RPC项目记录二期 - Netty替换socket,实现网络传输,解编码器,序列化器

2022-02-17 15:43:42 浏览数 (1)

内容

使用netty替代原先的socket网络传输,使效率更高

优点

netty是nio,socket是bio。

其实就是nio比bio好在哪。

出现的问题

基础

需要实现commonEncoder,CommonDecoder,NettyClientHandler。

这里用的是责任链模式,要层层处理后才可以给下一层,这里需要实现解码器,编码器,数据处理器。

发送rpc请求

这里我们是使用channel进行发送的,因为这是非阻塞的,所以结果会直接返回,导致接受不到结果。

这里我们需要用到attributeKey,netty常用解决粘包的代码。

代码语言:javascript复制
AttributeKey<RpcResponse> key = AttributeKey.valueOf("rpcResponse");
RpcResponse rpcResponse = channel.attr(key).get();

通过这种方式获得全局可见的返回结果,在获得返回结果 RpcResponse 后,将这个对象以 key 为 rpcResponse 放入 ChannelHandlerContext 中,这里就可以立刻获得结果并返回,我们会在 NettyClientHandler 中看到放入的过程。

自定义协议和解编码器

自定义协议

首先是 4 字节魔数,表识一个协议包。接着是 Package Type,标明这是一个调用请求还是调用响应,Serializer Type 标明了实际数据使用的序列化器,这个服务端和客户端应当使用统一标准;Data Length 就是实际数据的长度,设置这个字段主要防止粘包,最后就是经过序列化后的实际数据,可能是 RpcRequest 也可能是 RpcResponse 经过序列化后的字节,取决于 Package Type。

解编码器

1,编码器 commonEncoder

代码语言:javascript复制
public class CommonEncoder extends MessageToByteEncoder {

    private static final int MAGIC_NUMBER = 0xCAFEBABE;

    private final CommonSerializer serializer;

    public CommonEncoder(CommonSerializer serializer) {
        this.serializer = serializer;
    }

    @Override
    protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception {
        out.writeInt(MAGIC_NUMBER);
        if(msg instanceof RpcRequest) {
            out.writeInt(PackageType.REQUEST_PACK.getCode());
        } else {
            out.writeInt(PackageType.RESPONSE_PACK.getCode());
        }
        out.writeInt(serializer.getCode());
        byte[] bytes = serializer.serialize(msg);
        out.writeInt(bytes.length);
        out.writeBytes(bytes);
    }
}

MessageToByteEncoder:就是将数据(要发送的对象)转化成字节。

encode:按照协议,将字段一个个传入管道里。

这里要注意的:实现编码器,必须要传入一个选定的序列化容器。

2,解码器 commonDecoder

代码语言:javascript复制
public class CommonDecoder extends ReplayingDecoder {

    private static final Logger logger = LoggerFactory.getLogger(CommonDecoder.class);
    private static final int MAGIC_NUMBER = 0xCAFEBABE;

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        int magic = in.readInt();
        if(magic != MAGIC_NUMBER) {
            logger.error("不识别的协议包: {}", magic);
            throw new RpcException(RpcError.UNKNOWN_PROTOCOL);
        }
        int packageCode = in.readInt();
        Class<?> packageClass;
        if(packageCode == PackageType.REQUEST_PACK.getCode()) {
            packageClass = RpcRequest.class;
        } else if(packageCode == PackageType.RESPONSE_PACK.getCode()) {
            packageClass = RpcResponse.class;
        } else {
            logger.error("不识别的数据包: {}", packageCode);
            throw new RpcException(RpcError.UNKNOWN_PACKAGE_TYPE);
        }
        int serializerCode = in.readInt();
        CommonSerializer serializer = CommonSerializer.getByCode(serializerCode);
        if(serializer == null) {
            logger.error("不识别的反序列化器: {}", serializerCode);
            throw new RpcException(RpcError.UNKNOWN_SERIALIZER);
        }
        int length = in.readInt();
        byte[] bytes = new byte[length];
        in.readBytes(bytes);
        Object obj = serializer.deserialize(bytes, packageClass);
        out.add(obj);
    }

ReplayingDecoder :需要实现这个,也就是将序列化的字节码转换成对象。

这里逻辑很简单:从管道里取出每个字段,然后判断是否合理或存在(比如序列化器没有对应的反序列化器的话,会抛出异常)。

细节就是:先判断协议头是否合理,全部合理则把序列化数据反序列化完事。

序列化器

kyro不是线程安全的!所以我采用ThreadLocal方式的kyro。

json序列化器:

这里使用Jackson作为json序列化工具。

json反序列化因为是object,容易出现错误,所以需要写个新函数来一一对照。

代码语言:javascript复制
/*
        这里由于使用JSON序列化和反序列化Object数组,无法保证反序列化后仍然为原实例类型
        需要重新判断处理
     */
    private Object handleRequest(Object obj) throws IOException {
        RpcRequest rpcRequest = (RpcRequest) obj;
        for(int i = 0; i < rpcRequest.getParamTypes().length; i   ) {
            Class<?> clazz = rpcRequest.getParamTypes()[i];
            if(!clazz.isAssignableFrom(rpcRequest.getParameters()[i].getClass())) {
                byte[] bytes = objectMapper.writeValueAsBytes(rpcRequest.getParameters()[i]);
                rpcRequest.getParameters()[i] = objectMapper.readValue(bytes, clazz);
            }
        }
        return rpcRequest;
    }

kryo序列化器:

kyro不是线程安全的!所以我采用ThreadLocal方式的kyro。

kyro不是线程安全的!所以我采用ThreadLocal方式的kyro。

kyro不是线程安全的!所以我采用ThreadLocal方式的kyro。

实现方法就是正常注册kryo就行了。

NettyServerHandler 和 NettyClientHandler

这里是netty的最顶部和最尾部,不需要和序列化接触,直接处理rpcrequest和rpcresponse就可以了。

NettyClientHandler把结果传入编码器就行。

NettyServerHandler把结果传出去就行。

0 人点赞