《Dubbo进阶一》——RPC协议底层原理

2022-08-30 20:58:56 浏览数 (1)

大家好,又见面了,我是你们的朋友全栈君。

一 RPC协议简介

在一个典型的RPC的使用场景中,包含了服务发现、负载、容错、序列化和网络传输等组件,其中RPC协议指明了程序如何进行序列化和网络传输,也就是说一个RPC协议的实现等于一个非透明的RPC调用。

简单来说,分布式框架的核心是RPC框架,RPC框架的核心是RPC协议。

dubbo 支持的RPC协议列表

名称

实现描述

连接描述

使用场景

dubbo

传输服务: mina, netty(默认), grizzy; 序列化: dubbo, hessian2(默认), java, fastjson。 自定义报文

单个长连接NIO;异步传输

1.常规RPC调用 2.传输数据量小 3.提供者少于消费者

rmi

传输:java rmi 服务; 序列化:java原生二进制序列化

多个短连接; BIO同步传输

1.常规RPC调用 2.与原RMI客户端集成 3.可传少量文件 4.不支持防火墙穿透

hessian

传输服务:servlet容器; 序列化:hessian二进制序列化

基于Http 协议传输,依懒servlet容器配置

1.提供者多于消费者 2.可传大字段和文件 3.跨语言调用

http

传输服务:servlet容器; 序列化:http表单

依懒servlet容器配置

1、数据包大小混合

thrift

与thrift RPC 实现集成,并在其基础上修改了报文头

长连接、NIO异步传输

(PS:本文只探讨dubbo协议)

二 协议的基本组成

  1. IP:服务提供者的地址
  2. 端口:协议指定开放端口
  3. 运行服务 (1)netty (2)mima (3)rmi (4)servlet容器(Jetty、Tomcat、Jboss)
  4. 协议报文编码
  5. 序列化方式 (1)Hessian2Serialization (2)DubboSerialization (3)JavaSerialization (4)JsonSerialization

三 Duboo的RPC协议报文

先看下http协议报文格式

同样,Dubbo也有自己的报文格式

以head request body或head response body的形式存在

  • head 1标志位:表明是请求还是响应还是事件 2status:表明状态是OK还是不OK
  • request body 1Dubbo版本号 2接口路径 3接口版本 4方法名称 5参数类型 6参数值
  • response body 1结果标志(无结果、有结果、异常) 2结果

协议的编解码过程:

四 源码探究

以明晰编码解码和序列化反序列化为目的探究源码。其实就是如上图所示的协议的编解码过程。

com.alibaba.dubbo.rpc.protocol.dubbo.DubboCodec是很重要的一个类,无论是request还是response,还有编码解码都在这里类进行调度。

DubboCodec:

其中重点关注三个方法 decodeBody():解码(请求或响应)以及序列化和反序列化 encodeRequestData():编码请求(发生在Consumer) encodeResponseData():编码响应(发生在Provider)

1.编码序列化request

发生在Consumer发请求之前 encodeRequestData()

代码语言:javascript复制
protected void encodeRequestData(Channel channel, ObjectOutput out, Object data) throws IOException {
        RpcInvocation inv = (RpcInvocation)data;
        out.writeUTF(inv.getAttachment("dubbo", DUBBO_VERSION));
        out.writeUTF(inv.getAttachment("path"));
        out.writeUTF(inv.getAttachment("version"));
        out.writeUTF(inv.getMethodName());
        out.writeUTF(ReflectUtils.getDesc(inv.getParameterTypes()));
        Object[] args = inv.getArguments();
        if (args != null) {
            for(int i = 0; i < args.length;   i) {
                out.writeObject(CallbackServiceCodec.encodeInvocationArgument(channel, inv, i));
            }
        }

        out.writeObject(inv.getAttachments());
    }

参数ObjectOutput是序列化接口,具体调用什么实现类有配置决定,如没有则默认是hessian2。能用的子类(序列化方式)如下

RpcInvocation拿到datadata是请求的基本内容,也就是第三部分所说的request body的六个模块:Dubbo版本号、接口路径、接口版本、方法名称、参数类型、参数值。 writeUTF()将版本号、接口路径、接口版本、方法名和参数称写进序列化类。 最后的writeObject() 通过配置的序列化方式调用相应的实现类进行序列化,如在protocol配置了serialization=“fastjson”,将调用FastJsonObjectOutput实现类的writeObject()

