Dubbo系列四之dubbo协议数据收发细节

2024-08-09 11:33:31 浏览数 (2)

1 Consumer编码

由之前的文章可知,Consumer最终在生成DubboInvoker时,会生成对应的客户端连接,如下

代码语言:javascript复制
public <T> Invoker<T> protocolBindingRefer(Class<T> serviceType, URL url) throws RpcException {
    checkDestroyed();
    optimizeSerialization(url);

    // create rpc invoker.
    DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
    invokers.add(invoker);

    return invoker;
}

看到通过getClients(url)生成客户端,最终通过Transporters的connnect方法获取连接,如下

代码语言:javascript复制
public static Client connect(URL url, ChannelHandler... handlers) throws RemotingException {
    if (url == null) {
        throw new IllegalArgumentException("url == null");
    }
    ChannelHandler handler;
    if (handlers == null || handlers.length == 0) {
        handler = new ChannelHandlerAdapter();
    } else if (handlers.length == 1) {
        handler = handlers[0];
    } else {
        handler = new ChannelHandlerDispatcher(handlers);
    }
    return getTransporter(url).connect(url, handler);
}

public static Transporter getTransporter(URL url) {
    return url.getOrDefaultFrameworkModel().getExtensionLoader(Transporter.class).getAdaptiveExtension();
}

可以看出同样首先是获取适配类,通过适配类的connect方法获取连接,生成的适配类源码如下

代码语言:javascript复制
public class Transporter$Adaptive implements org.apache.dubbo.remoting.Transporter {

    public org.apache.dubbo.remoting.Client connect(org.apache.dubbo.common.URL arg0, org.apache.dubbo.remoting.ChannelHandler arg1) throws org.apache.dubbo.remoting.RemotingException {
        if (arg0 == null) throw new IllegalArgumentException("url == null");
        org.apache.dubbo.common.URL url = arg0;
        String extName = url.getParameter("client", url.getParameter("transporter", "netty"));
        if (extName == null)
            throw new IllegalStateException("Failed to get extension (org.apache.dubbo.remoting.Transporter) name from url ("   url.toString()   ") use keys([client, transporter])");
        ScopeModel scopeModel = ScopeModelUtil.getOrDefault(url.getScopeModel(), org.apache.dubbo.remoting.Transporter.class);
        org.apache.dubbo.remoting.Transporter extension = (org.apache.dubbo.remoting.Transporter) scopeModel.getExtensionLoader(org.apache.dubbo.remoting.Transporter.class).getExtension(extName);
        return extension.connect(arg0, arg1);
    }

    public org.apache.dubbo.remoting.RemotingServer bind(org.apache.dubbo.common.URL arg0, org.apache.dubbo.remoting.ChannelHandler arg1) throws org.apache.dubbo.remoting.RemotingException {
        if (arg0 == null) throw new IllegalArgumentException("url == null");
        org.apache.dubbo.common.URL url = arg0;
        String extName = url.getParameter("server", url.getParameter("transporter", "netty"));
        if (extName == null)
            throw new IllegalStateException("Failed to get extension (org.apache.dubbo.remoting.Transporter) name from url ("   url.toString()   ") use keys([server, transporter])");
        ScopeModel scopeModel = ScopeModelUtil.getOrDefault(url.getScopeModel(), org.apache.dubbo.remoting.Transporter.class);
        org.apache.dubbo.remoting.Transporter extension = (org.apache.dubbo.remoting.Transporter) scopeModel.getExtensionLoader(org.apache.dubbo.remoting.Transporter.class).getExtension(extName);
        return extension.bind(arg0, arg1);
    }
}

如上,在其connect方法中获取对应的Extension,由于设置的是netty,所有这里获取到的NettyTransporter ,最后在NettyTransporter中通过connect方法新建了一个NettyClient

代码语言:javascript复制
public Client connect(URL url, ChannelHandler handler) throws RemotingException {
    return new NettyClient(url, handler);
}

在NettyClient的构造器中什么也没干,直接调用父类AbstractClient的构造器方法,如下,核心方法调用了父类AbstractEndpoint的构造器方法,doOpen以及doConnect,下面依次来来看看

代码语言:javascript复制
public AbstractClient(URL url, ChannelHandler handler) throws RemotingException {
    super(url, handler);
    ......
    try {
        doOpen();
    } catch (Throwable t) {
        ......
    }

    try {
        // connect.
        connect();
        ......
    } catch (RemotingException t) {
        ......
    } catch (Throwable t) {
       ......
    }
}

