Dubbo源码之网络通信

2019-12-12 17:54:21 浏览数 (1)

介绍了Dubbo通信流程,跟着源码调试过来的,如果有问题还请各位大佬指出

服务暴露将做哪些事情?

  1. 注册ZK,监听动态配置节点
  2. 开启Server端
  3. 创建代理服务
  4. Exporter -> Invoker -> proxyService

服务引用将做哪些事情?

  1. 注册ZK,监听动态配置节点、providr节点、路由节点
  2. 开启Client端
  3. 创建代理服务
  4. proxyService -> Invoker

客户端请求

代码语言:javascript复制
ConsumerProxyService -> Invoker【DubboInvoker】 -> Exchanger【HeaderExchangeClient】 -> Transporter【NettyClient】 -> 编码 -> SEND-TO-SERVER (创建了DefaultFuture,Request带唯一标识)

服务端响应

代码语言:javascript复制
解码 -> Transporter【NettyServer】-> 系列Handlers -> 线程池 -> Exporter#getInvoker -> Invoker#invoke -> ProviderProxyService -> callback 

Exchanger

Exchangers

门面类,提供各种便捷方法,先通过SPI获取Exchanger,然后调用Exchanger的相关方法创建ExchangeServerExchangeClient

Exchanger

SPI接口,默认实现类HeaderExchanger,提供了两个快捷方法创建ExchangeServerExchangeClient

代码语言:javascript复制
@SPI(HeaderExchanger.NAME)
public interface Exchanger {
    @Adaptive({Constants.EXCHANGER_KEY})
    ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException;

    @Adaptive({Constants.EXCHANGER_KEY})
    ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException;
}

public class HeaderExchanger implements Exchanger {
    public static final String NAME = "header";

    @Override
    public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
        return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
    }

    @Override
    public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
        return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
    }
}

ExchangeServer

Server端使用,默认实现类HeaderExchangeServer,内部调用Transporter开启Server服务

代码语言:javascript复制
public interface ExchangeServer extends Server {
    Collection<ExchangeChannel> getExchangeChannels();

    ExchangeChannel getExchangeChannel(InetSocketAddress remoteAddress);
}

ExchangeClient

Client端使用,默认实现类HeaderExchangeClient,核心request方法,内部调用Transporter发送请求

代码语言:javascript复制
public interface ExchangeClient extends Client, ExchangeChannel {
}

ExchangeChannel

默认实现类 HeaderExchangeChannel,作为HeaderExchangeClient的一个属性

Transporter

Transporters

门面类,提供各种便捷方法,先通过SPI获取Transporter,然后调用Transporter的相关方法创建ServerClient

Transporter

SPI接口,默认实现类NettyTransporter,提供了两个快捷方法创建ServerClient

代码语言:javascript复制
@SPI("netty")
public interface Transporter {
    @Adaptive({Constants.SERVER_KEY, Constants.TRANSPORTER_KEY})
    Server bind(URL url, ChannelHandler handler) throws RemotingException;

    @Adaptive({Constants.CLIENT_KEY, Constants.TRANSPORTER_KEY})
    Client connect(URL url, ChannelHandler handler) throws RemotingException;
}

public class NettyTransporter implements Transporter {
    public static final String NAME = "netty";

    @Override
    public Server bind(URL url, ChannelHandler listener) throws RemotingException {
        return new NettyServer(url, listener);
    }

    @Override
    public Client connect(URL url, ChannelHandler listener) throws RemotingException {
        return new NettyClient(url, listener);
    }
}

Server

Server端使用,默认实现类NettyServer,用于开启Server服务,核心方法doOpen

代码语言:javascript复制
public class NettyServer extends AbstractServer implements Server {
}

Client

Client端使用,默认实现类NettyClient,核心request方法用于发送请求,doOpen用于与服务端建立连接

代码语言:javascript复制
public class NettyClient extends AbstractClient {
}

