概述
由于rpc底层涉及网络编程接口,线程模型,网络数据结构,服务协议,细到字节的处理。牵涉内容较多,本文就先从一个点说起。通过源码说明下dubbo通过netty框架做传输层,从接到数据字节流到把字节转换为dubbo上层可读的Request消息对象的过程。dubbo还支持mina,grizzly做底层传输层。这里包括两部,反序列化和解码。这篇主要是说解码的过程。 本文是说明下图dubbo架构图中红框中的部分。
netty
既然是netty做传输层,netty的基础得提一点。 netty框架是通过管道(ChannelPipeline)模型处理网络数据流的,每个管道中有多个处理接点(ChannelHandler), 节点分为,进站(client请求进服务端口)和出站(请求响应出服务端口)两种。比如一个进站消息总是,顺序的(顺序是程序中编码指定的)通过进站处理节点。 同理出站消息,总是顺序的通过出站节点到达网络接口。
dubbo2.5.6版本,传输层dubbo提供有netty3和netty4两种实现,初始化netty通道都在NettyServer类里,两个类同名,包名不同。
具体,netty3在NettyServer类里doOpen()方法:
代码语言:javascript复制protected void doOpen() throws Throwable {
NettyHelper.setNettyLoggerFactory();
ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true));
ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true));
ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS));
bootstrap = new ServerBootstrap(channelFactory);
final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
channels = nettyHandler.getChannels();
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
public ChannelPipeline getPipeline() {
//编解码器的初始化
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
ChannelPipeline pipeline = Channels.pipeline();
/*int idleTimeout = getIdleTimeout();
if (idleTimeout > 10000) {
pipeline.addLast("timer", new IdleStateHandler(timer, idleTimeout / 1000, 0, 0));
}*/
pipeline.addLast("decoder", adapter.getDecoder());//设置解码hander
pipeline.addLast("encoder", adapter.getEncoder());//设置编码hander
pipeline.addLast("handler", nettyHandler);//自定义NettyHandler 扩展自netty双向handler基类,可以接受进站和出站数据流
return pipeline;
//进站的请求,先经过adapter.getDecoder()handler处理,再由nettyHandler处理
//出站的请求,先经过nettyHandler处理 再由adapter.getEncoder()handler处理
}
});
// bind
channel = bootstrap.bind(getBindAddress());
}
netty4版本NettyServer类里doOpen()方法:
代码语言:javascript复制 protected void doOpen() throws Throwable {
NettyHelper.setNettyLoggerFactory();
bootstrap = new ServerBootstrap();
//acceptor 事件循环线程
bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("NettyServerBoss", true));
//client channel事件循环线程
workerGroup = new NioEventLoopGroup(getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
new DefaultThreadFactory("NettyServerWorker", true));
final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
channels = nettyServerHandler.getChannels();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
.childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
//编解码器的初始化
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
.addLast("decoder", adapter.getDecoder())//设置解码hander
.addLast("encoder", adapter.getEncoder())//设置编码hander
.addLast("handler", nettyServerHandler);//自定义NettyServerHandler 扩展netty双向handler基类,可以接受进站和出站数据流
//进站的请求,先经过adapter.getDecoder()handler处理,再由nettyServerHandler处理
//出站的请求,先经过nettyServerHandler处理 再由adapter.getEncoder()handler处理
}
});
// bind
ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
channelFuture.syncUninterruptibly();
channel = channelFuture.channel();
}
本次说的是进站流程,请求数据解析成Request对象过程,通过上面的代码和netty特性可知, 进站数据先通过解码hander,经解码成消息Request对象后,再到自定义handler,然后由自定义hanlder通过装饰模式,调用实际服务。
先看下解码handler的实现: 由adapter.getDecoder()这句跟踪到NettyCodecAdapter类的getDecoder()方法 public ChannelHandler getDecoder() { return decoder; } 可以看到,这个方法获取的解码handler,decoder,是NettyCodecAdapter类的私有属性
private final ChannelHandler decoder = new InternalDecoder();
看下InternalDecoder类定义,netty3版本:
代码语言:javascript复制 * 这里需要些netty的知识,继承SimpleChannelUpstreamHandler,表明它是进站handler
* 所以进站的数据流,都会经过本handler对象,具体就是messageReceived方法。
*/
private class InternalDecoder extends SimpleChannelUpstreamHandler {
private com.alibaba.dubbo.remoting.buffer.ChannelBuffer buffer =
com.alibaba.dubbo.remoting.buffer.ChannelBuffers.EMPTY_BUFFER;//这是dubbo根据nio自己实现的buffer
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent event) throws Exception {
Object o = event.getMessage();
if (!(o instanceof ChannelBuffer)) {//这里ChnnelBuffer是netty基于jdk nio ByteBuffer 实现
ctx.sendUpstream(event);
return;
}
ChannelBuffer input = (ChannelBuffer) o;
int readable = input.readableBytes();//到这就是从netty event对象取数据的过程。
if (readable <= 0) {
return;
}
com.alibaba.dubbo.remoting.buffer.ChannelBuffer message;
if (buffer.readable()) {
if (buffer instanceof DynamicChannelBuffer) {
buffer.writeBytes(input.toByteBuffer());
message = buffer;
} else {
int size = buffer.readableBytes() input.readableBytes();
message = com.alibaba.dubbo.remoting.buffer.ChannelBuffers.dynamicBuffer(
size > bufferSize ? size : bufferSize);//bufferSize 不指定是8k,这里表示最小8K
message.writeBytes(buffer, buffer.readableBytes());
message.writeBytes(input.toByteBuffer());//把netty 读到的字节流写入message
}
} else {//直接通过构造器,构造message
message = com.alibaba.dubbo.remoting.buffer.ChannelBuffers.wrappedBuffer(
input.toByteBuffer());
}
NettyChannel channel = NettyChannel.getOrAddChannel(ctx.getChannel(), url, handler);
Object msg;
int saveReaderIndex;
//从netty框架,读取到的数据,放入message后,下面就是针对message的反序列化和解码过程。
try {
// decode object.
do {
saveReaderIndex = message.readerIndex();
try {
//解码,这里面包括的反序列化
msg = codec.decode(channel, message);//重要!!!通过具体编解码实例codec完成解码
} catch (IOException e) {
buffer = com.alibaba.dubbo.remoting.buffer.ChannelBuffers.EMPTY_BUFFER;
throw e;
}
if (msg == Codec2.DecodeResult.NEED_MORE_INPUT) {//如果解码结果是,Codec2.DecodeResult.NEED_MORE_INPUT,表示,需要更多数据
message.readerIndex(saveReaderIndex);//很重要,设置readerIndex为,解码读取前的位置,为了下次再从头读取。
break;
} else {
if (saveReaderIndex == message.readerIndex()) {
buffer = com.alibaba.dubbo.remoting.buffer.ChannelBuffers.EMPTY_BUFFER;
throw new IOException("Decode without read data.");
}
if (msg != null) {//解码完成,这里的msg已经是Request对象。!!
Channels.fireMessageReceived(ctx, msg, event.getRemoteAddress());
}
}
} while (message.readable());
} finally {
if (message.readable()) {
message.discardReadBytes();
buffer = message;
} else {
buffer = com.alibaba.dubbo.remoting.buffer.ChannelBuffers.EMPTY_BUFFER;
}
NettyChannel.removeChannelIfDisconnected(ctx.getChannel());
}
}
//处理异常
@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
ctx.sendUpstream(e);//向下一个传递。一般在最后一个handler处理异常
}
}
netty4版本:
代码语言:javascript复制/***
* netty4 扩展了ByteToMessageDecoder
* 重写decode 方法,解码完成后,不用像netty3手动Channels.fireMessageReceived 发送事件,
* netty4自动把对象,传递到下一个handler
*/
private class InternalDecoder extends ByteToMessageDecoder {
protected void decode(ChannelHandlerContext ctx, ByteBuf input, List<Object> out) throws Exception {
ChannelBuffer message = new NettyBackedChannelBuffer(input);
NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
Object msg;
int saveReaderIndex;
try {
// decode object.
do {
//保存初始,读取位置
saveReaderIndex = message.readerIndex();
try {
msg = codec.decode(channel, message);//重要!!!通过具体编解码实例codec完成解码
} catch (IOException e) {
throw e;
}
if (msg == Codec2.DecodeResult.NEED_MORE_INPUT) {
//解码失败后,重新设置readerIndex为读取前位置
message.readerIndex(saveReaderIndex);
break;
} else {
//is it possible to go here ?
if (saveReaderIndex == message.readerIndex()) {
throw new IOException("Decode without read data.");
}
if (msg != null) {
//解码成功后,加入到out list中,传递到下一个处理handler
out.add(msg);
}
}
} while (message.readable());
} finally {
NettyChannel.removeChannelIfDisconnected(ctx.channel());
}
}
}
可以看到,无论netty3,还是netty4都是通过,NettyCodecAdapter的codec属性完成解码的, 这里有个概念,编解码handler是通过编解码实例完成编解码的,这里的编解码实例就是codec 而codec实例是由它构造函数从上层方法传递的。如下
代码语言:javascript复制public NettyCodecAdapter(Codec2 codec, URL url, com.alibaba.dubbo.remoting.ChannelHandler handler) {
this.codec = codec;
this.url = url;
this.handler = handler;
int b = url.getPositiveParameter(Constants.BUFFER_KEY, Constants.DEFAULT_BUFFER_SIZE);
this.bufferSize = b >= Constants.MIN_BUFFER_SIZE && b <= Constants.MAX_BUFFER_SIZE ? b : Constants.DEFAULT_BUFFER_SIZE;
}
再回到NettySever类中NettyCodecAdapter的构造语句 NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this); 跟进getCodec()方法,这个方就是获取实际编解码方案的。这个方法的实现在NettyServer的祖先类AbstractEndpoint中:
代码语言:javascript复制 protected Codec2 getCodec() {
return codec;
}
//可以看到codec在构造方法里创建的
public AbstractEndpoint(URL url, ChannelHandler handler) {
super(url, handler);
this.codec = getChannelCodec(url);//根据url配置,构造编码解码器(通过spi得到DubboCountCodec类实例)
this.timeout = url.getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
this.connectTimeout = url.getPositiveParameter(Constants.CONNECT_TIMEOUT_KEY, Constants.DEFAULT_CONNECT_TIMEOUT);
}
// 再跟到getChannelCodec方法
protected static Codec2 getChannelCodec(URL url) {
//通过spi机制,从url里获取编解码方案,这里是dubbo。取不到就是telnet
//dubbo编解码方案,实现类是DubboCountCodec
String codecName = url.getParameter(Constants.CODEC_KEY, "telnet");
if (ExtensionLoader.getExtensionLoader(Codec2.class).hasExtension(codecName)) {
return ExtensionLoader.getExtensionLoader(Codec2.class).getExtension(codecName);
} else {
return new CodecAdapter(ExtensionLoader.getExtensionLoader(Codec.class)
.getExtension(codecName));
}
}
这里看下,dubbo中的所有编解码类结构:
Codec2接口
可以看到,所有编解码器实现,都扩展了Codec2接口。同时Codec2也是个spi扩展点。 接口Codec2,如下:
代码语言:javascript复制@SPI
public interface Codec2 {
/**
* spi 获取编码器
* @param channel
* @param buffer
* @param message
* @throws IOException
*/
@Adaptive({Constants.CODEC_KEY})
void encode(Channel channel, ChannelBuffer buffer, Object message) throws IOException;
/***
* spi 获取解码器
* @param channel
* @param buffer
* @return
* @throws IOException
*/
@Adaptive({Constants.CODEC_KEY})
Object decode(Channel channel, ChannelBuffer buffer) throws IOException;
enum DecodeResult {
NEED_MORE_INPUT, SKIP_SOME_INPUT
}
}
具体实现通过spi获取,dubbo编解码方案实例就是DubboCountCodec 那么看下DubboCountCodec类,以及decode方法:
代码语言:javascript复制public final class DubboCountCodec implements Codec2 {
private DubboCodec codec = new DubboCodec();//具体dubbo协议编解码方案实现
public void encode(Channel channel, ChannelBuffer buffer, Object msg) throws IOException {
codec.encode(channel, buffer, msg);
}
public Object decode(Channel channel, ChannelBuffer buffer) throws IOException {
int save = buffer.readerIndex();//记录读取初始位置
MultiMessage result = MultiMessage.create();//解码后对象容器。list,可放多个消息
do {
Object obj = codec.decode(channel, buffer);//解码过程在DubboCodec类中的decode方法里
if (Codec2.DecodeResult.NEED_MORE_INPUT == obj) {//需要接受更多信息
buffer.readerIndex(save);//恢复读取位置
break;
} else {//解码完成,加到对象容器
result.addMessage(obj);
logMessageLength(obj, buffer.readerIndex() - save);//记录日志,可忽略
save = buffer.readerIndex();//更新读取位置
}
} while (true);
if (result.isEmpty()) {
return Codec2.DecodeResult.NEED_MORE_INPUT;
}
if (result.size() == 1) {
return result.get(0);//返回解码后对象
}
return result;
}
private void logMessageLength(Object result, int bytes) {
if (bytes <= 0) {
return;
}
if (result instanceof Request) {
try {
((RpcInvocation) ((Request) result).getData()).setAttachment(
Constants.INPUT_KEY, String.valueOf(bytes));
} catch (Throwable e) {
/* ignore */
}
} else if (result instanceof Response) {
try {
((RpcResult) ((Response) result).getResult()).setAttachment(
Constants.OUTPUT_KEY, String.valueOf(bytes));
} catch (Throwable e) {
/* ignore */
}
}
}
}
dubbo协议消息格式
分析DubboCodec类之前,先说下dubbo协议消息格式,它包括消息头和消息体:
前2个字节: 为协议魔数,固定值oxdabb
第三字节: 第1比特(0/1)表示是请求消息,还是响应消息 第2比特(0/1)表示是是否必须双向通信,即有请求,必有响应 第3比特(0/1)表示是是否是,心跳消息 第低5位比特,表示一个表示消息序列化的方式(1,是dubbo ,2,是hessian...)
第四字节: 只在响应消息中用到,表示响应消息的状态,是成功,失败等
第5-12字节: 8个字节,表示一个long型数字,是reqeustId
第13—16字节: 4个字节,表示消息体的长度(字节数)
消息体,不固定长度 是请求消息时,表示请求数据 是响应消息时,表示方法调用返回结果。
编码和解码主要是对消息头的设置和解析。序列化和反序列化主要是对消息体的操作。
先看DubboCodec的关系图:
DubboCodec类decode方法的实现在其父类ExchangeCodec中:
代码语言:javascript复制 //先看下类中定义的常量:
// header length.消息头长度
protected static final int HEADER_LENGTH = 16;
// magic header.
protected static final short MAGIC = (short) 0xdabb;//1101 1010 1011 1011魔数
protected static final byte MAGIC_HIGH = Bytes.short2bytes(MAGIC)[0];//高字节
protected static final byte MAGIC_LOW = Bytes.short2bytes(MAGIC)[1];//低字节
// message flag.
protected static final byte FLAG_REQUEST = (byte) 0x80;//1000,0000//表示消息类型
protected static final byte FLAG_TWOWAY = (byte) 0x40;//0100,0000//表示是否双向通信
protected static final byte FLAG_EVENT = (byte) 0x20;//0010,0000//表示是否是心跳事件
protected static final int SERIALIZATION_MASK = 0x1f;//0001,1111/表示是序列化实现类型
/***
* 解码入口方法
* @param channel
* @param buffer
* @return
* @throws IOException
*/
public Object decode(Channel channel, ChannelBuffer buffer) throws IOException {
//取得可读的字节数
int readable = buffer.readableBytes();
//header 最大16字节
byte[] header = new byte[Math.min(readable, HEADER_LENGTH)];
//从buffer中读16字节到header数组中
buffer.readBytes(header);
//调用本身decode方法
return decode(channel, buffer, readable, header);
}
/***
* 具体协议解析方法,本方法主要是读取验证消息头的过程
* @param channel
* @param buffer
* @param readable
* @param header
* @return
* @throws IOException
*/
protected Object decode(Channel channel, ChannelBuffer buffer, int readable, byte[] header) throws IOException {
// check magic number.
if (readable > 0 && header[0] != MAGIC_HIGH
|| readable > 1 && header[1] != MAGIC_LOW) {
//前两字节,不是dubbo魔数
int length = header.length;
//如果可读的字节数目,大于16字节
if (header.length < readable) {
//给header扩容到readable大小
header = Bytes.copyOf(header, readable);
//把buffer剩下的字节读到header中,这里多于16字节
buffer.readBytes(header, length, readable - length);
}
//上层方法,第一字节已经验证过,这个从第二字节开始验证。
for (int i = 1; i < header.length - 1; i ) {
//如果发现,后续字节有dubbo协议的开头
if (header[i] == MAGIC_HIGH && header[i 1] == MAGIC_LOW) {
//重置readerIndex 位置,到魔数开始的地方。
buffer.readerIndex(buffer.readerIndex() - header.length i);
//把魔数开始位置,以前的数据,覆盖赋给header(下面调用super.decode解析)
header = Bytes.copyOf(header, i);
break;
}
}
//上层方法是dubbo telnet 编解码实现
return super.decode(channel, buffer, readable, header);
}
// check length.
if (readable < HEADER_LENGTH) {//小于16字节,返回 需要更多对象
return DecodeResult.NEED_MORE_INPUT;
}
//get data length.读取header[]最后四字节,构造一个int的数据,
//根据dubbo协议,这个是消息体的长度
int len = Bytes.bytes2int(header, 12);
//检查数据大小
checkPayload(channel, len);//默认为8M
int tt = len HEADER_LENGTH;//总的消息大小,消息头加消息实体
if (readable < tt) {//如果可读取的,不够消息总大小,就返回 需要更多数据
return DecodeResult.NEED_MORE_INPUT;
}
// limit input stream.这个时候,buffer的readerIndex位置已是,读完header后的位置,接下来的len长度的数据,全是消息体的数据。
ChannelBufferInputStream is = new ChannelBufferInputStream(buffer, len);
try {
//解码反序列化成Reqeust或者Response对象,decodeBody方法被子类DubboCodec重写了
//这里要看DubboCodec的decodeBody的方法
return decodeBody(channel, is, header);
} finally {
if (is.available() > 0) {
try {
if (logger.isWarnEnabled()) {
logger.warn("Skip input stream " is.available());
}
StreamUtils.skipUnusedStream(is);
} catch (IOException e) {
logger.warn(e.getMessage(), e);
}
}
}
}
DubboCodec类的decodeBody方法:
代码语言:javascript复制/***
* 解码,是从输出流 is 取字节数据,经反序列化,构造Request 和Response对象的过程。
* @param channel
* @param is
* @param header
* @return
* @throws IOException
*/
protected Object decodeBody(Channel channel, InputStream is, byte[] header) throws IOException {
//消息头第3字节和SERIALIZATION_MASK&操作后,就可以得到,序列化/反序列化方案
byte flag = header[2], proto = (byte) (flag & SERIALIZATION_MASK);
//根据序列化方案id,或者url指定,通过spi机制去获取序列化实现。dubbo协议默认用hession2序列化方案
//是放在消息头flag 里的。这里proto 值是2
//获取具体用序列化/反序列化实现
Serialization s = CodecSupport.getSerialization(channel.getUrl(), proto);
// get request id.header字节从第5到12 8个字节,是请求id
long id = Bytes.bytes2long(header, 4);
if ((flag & FLAG_REQUEST) == 0) {//根据flag&FLAG_REQUEST后,判断需要解码的消息类型
// decode response.
Response res = new Response(id);
if ((flag & FLAG_EVENT) != 0) {
res.setEvent(Response.HEARTBEAT_EVENT);
}
// get status. 获取响应 状态 成功,失败等
byte status = header[3];
res.setStatus(status);
if (status == Response.OK) {//返回结果状态 ok成功
try {
Object data;
if (res.isHeartbeat()) {
data = decodeHeartbeatData(channel, deserialize(s, channel.getUrl(), is));
} else if (res.isEvent()) {//事件消息,反序列化
data = decodeEventData(channel, deserialize(s, channel.getUrl(), is));
} else {//业务调用结果消息 解码构造 DecodeableRpcResult 对象的过程
DecodeableRpcResult result;
if (channel.getUrl().getParameter(
Constants.DECODE_IN_IO_THREAD_KEY,
Constants.DEFAULT_DECODE_IN_IO_THREAD)) {//是否在io 线程内解码
result = new DecodeableRpcResult(channel, res, is,
(Invocation) getRequestData(id), proto);
//!!!Response消息反序列化 就是把调用结果返回值 从is里反序列化出来,放在 DecodeableRpcResult类的result 字段的过程。
result.decode();
} else {
//不在io线程解码,要先通过readMessageData方法把调用结果数组取出后,
//放在UnsafeByteArrayInputStream对象,存在DecodeableRpcResult对象里,后续通过上层方法解码。
result = new DecodeableRpcResult(channel, res,
new UnsafeByteArrayInputStream(readMessageData(is)),
(Invocation) getRequestData(id), proto);
}
data = result;
}
//同时把DecodeableRpcResult对象放入Response result字段。
res.setResult(data);
} catch (Throwable t) {
if (log.isWarnEnabled()) {
log.warn("Decode response failed: " t.getMessage(), t);
}
//异常处理,设置status和异常信息
res.setStatus(Response.CLIENT_ERROR);
res.setErrorMessage(StringUtils.toString(t));
}
} else {
//异常处理,设置异常信息
res.setErrorMessage(deserialize(s, channel.getUrl(), is).readUTF());
}
return res;
} else {//反解码Requset 消息类型
// decode request.
Request req = new Request(id);
req.setVersion("2.0.0");
req.setTwoWay((flag & FLAG_TWOWAY) != 0);
if ((flag & FLAG_EVENT) != 0) {
req.setEvent(Request.HEARTBEAT_EVENT);
}
try {
Object data;
if (req.isHeartbeat()) {
data = decodeHeartbeatData(channel, deserialize(s, channel.getUrl(), is));
} else if (req.isEvent()) {
data = decodeEventData(channel, deserialize(s, channel.getUrl(), is));
} else {//业务调用请求消息 解码构造 DecodeableRpcInvocation 对象的过程
DecodeableRpcInvocation inv;
if (channel.getUrl().getParameter(
Constants.DECODE_IN_IO_THREAD_KEY,
Constants.DEFAULT_DECODE_IN_IO_THREAD)) {//是否 io 线程内解码
inv = new DecodeableRpcInvocation(channel, req, is, proto);
//!!!Requset 类型反序列化方法
inv.decode();
} else {
inv = new DecodeableRpcInvocation(channel, req,
new UnsafeByteArrayInputStream(readMessageData(is)), proto);
}
data = inv;
}
//同时把DecodeableRpcInvocation对象放入Request data字段。
req.setData(data);
} catch (Throwable t) {
if (log.isWarnEnabled()) {
log.warn("Decode request failed: " t.getMessage(), t);
}
// bad request 异常请求对象设置
req.setBroken(true);
req.setData(t);
}
return req;
}
}
/***
* 获取反序列化方案
* @param serialization
* @param url
* @param is
* @return
* @throws IOException
*/
private ObjectInput deserialize(Serialization serialization, URL url, InputStream is)
throws IOException {
return serialization.deserialize(url, is);
}
/***
* 读取is 里的可用数据
* @param is
* @return
* @throws IOException
*/
private byte[] readMessageData(InputStream is) throws IOException {
if (is.available() > 0) {
byte[] result = new byte[is.available()];
is.read(result);
return result;
}
return new byte[]{};
}
RPC调用请求:DecodeableRpcInvocation 类反序列化方法:
代码语言:javascript复制public void decode() throws Exception {
if (!hasDecoded && channel != null && inputStream != null) {
try {//具体在decode重载方法里
decode(channel, inputStream);
} catch (Throwable e) {//异常请求设置
if (log.isWarnEnabled()) {
log.warn("Decode rpc invocation failed: " e.getMessage(), e);
}
request.setBroken(true);
request.setData(e);
} finally {
hasDecoded = true;
}
}
}
/**
* 反序列化,解码 通过反序列化还原
* RpcInvocation 类的
* private String methodName;
private Class<?>[] parameterTypes;
private Object[] arguments;
private Map<String, String> attachments; 是个属性值,就像在客户端请求时设置的一样。
* @param channel channel.
* @param input input stream.
* @return
* @throws IOException
*/
public Object decode(Channel channel, InputStream input) throws IOException {
//获取反序列化方案
ObjectInput in = CodecSupport.getSerialization(channel.getUrl(), serializationType)
.deserialize(channel.getUrl(), input);
//反序列化出,dubbo版本,路径,服务版本信息,设置到attachments里,这读取顺序和序列化时的写入顺序也一致
//当然,序列化方案也一致。这里默认都是hissen2
//调用ObjectInput的readUTF()反序列化方法,依次获取调用信息
setAttachment(Constants.DUBBO_VERSION_KEY, in.readUTF());
setAttachment(Constants.PATH_KEY, in.readUTF());
setAttachment(Constants.VERSION_KEY, in.readUTF());
//读取方法名
setMethodName(in.readUTF());
//反序列化,方法请求参数类型,
try {
Object[] args;
Class<?>[] pts;
String desc = in.readUTF();
if (desc.length() == 0) {
pts = DubboCodec.EMPTY_CLASS_ARRAY;
args = DubboCodec.EMPTY_OBJECT_ARRAY;
} else {
pts = ReflectUtils.desc2classArray(desc);
args = new Object[pts.length];
for (int i = 0; i < args.length; i ) {
try {
//更具类型读取请求参数值
//调用ObjectInput的readObject()反序列化方法,反序列化出参数值
args[i] = in.readObject(pts[i]);
} catch (Exception e) {
if (log.isWarnEnabled()) {
log.warn("Decode argument failed: " e.getMessage(), e);
}
}
}
}
//设置保存请求参数类型
setParameterTypes(pts);
//反序列化,attachment map
//调用ObjectInput的readObject()反序列化方法,反序列化出attachemnet值
Map<String, String> map = (Map<String, String>) in.readObject(Map.class);
if (map != null && map.size() > 0) {
Map<String, String> attachment = getAttachments();
if (attachment == null) {
attachment = new HashMap<String, String>();
}
attachment.putAll(map);
setAttachments(attachment);
}
//decode argument ,may be callback 回调参数设置,这个再说。
for (int i = 0; i < args.length; i ) {
args[i] = decodeInvocationArgument(channel, this, pts, i, args[i]);
}
//保存请求参数值
setArguments(args);
} catch (ClassNotFoundException e) {
throw new IOException(StringUtils.toString("Read invocation data failed.", e));
}
return this;
}
RPC调用结果:DecodeableRpcResult 反序列化 方法decode()
代码语言:javascript复制public void decode() throws Exception {
if (!hasDecoded && channel != null && inputStream != null) {
try {//具体实现在decode(channel, inputStream)重载方法里
decode(channel, inputStream);
} catch (Throwable e) {//设置异常返回response
if (log.isWarnEnabled()) {
log.warn("Decode rpc result failed: " e.getMessage(), e);
}
response.setStatus(Response.CLIENT_ERROR);
response.setErrorMessage(StringUtils.toString(e));
} finally {
hasDecoded = true;
}
}
}
/***
* 反序列化,解码过程,读取input的调用结果字节数据,经反序列化成方法返回类型对象
* 并发放回结果设置到RpcResult的result字段里
* 以及异常返回字段的设置。
* @param channel channel.
* @param input input stream.
* @return
* @throws IOException
*/
public Object decode(Channel channel, InputStream input) throws IOException {
//获取反序列化方案
ObjectInput in = CodecSupport.getSerialization(channel.getUrl(), serializationType)
.deserialize(channel.getUrl(), input);
//反序列化出 结果标识 调用ObjectInput的readByte方法,反序列化出一个byte值
byte flag = in.readByte();//
switch (flag) {
case DubboCodec.RESPONSE_NULL_VALUE://null 值
break;
case DubboCodec.RESPONSE_VALUE: //非空值
try {
//根据invocation获取调用方法的放回类型
Type[] returnType = RpcUtils.getReturnTypes(invocation);
//根据返回类型,反序列出结果并这是到RpcResult 的result字段里。
//void 类型 结果值 是null ;int 等基本类型,自动装箱 Integer;
//具体调用ObjectInput的readObject重载的两个方法,反序列出结果对象
setValue(returnType == null || returnType.length == 0 ? in.readObject() :
(returnType.length == 1 ? in.readObject((Class<?>) returnType[0])
: in.readObject((Class<?>) returnType[0], returnType[1])));
} catch (ClassNotFoundException e) {
throw new IOException(StringUtils.toString("Read response data failed.", e));
}
break;
case DubboCodec.RESPONSE_WITH_EXCEPTION://异常信息反序列化,设置到exception字段
try {
//具体调用ObjectInput的readObject重载的两个方法,反序列出异常对象
Object obj = in.readObject();
if (obj instanceof Throwable == false)
throw new IOException("Response data error, expect Throwable, but get " obj);
setException((Throwable) obj);
} catch (ClassNotFoundException e) {
throw new IOException(StringUtils.toString("Read response data failed.", e));
}
break;
default:
throw new IOException("Unknown result flag, expect '0' '1' '2', get " flag);
}
return this;
}
下一篇再说说反序列化。