由于我觉得服务端相对简单一点,所以先分析服务端接受请求流程的代码,再分析客户端调用的代码。本文重点解析的是调用以及调用需要初始化的处理器handler等,对于与注册中心的交互解析相对比较少。
一.服务端初始化handler
这里主要介绍主要的流程,并且默认dubbo协议
1.handler整理流程
要理清楚服务端调用流程最重要的就是要理清楚服务端的所有处理请求的handler,我们先来看看它的handler是如何初始化,并且逐级传递的
我先上一个handler的初始化的流程,然后再上一个handler调用的流程。
handler初始化的流程
ExchangeHandlerAdapter(dubboProtocal的中的匿名内部类)–>HeaderExchangeHandler–>DecodeHandler–>AllChannelHandler–>HeartbeatHandler–>MultiMessageHandler–>NettyServer–>NettyServerHandler(这里就是netty处理的handler)
handler调用的流程
handler调用的流程刚好和上面相反,从netty的handler开始向上传递,如果了解netty的应该不难理解,下面我们详细分析handler创建过程
2.handler初始化流程
我们直接从DubboProtocol分析,如果想了解其他的流程,可以阅读服务暴露的整个流程
从DubboProtocol#export()开始分析
代码语言:javascript复制public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
URL url = invoker.getUrl();
// export service.
String key = serviceKey(url);
//1.这个exportor需要有个印象,包括下面的exporterMap,因为调用的时候会通过这两个获取Invoker
DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
exporterMap.put(key, exporter);
//export an stub service for dispatching event
//......省略部分代码
//2.重点看这里
openServer(url);
optimizeSerialization(url);
return exporter;
}
接着看…
代码语言:javascript复制private void openServer(URL url) {
// find server.
String key = url.getAddress();
//client can export a service which's only for server to invoke
boolean isServer = url.getParameter(IS_SERVER_KEY, true);
if (isServer) {
ExchangeServer server = serverMap.get(key);
//1.通过双重校验,操作缓存,如果缓存里面有,则直接获取,否则创建server并放入缓存
if (server == null) {
synchronized (this) {
server = serverMap.get(key);
if (server == null) {
//2.重点看createServer()方法
serverMap.put(key, createServer(url));
}
}
} else {
// server supports reset, use together with override
server.reset(url);
}
}
}
比较简单,看注释就行,咱们接着看createServer(),从下面开始要传递handler了
代码语言:javascript复制private ExchangeServer createServer(URL url) {
url = URLBuilder.from(url)
// send readonly event when server closes, it's enabled by default
.addParameterIfAbsent(CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString())
// enable heartbeat by default
.addParameterIfAbsent(HEARTBEAT_KEY, String.valueOf(DEFAULT_HEARTBEAT))
.addParameter(CODEC_KEY, DubboCodec.NAME)
.build();
String str = url.getParameter(SERVER_KEY, DEFAULT_REMOTING_SERVER);
if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
throw new RpcException("Unsupported server type: " str ", url: " url);
}
ExchangeServer server;
try {
//1.重点看这里requestHandler,是第一个传递的handler
server = Exchangers.bind(url, requestHandler);
} catch (RemotingException e) {
throw new RpcException("Fail to start server(url: " url ") " e.getMessage(), e);
}
//......省略部分代码
return server;
}
这个requestHandler是我们向下传递的第一个handler,它是dubboProtocol类中的一个匿名内部类,我们简单看一下
代码语言:javascript复制private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {
@Override
public CompletableFuture<Object> reply(ExchangeChannel channel, Object message) throws RemotingException {
if (!(message instanceof Invocation)) {
throw new RemotingException(channel, "Unsupported request: "
(message == null ? null : (message.getClass().getName() ": " message))
", channel: consumer: " channel.getRemoteAddress() " --> provider: " channel.getLocalAddress());
}
Invocation inv = (Invocation) message;
//1.拿到服务的代理对象
Invoker<?> invoker = getInvoker(channel, inv);
// need to consider backward-compatibility if it's a callback
if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) {
String methodsStr = invoker.getUrl().getParameters().get("methods");
boolean hasMethod = false;
if (methodsStr == null || !methodsStr.contains(",")) {
hasMethod = inv.getMethodName().equals(methodsStr);
} else {
String[] methods = methodsStr.split(",");
for (String method : methods) {
if (inv.getMethodName().equals(method)) {
hasMethod = true;
break;
}
}
}
if (!hasMethod) {
logger.warn(new IllegalStateException("The methodName " inv.getMethodName()
" not found in callback service interface ,invoke will be ignored."
" please update the api interface. url is:"
invoker.getUrl()) " ,invocation is :" inv);
return null;
}
}
RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
//2.开始调用
Result result = invoker.invoke(inv);
return result.completionFuture().thenApply(Function.identity());
}
ok,第一个handler已经实例化好了,接着我们继续Exchangers.bind(url, requestHandler)接着看
代码语言:javascript复制public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
if (handler == null) {
throw new IllegalArgumentException("handler == null");
}
url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
//1.将handler继续往下传递
return getExchanger(url).bind(url, handler);
}
很简单,做了一些简单的判断后,直接继续向下传递。这里getExchanger()获取的到底是实例化了Exchanger的哪个子类呢?dubbo整个源码中充满了这种SPI机制来扩展以保证灵活性,可以参考dubbo spi机制原理,或者你先跳过,我这里先交大家一个简单的方法如果快速获取默认的SPI接口的实现,比如这个Exchanger
代码语言:javascript复制public static Exchanger getExchanger(String type) {
return ExtensionLoader.getExtensionLoader(Exchanger.class).getExtension(type);
}
如果大家看到这种形式的获取SPI接口的实例,我们直接点进去Exchanger接口内
代码语言:javascript复制//1.看这个SPI标注的内容是什么,这里的HeaderExchanger.NAME=header
@SPI(HeaderExchanger.NAME)
public interface Exchanger {
@Adaptive({
Constants.EXCHANGER_KEY})
ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException;
@Adaptive({
Constants.EXCHANGER_KEY})
ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException;
}
我们找到@SPI注释的值是header,然后我们进入dubbo源码的如下部分查找
所以这里我们默认的是HeaderExchanger,我们接着HeaderExchanger#bind()
代码语言:javascript复制public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
//1.这里比较重要了,一共创建了多个handler
return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
}
首先这里的handler是我们上面传递的ExchangerHandlerAdapter,然后创建将它传递到HeaderExchangeHandler,最后传递到DecodeHandler。然后这些handler都是昨晚自己的工作后,委托它里面的handler继续处理。
所以这里向下传递的是DecodeHandler,我们继续往下看Transporters#bind()
代码语言:javascript复制public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
if (handlers == null || handlers.length == 0) {
throw new IllegalArgumentException("handlers == null");
}
//1.传过来的是DecodeHandler,长度是1
ChannelHandler handler;
if (handlers.length == 1) {
handler = handlers[0];
} else {
handler = new ChannelHandlerDispatcher(handlers);
}
//2.继续看这里,getTransporter()也是SPI扩展,暂时按照上面的方法,找到默认的Transporter
return getTransporter().bind(url, handler);
}
按照上面的方法,找到的Transporter是netty4里面的NettyTransporter,那么我们就直接看NettyTransporter#bind(),此时的handler以然是DecodeHandler
代码语言:javascript复制public Server bind(URL url, ChannelHandler listener) throws RemotingException {
return new NettyServer(url, listener);
}
这里没有逻辑,继续接着看
代码语言:javascript复制public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
// you can customize name and type of client thread pool by THREAD_NAME_KEY and THREADPOOL_KEY in CommonConstants.
// the handler will be warped: MultiMessageHandler->HeartbeatHandler->handler
super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
}
这里需要人真看了,这里会对上面传过来的handler进一步包装,然后调用父类的构建方法,先看怎么包装handler的,我们看ChannelHandlers.wrap(handler,url)
代码语言:javascript复制public static ChannelHandler wrap(ChannelHandler handler, URL url) {
return ChannelHandlers.getInstance().wrapInternal(handler, url);
}
代码语言:javascript复制protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {
return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class)
.getAdaptiveExtension().dispatch(handler, url)));
}
先看最里面的SPI,同样的方法,我们找默认的Dispatcher,找到的是AllDispatcher,然后我们看它的dispatcher()方法
代码语言:javascript复制public ChannelHandler dispatch(ChannelHandler handler, URL url) {
return new AllChannelHandler(handler, url);
}
传入的是handler参数是DecodeHandler,所以这里就是DecodeHandler外面再套了一层AllChannelHandler。再回到上面,依次外面再套了HeartbeatHandler和MultiMessageHandler所以到这里handler的嵌套如下:
我们回到上面NettyServer的构造方法中,它调用父类的构造方法,然后此时传递的handler是MultiMessageHandler
代码语言:javascript复制public AbstractServer(URL url, ChannelHandler handler) throws RemotingException {
//1.这里要注意了,handler已经保存在对象的内部了,后面处理事件时会用调用到
super(url, handler);
localAddress = getUrl().toInetSocketAddress();
String bindIp = getUrl().getParameter(Constants.BIND_IP_KEY, getUrl().getHost());
int bindPort = getUrl().getParameter(Constants.BIND_PORT_KEY, getUrl().getPort());
if (url.getParameter(ANYHOST_KEY, false) || NetUtils.isInvalidLocalHost(bindIp)) {
bindIp = ANYHOST_VALUE;
}
bindAddress = new InetSocketAddress(bindIp, bindPort);
this.accepts = url.getParameter(ACCEPTS_KEY, DEFAULT_ACCEPTS);
this.idleTimeout = url.getParameter(IDLE_TIMEOUT_KEY, DEFAULT_IDLE_TIMEOUT);
try {
//2.典型的模板设计模式,上面是获取ip和port等参数,这里是建立连接,不同的子类连接方式不同
doOpen();
if (logger.isInfoEnabled()) {
logger.info("Start " getClass().getSimpleName() " bind " getBindAddress() ", export " getLocalAddress());
}
} catch (Throwable t) {
throw new RemotingException(url.toInetSocketAddress(), null, "Failed to bind " getClass().getSimpleName()
" on " getLocalAddress() ", cause: " t.getMessage(), t);
}
//fixme replace this with better method
DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
executor = (ExecutorService) dataStore.get(Constants.EXECUTOR_SERVICE_COMPONENT_KEY, Integer.toString(url.getPort()));
}
我们接着看doOpen()
代码语言:javascript复制protected void doOpen() throws Throwable {
bootstrap = new ServerBootstrap();
bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("NettyServerBoss", true));
workerGroup = new NioEventLoopGroup(getUrl().getPositiveParameter(IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
new DefaultThreadFactory("NettyServerWorker", true));
//1.这个handler是netty处理事件的回调handler,另外注意这里传递了一个this作为参数(在NettyServerHandler里面是handler)
final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
channels = nettyServerHandler.getChannels();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
.childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
// FIXME: should we use getTimeout()?
int idleTimeout = UrlUtils.getIdleTimeout(getUrl());
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
.addLast("decoder", adapter.getDecoder())
.addLast("encoder", adapter.getEncoder())
.addLast("server-idle-handler", new IdleStateHandler(0, 0, idleTimeout, MILLISECONDS))
.addLast("handler", nettyServerHandler);
}
});
// bind
//调用bind就绑定了ip和端口,等待accept连接了,连接后就能处理请求了
ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
channelFuture.syncUninterruptibly();
channel = channelFuture.channel();
}
上面的代码非常的熟悉,看过或者用过netty开发开发相信都不陌生,这是netty开发的固定写法。到这里其实服务端的准备工作已经完成了,应用也启动完成了,接下来就是等待建立连接和处理请求了。接下来我们看处理请求请求的流程
二.服务端处理请求
接下来,我们接着上面,看看服务端的处理请求,其实跟到这里,大家基本上可自行分析处理请求的流程了,无非就是handler的各种时间处理了
我们从netty处理事件的回调的handler开始分析,当服务端收到事件时,我们会依次调用channel中的handler组成的链依次调用,我们这里先不看编码解吗的handler,直接看nettyServerHandler,它既是入站处理器,又是出站处理器(继承了ChannelDuplexHandler),当我们收到接口请求时,会触发channelRead()方法
代码语言:javascript复制public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//1.封装了自己的nettyChannel,并且有一个map保存channel和nettyChannel的映射关系
NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
try {
//2.这个handler就是从构造方法中传递过来的,在NettyServer中传的是this
handler.received(channel, msg);
} finally {
NettyChannel.removeChannelIfDisconnected(ctx.channel());
}
}
直接委托给handler处理,注意这个handler是NettyServer,我们接着看NettyServer#received()这里是调用到了它的父类AbstractPeer的received方法
代码语言:javascript复制public void received(Channel ch, Object msg) throws RemotingException {
if (closed) {
return;
}
handler.received(ch, msg);
}
依然基本上不做任何的处理,继续委托给handler处理,注意 此时的handler是我们前面说的MultiMessageHandler,它通过构造NettyServer的构造方法传递过来的,所以我们这里调用的是MultiMessageHandler#handler()
代码语言:javascript复制public void received(Channel channel, Object message) throws RemotingException {
if (message instanceof MultiMessage) {
MultiMessage list = (MultiMessage) message;
for (Object obj : list) {
handler.received(channel, obj);
}
} else {
handler.received(channel, message);
}
}
这里不管是走到哪个分支逻辑,都是继续委托给它里面的handler处理,经过前面的分析,MultiMessageHandler里面一层是HeartbeatHandler,我们看它的received()方法
代码语言:javascript复制public void received(Channel channel, Object message) throws RemotingException {
setReadTimestamp(channel);
//1.如果客户端发过来的心跳消息,则回复一个心跳相应
if (isHeartbeatRequest(message)) {
Request req = (Request) message;
if (req.isTwoWay()) {
Response res = new Response(req.getId(), req.getVersion());
res.setEvent(Response.HEARTBEAT_EVENT);
channel.send(res);
if (logger.isInfoEnabled()) {
int heartbeat = channel.getUrl().getParameter(Constants.HEARTBEAT_KEY, 0);
if (logger.isDebugEnabled()) {
logger.debug("Received heartbeat from remote channel " channel.getRemoteAddress()
", cause: The channel has no data-transmission exceeds a heartbeat period"
(heartbeat > 0 ? ": " heartbeat "ms" : ""));
}
}
}
return;
}
//2.如果是心跳相应,则不处理
if (isHeartbeatResponse(message)) {
if (logger.isDebugEnabled()) {
logger.debug("Receive heartbeat response in thread " Thread.currentThread().getName());
}
return;
}
//3.继续调用handler
handler.received(channel, message);
}
这个handler一看就是处理心跳的handler,看到这里大家也大概可以看到这种类似责任链的模式了。我们从上面的分析可以得出heartbeatHandler里面的handler是AllChannelHandler,我们来看它的received()方法
代码语言:javascript复制public void received(Channel channel, Object message) throws RemotingException {
//1.获取线程池
ExecutorService executor = getExecutorService();
try {
//2.往线程池扔任务
executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
} catch (Throwable t) {
//......省略代码
}
}
上面这个方法比较重要了,好好唠唠
1.首先说一下获取线程池,听说这是面试点(会问默认的使用那种线程池,核心线程数默认是多少),我们来看看
代码语言:javascript复制public ExecutorService getExecutorService() {
return executor == null || executor.isShutdown() ? SHARED_EXECUTOR : executor;
}
这个executor在构造方法中就通过SPI扩展的方式创建了
代码语言:javascript复制public WrappedChannelHandler(ChannelHandler handler, URL url) {
//......省略代码
executor = (ExecutorService) ExtensionLoader.getExtensionLoader(ThreadPool.class).getAdaptiveExtension().getExecutor(url);
}
根据我们之前的方法看下默认是什么线程池—–>FixedThreadPool,看下它的构造方法
代码语言:javascript复制public Executor getExecutor(URL url) {
String name = url.getParameter(THREAD_NAME_KEY, DEFAULT_THREAD_NAME);
int threads = url.getParameter(THREADS_KEY, DEFAULT_THREADS);
int queues = url.getParameter(QUEUES_KEY, DEFAULT_QUEUES);
return new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS,
queues == 0 ? new SynchronousQueue<Runnable>() :
(queues < 0 ? new LinkedBlockingQueue<Runnable>()
: new LinkedBlockingQueue<Runnable>(queues)),
new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));
}
上面的构造方法,大家进去看默认值,可以看到核心线程数是200,另外如果设置的队列的长度是0的话(默认是0),那么使用的同步队列是SynchronousQueue,否则是LinkedBlockingQueue,并且长度就是用户设置的长度
另外到这里服务端处理事件总共用了两个线程池哦,第一个是netty的workEventLoopGroup,另外一个就是这里了,工作过程大致如下:
这样可以很好的保证eventloop中的线程不会被阻塞,从而达到更大的并发…
回到上面,我们将请求放到线程池中处理,显然这里创建的任务是ChannelEventRunnable,并且以然会将handler依次向下传递,我们看ChannelEventRunnable的run方法
代码语言:javascript复制public void run() {
if (state == ChannelState.RECEIVED) {
try {
handler.received(channel, message);
} catch (Exception e) {
logger.warn("ChannelEventRunnable handle " state " operation error, channel is " channel
", message is " message, e);
}
} else {
switch (state) {
case CONNECTED:
try {
handler.connected(channel);
} catch (Exception e) {
logger.warn("ChannelEventRunnable handle " state " operation error, channel is " channel, e);
}
break;
case DISCONNECTED:
try {
handler.disconnected(channel);
} catch (Exception e) {
logger.warn("ChannelEventRunnable handle " state " operation error, channel is " channel, e);
}
break;
case SENT:
try {
handler.sent(channel, message);
} catch (Exception e) {
logger.warn("ChannelEventRunnable handle " state " operation error, channel is " channel
", message is " message, e);
}
break;
case CAUGHT:
try {
handler.caught(channel, exception);
} catch (Exception e) {
logger.warn("ChannelEventRunnable handle " state " operation error, channel is " channel
", message is: " message ", exception is " exception, e);
}
break;
default:
logger.warn("unknown state: " state ", message is " message);
}
}
}
这个方法很简单,根据不同的类型请求做不同的处理,我们这里仅仅看received,并且这里的handler是allChannelHandler里面的handler了,根据我们前面分析是DecodeHandler,我们继续看DecodeHandler里面的received
代码语言:javascript复制public void received(Channel channel, Object message) throws RemotingException {
if (message instanceof Decodeable) {
decode(message);
}
if (message instanceof Request) {
decode(((Request) message).getData());
}
if (message instanceof Response) {
decode(((Response) message).getResult());
}
handler.received(channel, message);
}
这个方法就是将客户端传过来的内容根据不同的请求,进行转换成不同的对象,接着继续调用handler,此时这个handler是HeaderExchangeHandler
代码语言:javascript复制public void received(Channel channel, Object message) throws RemotingException {
channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
final ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
try {
if (message instanceof Request) {
// handle request.
Request request = (Request) message;
if (request.isEvent()) {
handlerEvent(channel, request);
} else {
//1.需要返回结果,双向通信
if (request.isTwoWay()) {
handleRequest(exchangeChannel, request);
} else {
//2.不需要返回结果,单向通信
handler.received(exchangeChannel, request.getData());
}
}
} else if (message instanceof Response) {
handleResponse(channel, (Response) message);
} else if (message instanceof String) {
if (isClientSide(channel)) {
Exception e = new Exception("Dubbo client can not supported string message: " message " in channel: " channel ", url: " channel.getUrl());
logger.error(e.getMessage(), e);
} else {
String echo = handler.telnet(channel, (String) message);
if (echo != null && echo.length() > 0) {
channel.send(echo);
}
}
} else {
handler.received(exchangeChannel, message);
}
} finally {
HeaderExchangeChannel.removeChannelIfDisconnected(channel);
}
}
默认是需要返回结果的,我们看handleRequest()方法
代码语言:javascript复制void handleRequest(final ExchangeChannel channel, Request req) throws RemotingException {
Response res = new Response(req.getId(), req.getVersion());
if (req.isBroken()) {
//......省略部分代码
return;
}
// find handler by message class.
Object msg = req.getData();
try {
//1.获取结果
CompletionStage<Object> future = handler.reply(channel, msg);
//2.当获取到结果后,回调,将结果通过channel通道写回给客户端
future.whenComplete((appResult, t) -> {
try {
if (t == null) {
res.setStatus(Response.OK);
res.setResult(appResult);
} else {
res.setStatus(Response.SERVICE_ERROR);
res.setErrorMessage(StringUtils.toString(t));
}
//3.写回给客户端
channel.send(res);
} catch (RemotingException e) {
logger.warn("Send result to consumer failed, channel is " channel ", msg is " e);
} finally {
// HeaderExchangeChannel.removeChannelIfDisconnected(channel);
}
});
} catch (Throwable e) {
res.setStatus(Response.SERVICE_ERROR);
res.setErrorMessage(StringUtils.toString(e));
channel.send(res);
}
}
上面通过调用handler获取结果,然后通过通道写回给客户端,此时的handler是我们DubboProtocol匿名ExchangeHandlerAdapter内部类,我们去看看
代码语言:javascript复制public CompletableFuture<Object> reply(ExchangeChannel channel, Object message) throws RemotingException {
if (!(message instanceof Invocation)) {
throw new RemotingException(channel, "Unsupported request: "
(message == null ? null : (message.getClass().getName() ": " message))
", channel: consumer: " channel.getRemoteAddress() " --> provider: " channel.getLocalAddress());
}
Invocation inv = (Invocation) message;
//1.获取invoker
Invoker<?> invoker = getInvoker(channel, inv);
// need to consider backward-compatibility if it's a callback
//......省略部分代码
RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
//2.调用
Result result = invoker.invoke(inv);
//3.返回
return result.completionFuture().thenApply(Function.identity());
}
1.获取invoker,在前面分析过,当创建exporter时,将invoker传递到exporter里面了,然后用一个exporterMap存起来了exporter,这里就是通过map拿到exporter,然后拿到invoker
2.此处的invoker是一个匿名内部类,其父类是AbstractProxyInvoker,创建的代码在JavassistProxyFactory#getInvoker(),这里的JavassistProxyFactory也是SPI获取的默认的
代码语言:javascript复制public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
// TODO Wrapper cannot handle this scenario correctly: the classname contains '$'
final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
return new AbstractProxyInvoker<T>(proxy, type, url) {
@Override
protected Object doInvoke(T proxy, String methodName,
Class<?>[] parameterTypes,
Object[] arguments) throws Throwable {
//1.最终调用Wrapper的invokemethod方法
return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
}
};
}
我们接着上面看,先是调用invoker.invoke()方法,其实就是父类AbstractProxyInvoker的invoke方法
代码语言:javascript复制public Result invoke(Invocation invocation) throws RpcException {
try {
//1.模板方法,调用的是子类的方法,也就是匿名内部类的方法,就是上面的doInvoke()方法
Object value = doInvoke(proxy, invocation.getMethodName(), invocation.getParameterTypes(), invocation.getArguments());
CompletableFuture<Object> future = wrapWithFuture(value, invocation);
AsyncRpcResult asyncRpcResult = new AsyncRpcResult(invocation);
future.whenComplete((obj, t) -> {
AppResponse result = new AppResponse();
if (t != null) {
if (t instanceof CompletionException) {
result.setException(t.getCause());
} else {
result.setException(t);
}
} else {
result.setValue(obj);
}
asyncRpcResult.complete(result);
});
return asyncRpcResult;
} catch (InvocationTargetException e) {
//省略部分代码...
}
}
最终会调用到匿名内部类的doInvoke()方法,然而上面的doInvoke()其实就是调用wrapper.invokeMethod()方法,wrapper是通过字节码生成的,我下面贴一下生成的具体方法的内容
代码语言:javascript复制public Object invokeMethod(Object o, String n, Class[] p, Object[] v) throws java.lang.reflect.InvocationTargetException{
com.taolong.dubbo.api.HelloService w;
try{
w = ((com.taolong.dubbo.api.HelloService)$1);
}
catch(Throwable e){
throw new IllegalArgumentException(e);
}
try{
if( "sayHello".equals( $2 ) && $3.length == 1 ) {
return ($w)w.sayHello((java.lang.String)$4[0]);
}
}
catch(Throwable e) {
throw new java.lang.reflect.InvocationTargetException(e);
}
throw new org.apache.dubbo.common.bytecode.NoSuchMethodException("Not found method "" $2 "" in class com.taolong.dubbo.api.HelloService.");
}
可以看到最终会调用到第一个参数的sayHello方法,而第一个参数就是我们自己的业务类proxy对应我工程中的HelloserviceImpl实例,整个服务端的调用过程大致如下了,这里仅仅分析了主干流程,还有其他的比如编解码,大家可以自行分析…
三.客户端服务引用
为什么我不直接就开始分析调用流程呢?因为调用过程中创建的各种对象,以及包装对象的创建不明白的话,很难让人看明白,所以先分析调用请求的对象的创建,这样才能更好的将流程串起来。但是不是重要的内容可能不会重点分析。而且有一些内容与前面的服务端的初始化是一样的,比如后面那些handler(MultiMessageHandler、HeartbeatHandler、AllChannelHandler…)。我们先从ReferenceBean开始吧…
先看afterPropertiesSet(),为什么是这个方法,这涉及到spring相关的内容,因为它实现了InitializingBean,当spring创建这个bean时就会调用这个方法,我这里就简单带过,感兴趣的可以去学习下spring,也可以阅读我相关的文章
代码语言:javascript复制public void afterPropertiesSet() throws Exception {
//......这个方法就不细看了,就是繁琐的获取配置信息
}
获取到配置信息后,一定会调用getObject(),这是为什么呢?这又是spring相关的知识,因为ReferenceBean实现了FactoryBean接口,spring创建这种类,最终获取的是调用getObject()放回的值,放到spring容器中
代码语言:javascript复制public Object getObject() {
return get();
}
代码语言:javascript复制public synchronized T get() {
checkAndUpdateSubConfigs();
if (destroyed) {
throw new IllegalStateException("The invoker of ReferenceConfig(" url ") has already destroyed!");
}
if (ref == null) {
//直接看这里啦
init();
}
return ref;
}
代码语言:javascript复制private void init() {
if (initialized) {
return;
}
//1....这上面的代码省略掉了,反正也是初始化一些值,比如从配置文件设置的,以及默认的,还有获取ip地址的......都会放到map中传递
//2.重点看这个ref,这个就是我们需要的代理类,就是用它发起远程调用的
ref = createProxy(map);
String serviceKey = URL.buildKey(interfaceName, group, version);
ApplicationModel.initConsumerModel(serviceKey, buildConsumerModel(serviceKey, attributes));
initialized = true;
}
代码语言:javascript复制private T createProxy(Map<String, String> map) {
//jvm内部调用,相当于同一个进程调用,没啥用...
if (shouldJvmRefer(map)) {
//省略....
} else {
urls.clear(); // reference retry init will add url to urls, lead to OOM
if (url != null && url.length() > 0) {
// user specified URL, could be peer-to-peer address, or register center's address.
//......省略
} else {
// assemble URL from register center's configuration
// if protocols not injvm checkRegistry
//......省略
}
if (urls.size() == 1) {
//1.直接看单个注册中心的,此时的注册中心的url个数为1
invoker = REF_PROTOCOL.refer(interfaceClass, urls.get(0));
} else {
//多注册中心的不看...
}
//......省略
// create service proxy
//2.将上面生成的invoker封装成一个代理类......其实内部远程调用就是invoker
return (T) PROXY_FACTORY.getProxy(invoker);
}
这里我们重点分析注释1和注释2的代码,由于注释1的代码是重点,而且流程最长,所以我们先看注释2的部分,这是假设注释1的部分已经生成了一个invoker,看看它是如何封装成一个代理对象,最后交给spring容器管理的呢?
PROXY_FACTORY是使用SPI机制的,我们找到默认的ProxyFactory是JavassistProxyFactory,那么我们看它的getProxy()方法
代码语言:javascript复制public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
}
先看Proxy.getProxy()这个方法,看看它生成的是什么
代码语言:javascript复制public static Proxy getProxy(Class<?>... ics) {
return getProxy(ClassUtils.getClassLoader(Proxy.class), ics);
}
代码语言:javascript复制public static Proxy getProxy(ClassLoader cl, Class<?>... ics) {
if (ics.length > MAX_PROXY_COUNT) {
throw new IllegalArgumentException("interface limit exceeded");
}
//......省略
// get cache by class loader.
final Map<String, Object> cache;
synchronized (PROXY_CACHE_MAP) {
cache = PROXY_CACHE_MAP.computeIfAbsent(cl, k -> new HashMap<>());
}
Proxy proxy = null;
//同步,其他线程在生成的字节码的时候,就等待,并等待唤醒
synchronized (cache) {
do {
Object value = cache.get(key);
if (value instanceof Reference<?>) {
proxy = (Proxy) ((Reference<?>) value).get();
if (proxy != null) {
return proxy;
}
}
if (value == PENDING_GENERATION_MARKER) {
try {
cache.wait();
} catch (InterruptedException e) {
}
} else {
cache.put(key, PENDING_GENERATION_MARKER);
break;
}
}
while (true);
}
long id = PROXY_CLASS_COUNTER.getAndIncrement();
String pkg = null;
ClassGenerator ccp = null, ccm = null;
try {
ccp = ClassGenerator.newInstance(cl);
Set<String> worked = new HashSet<>();
List<Method> methods = new ArrayList<>();
//遍历接口
for (int i = 0; i < ics.length; i ) {
if (!Modifier.isPublic(ics[i].getModifiers())) {
String npkg = ics[i].getPackage().getName();
if (pkg == null) {
pkg = npkg;
} else {
if (!pkg.equals(npkg)) {
throw new IllegalArgumentException("non-public interfaces from different packages");
}
}
}
ccp.addInterface(ics[i]);
//遍历接口中的方法
for (Method method : ics[i].getMethods()) {
String desc = ReflectUtils.getDesc(method);
if (worked.contains(desc)) {
continue;
}
if (ics[i].isInterface() && Modifier.isStatic(method.getModifiers())) {
continue;
}
worked.add(desc);
int ix = methods.size();
Class<?> rt = method.getReturnType();
Class<?>[] pts = method.getParameterTypes();
StringBuilder code = new StringBuilder("Object[] args = new Object[").append(pts.length).append("];");
for (int j = 0; j < pts.length; j ) {
code.append(" args[").append(j).append("] = ($w)$").append(j 1).append(";");
}
code.append(" Object ret = handler.invoke(this, methods[").append(ix).append("], args);");
if (!Void.TYPE.equals(rt)) {
code.append(" return ").append(asArgument(rt, "ret")).append(";");
}
methods.add(method);
ccp.addMethod(method.getName(), method.getModifiers(), rt, pts, method.getExceptionTypes(), code.toString());
}
}
if (pkg == null) {
pkg = PACKAGE_NAME;
}
// create ProxyInstance class.
String pcn = pkg ".proxy" id;
ccp.setClassName(pcn);
//增加的属性
ccp.addField("public static java.lang.reflect.Method[] methods;");
ccp.addField("private " InvocationHandler.class.getName() " handler;");
ccp.addConstructor(Modifier.PUBLIC, new Class<?>[]{
InvocationHandler.class}, new Class<?>[0], "handler=$1;");
ccp.addDefaultConstructor();
Class<?> clazz = ccp.toClass();
clazz.getField("methods").set(null, methods.toArray(new Method[0]));
// create Proxy class.
String fcn = Proxy.class.getName() id;
ccm = ClassGenerator.newInstance(cl);
ccm.setClassName(fcn);
ccm.addDefaultConstructor();
ccm.setSuperClass(Proxy.class);
//2.重点看这个方法,后面调用这个方法
ccm.addMethod("public Object newInstance(" InvocationHandler.class.getName() " h){ return new " pcn "($1); }");
Class<?> pc = ccm.toClass();
proxy = (Proxy) pc.newInstance();
} catch (RuntimeException e) {
throw e;
} catch (Exception e) {
throw new RuntimeException(e.getMessage(), e);
} finally {
// release ClassGenerator
if (ccp != null) {
ccp.release();
}
if (ccm != null) {
ccm.release();
}
synchronized (cache) {
if (proxy == null) {
cache.remove(key);
} else {
//还用了weakReference
cache.put(key, new WeakReference<Proxy>(proxy));
}
cache.notifyAll();
}
}
return proxy;
}
这个方法很长,其实我们不用太过关注,因为他就是通过拼接的方法生成Class,然后实例化对象,这里我们记住它拼接了一个newInstance()方法,后面要用
这里生成了一个Proxy对象,我们接着往上面看,最终调用这个Proxy的newInstance(new InvokerInvocationHandler(invoker)),我们回到上面的字节码生成的方法里面,它增加的方法就是newInstance,下面是我debug的界面,它通过new生成了一个对象,class的路径是:org.apache.dubbo.common.bytecode.proxy0
我根据官网的提示,用arthas生成了这个类的字节码信息,内容如下:
这就是我们最终生成的代理对象了,也是我们spring管理的对象,当调用它的业务方法(sayHello)时,其实就是将请求转发给它内部的handler.invoke(),而handler是我们传进去的invocationHandler,即(T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));我们接着看这个类的invoke方法
InvocationHandler#invoke()
代码语言:javascript复制public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
String methodName = method.getName();
Class<?>[] parameterTypes = method.getParameterTypes();
if (method.getDeclaringClass() == Object.class) {
return method.invoke(invoker, args);
}
if ("toString".equals(methodName) && parameterTypes.length == 0) {
return invoker.toString();
}
if ("hashCode".equals(methodName) && parameterTypes.length == 0) {
return invoker.hashCode();
}
if ("equals".equals(methodName) && parameterTypes.length == 1) {
return invoker.equals(args[0]);
}
return invoker.invoke(new RpcInvocation(method, args)).recreate();
}
很好理解,toString,hashCode,equals之外的方法都是直接继续交给它里面的invoke.invoke()处理,而这个invoke对象,就是我们上面假设生成的invoke,其实它的生成过程才是最重要的,前面我们只是假设它已经生成了,现在我们来看看它具体怎么生成的…因为所有的业务请求都是调用它的invoke方法。我们继续回到
ReferenceConfig#createProxy()里面
代码语言:javascript复制if (urls.size() == 1) {
//1.为了简单起见,我们只分析单注册中心,单协议......多协议多注册中心类似,无非就是多了个循环遍历
invoker = REF_PROTOCOL.refer(interfaceClass, urls.get(0));
}
这里的REF_PROTOCOL是一个SPI的机制,此时url的协议是registry,所以得到的Protocol是registry,但是,重点来了,SPI在这里继续做了扩展(之前在服务端调用时候我没有说,这里好好看,是重点),SPI在生成类是做了是否包装的判断,逻辑如下:
如果他的子类的构造方法里面参数是它的接口类型,dubbo的spi会自动创建它的包装类型,并且把当前的实例传进去
具体的代码如下,感兴趣的可以自行查阅
代码语言:javascript复制private void loadClass(Map<String, Class<?>> extensionClasses, java.net.URL resourceURL, Class<?> clazz, String name) throws NoSuchMethodException {
//省略......
//是否包装的逻辑判断在这里
else if (isWrapperClass(clazz)) {
cacheWrapperClass(clazz);
} else {
//......省略
}
}
代码语言:javascript复制private boolean isWrapperClass(Class<?> clazz) {
try {
//获取它的构造函数,参数是该类型自己(接口类型)
clazz.getConstructor(type);
return true;
} catch (NoSuchMethodException e) {
return false;
}
}
代码语言:javascript复制private void cacheWrapperClass(Class<?> clazz) {
if (cachedWrapperClasses == null) {
cachedWrapperClasses = new ConcurrentHashSet<>();
}
//加入到集合中......
cachedWrapperClasses.add(clazz);
}
代码语言:javascript复制public T getExtension(String name) {
if (StringUtils.isEmpty(name)) {
throw new IllegalArgumentException("Extension name == null");
}
if ("true".equals(name)) {
return getDefaultExtension();
}
final Holder<Object> holder = getOrCreateHolder(name);
Object instance = holder.get();
if (instance == null) {
synchronized (holder) {
instance = holder.get();
if (instance == null) {
//看这里
instance = createExtension(name);
holder.set(instance);
}
}
}
return (T) instance;
}
代码语言:javascript复制private T createExtension(String name) {
Class<?> clazz = getExtensionClasses().get(name);
if (clazz == null) {
throw findException(name);
}
try {
T instance = (T) EXTENSION_INSTANCES.get(clazz);
if (instance == null) {
EXTENSION_INSTANCES.putIfAbsent(clazz, clazz.newInstance());
instance = (T) EXTENSION_INSTANCES.get(clazz);
}
injectExtension(instance);
Set<Class<?>> wrapperClasses = cachedWrapperClasses;
if (CollectionUtils.isNotEmpty(wrapperClasses)) {
//会遍历刚才的拿包装的集合
for (Class<?> wrapperClass : wrapperClasses) {
//注入......
instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance));
}
}
return instance;
} catch (Throwable t) {
throw new IllegalStateException("Extension instance (name: " name ", class: "
type ") couldn't be instantiated: " t.getMessage(), t);
}
}
上面讲了那么多,想说的意思是dubbo在创建RegistryProtocol时,其实外面还包装了几次,具体包装了几次,需要看它的spi配置文件的所有类的构造函数是否有带参数并且参数类型是Protocol类型,如果是则按顺序包装,后面创建DubboProtocol也是这样,我们看下它的配置文件
我已经看了,后面带wrapper结尾的都有构造函数,并且参数是Protocol,不信去看看
代码语言:javascript复制public QosProtocolWrapper(Protocol protocol) {
if (protocol == null) {
throw new IllegalArgumentException("protocol == null");
}
this.protocol = protocol;
}
综上所示,此时的ReferenceConfig中生成的invoke应该是如下形式:
看它的refer方法
代码语言:javascript复制public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
if (REGISTRY_PROTOCOL.equals(url.getProtocol())) {
//1.通过其他方式操作服务,qos运维相关,其实是开启netty服务,接受请求,感兴趣可自己阅读
startQosServer(url);
//2.交给它里面protocol操作
return protocol.refer(type, url);
}
return protocol.refer(type, url);
}
由于篇幅的问题,startQosServer不解析,接着往下看,会调用ProtocolListenerWrapper#refer()
代码语言:javascript复制public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
if (REGISTRY_PROTOCOL.equals(url.getProtocol())) {
//1.接下来调用的是ProtocolFilterWrapper#refer
return protocol.refer(type, url);
}
return new ListenerInvokerWrapper<T>(protocol.refer(type, url),
Collections.unmodifiableList(
ExtensionLoader.getExtensionLoader(InvokerListener.class)
.getActivateExtension(url, INVOKER_LISTENER_KEY)));
}
接下来调用的是ProtocolFilterWrapper#refer
代码语言:javascript复制public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
//1.registy时走这里
if (REGISTRY_PROTOCOL.equals(url.getProtocol())) {
return protocol.refer(type, url);
}
//2.后面还有dubbo时,就会创建Filter链了,这也是dubbo的特色,后面到这里的再分析
return buildInvokerChain(protocol.refer(type, url), REFERENCE_FILTER_KEY, CommonConstants.CONSUMER);
}
此时调用到RegistryProtocol#refer
代码语言:javascript复制public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
url = URLBuilder.from(url)
.setProtocol(url.getParameter(REGISTRY_KEY, DEFAULT_REGISTRY))
.removeParameter(REGISTRY_KEY)
.build();
Registry registry = registryFactory.getRegistry(url);
if (RegistryService.class.equals(type)) {
return proxyFactory.getInvoker((T) registry, type, url);
}
// group="a,b" or group="*"
Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(REFER_KEY));
String group = qs.get(GROUP_KEY);
if (group != null && group.length() > 0) {
if ((COMMA_SPLIT_PATTERN.split(group)).length > 1 || "*".equals(group)) {
return doRefer(getMergeableCluster(), registry, type, url);
}
}
//1.重点看这里吧
return doRefer(cluster, registry, type, url);
}
代码语言:javascript复制private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
directory.setRegistry(registry);
directory.setProtocol(protocol);
// all attributes of REFER_KEY
Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());
URL subscribeUrl = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters);
if (!ANY_VALUE.equals(url.getServiceInterface()) && url.getParameter(REGISTER_KEY, true)) {
directory.setRegisteredConsumerUrl(getRegisteredConsumerUrl(subscribeUrl, url));
registry.register(directory.getRegisteredConsumerUrl());
}
directory.buildRouterChain(subscribeUrl);
//1.订阅zk上的节点,发生变化时改变
directory.subscribe(subscribeUrl.addParameter(CATEGORY_KEY,
PROVIDERS_CATEGORY "," CONFIGURATORS_CATEGORY "," ROUTERS_CATEGORY));
//2.生成invoker
Invoker invoker = cluster.join(directory);
ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory);
return invoker;
}
1.获取zookeeper上provider节点内容,并且通过netty建立连接
2.创建invoker,调用invoker方法时,里面包含服务的选择…
先看2吧,相对简单,其实就是创建一个invker,这里也是spi,默认就是FailoverCluster
代码语言:javascript复制public class FailoverCluster implements Cluster {
public final static String NAME = "failover";
@Override
public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
return new FailoverClusterInvoker<T>(directory);
}
}
我们接着注释1看,这里引入了RegistryDirectory,其实很好理解,就是维护这些节点的变化
感觉流程还很长,由于后面也不难,我就不一一分析了,后面的部分我下面画个流程图
大概的流程如上图,主要包括两个方便,我稍作解析
1.获取zookeeper节点上的信息,得到provider的地址
2.通过url使用netty建立远程连接(这个流程和服务端bind是对称的,比较简单,包括里面的handler的初始化也是一样,大家可以去看server的初始化)
具体的流程细节大家可根据流程图自行阅读
四.客户端发起调用请求
限于篇幅,调用的流程,我也画一个简单的流程图,然后重点解析流程图里面一些重要的步骤
简单大致流程图如上所述,这里仅仅是发送请求,下面解析下重要的地方
1.Filter链
流程2和流程3我们看到有filter.invoke的调用
这是因为我们在通过Protocol创建Invoker的时候,上面说过,它会在外面包装QosProtocolWrapper,ProtocolListenerWrapper,ProtocolFilterWrapper,而这个filter的逻辑就是在ProtocolFilterWrapper里面的,方便我们在调用的过程中做扩展,比如我想把一些用户的信息传递到服务提供方,方便使用,就可以向下面一样使用,相信大家都见过
这是消费端代码:
代码语言:javascript复制@Activate(group = CommonConstants.CONSUMER)
public class LoginAccountConsumerContextFilter extends ContextFilter {
@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
SysAccountModel accountModel = SessionLocal.getUserDetails();
if (accountModel != null) {
String accountModelJsonString = JSON.toJSONString(accountModel);
invocation.setAttachment(DubboFilterHelper.ACCOUNT_MODEL_FILTER_KEY, accountModelJsonString);
}
return invoker.invoke(invocation);
}
}
这是服务端代码:
代码语言:javascript复制@Activate(group = CommonConstants.PROVIDER)
public class LoginAccountProviderContextFilter extends ContextFilter {
@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
SysAccountModel sysAccountModel = DubboFilterHelper.sysAccountModelFromFilter(invocation);
if (sysAccountModel != null) {
LoginAccountContext.setCurrentLoginAccountModel(sysAccountModel);
Result result = invoker.invoke(invocation);
LoginAccountContext.removeCurrentLoginAccountModel();
return result;
}
return invoker.invoke(invocation);
}
}
可以编写多个filter,他会形成filter链,进行依次调用。另外服务端也有这个filter,我上面忘了分析,大家要注意
2.异步调用
我们知道dubbo是通过netty框架进行网络调用,所以我们调用的时候只需要往channel通道里面写请求,那么当服务端处理完成后,也是往channel中write结果,此时对客户端来说是异步的,他是如何获取这个结果的呢?channel中那么多服务端发来的信息,客户端是如何找到是自己发送请求的相应结果呢?
dubbo 的作法就是每个请求有一个唯一的id,将这个id和future绑定放到map中,并且把id传输到服务端,服务端相应时也携带这个id。这样就能通过id在map中找到future,通过future拿到结果,代码如下:
HeaderExchangeChannel#request
代码语言:javascript复制public CompletableFuture<Object> request(Object request, int timeout) throws RemotingException {
if (closed) {
throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " request ", cause: The channel " this " is closed!");
}
// create request.
Request req = new Request();
req.setVersion(Version.getProtocolVersion());
req.setTwoWay(true);
req.setData(request);
//1.创建future时,放到map中
DefaultFuture future = DefaultFuture.newFuture(channel, req, timeout);
try {
//2.发送请求
channel.send(req);
} catch (RemotingException e) {
future.cancel();
throw e;
}
return future;
}
代码语言:javascript复制public static DefaultFuture newFuture(Channel channel, Request request, int timeout) {
//看构造函数
final DefaultFuture future = new DefaultFuture(channel, request, timeout);
// timeout check
timeoutCheck(future);
return future;
}
代码语言:javascript复制private DefaultFuture(Channel channel, Request request, int timeout) {
this.channel = channel;
this.request = request;
this.id = request.getId();
this.timeout = timeout > 0 ? timeout : channel.getUrl().getPositiveParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT);
// put into waiting map.
//在这里放到map中的,FUTURES是静态的
FUTURES.put(id, this);
CHANNELS.put(id, channel);
}
我们来看客户端接受响应,具体的流程与服务端received流程一样,只是一个类型是Request,一个类型是Response,我们来看HeaderExchangeHandler
代码语言:javascript复制public void received(Channel channel, Object message) throws RemotingException {
channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
final ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
try {
if (message instanceof Request) {
//......
} else if (message instanceof Response) {
//1.接受响应
handleResponse(channel, (Response) message);
} else if (message instanceof String) {
//......
} else {
handler.received(exchangeChannel, message);
}
} finally {
HeaderExchangeChannel.removeChannelIfDisconnected(channel);
}
}
代码语言:javascript复制static void handleResponse(Channel channel, Response response) throws RemotingException {
if (response != null && !response.isHeartbeat()) {
DefaultFuture.received(channel, response);
}
}
DefualtFuture#received()
代码语言:javascript复制public static void received(Channel channel, Response response, boolean timeout) {
try {
//1.通过id,拿到future
DefaultFuture future = FUTURES.remove(response.getId());
if (future != null) {
Timeout t = future.timeoutCheckTask;
if (!timeout) {
// decrease Time
t.cancel();
}
//2.设置操作完成
future.doReceived(response);
} else {
logger.warn("The timeout response finally returned at "
(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()))
", response " response
(channel == null ? "" : ", channel: " channel.getLocalAddress()
" -> " channel.getRemoteAddress()));
}
} finally {
CHANNELS.remove(response.getId());
}
}
这样我们客户端就能够拿到相应的结果了…
3.线程派发
另外客户端也有线程派发和服务端一样…,大家可以看服务端相关的解析
这篇文章,其实包括了dubbo大部分的主干的流程了,希望会对大家有帮助,有时间我会继续写dubbo相关的文章,比如:
dubbo spi
负载均衡
服务暴露
服务引入
…
发布者:全栈程序员栈长,转载请注明出处:https://javaforall.cn/111195.html原文链接:https://javaforall.cn