编码序列化request完成

2.编码序列化response

发生在Provider发出响应之前。 encodeResponseData

代码语言:javascript复制
protected void encodeResponseData(Channel channel, ObjectOutput out, Object data) throws IOException {
        Result result = (Result)data;
        Throwable th = result.getException();
        if (th == null) {
            Object ret = result.getValue();
            if (ret == null) {
                out.writeByte((byte)2);
            } else {
                out.writeByte((byte)1);
                out.writeObject(ret);
            }
        } else {
            out.writeByte((byte)0);
            out.writeObject(th);
        }

    }

过程与编码序列化request类似且较为简单,不再多说。

3.解码反序列化request和response

解码反序列化request发生在Provider;解码反序列化response发生在Consumer。两个方法在同个方法中,就一起讲了。

代码语言:javascript复制
protected Object decodeBody(Channel channel, InputStream is, byte[] header) throws IOException {
       byte flag = header[2];
       byte proto = (byte)(flag & 31);
       Serialization s = CodecSupport.getSerialization(channel.getUrl(), proto);
       long id = Bytes.bytes2long(header, 4);
       if ((flag & -128) == 0) {
           Response res = new Response(id);
           if ((flag & 32) != 0) {
               res.setEvent(Response.HEARTBEAT_EVENT);
           }

           byte status = header[3];
           res.setStatus(status);
           if (status == 20) {
               try {
                   Object data;
                   if (res.isHeartbeat()) {
                       data = this.decodeHeartbeatData(channel, this.deserialize(s, channel.getUrl(), is));
                   } else if (res.isEvent()) {
                       data = this.decodeEventData(channel, this.deserialize(s, channel.getUrl(), is));
                   } else {
                       DecodeableRpcResult result;
                       if (channel.getUrl().getParameter("decode.in.io", true)) {
                           result = new DecodeableRpcResult(channel, res, is, (Invocation)this.getRequestData(id), proto);
                           result.decode();
                       } else {
                           result = new DecodeableRpcResult(channel, res, new UnsafeByteArrayInputStream(this.readMessageData(is)), (Invocation)this.getRequestData(id), proto);
                       }

                       data = result;
                   }

                   res.setResult(data);
               } catch (Throwable var13) {
                   if (log.isWarnEnabled()) {
                       log.warn("Decode response failed: "   var13.getMessage(), var13);
                   }

                   res.setStatus((byte)90);
                   res.setErrorMessage(StringUtils.toString(var13));
               }
           } else {
               res.setErrorMessage(this.deserialize(s, channel.getUrl(), is).readUTF());
           }

           return res;
       } else {
           Request req = new Request(id);
           req.setVersion("2.0.0");
           req.setTwoWay((flag & 64) != 0);
           if ((flag & 32) != 0) {
               req.setEvent(Request.HEARTBEAT_EVENT);
           }

           try {
               Object data;
               if (req.isHeartbeat()) {
                   data = this.decodeHeartbeatData(channel, this.deserialize(s, channel.getUrl(), is));
               } else if (req.isEvent()) {
                   data = this.decodeEventData(channel, this.deserialize(s, channel.getUrl(), is));
               } else {
                   DecodeableRpcInvocation inv;
                   if (channel.getUrl().getParameter("decode.in.io", true)) {
                       inv = new DecodeableRpcInvocation(channel, req, is, proto);
                       inv.decode();
                   } else {
                       inv = new DecodeableRpcInvocation(channel, req, new UnsafeByteArrayInputStream(this.readMessageData(is)), proto);
                   }

                   data = inv;
               }

               req.setData(data);
           } catch (Throwable var14) {
               if (log.isWarnEnabled()) {
                   log.warn("Decode request failed: "   var14.getMessage(), var14);
               }

               req.setBroken(true);
               req.setData(var14);
           }

           return req;
       }
   }