服务端启动服务

代码语言:javascript复制
DubboProtocol#export =>
DubboProtocol#openServer => 
DubboProtocol#createServer =>
Exchangers#bind => 
NettyServer#doOpen

最终,在NettyServer#doOpen中通过Netty开启了一个Server端

代码语言:javascript复制
DubboProtocol#createServer
    => Exchangers#bind(url, requestHandler)
        => HeaderExchanger#bind(url, requestHandler)
            => return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))))
                
// Transporters#bind 语句可以拆解为 
Transporters#bind
    => NettyTransporter#bind(url, handler)
        => return new NettyServer(url, handler)
            =>  NettyServer#doOpen【NettyServer构造函数中调用了doOpen方法】

NettyServer中的hander属性,最终指向的是new DecodeHandler(new HeaderExchangeHandler(handler))。最终Server端返回HeaderExchangeServer,然后在NettyServer的构造函数中,对handle其实还做了一些封装

代码语言:javascript复制
public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
    // the handler will be warped: MultiMessageHandler->HeartbeatHandler->handler
    super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
}

public class ChannelHandlers {
    private static ChannelHandlers INSTANCE = new ChannelHandlers();

    protected ChannelHandlers() {}

    public static ChannelHandler wrap(ChannelHandler handler, URL url) {
        return ChannelHandlers.getInstance().wrapInternal(handler, url);
    }
    protected static ChannelHandlers getInstance() {
        return INSTANCE;
    }
    static void setTestingChannelHandlers(ChannelHandlers instance) {
        INSTANCE = instance;
    }
    protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {
        return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class)
                .getAdaptiveExtension().dispatch(handler, url)));
    }
}

所以,最终NettyServer中的hander属性指向MultiMessageHandler -> HeartbeatHandler -> AllDispatcher -> DecodeHandler

客户端连接服务

调用链太长了,而且隐藏的非常深,重点省略了一些,在应用启动时为Reference对象生成Invoker时创建的

代码语言:javascript复制
RegistryProtocol#doRefer =>
RegistryDirectory#subscribe =>
RegistryDirectory#toInvokers => 
ProtocolFilterWrapper#refer =>
AbstractProtocol#refer =>
DubboProtocol#protocolBindingRefer =>
DubboProtocol#getClients =>
DubboProtocol#getSharedClient =>
DubboProtocol#buildReferenceCountExchangeClientList =>
DubboProtocol#buildReferenceCountExchangeClient =>
DubboProtocol#initClient =>
Exchangers#connect =>
HeaderExchanger#connect =>
Transporters#connect =>
NettyTransporter#connect =>
NettyClient#<init> =>
NettyClient#doOpen

最终,在NettyClient#doOpen中通过Netty与Server建立连接

代码语言:javascript复制
Exchangers#connect
    => HeaderExchanger#connect(url, handler)
        => return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true)
                
// Transporters#connect 语句可以拆解为 
Transporters#connect
    => NettyTransporter#connect(url, handler)
        => return new NettyClient(url, handler)
            =>  NettyClient#doOpen【NettyClient构造函数中调用了doOpen方法】

NettyClient中的hander属性,最终指向的是new DecodeHandler(new HeaderExchangeHandler(handler))。最终Client端返回HeaderExchangeClient,其中的client属性也对NettyClient做了包装处理

不过在DubboProtocol#buildReferenceCountExchangeClient方法中对HeaderExchangeClient包装了一层,最终Invoker中的Client类型是ReferenceCountExchangeClient

代码语言:javascript复制
private ReferenceCountExchangeClient buildReferenceCountExchangeClient(URL url) {
    ExchangeClient exchangeClient = initClient(url);

    return new ReferenceCountExchangeClient(exchangeClient);
}

ReferenceCountExchangeClientHeaderExchangeClient没什么区别,只不过包装了一层,然后还有一个比较重要的属性referenceCount,用于记录客户端的个数?