super(url, handler),主要是通过父类AbstractEndpoint的getChannelCodec方法获取对应的编解码器,如下

代码语言:javascript复制
public AbstractEndpoint(URL url, ChannelHandler handler) {
    super(url, handler);
    this.codec = getChannelCodec(url);
    this.connectTimeout = url.getPositiveParameter(Constants.CONNECT_TIMEOUT_KEY, Constants.DEFAULT_CONNECT_TIMEOUT);
}

protected static Codec2 getChannelCodec(URL url) {
    String codecName = url.getParameter(Constants.CODEC_KEY);
    if (StringUtils.isEmpty(codecName)) {
        // codec extension name must stay the same with protocol name
        codecName = url.getProtocol();
    }
    FrameworkModel frameworkModel = getFrameworkModel(url.getScopeModel());
    if (frameworkModel.getExtensionLoader(Codec2.class).hasExtension(codecName)) {
        return frameworkModel.getExtensionLoader(Codec2.class).getExtension(codecName);
    } else if (frameworkModel.getExtensionLoader(Codec.class).hasExtension(codecName)) {
        return new CodecAdapter(frameworkModel.getExtensionLoader(Codec.class)
            .getExtension(codecName));
    } else {
        return frameworkModel.getExtensionLoader(Codec2.class).getExtension("default");
    }
}

由于默认是dubbo协议,所以这里获取到的是DubboCountCodec,如下

代码语言:javascript复制
public final class DubboCountCodec implements Codec2 {

    private final DubboCodec codec;

    public DubboCountCodec(FrameworkModel frameworkModel) {
        codec = new DubboCodec(frameworkModel);
    }

    @Override
    public void encode(Channel channel, ChannelBuffer buffer, Object msg) throws IOException {
        if (msg instanceof MultiMessage) {
            MultiMessage multiMessage = (MultiMessage) msg;
            for (Object singleMessage : multiMessage) {
                codec.encode(channel, buffer, singleMessage);
            }
        } else {
            codec.encode(channel, buffer, msg);
        }
    }

    @Override
    public Object decode(Channel channel, ChannelBuffer buffer) throws IOException {
        int save = buffer.readerIndex();
        MultiMessage result = MultiMessage.create();
        do {
            Object obj = codec.decode(channel, buffer);
            if (Codec2.DecodeResult.NEED_MORE_INPUT == obj) {
                buffer.readerIndex(save);
                break;
            } else {
                result.addMessage(obj);
                logMessageLength(obj, buffer.readerIndex() - save);
                save = buffer.readerIndex();
            }
        } while (true);
        if (result.isEmpty()) {
            return Codec2.DecodeResult.NEED_MORE_INPUT;
        }
        if (result.size() == 1) {
            return result.get(0);
        }
        return result;
    }
}

这里获取的DubboCountCodec也不是最终的编解码器,从上面的代码中可以看到最终是通过DubboCodec来处理编解码

doOpen(),主要是在获取到编解码器之后再初始化客户端的Bootstrap

代码语言:javascript复制
protected void doOpen() throws Throwable {
    final NettyClientHandler nettyClientHandler = createNettyClientHandler();
    bootstrap = new Bootstrap();
    initBootstrap(nettyClientHandler);
}
protected void initBootstrap(NettyClientHandler nettyClientHandler) {
    bootstrap.group(EVENT_LOOP_GROUP.get())
        .option(ChannelOption.SO_KEEPALIVE, true)
        .option(ChannelOption.TCP_NODELAY, true)
        .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
        //.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getTimeout())
        .channel(socketChannelClass());

    bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, Math.max(DEFAULT_CONNECT_TIMEOUT, getConnectTimeout()));
    SslContext sslContext = SslContexts.buildClientSslContext(getUrl());
    bootstrap.handler(new ChannelInitializer<SocketChannel>() {

        @Override
        protected void initChannel(SocketChannel ch) throws Exception {
            int heartbeatInterval = UrlUtils.getHeartbeat(getUrl());

            if (sslContext != null) {
                ch.pipeline().addLast("negotiation", new SslClientTlsHandler(sslContext));
            }

            NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this);
            ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
                .addLast("decoder", adapter.getDecoder())
                .addLast("encoder", adapter.getEncoder())
                .addLast("client-idle-handler", new IdleStateHandler(heartbeatInterval, 0, 0, MILLISECONDS))
                .addLast("handler", nettyClientHandler);

            String socksProxyHost = ConfigurationUtils.getProperty(getUrl().getOrDefaultApplicationModel(), SOCKS_PROXY_HOST);
            if (socksProxyHost != null && !isFilteredAddress(getUrl().getHost())) {
                int socksProxyPort = Integer.parseInt(ConfigurationUtils.getProperty(getUrl().getOrDefaultApplicationModel(), SOCKS_PROXY_PORT, DEFAULT_SOCKS_PROXY_PORT));
                Socks5ProxyHandler socks5ProxyHandler = new Socks5ProxyHandler(new InetSocketAddress(socksProxyHost, socksProxyPort));
                ch.pipeline().addFirst(socks5ProxyHandler);
            }
        }
    });
}

