系列文章
dubbo 源码 v2.7 分析:结构、container 入口及线程模型
dubbo 源码 v2.7 分析:SPI 机制
dubbo 源码 v2.7 分析:核心机制(一)
dubbo 源码 v2.7 分析:核心机制(二)
关注公众号:程序员架构进阶,每天实时获取更新,上百份面试资料和其他福利等你拿~
一 摘要
前面我们介绍了dubbo的核心机制,今天将开始分析远程调用流程。毕竟,作为一个rpc框架,远程调用是理论的核心内容。通过对dubbo相关实现的探究,深入了解rpc原理及可能的问题。
二 rpc通信核心问题
2.1 rpc核心问题
1、远程调用,使用什么传输协议
2、远程传输数据,不能直接原格式传输,只能通过流数据传输,序列化和反序列化协议选择?
3、发起请求方式?
4、请求处理方式?同步/异步,长连接/短链接...
除此之外,还有超时时间、异常处理、缓存、负载均衡策略、注册中心、网关等等相关问题,都需要解决。我们先关注前面四个。
2.2 rpc框架构成
1)服务提供者,运行在服务器端,提供服务接口定义与服务实现类。
2)服务中心,运行在服务器端,负责将本地服务发布成远程服务,管理远程服务,提供给服务消费者使用。
3)服务消费者,运行在客户端,通过远程代理对象调用远程服务。
三 序列化和反序列化
3.1 相关概念
3.1.1 定义
先说一个老生常谈的问题,什么是序列化和反序列化?简单来说,定义如下:
序列化(Serialization):将数据结构或对象转换成二进制串的过程;
反序列化(Deserialization):将在序列化过程中所生成的二进制串转换成数据结构或者对象的过程。
3.1.2 数据结构、对象与二进制串
不同的开发语言中,数据结构、对象和二进制串的表示方式略有不同。(内容来自美团技术团队的文章,序列化和反序列化)
1)数据结构和对象:
对于类似Java这种完全面向对象的语言,开发者操作的都是对象(Object),来自于类的实例化。在Java语言中最接近数据结构的概念,就是POJO(Plain Old Java Object)或者Java Bean--那些只有setter/getter方法的类。而在C 这种半面向对象的语言中,数据结构和struct对应,对象和class对应。
2)二进制串:
序列化所生成的二进制串指的是存储在内存中的一块数据。C 语言具有内存操作符,所以二进制串的概念容易理解,例如,C 语言的字符串可以直接被传输层使用,因为其本质上就是以’ ’结尾的存储在内存中的二进制串。在Java语言里面,二进制串的概念容易和String混淆。实际上String 是Java的一等公民,是一种特殊对象(Object)。对于跨语言间的通讯,序列化后的数据当然不能是某种语言的特殊数据类型。二进制串在Java里面所指的是byte[],byte是Java的8中原生数据类型之一(Primitive data types)。
3.2 为什么要序列化?
这是与远程通信的过程相关的,网络传输的数据必须是二进制数据,但通常调用方请求的入参有可能是对象(各种面向对象语言),而对象是不能直接在网络中传输的,所以需要把它转成可传输的二进制串,并且要求转换算法是可逆的,这个过程也就是“序列化”过程。这样,服务提供方就可以正确地从输入的二进制数据中分割出不同的请求参数,同时根据请求类型和序列化类型,把二进制的消息体逆向还原成请求对象,这个过程也就是“反序列化”。
回顾一下rpc请求过程:
添加描述
另外,作为一个框架,通用性是必须考虑的要素之一;而且,作为跨机器的请求,服务提供方和调用方也很可能使用的不是一种编程语言,那么保证两端的数据结构能够相互理解就是必须解决的一个问题,这也是序列化和反序列化的意义所在。
四 dubbo的通信过程
以dubbo官方提供的服务方和客户端demo作为生产和消费两端,对应的请求过程如下图所示:
添加描述
可见,dubbo采用的是Netty(TCP)协议作为默认的通信协议;通过Netty的NIO来进行数据传输,并使用了它的响应机制。
五 Dubbo通信分析
先回顾一下OSI模型,下图左侧是7层结构,右侧是对该层的解释:
添加描述
其中,传输层是因特网协议套件和OSI模型中的网络堆栈中的协议的分层体系结构中的方法的概念划分。该层的协议为应用程序提供主机到主机的通信服务。如UDP、TCP。并且提供面向连接的通信,可靠性,流量控制和多路复用等服务。
在dubbo中,Transporter就是对传输层的实现。它对于提供了dubbo服务间通讯的支持。有了它,各个服务就可以进行网络通信了,不再是信息孤岛了。
5.1 相关代码结构
dubbo远程通信相关代码在org.apache.dubbo.remoting包下。
- transport为网路传送层,抽象mina和netty为统一接口,以Message为中心,扩展接口为Channel、Transporter、Client、Server、Codec。
- exchange是信息交换层,封装请求响应模式,以Request为中心,Response为中心, 扩展接口为Exchanger、ExchangeChannel、ExchangeClient、ExchangeServer。
添加描述
5.2 通信协议及框架
从上图可见,dubbo支持了mina, netty(3 & 4版本), grizzly三种通信框架,默认配置常量在org.apache.dubbo.remoting.Constants中,这是一个常量接口,并未定义任何抽象方法。里面的常量包括一些关键的key(DISPATCHER_KEY,BIND_IP_KEY,BIND_PORT_KEY等等),默认的transporter、默认的远程客户端、默认的二进制协议等等。
添加描述
5.3 Transport分析
5.3.1 Transporter接口
代码语言:javascript复制@SPI("netty")public interface Transporter {
/**
* Bind a server.
*
* @param url server url
* @param handler
* @return server
* @throws RemotingException
* @see org.apache.dubbo.remoting.Transporters#bind(URL, ChannelHandler...)
*/
@Adaptive({Constants.SERVER_KEY, Constants.TRANSPORTER_KEY})
Server bind(URL url, ChannelHandler handler) throws RemotingException;
/**
* Connect to a server.
*
* @param url server url
* @param handler
* @return client
* @throws RemotingException
* @see org.apache.dubbo.remoting.Transporters#connect(URL, ChannelHandler...)
*/
@Adaptive({Constants.CLIENT_KEY, Constants.TRANSPORTER_KEY})
Client connect(URL url, ChannelHandler handler) throws RemotingException;}
通过这个接口定义可以知道:
1)dubbo是C/S架构
2)Transport提供了创建服务端和客户端的功能;
3)默认的Transporter是NettyTransporter,@SPI("netty")注解中标记;
4)可以通过server/client、transport来配置server/client的类型,目前支持的类型有netty、mina等
注:3)中的netty,指的是netty4(2.5.6版本以后,之前的版本是netty3),也就是对应transport.netty4包下的内容,这是因为netty4包下的NettyTransporter中,name是netty,SPI是通过name来获取对应的Transporter的:
添加描述
而 netty包下标记的NAME为netty3:
添加描述
5.3.2 NettyServer
1、属性和构造方法
继承自AbstractServer,且实现了Server接口:
添加描述
2、构造方法和AbstractServer
构造方法的主要动作都是在AbstractServer中完成的(继承关系从子类到父类:AbstractServer->AbstractEndpoint->AbstractPeer),在这里设置了ip、端口、空闲时间等参数,并调用了doOpen()方法,这是定义在AbstractServer的抽象方法,所以需要子类实现:
代码语言:javascript复制public AbstractServer(URL url, ChannelHandler handler) throws RemotingException {
super(url, handler);
localAddress = getUrl().toInetSocketAddress();
String bindIp = getUrl().getParameter(Constants.BIND_IP_KEY, getUrl().getHost());
int bindPort = getUrl().getParameter(Constants.BIND_PORT_KEY, getUrl().getPort());
if (url.getParameter(ANYHOST_KEY, false) || NetUtils.isInvalidLocalHost(bindIp)) {
bindIp = ANYHOST_VALUE;
}
bindAddress = new InetSocketAddress(bindIp, bindPort);
this.accepts = url.getParameter(ACCEPTS_KEY, DEFAULT_ACCEPTS);
this.idleTimeout = url.getParameter(IDLE_TIMEOUT_KEY, DEFAULT_IDLE_TIMEOUT);
try {
doOpen();
if (logger.isInfoEnabled()) {
logger.info("Start " getClass().getSimpleName() " bind " getBindAddress() ", export " getLocalAddress());
}
} catch (Throwable t) {
throw new RemotingException(url.toInetSocketAddress(), null, "Failed to bind " getClass().getSimpleName()
" on " getLocalAddress() ", cause: " t.getMessage(), t);
}
//fixme replace this with better method
DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
executor = (ExecutorService) dataStore.get(Constants.EXECUTOR_SERVICE_COMPONENT_KEY, Integer.toString(url.getPort()));
}
AbstractPeer.java,除提供构造方法外,还提供了消息发送方法,以及一些handler:
代码语言:javascript复制 public AbstractPeer(URL url, ChannelHandler handler) {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
if (handler == null) {
throw new IllegalArgumentException("handler == null");
}
this.url = url;
this.handler = handler;
}
@Override
public void send(Object message) throws RemotingException {
send(message, url.getParameter(Constants.SENT_KEY, false));
}
@Override
public ChannelHandler getChannelHandler() {
if (handler instanceof ChannelHandlerDelegate) {
return ((ChannelHandlerDelegate) handler).getHandler();
} else {
return handler;
}
}
/**
* @return ChannelHandler
*/
@Deprecated
public ChannelHandler getHandler() {
return getDelegateHandler();
}
/**
* Return the final handler (which may have been wrapped). This method should be distinguished with getChannelHandler() method
*
* @return ChannelHandler
*/
public ChannelHandler getDelegateHandler() {
return handler;
}
Transport初始化是通过Transporters这个工具类进行初始化的,查看源码,有以下两个bind方法,支持传参url和ChannelHandler,并且支持传入多个Handler:
代码语言:javascript复制 public static Server bind(String url, ChannelHandler... handler) throws RemotingException { return bind(URL.valueOf(url), handler); } public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException { if (url == null) { throw new IllegalArgumentException("url == null"); } if (handlers == null || handlers.length == 0) { throw new IllegalArgumentException("handlers == null"); } ChannelHandler handler; if (handlers.length == 1) { handler = handlers[0]; } else { handler = new ChannelHandlerDispatcher(handlers); } return getTransporter().bind(url, handler); }
3、doOpen
大部分方法是在AbstractServer中实现,NettyServer中最重要的方法就是这个doOpen方法,对比netty server配置和启动demo可知,就是Netty配置和启动的一个标准流程:
代码语言:javascript复制 @Override
protected void doOpen() throws Throwable {
bootstrap = new ServerBootstrap();
bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("NettyServerBoss", true));
workerGroup = new NioEventLoopGroup(getUrl().getPositiveParameter(IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
new DefaultThreadFactory("NettyServerWorker", true));
final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
channels = nettyServerHandler.getChannels();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
.childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
// FIXME: should we use getTimeout()?
int idleTimeout = UrlUtils.getIdleTimeout(getUrl());
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
.addLast("decoder", adapter.getDecoder())
.addLast("encoder", adapter.getEncoder())
.addLast("server-idle-handler", new IdleStateHandler(0, 0, idleTimeout, MILLISECONDS))
.addLast("handler", nettyServerHandler);
}
});
// bind
ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
channelFuture.syncUninterruptibly();
channel = channelFuture.channel();
}
4、Transporter的上层调用关系——Exchanger
继续看上层调用,我们知道了Transporter通过了bind方法,来绑定url和ChannelHandler,那么是由谁来执行绑定,又是怎样与上层联系起来的呢?
通过查看bind方法调用关系,可以看到Transporters.bind()方法,是通过它来进行的绑定调用:
代码语言:javascript复制public static Server bind(String url, ChannelHandler... handler) throws RemotingException {
return bind(URL.valueOf(url), handler);}public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
if (handlers == null || handlers.length == 0) {
throw new IllegalArgumentException("handlers == null");
}
ChannelHandler handler;
if (handlers.length == 1) {
handler = handlers[0];
} else {
handler = new ChannelHandlerDispatcher(handlers);
}
return getTransporter().bind(url, handler);}
而在此之上,是HeaderExchanger调用了bind,并且是在HeaderExchangeServer的构造方法中传入作为构造参数:
代码语言:javascript复制public class HeaderExchanger implements Exchanger {
public static final String NAME = "header";
@Override
public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
}
@Override
public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
}}
5、DubboProtocol
再继续向上溯源,就到了DubboProtocol,在createServer方法中可以看到这个调用过程:
代码语言:javascript复制private ExchangeServer createServer(URL url) {
url = URLBuilder.from(url)
// send readonly event when server closes, it's enabled by default
.addParameterIfAbsent(CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString())
// enable heartbeat by default
.addParameterIfAbsent(HEARTBEAT_KEY, String.valueOf(DEFAULT_HEARTBEAT))
.addParameter(CODEC_KEY, DubboCodec.NAME)
.build();
String str = url.getParameter(SERVER_KEY, DEFAULT_REMOTING_SERVER);
if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
throw new RpcException("Unsupported server type: " str ", url: " url);
}
ExchangeServer server;
try {
//这里就是Exchangers.bind的调用关系
server = Exchangers.bind(url, requestHandler);
} catch (RemotingException e) {
throw new RpcException("Fail to start server(url: " url ") " e.getMessage(), e);
}
str = url.getParameter(CLIENT_KEY);
if (str != null && str.length() > 0) {
Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
if (!supportedTypes.contains(str)) {
throw new RpcException("Unsupported client type: " str);
}
}
return server;
}
这是一个private方法,在上面是openServer(URL url),这同样是一个private方法:
代码语言:javascript复制private void openServer(URL url) {
// find server.
String key = url.getAddress();
//client can export a service which's only for server to invoke
boolean isServer = url.getParameter(IS_SERVER_KEY, true);
if (isServer) {
ExchangeServer server = serverMap.get(key);
if (server == null) {
synchronized (this) {
server = serverMap.get(key);
if (server == null) {
serverMap.put(key, createServer(url));
}
}
} else {
// server supports reset, use together with override
server.reset(url);
}
}
}
继续向上,export(Invoker<T> invoker),终于是public,这个方法被上层的ServiceConfig和 ServiceConfig使用:
代码语言:javascript复制@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
URL url = invoker.getUrl();
// export service.
String key = serviceKey(url);
DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
exporterMap.put(key, exporter);
//export an stub service for dispatching event
Boolean isStubSupportEvent = url.getParameter(STUB_EVENT_KEY, DEFAULT_STUB_EVENT);
Boolean isCallbackservice = url.getParameter(IS_CALLBACK_SERVICE, false);
if (isStubSupportEvent && !isCallbackservice) {
String stubServiceMethods = url.getParameter(STUB_EVENT_METHODS_KEY);
if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
if (logger.isWarnEnabled()) {
logger.warn(new IllegalStateException("consumer [" url.getParameter(INTERFACE_KEY)
"], has set stubproxy support event ,but no stub methods founded."));
}
} else {
stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
}
}
openServer(url);
optimizeSerialization(url);
return exporter;}
六 调用序列
再回顾一下dubbo的架构图,注意左边服务提供方暴露服务的蓝色初始化链:
可见,这就是我们刚刚追踪的调用过程,这部分的时序图如下:
添加描述
总结
本文详细阐述了dubbo的通信过程,并介绍了支持的序列化协议,和默认的使用配置。通信过程是分析的重点,这让我们更加深入理解了总架构图,也能够据此对rpc框架『窥一斑而知全豹』。再后面的文章中,会详细介绍序列化协议部分,有经验的小伙伴可能清楚,序列化在rpc框架中的重要地位,从安全性到性能,都是值得深入探讨的话题。
另外:有一点需要注意(From dubbo官网),Remoting 实现是 Dubbo 协议的实现,如果你选择 RMI 协议,整个 Remoting 都不会用上,Remoting 内部再划为 Transport 传输层和 Exchange 信息交换层,Transport 层只负责单向消息传输,是对 Mina, Netty, Grizzly 的抽象,它也可以扩展 UDP 传输,而 Exchange 层是在传输层之上封装了 Request-Response 语义。