客户端发送请求

代码语言:javascript复制
调用方代理类 ->
InvokerInvocationHandler#invoke ->
MockClusterInvoker#invoke ->
AbstractClusterInvoker#invoke【获取LoadBalance】 -> 
FailoverClusterInvoker#doInvoke【处理重试次数】 ->
ProtocolFilterWrapper#invoke【处理Filter链路】 ->
AbstractInvoker#invoke【设置Attachments参数】 ->
DubboInvoker#doInvoke【Exchange交接层】 ->
ReferenceCountExchangeClient#request ->
HeaderExchangeClient#request ->
HeaderExchangeChannel#request【return CompletableFuture】 ->
AbstractPeer#send ->
AbstractClient#send ->
NettyChannel#send ->
Channel#writeAndFlush【发消息给服务端】

DubboInvoker#doInvoke开始与Exchange层交互,核心代码如下

代码语言:javascript复制
protected Result doInvoke(final Invocation invocation) throws Throwable {
    ExchangeClient currentClient;
    if (clients.length == 1) {
        currentClient = clients[0];
    } else {
        currentClient = clients[index.getAndIncrement() % clients.length];
    }
    boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
    // return = false,即oneWay ,可以减少不必要的Future对象创建
    if (isOneway) {
        // send=true,即客户端发送之后再返回,否则直接返回
        boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
        currentClient.send(inv, isSent);
        RpcContext.getContext().setFuture(null);
        return AsyncRpcResult.newDefaultAsyncResult(invocation);
    } else {
        AsyncRpcResult asyncRpcResult = new AsyncRpcResult(inv);
        CompletableFuture<Object> responseFuture = currentClient.request(inv, timeout);
        asyncRpcResult.subscribeTo(responseFuture);
        RpcContext.getContext().setFuture(new FutureAdapter(asyncRpcResult));
        return asyncRpcResult;
    }
}
代码语言:javascript复制
ReferenceCountExchangeClient#request => 
HeaderExchangeClient#request =>  
HeaderExchangeChannel#request
代码语言:javascript复制
// HeaderExchangeChannel.java
public CompletableFuture<Object> request(Object request, int timeout) throws RemotingException {
    if (closed) {
        throw new RemotingException(this.getLocalAddress(), null, "Failed to send request "   request   ", cause: The channel "   this   " is closed!");
    }
    // create request.
    Request req = new Request();
    req.setVersion(Version.getProtocolVersion());
    req.setTwoWay(true);
    req.setData(request);
    DefaultFuture future = DefaultFuture.newFuture(channel, req, timeout);
    try {
        channel.send(req);
    } catch (RemotingException e) {
        future.cancel();
        throw e;
    }
    return future;
}

在这个方法中,有以下几个需要注意的点:

  1. Request构造函数内部,会为Request生成一个递增唯一的ID,用于标识该请求
  2. channel#send调用过程中,涉及到NettyChannel#getOrAddChannel方法的调用,NettyChannel中有一个ConcurrentMap<Channel, NettyChannel> CHANNEL_MAP缓存,用于维护io.netty.channel.ChannelNettyChannel 的关系
  3. channel#send调用过程中,最终会调用到NettyChannel#send方法,该方法真正的将消息发给Server端
  4. 返回的DefaultFuture是一个CompletableFuture
代码语言:javascript复制
// NettyChannel.java
public void send(Object message, boolean sent) throws RemotingException {
    boolean success = true;
    int timeout = 0;
    try {
        // 将消息发给Server
        ChannelFuture future = channel.writeAndFlush(message);
        if (sent) {
            // 如果配置了 send=true 参数,客户端需要等待消息发出之后再返回
            timeout = getUrl().getPositiveParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT);
            success = future.await(timeout);
        }
        Throwable cause = future.cause();
        if (cause != null) {
            throw cause;
        }
    } catch (Throwable e) {
        throw new RemotingException(this, "Failed to send message "   message   " to "   getRemoteAddress()   ", cause: "   e.getMessage(), e);
    }
    if (!success) {
        throw new RemotingException(this, "Failed to send message "   message   " to "   getRemoteAddress()
                  "in timeout("   timeout   "ms) limit");
    }
}