可以看到在channel的pipeline中,如果没有ssl的话,那第一步就是添加编解码器,这里添加的时候又通过NettyCodecAdapter封装成继承了Netty编解码类的codec,分别是InternalEncoder和InternalDecoder

代码语言:javascript复制
final public class NettyCodecAdapter {

    private final ChannelHandler encoder = new InternalEncoder();

    private final ChannelHandler decoder = new InternalDecoder();

    private final Codec2 codec;

    private final URL url;

    private final org.apache.dubbo.remoting.ChannelHandler handler;

    public NettyCodecAdapter(Codec2 codec, URL url, org.apache.dubbo.remoting.ChannelHandler handler) {
        this.codec = codec;
        this.url = url;
        this.handler = handler;
    }

    public ChannelHandler getEncoder() {
        return encoder;
    }

    public ChannelHandler getDecoder() {
        return decoder;
    }

    private class InternalEncoder extends MessageToByteEncoder {

        @Override
        protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception {
            boolean encoded = false;
            if (msg instanceof ByteBuf) {
                out.writeBytes(((ByteBuf) msg));
                encoded = true;
            } else if (msg instanceof MultiMessage) {
                for (Object singleMessage : ((MultiMessage) msg)) {
                    if (singleMessage instanceof ByteBuf) {
                        ByteBuf buf = (ByteBuf) singleMessage;
                        out.writeBytes(buf);
                        encoded = true;
                        buf.release();
                    }
                }
            }

            if (!encoded) {
                ChannelBuffer buffer = new NettyBackedChannelBuffer(out);
                Channel ch = ctx.channel();
                NettyChannel channel = NettyChannel.getOrAddChannel(ch, url, handler);
                codec.encode(channel, buffer, msg);
            }
        }
    }

    private class InternalDecoder extends ByteToMessageDecoder {

        @Override
        protected void decode(ChannelHandlerContext ctx, ByteBuf input, List<Object> out) throws Exception {

            ChannelBuffer message = new NettyBackedChannelBuffer(input);

            NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);

            // decode object.
            do {
                int saveReaderIndex = message.readerIndex();
                Object msg = codec.decode(channel, message);
                if (msg == Codec2.DecodeResult.NEED_MORE_INPUT) {
                    message.readerIndex(saveReaderIndex);
                    break;
                } else {
                    //is it possible to go here ?
                    if (saveReaderIndex == message.readerIndex()) {
                        throw new IOException("Decode without read data.");
                    }
                    if (msg != null) {
                        out.add(msg);
                    }
                }
            } while (message.readable());
        }
    }
}

InternalEncoder和InternalDecoder分别继承了Netty的MessageToByteEncoder和ByteToMessageDecoder

最后再来看看connect(),最终是通过调用NettyClient的doConnect连新建连接,详细步骤下面代码中有注释