需要注意的是来到这个方法表明请求头已经处理好,现在是处理body。 flag通过header拿到标志位。 第一个if语句(flag & -128) == 0,实际上是在判断是request还是response,若为true为response,也就是Consumer要解码反序列化从Provider发来的响应;若为false为request,也就是Provider要解码反序列化从Consumer发来的请求。

(1)解码反序列化request

(flag & -128) == 0为false时,进入else执行体,在服务端进行操作。 if ((flag & 32) != 0)在判断是否时一个心跳事件,心跳事件时为了检测连接是否断开以备重连。 if (req.isHeartbeat())判断是否时一个心跳事件,else if (req.isEvent())判断是否时一个事件 排除了这两个之后就是真正的request。 inv拿到request相关参数,inv.decode()进行解码和反序列化。 调用DecodeableRpcInvocation的decode()方法如下

代码语言:javascript复制
public Object decode(Channel channel, InputStream input) throws IOException {
        ObjectInput in = CodecSupport.getSerialization(channel.getUrl(), this.serializationType).deserialize(channel.getUrl(), input);
        this.setAttachment("dubbo", in.readUTF());
        this.setAttachment("path", in.readUTF());
        this.setAttachment("version", in.readUTF());
        this.setMethodName(in.readUTF());

        try {
            String desc = in.readUTF();
            Object[] args;
            Class[] pts;
            if (desc.length() == 0) {
                pts = DubboCodec.EMPTY_CLASS_ARRAY;
                args = DubboCodec.EMPTY_OBJECT_ARRAY;
            } else {
                pts = ReflectUtils.desc2classArray(desc);
                args = new Object[pts.length];

                for(int i = 0; i < args.length;   i) {
                    try {
                        args[i] = in.readObject(pts[i]);
                    } catch (Exception var9) {
                        if (log.isWarnEnabled()) {
                            log.warn("Decode argument failed: "   var9.getMessage(), var9);
                        }
                    }
                }
            }

            this.setParameterTypes(pts);
            Map<String, String> map = (Map)in.readObject(Map.class);
            if (map != null && map.size() > 0) {
                Map<String, String> attachment = this.getAttachments();
                if (attachment == null) {
                    attachment = new HashMap();
                }

                ((Map)attachment).putAll(map);
                this.setAttachments((Map)attachment);
            }

            for(int i = 0; i < args.length;   i) {
                args[i] = CallbackServiceCodec.decodeInvocationArgument(channel, this, pts, i, args[i]);
            }

            this.setArguments(args);
            return this;
        } catch (ClassNotFoundException var10) {
            throw new IOException(StringUtils.toString("Read invocation data failed.", var10));
        }
    }

其中ObjectInput选择的序列化方式实现子类依然时根据配置文件来的,只有与客户端序列化的方式一样才能反序列化成功。接下来是逐个readUTF()解码request body的模块。try代码块里的readUTF()解码出参数类型和参数值。最后将dubbo的隐式参数也一同设置进去Map<String, String> map = (Map)in.readObject(Map.class),到这里DecodeableRpcInvocation拿到所有相关参数,后续可以进行业务操作。 解码反序列化request完成

(2)解码反序列化response