从上面消息发送的流程中,好像没有看到对消息的编码工作,那是因为在Netty客户端初始化的时候,已经设置了编解码器

代码语言:javascript复制
// NettyClient.java 
protected void doOpen() throws Throwable {
    final NettyClientHandler nettyClientHandler = new NettyClientHandler(getUrl(), this);
    bootstrap = new Bootstrap();
    bootstrap.group(nioEventLoopGroup)
            .option(ChannelOption.SO_KEEPALIVE, true)
            .option(ChannelOption.TCP_NODELAY, true)
            .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
            .channel(NioSocketChannel.class);
    if (getConnectTimeout() < 3000) {
        bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000);
    } else {
        bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getConnectTimeout());
    }

    bootstrap.handler(new ChannelInitializer() {
        @Override
        protected void initChannel(Channel ch) throws Exception {
            int heartbeatInterval = UrlUtils.getHeartbeat(getUrl());
            NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this);
            ch.pipeline()
                    .addLast("decoder", adapter.getDecoder())
                    .addLast("encoder", adapter.getEncoder())
                    .addLast("client-idle-handler", new IdleStateHandler(heartbeatInterval, 0, 0, MILLISECONDS))
                    .addLast("handler", nettyClientHandler);
            String socksProxyHost = ConfigUtils.getProperty(SOCKS_PROXY_HOST);
            if(socksProxyHost != null) {
                int socksProxyPort = Integer.parseInt(ConfigUtils.getProperty(SOCKS_PROXY_PORT, DEFAULT_SOCKS_PROXY_PORT));
                Socks5ProxyHandler socks5ProxyHandler = new Socks5ProxyHandler(new InetSocketAddress(socksProxyHost, socksProxyPort));
                ch.pipeline().addFirst(socks5ProxyHandler);
            }
        }
    });
}

先经过编码器,即InternalEncoder#encode方法,InternalEncoder实现了MessageToByteEncoder接口,该方法内部调用了Codec2的相关方法,而Codec2是一个SPI接口,默认实现DubboCodec

代码语言:javascript复制
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
protected static Codec2 getChannelCodec(URL url) {
    String codecName = url.getParameter(Constants.CODEC_KEY, "telnet");
    if (ExtensionLoader.getExtensionLoader(Codec2.class).hasExtension(codecName)) {
        return ExtensionLoader.getExtensionLoader(Codec2.class).getExtension(codecName);
    } else {
        return new CodecAdapter(ExtensionLoader.getExtensionLoader(Codec.class).getExtension(codecName));
    }
}

服务端响应请求

上面提到了NettyServer中的hander属性指向 MultiMessageHandler -> HeartbeatHandler -> AllDispatcher -> DecodeHandler -> HeaderExchangeHandlerNettyServer开启Server端的代码如下

代码语言:javascript复制
protected void doOpen() throws Throwable {
    bootstrap = new ServerBootstrap();
    bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("NettyServerBoss", true));
    workerGroup = new NioEventLoopGroup(getUrl().getPositiveParameter(IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
            new DefaultThreadFactory("NettyServerWorker", true));
    final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
    channels = nettyServerHandler.getChannels();

    bootstrap.group(bossGroup, workerGroup)
            .channel(NioServerSocketChannel.class)
            .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
            .childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
            .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
            .childHandler(new ChannelInitializer<NioSocketChannel>() {
                @Override
                protected void initChannel(NioSocketChannel ch) throws Exception {
                    int idleTimeout = UrlUtils.getIdleTimeout(getUrl());
                    NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
                    ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
                            .addLast("decoder", adapter.getDecoder())
                            .addLast("encoder", adapter.getEncoder())
                            .addLast("server-idle-handler", new IdleStateHandler(0, 0, idleTimeout, MILLISECONDS))
                            .addLast("handler", nettyServerHandler);
                }
            });
    ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
    channelFuture.syncUninterruptibly();
    channel = channelFuture.channel();
}
  1. 先经过解码器,即InternalDecoder#decode方法,InternalDecoder实现了ByteToMessageDecoder接口,该方法内部调用了Codec2的相关方法,而Codec2是一个SPI接口,默认实现DubboCodec