代码语言:javascript复制
private void doConnect(InetSocketAddress serverAddress) throws RemotingException {
    long start = System.currentTimeMillis();
    ChannelFuture future = bootstrap.connect(serverAddress);
    try {
        // 根据链接超时时间等待对应的时间
        boolean ret = future.awaitUninterruptibly(getConnectTimeout(), MILLISECONDS);

        if (ret && future.isSuccess()) {
            // 获取到新建立连接的Channel
            Channel newChannel = future.channel();
            try {
                // 如果存在老的Channel,则将老的channel关掉
                // 并且从NettyChannel的ChannelMap中删除
                Channel oldChannel = NettyClient.this.channel;
                if (oldChannel != null) {
                    try {
                        if (logger.isInfoEnabled()) {
                            logger.info("Close old netty channel "   oldChannel   " on create new netty channel "   newChannel);
                        }
                        oldChannel.close();
                    } finally {
                        NettyChannel.removeChannelIfDisconnected(oldChannel);
                    }
                }
            } finally {
                // 如果当前client是关闭状态,那么需要关闭新建立的链接
                // 并且从NettyChannel的ChannelMap中删除
                if (NettyClient.this.isClosed()) {
                    try {
                        if (logger.isInfoEnabled()) {
                            logger.info("Close new netty channel "   newChannel   ", because the client closed.");
                        }
                        newChannel.close();
                    } finally {
                        NettyClient.this.channel = null;
                        NettyChannel.removeChannelIfDisconnected(newChannel);
                    }
                } else {
                    NettyClient.this.channel = newChannel;
                }
            }
        } else if (future.cause() != null) {

            Throwable cause = future.cause();

            // 6-1 Failed to connect to provider server by other reason.

            RemotingException remotingException = new RemotingException(this, "client(url: "   getUrl()   ") failed to connect to server "
                  serverAddress   ", error message is:"   cause.getMessage(), cause);

            logger.error(TRANSPORT_FAILED_CONNECT_PROVIDER, "network disconnected", "",
                "Failed to connect to provider server by other reason.", cause);

            throw remotingException;

        } else {

            // 6-2 Client-side timeout

            RemotingException remotingException = new RemotingException(this, "client(url: "   getUrl()   ") failed to connect to server "
                  serverAddress   " client-side timeout "
                  getConnectTimeout()   "ms (elapsed: "   (System.currentTimeMillis() - start)   "ms) from netty client "
                  NetUtils.getLocalHost()   " using dubbo version "   Version.getVersion());

            logger.error(TRANSPORT_CLIENT_CONNECT_TIMEOUT, "provider crash", "",
                "Client-side timeout.", remotingException);

            throw remotingException;
        }
    } finally {
        // 判断下链接是否建立,这里判断了之后啥也没做
        if (!isConnected()) {
            //future.cancel(true);
        }
    }
}

再来看看上面代码最后一步isConnected方法干了啥

代码语言:javascript复制
public boolean isConnected() {
    // 获取channel,返回isConnected,这里获取的channel是一个NettyChannel实例,对原生的channel做了
    // 一层封装
    Channel channel = getChannel();
    if (channel == null) {
        return false;
    }
    return channel.isConnected();
}

在看看getChannel()方法

代码语言:javascript复制
protected org.apache.dubbo.remoting.Channel getChannel() {
    Channel c = channel;
    if (c == null) {
        return null;
    }
    // 这里其实就是新建了一个NettyChannel,并且将原生的channel作为Key,
    // NettyChannel为value放在了全局的ChannelMap中
    return NettyChannel.getOrAddChannel(c, getUrl(), this);
}

新建NettyChannel的代码如下,在新建时设置了当前协议的编解码器和writequeue

代码语言:javascript复制
private NettyChannel(Channel channel, URL url, ChannelHandler handler) {
    super(url, handler);
    if (channel == null) {
        throw new IllegalArgumentException("netty channel == null;");
    }
    this.channel = channel;
    this.writeQueue = Netty4BatchWriteQueue.createWriteQueue(channel);
    this.codec = getChannelCodec(url);
    this.encodeInIOThread = getUrl().getParameter(ENCODE_IN_IO_THREAD_KEY, DEFAULT_ENCODE_IN_IO_THREAD);
}

这里先提出一个问题,在上文中channel的pipeline初始化时已经添加了编解码器,为啥封装的NettyChannel还要设置编解码器?下面就来看看Consumer端是如何将请求发出去的,这里紧接着Dubbo系列三这篇文章中Consumer发送请求结尾处发送请求走到HeaderExchangeChannel.request方法,在这个方法中调用了NettyClient的send方法,如下

代码语言:javascript复制
public void send(Object message, boolean sent) throws RemotingException {
    if (needReconnect && !isConnected()) {
        connect();
    }
    Channel channel = getChannel();
    //TODO Can the value returned by getChannel() be null? need improvement.
    if (channel == null || !channel.isConnected()) {
        throw new RemotingException(this, "message can not send, because channel is closed . url:"   getUrl());
    }
    channel.send(message, sent);
}

