内容
使用netty替代原先的socket网络传输,使效率更高
优点
netty是nio,socket是bio。
其实就是nio比bio好在哪。
出现的问题
基础
需要实现commonEncoder,CommonDecoder,NettyClientHandler。
这里用的是责任链模式,要层层处理后才可以给下一层,这里需要实现解码器,编码器,数据处理器。
发送rpc请求
这里我们是使用channel进行发送的,因为这是非阻塞的,所以结果会直接返回,导致接受不到结果。
这里我们需要用到attributeKey,netty常用解决粘包的代码。
代码语言:javascript复制AttributeKey<RpcResponse> key = AttributeKey.valueOf("rpcResponse");
RpcResponse rpcResponse = channel.attr(key).get();
通过这种方式获得全局可见的返回结果,在获得返回结果 RpcResponse 后,将这个对象以 key 为 rpcResponse 放入 ChannelHandlerContext 中,这里就可以立刻获得结果并返回,我们会在 NettyClientHandler
中看到放入的过程。
自定义协议和解编码器
自定义协议
首先是 4 字节魔数,表识一个协议包。接着是 Package Type,标明这是一个调用请求还是调用响应,Serializer Type 标明了实际数据使用的序列化器,这个服务端和客户端应当使用统一标准;Data Length 就是实际数据的长度,设置这个字段主要防止粘包,最后就是经过序列化后的实际数据,可能是 RpcRequest 也可能是 RpcResponse 经过序列化后的字节,取决于 Package Type。
解编码器
1,编码器 commonEncoder
代码语言:javascript复制public class CommonEncoder extends MessageToByteEncoder {
private static final int MAGIC_NUMBER = 0xCAFEBABE;
private final CommonSerializer serializer;
public CommonEncoder(CommonSerializer serializer) {
this.serializer = serializer;
}
@Override
protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception {
out.writeInt(MAGIC_NUMBER);
if(msg instanceof RpcRequest) {
out.writeInt(PackageType.REQUEST_PACK.getCode());
} else {
out.writeInt(PackageType.RESPONSE_PACK.getCode());
}
out.writeInt(serializer.getCode());
byte[] bytes = serializer.serialize(msg);
out.writeInt(bytes.length);
out.writeBytes(bytes);
}
}
MessageToByteEncoder:就是将数据(要发送的对象)转化成字节。
encode:按照协议,将字段一个个传入管道里。
这里要注意的:实现编码器,必须要传入一个选定的序列化容器。
2,解码器 commonDecoder
代码语言:javascript复制public class CommonDecoder extends ReplayingDecoder {
private static final Logger logger = LoggerFactory.getLogger(CommonDecoder.class);
private static final int MAGIC_NUMBER = 0xCAFEBABE;
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
int magic = in.readInt();
if(magic != MAGIC_NUMBER) {
logger.error("不识别的协议包: {}", magic);
throw new RpcException(RpcError.UNKNOWN_PROTOCOL);
}
int packageCode = in.readInt();
Class<?> packageClass;
if(packageCode == PackageType.REQUEST_PACK.getCode()) {
packageClass = RpcRequest.class;
} else if(packageCode == PackageType.RESPONSE_PACK.getCode()) {
packageClass = RpcResponse.class;
} else {
logger.error("不识别的数据包: {}", packageCode);
throw new RpcException(RpcError.UNKNOWN_PACKAGE_TYPE);
}
int serializerCode = in.readInt();
CommonSerializer serializer = CommonSerializer.getByCode(serializerCode);
if(serializer == null) {
logger.error("不识别的反序列化器: {}", serializerCode);
throw new RpcException(RpcError.UNKNOWN_SERIALIZER);
}
int length = in.readInt();
byte[] bytes = new byte[length];
in.readBytes(bytes);
Object obj = serializer.deserialize(bytes, packageClass);
out.add(obj);
}
ReplayingDecoder :需要实现这个,也就是将序列化的字节码转换成对象。
这里逻辑很简单:从管道里取出每个字段,然后判断是否合理或存在(比如序列化器没有对应的反序列化器的话,会抛出异常)。
细节就是:先判断协议头是否合理,全部合理则把序列化数据反序列化完事。
序列化器
kyro不是线程安全的!所以我采用ThreadLocal方式的kyro。
json序列化器:
这里使用Jackson作为json序列化工具。
json反序列化因为是object,容易出现错误,所以需要写个新函数来一一对照。
代码语言:javascript复制/*
这里由于使用JSON序列化和反序列化Object数组,无法保证反序列化后仍然为原实例类型
需要重新判断处理
*/
private Object handleRequest(Object obj) throws IOException {
RpcRequest rpcRequest = (RpcRequest) obj;
for(int i = 0; i < rpcRequest.getParamTypes().length; i ) {
Class<?> clazz = rpcRequest.getParamTypes()[i];
if(!clazz.isAssignableFrom(rpcRequest.getParameters()[i].getClass())) {
byte[] bytes = objectMapper.writeValueAsBytes(rpcRequest.getParameters()[i]);
rpcRequest.getParameters()[i] = objectMapper.readValue(bytes, clazz);
}
}
return rpcRequest;
}
kryo序列化器:
kyro不是线程安全的!所以我采用ThreadLocal方式的kyro。
kyro不是线程安全的!所以我采用ThreadLocal方式的kyro。
kyro不是线程安全的!所以我采用ThreadLocal方式的kyro。
实现方法就是正常注册kryo就行了。
NettyServerHandler 和 NettyClientHandler
这里是netty的最顶部和最尾部,不需要和序列化接触,直接处理rpcrequest和rpcresponse就可以了。
NettyClientHandler把结果传入编码器就行。
NettyServerHandler把结果传出去就行。