代码语言:javascript复制
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
protected static Codec2 getChannelCodec(URL url) {
    String codecName = url.getParameter(Constants.CODEC_KEY, "telnet");
    if (ExtensionLoader.getExtensionLoader(Codec2.class).hasExtension(codecName)) {
        return ExtensionLoader.getExtensionLoader(Codec2.class).getExtension(codecName);
    } else {
        return new CodecAdapter(ExtensionLoader.getExtensionLoader(Codec.class).getExtension(codecName));
    }
}
  1. MultiMessageHandler用于处理数组消息,如果是消息是MultiMessage类型,MultiMessage实现了Iterable数组,则遍历调用handle的received方法;否则直接调用下一个handle的received方法
  2. AllChannelHandler收到消息,将 channel handler message封装成state为ChannelState.RECEIVED类型的ChannelEventRunnable对象,然后交给线程池执行
  3. ChannelEventRunnable#run方法中判断state为ChannelState.RECEIVED类型,直接执行下一个handler的received方法,即DecodeHandler,这个过程是由线程池执行
  4. DecodeHandler#received方法中,如果消息是Decodeable类型,对整个消息进行解码;如果消息是Request类型,对Request.getData()进行解码;如果消息是Response类型,对Response.getResult()进行解码
  5. HeaderExchangeHandler#received -> HeaderExchangeHandler#handleRequest -> requestHandler#replyrequestHandlerDubboProtocol中的一个属性,ExchangeHandlerAdapter类型
  6. HeaderExchangeHandler#handleRequest中会创建一个Response对象,它的ID属性值,就是Request对象的ID值,这样请求和响应就关联起来了
  7. requestHandler#reply方法中,从exporterMap缓存中获取对应的DubboExporter对象,然后从DubboExporter获取Invoker,最后执行Invoker#invoke方法,然后返回一个CompletableFuture对象
  8. HeaderExchangeHandler#handleRequest方法中接收返回的CompletableFuture对象,对它添加回调处理,在回调中将返回结果封装到Response对象中,然后通过channel将Response发出
代码语言:javascript复制
// ChannelEventRunnable.java
public void run() {
    if (state == ChannelState.RECEIVED) {
        try {
            // RECEIVED 类型,直接执行下一个handle的received方法,即 DecodeHandler
            handler.received(channel, message);
        } catch (Exception e) {}
    } else {
        switch (state) {
        case CONNECTED:
            try {
                handler.connected(channel);
           } catch (Exception e) {}
            break;
        case DISCONNECTED:
            try {
                handler.disconnected(channel);
           } catch (Exception e) {}
            break;
        case SENT:
            try {
                handler.sent(channel, message);
            } catch (Exception e) {}
            break;
        case CAUGHT:
            try {
                handler.caught(channel, exception);
           } catch (Exception e) {}
            break;
        default:
            logger.warn("unknown state: "   state   ", message is "   message);
        }
    }

}
代码语言:javascript复制
InternalDecoder#decode
    => NettyServerHandler#channelRead
        => AbstractPeer#received
            => MultiMessageHandler#received
                => HeartbeatHandler#received
                    => AllChannelHandler#received
                    
                    ------------------ 异步执行,放到线程池 ----------------------
                    => ChannelEventRunnable#run
                        => DecodeHandler#received
                            => DecodeHandler#decode
                                => DecodeableRpcInvocation#decode
                        => HeaderExchangeHandler#received
                            => HeaderExchangeHandler#handleRequest
                                => DubboProtocol.requestHandler#reply
                    ------------------ 异步执行 -----------------------

                                    ----------------扩展点-------------------
                                    => ProtocolFilterWrapper.invoke
                                    => EchoFilter.invoke
                                        => ClassLoaderFilter.invoke
                                        => GenericFilter.invoke
                                            => TraceFilter.invoke
                                            => MonitorFilter.invoke
                                                => TimeoutFilter.invoke
                                                => ExceptionFilter.invoke
                                                    => InvokerWrapper.invoke
                                    -----------------扩展点-------------------
                                                        => AbstractProxyInvoker#invoke
                                                            => JavassistProxyFactory.AbstractProxyInvoker#doInvoke
                                                                => 代理类#invokeMethod
                                                                    => 真正的service方法


                            //把接收处理的结果,数据发回consumer  future#whenComplete                                                              
                            => channel.send(response)
                                => HeaderExchangeChannel
                                    => NettyChannel.send
                                        => NioSocketChannel#writeAndFlush(message)                                                  