这里先通过getChannel()方法获取到Channel,这里的Channel就是刚才所封装的NettyChannel,再通过其send方法将数据发送出去,如下

代码语言:javascript复制
public void send(Object message, boolean sent) throws RemotingException {
    // 先调用父类的send方法,啥也没干,就打了一行日志
    super.send(message, sent);

    boolean success = true;
    int timeout = 0;
    try {
        Object outputMessage = message;
        // 先判断是不是需要在IO线程中进行编码,如果不是,那么就在当前的业务线程中编码
        // 这里解释下刚才的问题,之所以要在NettyChannel设置编解码器,就是为了提前编码,
        // 这样在Netty的IO线程中就不需要再编码了,直接发送数据
        if (!encodeInIOThread) {
            ByteBuf buf = channel.alloc().buffer();
            ChannelBuffer buffer = new NettyBackedChannelBuffer(buf);
            codec.encode(this, buffer, message);
            outputMessage = buf;
        }
        // 然后将要发送的数据添加到writeQueue中,一个channel对应一个writeQueue,所以dubbo也不是直接
        // 就把数据发出去,而是先添加到writeQueue中,方便后续批量发送的逻辑处理
        // 这里同时添加了一个监听器,当数据发送后(成功或者失败)进行回调,代码跟进去其实啥也没干,是个空实现
        ChannelFuture future = writeQueue.enqueue(outputMessage).addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                if (!(message instanceof Request)) {
                    return;
                }
                ChannelHandler handler = getChannelHandler();
                if (future.isSuccess()) {
                    handler.sent(NettyChannel.this, message);
                } else {
                    Throwable t = future.cause();
                    if (t == null) {
                        return;
                    }
                    Response response = buildErrorResponse((Request) message, t);
                    handler.received(NettyChannel.this, response);
                }
            }
        });

        if (sent) {
            // wait timeout ms
            timeout = getUrl().getPositiveParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT);
            success = future.await(timeout);
        }
        Throwable cause = future.cause();
        if (cause != null) {
            throw cause;
        }
    } catch (Throwable e) {
        removeChannelIfDisconnected(channel);
        throw new RemotingException(this, "Failed to send message "   PayloadDropper.getRequestWithoutData(message)   " to "   getRemoteAddress()   ", cause: "   e.getMessage(), e);
    }
    if (!success) {
        throw new RemotingException(this, "Failed to send message "   PayloadDropper.getRequestWithoutData(message)   " to "   getRemoteAddress()
              "in timeout("   timeout   "ms) limit");
    }
}

再来看看writeQueue.enqueue(outputMessage)方法,直接通过channel.newPromise()返回了一个Promise,用于监听

代码语言:javascript复制
public ChannelFuture enqueue(Object message) {
    return enqueue(message, channel.newPromise());
}

public ChannelFuture enqueue(Object message, ChannelPromise channelPromise) {
    MessageTuple messageTuple = new MessageTuple(message, channelPromise);
    super.enqueue(messageTuple, eventLoop);
    return messageTuple.channelPromise;
}

以上代码中又调用父类的super.enqueue(messageTuple, eventLoop)方法,如下

代码语言:javascript复制
public void enqueue(T message, Executor executor) {
    queue.add(message);
    scheduleFlush(executor);
}

可以看到是先将待发送的数据加入到一个队列当中,然后再调用scheduleFlush(executor)处理,这里的入参executor是一个netty的eventloop,eventloop不仅可以作为netty的IO线程用,也可以作为普通的线程使用来提交异步任务,和普通线程池使用没啥区别,看下scheduleFlush(executor)方法的逻辑