(flag & -128) == 0为true时,进入if执行体,在客户端进行操作。 if ((flag & 32) != 0)在判断是否时一个心跳事件,心跳事件时为了检测连接是否断开以备重连。 status从header拿到状态码,如果不等于20,直接进入else执行错误信息写入到responseres.setErrorMessage()if (req.isHeartbeat()判断是否时一个心跳事件,else if (req.isEvent()判断是否时一个事件 排除了这两个之后就是真正的response。 result拿到response相关参数,result .decode()进行解码和反序列化。 调用DecodeableRpcResult的decode()方法如下

代码语言:javascript复制
public Object decode(Channel channel, InputStream input) throws IOException {
        ObjectInput in = CodecSupport.getSerialization(channel.getUrl(), this.serializationType).deserialize(channel.getUrl(), input);
        byte flag = in.readByte();
        switch(flag) {
        case 0:
            try {
                Object obj = in.readObject();
                if (!(obj instanceof Throwable)) {
                    throw new IOException("Response data error, expect Throwable, but get "   obj);
                }

                this.setException((Throwable)obj);
                break;
            } catch (ClassNotFoundException var6) {
                throw new IOException(StringUtils.toString("Read response data failed.", var6));
            }
        case 1:
            try {
                Type[] returnType = RpcUtils.getReturnTypes(this.invocation);
                this.setValue(returnType != null && returnType.length != 0 ? (returnType.length == 1 ? in.readObject((Class)returnType[0]) : in.readObject((Class)returnType[0], returnType[1])) : in.readObject());
            } catch (ClassNotFoundException var7) {
                throw new IOException(StringUtils.toString("Read response data failed.", var7));
            }
        case 2:
            break;
        default:
            throw new IOException("Unknown result flag, expect '0' '1' '2', get "   flag);
        }

        return this;
    }

一开始就调用getSerialization()进行反序列化,然后赋给ObjectInput。 判断flag,0为发生异常,并处理异常信息;2为没值,直接退出方法。 当等于1时对response进行解码,调用setValue()将信息读出来。 解码反序列化response完成

4.业务调用

了解是如何编码序列化等操作之后,最后看下服务端接收到请求整个流程是如何调用的。(客户端接收到响应类似)

以dubbo默认的传输服务netty为例,存在一个重要的类: comalibabadubboremotingtransportnettyNettyServer.class (客户端为NettyClient)

其中的doOpen()方法,表示打开服务

代码语言:javascript复制
protected void doOpen() throws Throwable {
        NettyHelper.setNettyLoggerFactory();
        this.bootstrap = new ClientBootstrap(channelFactory);
        this.bootstrap.setOption("keepAlive", true);
        this.bootstrap.setOption("tcpNoDelay", true);
        this.bootstrap.setOption("connectTimeoutMillis", this.getTimeout());
        final NettyHandler nettyHandler = new NettyHandler(this.getUrl(), this);
        this.bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
            public ChannelPipeline getPipeline() {
                NettyCodecAdapter adapter = new NettyCodecAdapter(NettyClient.this.getCodec(), NettyClient.this.getUrl(), NettyClient.this);
                ChannelPipeline pipeline = Channels.pipeline();
                pipeline.addLast("decoder", adapter.getDecoder());
                pipeline.addLast("encoder", adapter.getEncoder());
                pipeline.addLast("handler", nettyHandler);
                return pipeline;
            }
        });
    }

三个pipeline.addLast()操作对应解码、编码以及解码后的操作。编解码上面已经说过,这里主要探究解码后的操作。

解码完成后带着参数发起对AllDispatcher类的调用

代码语言:javascript复制
public class AllDispatcher implements Dispatcher {
    public static final String NAME = "all";

    public AllDispatcher() {
    }

    public ChannelHandler dispatch(ChannelHandler handler, URL url) {
        return new AllChannelHandler(handler, url);
    }
}

可以看到它又调用了ChannelHandler接口来处理,最终是返回调用AllChannelHandler实现类。

其中在received()方法中进行线程派发

代码语言:javascript复制
public void received(Channel channel, Object message) throws RemotingException {
        ExecutorService cexecutor = this.getExecutorService();

        try {
            cexecutor.execute(new ChannelEventRunnable(channel, this.handler, ChannelState.RECEIVED, message));
        } catch (Throwable var8) {
            if (message instanceof Request && var8 instanceof RejectedExecutionException) {
                Request request = (Request)message;
                if (request.isTwoWay()) {
                    String msg = "Server side("   this.url.getIp()   ","   this.url.getPort()   ") threadpool is exhausted ,detail msg:"   var8.getMessage();
                    Response response = new Response(request.getId(), request.getVersion());
                    response.setStatus((byte)100);
                    response.setErrorMessage(msg);
                    channel.send(response);
                    return;
                }
            }

            throw new ExecutionException(message, channel, this.getClass()   " error when process received event .", var8);
        }
    }

传进来的参数Object message包含request。 ExecutorService cexecutor拿到对应的线程池。 调用cexecutor.execute()执行,执行时调用了ChannelEventRunnable,在ChannelEventRunnable这个类的run()方法就调用了我们自己写的业务方法。

发布者:全栈程序员栈长,转载请注明出处:https://javaforall.cn/144671.html原文链接:https://javaforall.cn

0 人点赞