服务端发送结果

代码语言:javascript复制
HeaderExchangeChannel#send =>
NettyChannel#send => 
NioSocketChannel#writeAndFlush(message) 

客户端响应结果

在客户端启动的时候,入参handler和服务端的handler是同一个

代码语言:javascript复制
// DubboProtocol#initClient
Exchangers.connect(url, requestHandler);

// HeaderExchanger#connect
public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
    return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
}

Transporters#connect =>
    NettyTransporter#connect
        return NettyClient

NettyClient构造函数中,对handler做了包装

代码语言:javascript复制
ChannelHandlers.wrap(handler, url)

public class ChannelHandlers {
    private static ChannelHandlers INSTANCE = new ChannelHandlers();
    protected ChannelHandlers() {
    }

    public static ChannelHandler wrap(ChannelHandler handler, URL url) {
        return ChannelHandlers.getInstance().wrapInternal(handler, url);
    }
    protected static ChannelHandlers getInstance() {
        return INSTANCE;
    }
    static void setTestingChannelHandlers(ChannelHandlers instance) {
        INSTANCE = instance;
    }
    protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {
        return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class)
                .getAdaptiveExtension().dispatch(handler, url)));
    }
}

所以,最终NettyClient中的handler属性指向 MultiMessageHandler -> HeartbeatHandler -> AllChannelHandler -> DecodeHandler -> HeaderExchangeHandler -> requestHandler ,和服务端处理流程一样一样

  1. 接收消息,经过MultiMessageHandlerHeartbeatHandler 处理,到达 AllDispatcher
  2. AllChannelHandler中将消息封装成new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message)类型,交由线程池执行
  3. 线程池执行任务,经过DecodeHandler到达HeaderExchangeHandler
  4. HeaderExchangeHandler#received -> HeaderExchangeHandler#handleResponse -> DefaultFuture#receivedDefaultFuture中维护了一个请求ID和DefaultFuture的映射关系,Request和Response通过请求ID可以一一对应
代码语言:javascript复制
public static void received(Channel channel, Response response, boolean timeout) {
    try {
        DefaultFuture future = FUTURES.remove(response.getId());
        if (future != null) {
            Timeout t = future.timeoutCheckTask;
            if (!timeout) {
                t.cancel();
            }
            future.doReceived(response);
        } else {
        }
    } finally {
        CHANNELS.remove(response.getId());
    }
}

private void doReceived(Response res) {
    if (res == null) {
        throw new IllegalStateException("response cannot be null");
    }
    if (res.getStatus() == Response.OK) {
        this.complete(res.getResult());
    } else if (res.getStatus() == Response.CLIENT_TIMEOUT || res.getStatus() == Response.SERVER_TIMEOUT) {
        this.completeExceptionally(new TimeoutException(res.getStatus() == Response.SERVER_TIMEOUT, channel, res.getErrorMessage()));
    } else {
        this.completeExceptionally(new RemotingException(channel, res.getErrorMessage()));
    }
}
  1. 通过response.Id获取DefaultFuture
  2. 执行CompletableFuture#complete方法可以让 执行了CompletableFuture#get的用户线程得到响应,获取结果返回。至此整个调用过程完成