代码语言:javascript复制
// 向eventloop中提交了一个异步任务,任务为run方法的逻辑
protected void scheduleFlush(Executor executor) {
    if (scheduled.compareAndSet(false, true)) {
        executor.execute(() -> this.run(executor));
    }
}
// 一个item其实就是一个MessageTuple,MessageTuple包含了要发送的数据以及对应的Promise
// 以下代码的逻辑就是依次从queue中获取待发送的数据,如果数据只有一条直接调用flush将数据发送出去
// 如果数据不止一条就添加到MultiMessage中,同时将数据的promise也添加到MultiMessage中
// 如果添加的数据条数达到限制(默认是128条),就将数据批量发送出去
private void run(Executor executor) {
    try {
        Queue<T> snapshot = new LinkedList<>();
        T item;
        while ((item = queue.poll()) != null) {
            snapshot.add(item);
        }
        int i = 0;
        boolean flushedOnce = false;
        while ((item = snapshot.poll()) != null) {
            if (snapshot.size() == 0) {
                flushedOnce = false;
                break;
            }
            if (i == chunkSize) {
                i = 0;
                flush(item);
                flushedOnce = true;
            } else {
                prepare(item);
                i  ;
            }
        }
        if (!flushedOnce && item != null) {
            flush(item);
        }
    } finally {
        scheduled.set(false);
        if (!queue.isEmpty()) {
            scheduleFlush(executor);
        }
    }
}

再来看看flush(item)的逻辑

代码语言:javascript复制
protected void flush(MessageTuple item) {
    prepare(item);
    Object finalMessage = multiMessage;
    if (multiMessage.size() == 1) {
        finalMessage = multiMessage.get(0);
    }
    // 这里就真正调用netty的channel的writeAndFlush方法将数据发送出去,同时添加了一个Listener
    // 将发送结果设置到所有已发送数据的promise当中
    channel.writeAndFlush(finalMessage).addListener(new ChannelFutureListener() {
        @Override
        public void operationComplete(ChannelFuture future) throws Exception {
            ChannelPromise cp;
            while ((cp = promises.poll()) != null) {
                if (future.isSuccess()){
                    cp.setSuccess();
                } else {
                    cp.setFailure(future.cause());
                }
            }
        }
    });
    this.multiMessage.removeMessages();
}

数据发送出去以后就走到InternalEncoder的encode方法,由于我设置的是在IO线程中编解码,所以这里会对数据进行编码

代码语言:javascript复制
protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception {
    boolean encoded = false;
    // 如果数据是单条ByteBuf,直接写入out即可
    if (msg instanceof ByteBuf) {
        // 这里的ByteBuf为什么不需要手动release,因为在父类MessageToByteEncoder的write方法会release
        out.writeBytes(((ByteBuf) msg));
        encoded = true;
    } else if (msg instanceof MultiMessage) {
        // 如果数据是MultiMessage,则遍历将数据写入out
        for (Object singleMessage : ((MultiMessage) msg)) {
            if (singleMessage instanceof ByteBuf) {
                ByteBuf buf = (ByteBuf) singleMessage;
                out.writeBytes(buf);
                encoded = true;
                // 为啥手动调了一次release,因为如果是MultiMessage并且是ByteBuf,说明在业务线程已经编码
                // 过了,所以这里的buf是通过channel.alloc().buffer()手动新建出来的,这里就要手动release
                buf.release();
            }
        }
    }
    // 如果没有编码过,这里就先编码再写数据
    if (!encoded) {
        ChannelBuffer buffer = new NettyBackedChannelBuffer(out);
        Channel ch = ctx.channel();
        NettyChannel channel = NettyChannel.getOrAddChannel(ch, url, handler);
        codec.encode(channel, buffer, msg);
    }
}

接着看下codec.encode(channel, buffer, msg)方法,由于本地只发了一次请求,所以最终会走到codec的encodeRequest方法,如下

代码语言:javascript复制
protected void encodeRequest(Channel channel, ChannelBuffer buffer, Request req) throws IOException {
    // 获取序列化方式,默认是fastjson2
    Serialization serialization = getSerialization(channel, req);
    // header length,16 byte
    byte[] header = new byte[HEADER_LENGTH];
    // header写入magic number,2 byte
    Bytes.short2bytes(MAGIC, header);
    // header设置request和序列化方式标志,1 byte
    header[2] = (byte) (FLAG_REQUEST | serialization.getContentTypeId());
    if (req.isTwoWay()) {
        header[2] |= FLAG_TWOWAY;
    }
    if (req.isEvent()) {
        header[2] |= FLAG_EVENT;
    }
    // header设置requestId,8 byte
    Bytes.long2bytes(req.getId(), header, 4);
    // 设置ByteBuf的writerIndex为当前的writerIndex HEADER_LENGTH,主要为了保证后续的数据写入从
    // header后面开始写,给header 的写入留下足够的空间
    int savedWriteIndex = buffer.writerIndex();
    buffer.writerIndex(savedWriteIndex   HEADER_LENGTH);
    // 通过ChannelBufferOutputStream编码数据,并且将数据写入buffer
    ChannelBufferOutputStream bos = new ChannelBufferOutputStream(buffer);
    if (req.isHeartbeat()) {
        // heartbeat request data is always null
        bos.write(CodecSupport.getNullBytesOf(serialization));
    } else {
        ObjectOutput out = serialization.serialize(channel.getUrl(), bos);
        if (req.isEvent()) {
            encodeEventData(channel, out, req.getData());
        } else {
            encodeRequestData(channel, out, req.getData(), req.getVersion());
        }
        out.flushBuffer();
        if (out instanceof Cleanable) {
            ((Cleanable) out).cleanup();
        }
    }
    // 这两个方法啥也没做
    bos.flush();
    bos.close();
    // header写入数据的长度,4 byte,所以这里Header其实一共只用了15 byte,还有一个byte空着的
    int len = bos.writtenBytes();
    checkPayload(channel, req.getPayload(), len);
    Bytes.int2bytes(len, header, 12);

    // 首先重置writerIndex为header之前的index,然后再写入header,最后设置
    // buffer的writerIndex为savedWriteIndex HEADER_LENGTH 数据的长度
    buffer.writerIndex(savedWriteIndex);
    buffer.writeBytes(header); // write header.
    buffer.writerIndex(savedWriteIndex   HEADER_LENGTH   len);
}

从上面可以看出dubbo协议的设计还是比较简单的,header的前4个字节存储魔数,request和序列化标记等(其实只用了三个字节,留了一个字节备用),中间8个字节保存RequestId或者ResponseId,最后4个字节保存数据包的长度,然后再跟上真实的数据字节

2 Provider解码

provider接收到请求后第一步就是解码,看看InternalDecoder

代码语言:javascript复制
private class InternalDecoder extends ByteToMessageDecoder {

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf input, List<Object> out) throws Exception {
        ChannelBuffer message = new NettyBackedChannelBuffer(input);
        NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
        // decode object.
        do {
            int saveReaderIndex = message.readerIndex();
            Object msg = codec.decode(channel, message);
            if (msg == Codec2.DecodeResult.NEED_MORE_INPUT) {
                message.readerIndex(saveReaderIndex);
                break;
            } else {
                //is it possible to go here ?
                if (saveReaderIndex == message.readerIndex()) {
                    throw new IOException("Decode without read data.");
                }
                if (msg != null) {
                    out.add(msg);
                }
            }
        } while (message.readable());
    }
}

decode的逻辑比较简单,就是当Bytebuf可读时循环调用codec.decode(channel, message)解析Bytebuf,如果数据不够一个完整的数据包就退出循环,下面看下codec.decode(channel, message)方法

代码语言:javascript复制
public Object decode(Channel channel, ChannelBuffer buffer) throws IOException {
    int save = buffer.readerIndex();
    MultiMessage result = MultiMessage.create();
    do {
        Object obj = codec.decode(channel, buffer);
        if (Codec2.DecodeResult.NEED_MORE_INPUT == obj) {
            buffer.readerIndex(save);
            break;
        } else {
            result.addMessage(obj);
            logMessageLength(obj, buffer.readerIndex() - save);
            save = buffer.readerIndex();
        }
    } while (true);
    if (result.isEmpty()) {
        return Codec2.DecodeResult.NEED_MORE_INPUT;
    }
    if (result.size() == 1) {
        return result.get(0);
    }
    return result;
}

同样也是循环解析,如果有多条消息,则封装成一个MultiMessage返回,再往下看看真正decode的代码