同步转异步

可是我们在代码中很多时候都是同步调用,很少自己去调用CompletableFuture#get方法,这一部分逻辑又是怎么处理的。在DubboInvoker#doInvoke方法中,返回的是一个AsyncRpcResult

代码语言:javascript复制
protected Result doInvoke(final Invocation invocation) throws Throwable {
    RpcInvocation inv = (RpcInvocation) invocation;
    final String methodName = RpcUtils.getMethodName(invocation);
    inv.setAttachment(PATH_KEY, getUrl().getPath());
    inv.setAttachment(VERSION_KEY, version);

    ExchangeClient currentClient;
    if (clients.length == 1) {
        currentClient = clients[0];
    } else {
        currentClient = clients[index.getAndIncrement() % clients.length];
    }
    try {
        boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
        int timeout = getUrl().getMethodPositiveParameter(methodName, TIMEOUT_KEY, DEFAULT_TIMEOUT);
        // return = false,即oneWay ,可以减少不必要的Future对象创建
        if (isOneway) {
            boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
            currentClient.send(inv, isSent);
            RpcContext.getContext().setFuture(null);
            return AsyncRpcResult.newDefaultAsyncResult(invocation);
        } else {c
            AsyncRpcResult asyncRpcResult = new AsyncRpcResult(inv);
            CompletableFuture<Object> responseFuture = currentClient.request(inv, timeout);
            // 订阅 responseFuture ,当 responseFuture 完成的之后,执行 asyncRpcResult 的complete方法, 这样用户线程就可以响应了
            asyncRpcResult.subscribeTo(responseFuture);

            RpcContext.getContext().setFuture(new FutureAdapter(asyncRpcResult));
            return asyncRpcResult;
        }
    } catch (TimeoutException e) {
        throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: "   invocation.getMethodName()   ", provider: "   getUrl()   ", cause: "   e.getMessage(), e);
    } catch (RemotingException e) {
        throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: "   invocation.getMethodName()   ", provider: "   getUrl()   ", cause: "   e.getMessage(), e);
    }
}

AsyncToSyncInvoker

AsyncToSyncInvoker#invoke方法中,会判断是同步调用还是异步调用,如果是同步调用,将调用AsyncRpcResult#get方法阻塞用户线程,以达到同步效果

代码语言:javascript复制
public Result invoke(Invocation invocation) throws RpcException {
    Result asyncResult = invoker.invoke(invocation);
    try {
        if (InvokeMode.SYNC == ((RpcInvocation) invocation).getInvokeMode()) {
            // 如果是同步调用,调用 asyncResult#get 阻塞用户线程
            asyncResult.get(Integer.MAX_VALUE, TimeUnit.MILLISECONDS);
        }
    } catch (InterruptedException e) {
        throw new RpcException("Interrupted unexpectedly while waiting for remoting result to return!  method: "   invocation.getMethodName()   ", provider: "   getUrl()   ", cause: "   e.getMessage(), e);
    } catch (ExecutionException e) {
        Throwable t = e.getCause();
        if (t instanceof TimeoutException) {
            throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: "   invocation.getMethodName()   ", provider: "   getUrl()   ", cause: "   e.getMessage(), e);
        } else if (t instanceof RemotingException) {
            throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: "   invocation.getMethodName()   ", provider: "   getUrl()   ", cause: "   e.getMessage(), e);
        }
    } catch (Throwable e) {
        throw new RpcException(e.getMessage(), e);
    }
    return asyncResult;
}

0 人点赞