代码语言:javascript复制
protected Object decode(Channel channel, ChannelBuffer buffer, int readable, byte[] header) throws IOException {
    // 如果header不是以魔数开头,需要调用父类的decode方法处理
    // 主要是为了处理一些特殊的命令行
    if (readable > 0 && header[0] != MAGIC_HIGH
        || readable > 1 && header[1] != MAGIC_LOW) {
        int length = header.length;
        if (header.length < readable) {
            header = Bytes.copyOf(header, readable);
            buffer.readBytes(header, length, readable - length);
        }
        for (int i = 1; i < header.length - 1; i  ) {
            if (header[i] == MAGIC_HIGH && header[i   1] == MAGIC_LOW) {
                buffer.readerIndex(buffer.readerIndex() - header.length   i);
                header = Bytes.copyOf(header, i);
                break;
            }
        }
        return super.decode(channel, buffer, readable, header);
    }
    // 如果可读数小于header的长度,返回需要更多数据
    if (readable < HEADER_LENGTH) {
        return DecodeResult.NEED_MORE_INPUT;
    }

    // get data length.
    int len = Bytes.bytes2int(header, 12);

    // 如果收到的是响应且数据包大小超过限制,则直接构造一个Response返回,并设置错误信息
    Object obj = finishRespWhenOverPayload(channel, len, header);
    if (null != obj) {
        return obj;
    }
    // 如果数据包长度不够,同样返回需要更多数据
    int tt = len   HEADER_LENGTH;
    if (readable < tt) {
        return DecodeResult.NEED_MORE_INPUT;
    }

    // limit input stream.
    ChannelBufferInputStream is = new ChannelBufferInputStream(buffer, len);

    try {
        // 解析数据包
        return decodeBody(channel, is, header);
    } finally {
        if (is.available() > 0) {
            try {
                if (logger.isWarnEnabled()) {
                    logger.warn(TRANSPORT_SKIP_UNUSED_STREAM, "", "", "Skip input stream "   is.available());
                }
                StreamUtils.skipUnusedStream(is);
            } catch (IOException e) {
                logger.warn(TRANSPORT_SKIP_UNUSED_STREAM, "", "", e.getMessage(), e);
            }
        }
    }
}

// decodeBody其实就是根据数据类型解析,如果是请求就构造一个Request,按照Request的格式解析数据包
// Reponse和Event一样的逻辑
protected Object decodeBody(Channel channel, InputStream is, byte[] header) throws IOException {
    byte flag = header[2], proto = (byte) (flag & SERIALIZATION_MASK);
    // get request id.
    long id = Bytes.bytes2long(header, 4);
    if ((flag & FLAG_REQUEST) == 0) {
        // decode response.
        Response res = new Response(id);
        if ((flag & FLAG_EVENT) != 0) {
            res.setEvent(true);
        }
        // get status.
        byte status = header[3];
        res.setStatus(status);
        try {
            if (status == Response.OK) {
                Object data;
                if (res.isEvent()) {
                    byte[] eventPayload = CodecSupport.getPayload(is);
                    if (CodecSupport.isHeartBeat(eventPayload, proto)) {
                        // heart beat response data is always null;
                        data = null;
                    } else {
                        data = decodeEventData(channel, CodecSupport.deserialize(channel.getUrl(), new ByteArrayInputStream(eventPayload), proto), eventPayload);
                    }
                } else {
                    data = decodeResponseData(channel, CodecSupport.deserialize(channel.getUrl(), is, proto), getRequestData(channel, res, id));
                }
                res.setResult(data);
            } else {
                res.setErrorMessage(CodecSupport.deserialize(channel.getUrl(), is, proto).readUTF());
            }
        } catch (Throwable t) {
            res.setStatus(Response.CLIENT_ERROR);
            res.setErrorMessage(StringUtils.toString(t));
        }
        return res;
    } else {
        // decode request.
        Request req;
        try {
            Object data;
            if ((flag & FLAG_EVENT) != 0) {
                byte[] eventPayload = CodecSupport.getPayload(is);
                if (CodecSupport.isHeartBeat(eventPayload, proto)) {
                    // heart beat response data is always null;
                    req = new HeartBeatRequest(id);
                    ((HeartBeatRequest) req).setProto(proto);
                    data = null;
                } else {
                    req = new Request(id);
                    data = decodeEventData(channel, CodecSupport.deserialize(channel.getUrl(), new ByteArrayInputStream(eventPayload), proto), eventPayload);
                }
                req.setEvent(true);
            } else {
                req = new Request(id);
                data = decodeRequestData(channel, CodecSupport.deserialize(channel.getUrl(), is, proto));
            }
            req.setData(data);
        } catch (Throwable t) {
            // bad request
            req = new Request(id);
            req.setBroken(true);
            req.setData(t);
        }
        req.setVersion(Version.getProtocolVersion());
        req.setTwoWay((flag & FLAG_TWOWAY) != 0);
        return req;
    }
}

这样解析到数据之后就交给业务handler处理,处理完成之后将结果encode一遍返回给消费端,消费端又decode获取结果

0 人点赞