1 Consumer编码
由之前的文章可知,Consumer最终在生成DubboInvoker时,会生成对应的客户端连接,如下
代码语言:javascript复制public <T> Invoker<T> protocolBindingRefer(Class<T> serviceType, URL url) throws RpcException {
checkDestroyed();
optimizeSerialization(url);
// create rpc invoker.
DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
invokers.add(invoker);
return invoker;
}
看到通过getClients(url)生成客户端,最终通过Transporters的connnect方法获取连接,如下
代码语言:javascript复制public static Client connect(URL url, ChannelHandler... handlers) throws RemotingException {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
ChannelHandler handler;
if (handlers == null || handlers.length == 0) {
handler = new ChannelHandlerAdapter();
} else if (handlers.length == 1) {
handler = handlers[0];
} else {
handler = new ChannelHandlerDispatcher(handlers);
}
return getTransporter(url).connect(url, handler);
}
public static Transporter getTransporter(URL url) {
return url.getOrDefaultFrameworkModel().getExtensionLoader(Transporter.class).getAdaptiveExtension();
}
可以看出同样首先是获取适配类,通过适配类的connect方法获取连接,生成的适配类源码如下
代码语言:javascript复制public class Transporter$Adaptive implements org.apache.dubbo.remoting.Transporter {
public org.apache.dubbo.remoting.Client connect(org.apache.dubbo.common.URL arg0, org.apache.dubbo.remoting.ChannelHandler arg1) throws org.apache.dubbo.remoting.RemotingException {
if (arg0 == null) throw new IllegalArgumentException("url == null");
org.apache.dubbo.common.URL url = arg0;
String extName = url.getParameter("client", url.getParameter("transporter", "netty"));
if (extName == null)
throw new IllegalStateException("Failed to get extension (org.apache.dubbo.remoting.Transporter) name from url (" url.toString() ") use keys([client, transporter])");
ScopeModel scopeModel = ScopeModelUtil.getOrDefault(url.getScopeModel(), org.apache.dubbo.remoting.Transporter.class);
org.apache.dubbo.remoting.Transporter extension = (org.apache.dubbo.remoting.Transporter) scopeModel.getExtensionLoader(org.apache.dubbo.remoting.Transporter.class).getExtension(extName);
return extension.connect(arg0, arg1);
}
public org.apache.dubbo.remoting.RemotingServer bind(org.apache.dubbo.common.URL arg0, org.apache.dubbo.remoting.ChannelHandler arg1) throws org.apache.dubbo.remoting.RemotingException {
if (arg0 == null) throw new IllegalArgumentException("url == null");
org.apache.dubbo.common.URL url = arg0;
String extName = url.getParameter("server", url.getParameter("transporter", "netty"));
if (extName == null)
throw new IllegalStateException("Failed to get extension (org.apache.dubbo.remoting.Transporter) name from url (" url.toString() ") use keys([server, transporter])");
ScopeModel scopeModel = ScopeModelUtil.getOrDefault(url.getScopeModel(), org.apache.dubbo.remoting.Transporter.class);
org.apache.dubbo.remoting.Transporter extension = (org.apache.dubbo.remoting.Transporter) scopeModel.getExtensionLoader(org.apache.dubbo.remoting.Transporter.class).getExtension(extName);
return extension.bind(arg0, arg1);
}
}
如上,在其connect方法中获取对应的Extension,由于设置的是netty,所有这里获取到的NettyTransporter ,最后在NettyTransporter中通过connect方法新建了一个NettyClient
代码语言:javascript复制public Client connect(URL url, ChannelHandler handler) throws RemotingException {
return new NettyClient(url, handler);
}
在NettyClient的构造器中什么也没干,直接调用父类AbstractClient的构造器方法,如下,核心方法调用了父类AbstractEndpoint的构造器方法,doOpen以及doConnect,下面依次来来看看
代码语言:javascript复制public AbstractClient(URL url, ChannelHandler handler) throws RemotingException {
super(url, handler);
......
try {
doOpen();
} catch (Throwable t) {
......
}
try {
// connect.
connect();
......
} catch (RemotingException t) {
......
} catch (Throwable t) {
......
}
}
super(url, handler),主要是通过父类AbstractEndpoint的getChannelCodec方法获取对应的编解码器,如下
代码语言:javascript复制public AbstractEndpoint(URL url, ChannelHandler handler) {
super(url, handler);
this.codec = getChannelCodec(url);
this.connectTimeout = url.getPositiveParameter(Constants.CONNECT_TIMEOUT_KEY, Constants.DEFAULT_CONNECT_TIMEOUT);
}
protected static Codec2 getChannelCodec(URL url) {
String codecName = url.getParameter(Constants.CODEC_KEY);
if (StringUtils.isEmpty(codecName)) {
// codec extension name must stay the same with protocol name
codecName = url.getProtocol();
}
FrameworkModel frameworkModel = getFrameworkModel(url.getScopeModel());
if (frameworkModel.getExtensionLoader(Codec2.class).hasExtension(codecName)) {
return frameworkModel.getExtensionLoader(Codec2.class).getExtension(codecName);
} else if (frameworkModel.getExtensionLoader(Codec.class).hasExtension(codecName)) {
return new CodecAdapter(frameworkModel.getExtensionLoader(Codec.class)
.getExtension(codecName));
} else {
return frameworkModel.getExtensionLoader(Codec2.class).getExtension("default");
}
}
由于默认是dubbo协议,所以这里获取到的是DubboCountCodec,如下
代码语言:javascript复制public final class DubboCountCodec implements Codec2 {
private final DubboCodec codec;
public DubboCountCodec(FrameworkModel frameworkModel) {
codec = new DubboCodec(frameworkModel);
}
@Override
public void encode(Channel channel, ChannelBuffer buffer, Object msg) throws IOException {
if (msg instanceof MultiMessage) {
MultiMessage multiMessage = (MultiMessage) msg;
for (Object singleMessage : multiMessage) {
codec.encode(channel, buffer, singleMessage);
}
} else {
codec.encode(channel, buffer, msg);
}
}
@Override
public Object decode(Channel channel, ChannelBuffer buffer) throws IOException {
int save = buffer.readerIndex();
MultiMessage result = MultiMessage.create();
do {
Object obj = codec.decode(channel, buffer);
if (Codec2.DecodeResult.NEED_MORE_INPUT == obj) {
buffer.readerIndex(save);
break;
} else {
result.addMessage(obj);
logMessageLength(obj, buffer.readerIndex() - save);
save = buffer.readerIndex();
}
} while (true);
if (result.isEmpty()) {
return Codec2.DecodeResult.NEED_MORE_INPUT;
}
if (result.size() == 1) {
return result.get(0);
}
return result;
}
}
这里获取的DubboCountCodec也不是最终的编解码器,从上面的代码中可以看到最终是通过DubboCodec来处理编解码
doOpen(),主要是在获取到编解码器之后再初始化客户端的Bootstrap
代码语言:javascript复制protected void doOpen() throws Throwable {
final NettyClientHandler nettyClientHandler = createNettyClientHandler();
bootstrap = new Bootstrap();
initBootstrap(nettyClientHandler);
}
protected void initBootstrap(NettyClientHandler nettyClientHandler) {
bootstrap.group(EVENT_LOOP_GROUP.get())
.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
//.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getTimeout())
.channel(socketChannelClass());
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, Math.max(DEFAULT_CONNECT_TIMEOUT, getConnectTimeout()));
SslContext sslContext = SslContexts.buildClientSslContext(getUrl());
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
int heartbeatInterval = UrlUtils.getHeartbeat(getUrl());
if (sslContext != null) {
ch.pipeline().addLast("negotiation", new SslClientTlsHandler(sslContext));
}
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this);
ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
.addLast("decoder", adapter.getDecoder())
.addLast("encoder", adapter.getEncoder())
.addLast("client-idle-handler", new IdleStateHandler(heartbeatInterval, 0, 0, MILLISECONDS))
.addLast("handler", nettyClientHandler);
String socksProxyHost = ConfigurationUtils.getProperty(getUrl().getOrDefaultApplicationModel(), SOCKS_PROXY_HOST);
if (socksProxyHost != null && !isFilteredAddress(getUrl().getHost())) {
int socksProxyPort = Integer.parseInt(ConfigurationUtils.getProperty(getUrl().getOrDefaultApplicationModel(), SOCKS_PROXY_PORT, DEFAULT_SOCKS_PROXY_PORT));
Socks5ProxyHandler socks5ProxyHandler = new Socks5ProxyHandler(new InetSocketAddress(socksProxyHost, socksProxyPort));
ch.pipeline().addFirst(socks5ProxyHandler);
}
}
});
}
可以看到在channel的pipeline中,如果没有ssl的话,那第一步就是添加编解码器,这里添加的时候又通过NettyCodecAdapter封装成继承了Netty编解码类的codec,分别是InternalEncoder和InternalDecoder
代码语言:javascript复制final public class NettyCodecAdapter {
private final ChannelHandler encoder = new InternalEncoder();
private final ChannelHandler decoder = new InternalDecoder();
private final Codec2 codec;
private final URL url;
private final org.apache.dubbo.remoting.ChannelHandler handler;
public NettyCodecAdapter(Codec2 codec, URL url, org.apache.dubbo.remoting.ChannelHandler handler) {
this.codec = codec;
this.url = url;
this.handler = handler;
}
public ChannelHandler getEncoder() {
return encoder;
}
public ChannelHandler getDecoder() {
return decoder;
}
private class InternalEncoder extends MessageToByteEncoder {
@Override
protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception {
boolean encoded = false;
if (msg instanceof ByteBuf) {
out.writeBytes(((ByteBuf) msg));
encoded = true;
} else if (msg instanceof MultiMessage) {
for (Object singleMessage : ((MultiMessage) msg)) {
if (singleMessage instanceof ByteBuf) {
ByteBuf buf = (ByteBuf) singleMessage;
out.writeBytes(buf);
encoded = true;
buf.release();
}
}
}
if (!encoded) {
ChannelBuffer buffer = new NettyBackedChannelBuffer(out);
Channel ch = ctx.channel();
NettyChannel channel = NettyChannel.getOrAddChannel(ch, url, handler);
codec.encode(channel, buffer, msg);
}
}
}
private class InternalDecoder extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf input, List<Object> out) throws Exception {
ChannelBuffer message = new NettyBackedChannelBuffer(input);
NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
// decode object.
do {
int saveReaderIndex = message.readerIndex();
Object msg = codec.decode(channel, message);
if (msg == Codec2.DecodeResult.NEED_MORE_INPUT) {
message.readerIndex(saveReaderIndex);
break;
} else {
//is it possible to go here ?
if (saveReaderIndex == message.readerIndex()) {
throw new IOException("Decode without read data.");
}
if (msg != null) {
out.add(msg);
}
}
} while (message.readable());
}
}
}
InternalEncoder和InternalDecoder分别继承了Netty的MessageToByteEncoder和ByteToMessageDecoder
最后再来看看connect(),最终是通过调用NettyClient的doConnect连新建连接,详细步骤下面代码中有注释
代码语言:javascript复制private void doConnect(InetSocketAddress serverAddress) throws RemotingException {
long start = System.currentTimeMillis();
ChannelFuture future = bootstrap.connect(serverAddress);
try {
// 根据链接超时时间等待对应的时间
boolean ret = future.awaitUninterruptibly(getConnectTimeout(), MILLISECONDS);
if (ret && future.isSuccess()) {
// 获取到新建立连接的Channel
Channel newChannel = future.channel();
try {
// 如果存在老的Channel,则将老的channel关掉
// 并且从NettyChannel的ChannelMap中删除
Channel oldChannel = NettyClient.this.channel;
if (oldChannel != null) {
try {
if (logger.isInfoEnabled()) {
logger.info("Close old netty channel " oldChannel " on create new netty channel " newChannel);
}
oldChannel.close();
} finally {
NettyChannel.removeChannelIfDisconnected(oldChannel);
}
}
} finally {
// 如果当前client是关闭状态,那么需要关闭新建立的链接
// 并且从NettyChannel的ChannelMap中删除
if (NettyClient.this.isClosed()) {
try {
if (logger.isInfoEnabled()) {
logger.info("Close new netty channel " newChannel ", because the client closed.");
}
newChannel.close();
} finally {
NettyClient.this.channel = null;
NettyChannel.removeChannelIfDisconnected(newChannel);
}
} else {
NettyClient.this.channel = newChannel;
}
}
} else if (future.cause() != null) {
Throwable cause = future.cause();
// 6-1 Failed to connect to provider server by other reason.
RemotingException remotingException = new RemotingException(this, "client(url: " getUrl() ") failed to connect to server "
serverAddress ", error message is:" cause.getMessage(), cause);
logger.error(TRANSPORT_FAILED_CONNECT_PROVIDER, "network disconnected", "",
"Failed to connect to provider server by other reason.", cause);
throw remotingException;
} else {
// 6-2 Client-side timeout
RemotingException remotingException = new RemotingException(this, "client(url: " getUrl() ") failed to connect to server "
serverAddress " client-side timeout "
getConnectTimeout() "ms (elapsed: " (System.currentTimeMillis() - start) "ms) from netty client "
NetUtils.getLocalHost() " using dubbo version " Version.getVersion());
logger.error(TRANSPORT_CLIENT_CONNECT_TIMEOUT, "provider crash", "",
"Client-side timeout.", remotingException);
throw remotingException;
}
} finally {
// 判断下链接是否建立,这里判断了之后啥也没做
if (!isConnected()) {
//future.cancel(true);
}
}
}
再来看看上面代码最后一步isConnected方法干了啥
代码语言:javascript复制public boolean isConnected() {
// 获取channel,返回isConnected,这里获取的channel是一个NettyChannel实例,对原生的channel做了
// 一层封装
Channel channel = getChannel();
if (channel == null) {
return false;
}
return channel.isConnected();
}
在看看getChannel()方法
代码语言:javascript复制protected org.apache.dubbo.remoting.Channel getChannel() {
Channel c = channel;
if (c == null) {
return null;
}
// 这里其实就是新建了一个NettyChannel,并且将原生的channel作为Key,
// NettyChannel为value放在了全局的ChannelMap中
return NettyChannel.getOrAddChannel(c, getUrl(), this);
}
新建NettyChannel的代码如下,在新建时设置了当前协议的编解码器和writequeue
代码语言:javascript复制private NettyChannel(Channel channel, URL url, ChannelHandler handler) {
super(url, handler);
if (channel == null) {
throw new IllegalArgumentException("netty channel == null;");
}
this.channel = channel;
this.writeQueue = Netty4BatchWriteQueue.createWriteQueue(channel);
this.codec = getChannelCodec(url);
this.encodeInIOThread = getUrl().getParameter(ENCODE_IN_IO_THREAD_KEY, DEFAULT_ENCODE_IN_IO_THREAD);
}
这里先提出一个问题,在上文中channel的pipeline初始化时已经添加了编解码器,为啥封装的NettyChannel还要设置编解码器?下面就来看看Consumer端是如何将请求发出去的,这里紧接着Dubbo系列三这篇文章中Consumer发送请求结尾处发送请求走到HeaderExchangeChannel.request方法,在这个方法中调用了NettyClient的send方法,如下
代码语言:javascript复制public void send(Object message, boolean sent) throws RemotingException {
if (needReconnect && !isConnected()) {
connect();
}
Channel channel = getChannel();
//TODO Can the value returned by getChannel() be null? need improvement.
if (channel == null || !channel.isConnected()) {
throw new RemotingException(this, "message can not send, because channel is closed . url:" getUrl());
}
channel.send(message, sent);
}
这里先通过getChannel()方法获取到Channel,这里的Channel就是刚才所封装的NettyChannel,再通过其send方法将数据发送出去,如下
代码语言:javascript复制public void send(Object message, boolean sent) throws RemotingException {
// 先调用父类的send方法,啥也没干,就打了一行日志
super.send(message, sent);
boolean success = true;
int timeout = 0;
try {
Object outputMessage = message;
// 先判断是不是需要在IO线程中进行编码,如果不是,那么就在当前的业务线程中编码
// 这里解释下刚才的问题,之所以要在NettyChannel设置编解码器,就是为了提前编码,
// 这样在Netty的IO线程中就不需要再编码了,直接发送数据
if (!encodeInIOThread) {
ByteBuf buf = channel.alloc().buffer();
ChannelBuffer buffer = new NettyBackedChannelBuffer(buf);
codec.encode(this, buffer, message);
outputMessage = buf;
}
// 然后将要发送的数据添加到writeQueue中,一个channel对应一个writeQueue,所以dubbo也不是直接
// 就把数据发出去,而是先添加到writeQueue中,方便后续批量发送的逻辑处理
// 这里同时添加了一个监听器,当数据发送后(成功或者失败)进行回调,代码跟进去其实啥也没干,是个空实现
ChannelFuture future = writeQueue.enqueue(outputMessage).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!(message instanceof Request)) {
return;
}
ChannelHandler handler = getChannelHandler();
if (future.isSuccess()) {
handler.sent(NettyChannel.this, message);
} else {
Throwable t = future.cause();
if (t == null) {
return;
}
Response response = buildErrorResponse((Request) message, t);
handler.received(NettyChannel.this, response);
}
}
});
if (sent) {
// wait timeout ms
timeout = getUrl().getPositiveParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT);
success = future.await(timeout);
}
Throwable cause = future.cause();
if (cause != null) {
throw cause;
}
} catch (Throwable e) {
removeChannelIfDisconnected(channel);
throw new RemotingException(this, "Failed to send message " PayloadDropper.getRequestWithoutData(message) " to " getRemoteAddress() ", cause: " e.getMessage(), e);
}
if (!success) {
throw new RemotingException(this, "Failed to send message " PayloadDropper.getRequestWithoutData(message) " to " getRemoteAddress()
"in timeout(" timeout "ms) limit");
}
}
再来看看writeQueue.enqueue(outputMessage)方法,直接通过channel.newPromise()返回了一个Promise,用于监听
代码语言:javascript复制public ChannelFuture enqueue(Object message) {
return enqueue(message, channel.newPromise());
}
public ChannelFuture enqueue(Object message, ChannelPromise channelPromise) {
MessageTuple messageTuple = new MessageTuple(message, channelPromise);
super.enqueue(messageTuple, eventLoop);
return messageTuple.channelPromise;
}
以上代码中又调用父类的super.enqueue(messageTuple, eventLoop)方法,如下
代码语言:javascript复制public void enqueue(T message, Executor executor) {
queue.add(message);
scheduleFlush(executor);
}
可以看到是先将待发送的数据加入到一个队列当中,然后再调用scheduleFlush(executor)处理,这里的入参executor是一个netty的eventloop,eventloop不仅可以作为netty的IO线程用,也可以作为普通的线程使用来提交异步任务,和普通线程池使用没啥区别,看下scheduleFlush(executor)方法的逻辑
代码语言:javascript复制// 向eventloop中提交了一个异步任务,任务为run方法的逻辑
protected void scheduleFlush(Executor executor) {
if (scheduled.compareAndSet(false, true)) {
executor.execute(() -> this.run(executor));
}
}
// 一个item其实就是一个MessageTuple,MessageTuple包含了要发送的数据以及对应的Promise
// 以下代码的逻辑就是依次从queue中获取待发送的数据,如果数据只有一条直接调用flush将数据发送出去
// 如果数据不止一条就添加到MultiMessage中,同时将数据的promise也添加到MultiMessage中
// 如果添加的数据条数达到限制(默认是128条),就将数据批量发送出去
private void run(Executor executor) {
try {
Queue<T> snapshot = new LinkedList<>();
T item;
while ((item = queue.poll()) != null) {
snapshot.add(item);
}
int i = 0;
boolean flushedOnce = false;
while ((item = snapshot.poll()) != null) {
if (snapshot.size() == 0) {
flushedOnce = false;
break;
}
if (i == chunkSize) {
i = 0;
flush(item);
flushedOnce = true;
} else {
prepare(item);
i ;
}
}
if (!flushedOnce && item != null) {
flush(item);
}
} finally {
scheduled.set(false);
if (!queue.isEmpty()) {
scheduleFlush(executor);
}
}
}
再来看看flush(item)的逻辑
代码语言:javascript复制protected void flush(MessageTuple item) {
prepare(item);
Object finalMessage = multiMessage;
if (multiMessage.size() == 1) {
finalMessage = multiMessage.get(0);
}
// 这里就真正调用netty的channel的writeAndFlush方法将数据发送出去,同时添加了一个Listener
// 将发送结果设置到所有已发送数据的promise当中
channel.writeAndFlush(finalMessage).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
ChannelPromise cp;
while ((cp = promises.poll()) != null) {
if (future.isSuccess()){
cp.setSuccess();
} else {
cp.setFailure(future.cause());
}
}
}
});
this.multiMessage.removeMessages();
}
数据发送出去以后就走到InternalEncoder的encode方法,由于我设置的是在IO线程中编解码,所以这里会对数据进行编码
代码语言:javascript复制protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception {
boolean encoded = false;
// 如果数据是单条ByteBuf,直接写入out即可
if (msg instanceof ByteBuf) {
// 这里的ByteBuf为什么不需要手动release,因为在父类MessageToByteEncoder的write方法会release
out.writeBytes(((ByteBuf) msg));
encoded = true;
} else if (msg instanceof MultiMessage) {
// 如果数据是MultiMessage,则遍历将数据写入out
for (Object singleMessage : ((MultiMessage) msg)) {
if (singleMessage instanceof ByteBuf) {
ByteBuf buf = (ByteBuf) singleMessage;
out.writeBytes(buf);
encoded = true;
// 为啥手动调了一次release,因为如果是MultiMessage并且是ByteBuf,说明在业务线程已经编码
// 过了,所以这里的buf是通过channel.alloc().buffer()手动新建出来的,这里就要手动release
buf.release();
}
}
}
// 如果没有编码过,这里就先编码再写数据
if (!encoded) {
ChannelBuffer buffer = new NettyBackedChannelBuffer(out);
Channel ch = ctx.channel();
NettyChannel channel = NettyChannel.getOrAddChannel(ch, url, handler);
codec.encode(channel, buffer, msg);
}
}
接着看下codec.encode(channel, buffer, msg)方法,由于本地只发了一次请求,所以最终会走到codec的encodeRequest方法,如下
代码语言:javascript复制protected void encodeRequest(Channel channel, ChannelBuffer buffer, Request req) throws IOException {
// 获取序列化方式,默认是fastjson2
Serialization serialization = getSerialization(channel, req);
// header length,16 byte
byte[] header = new byte[HEADER_LENGTH];
// header写入magic number,2 byte
Bytes.short2bytes(MAGIC, header);
// header设置request和序列化方式标志,1 byte
header[2] = (byte) (FLAG_REQUEST | serialization.getContentTypeId());
if (req.isTwoWay()) {
header[2] |= FLAG_TWOWAY;
}
if (req.isEvent()) {
header[2] |= FLAG_EVENT;
}
// header设置requestId,8 byte
Bytes.long2bytes(req.getId(), header, 4);
// 设置ByteBuf的writerIndex为当前的writerIndex HEADER_LENGTH,主要为了保证后续的数据写入从
// header后面开始写,给header 的写入留下足够的空间
int savedWriteIndex = buffer.writerIndex();
buffer.writerIndex(savedWriteIndex HEADER_LENGTH);
// 通过ChannelBufferOutputStream编码数据,并且将数据写入buffer
ChannelBufferOutputStream bos = new ChannelBufferOutputStream(buffer);
if (req.isHeartbeat()) {
// heartbeat request data is always null
bos.write(CodecSupport.getNullBytesOf(serialization));
} else {
ObjectOutput out = serialization.serialize(channel.getUrl(), bos);
if (req.isEvent()) {
encodeEventData(channel, out, req.getData());
} else {
encodeRequestData(channel, out, req.getData(), req.getVersion());
}
out.flushBuffer();
if (out instanceof Cleanable) {
((Cleanable) out).cleanup();
}
}
// 这两个方法啥也没做
bos.flush();
bos.close();
// header写入数据的长度,4 byte,所以这里Header其实一共只用了15 byte,还有一个byte空着的
int len = bos.writtenBytes();
checkPayload(channel, req.getPayload(), len);
Bytes.int2bytes(len, header, 12);
// 首先重置writerIndex为header之前的index,然后再写入header,最后设置
// buffer的writerIndex为savedWriteIndex HEADER_LENGTH 数据的长度
buffer.writerIndex(savedWriteIndex);
buffer.writeBytes(header); // write header.
buffer.writerIndex(savedWriteIndex HEADER_LENGTH len);
}
从上面可以看出dubbo协议的设计还是比较简单的,header的前4个字节存储魔数,request和序列化标记等(其实只用了三个字节,留了一个字节备用),中间8个字节保存RequestId或者ResponseId,最后4个字节保存数据包的长度,然后再跟上真实的数据字节
2 Provider解码
provider接收到请求后第一步就是解码,看看InternalDecoder
代码语言:javascript复制private class InternalDecoder extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf input, List<Object> out) throws Exception {
ChannelBuffer message = new NettyBackedChannelBuffer(input);
NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
// decode object.
do {
int saveReaderIndex = message.readerIndex();
Object msg = codec.decode(channel, message);
if (msg == Codec2.DecodeResult.NEED_MORE_INPUT) {
message.readerIndex(saveReaderIndex);
break;
} else {
//is it possible to go here ?
if (saveReaderIndex == message.readerIndex()) {
throw new IOException("Decode without read data.");
}
if (msg != null) {
out.add(msg);
}
}
} while (message.readable());
}
}
decode的逻辑比较简单,就是当Bytebuf可读时循环调用codec.decode(channel, message)解析Bytebuf,如果数据不够一个完整的数据包就退出循环,下面看下codec.decode(channel, message)方法
代码语言:javascript复制public Object decode(Channel channel, ChannelBuffer buffer) throws IOException {
int save = buffer.readerIndex();
MultiMessage result = MultiMessage.create();
do {
Object obj = codec.decode(channel, buffer);
if (Codec2.DecodeResult.NEED_MORE_INPUT == obj) {
buffer.readerIndex(save);
break;
} else {
result.addMessage(obj);
logMessageLength(obj, buffer.readerIndex() - save);
save = buffer.readerIndex();
}
} while (true);
if (result.isEmpty()) {
return Codec2.DecodeResult.NEED_MORE_INPUT;
}
if (result.size() == 1) {
return result.get(0);
}
return result;
}
同样也是循环解析,如果有多条消息,则封装成一个MultiMessage返回,再往下看看真正decode的代码
代码语言:javascript复制protected Object decode(Channel channel, ChannelBuffer buffer, int readable, byte[] header) throws IOException {
// 如果header不是以魔数开头,需要调用父类的decode方法处理
// 主要是为了处理一些特殊的命令行
if (readable > 0 && header[0] != MAGIC_HIGH
|| readable > 1 && header[1] != MAGIC_LOW) {
int length = header.length;
if (header.length < readable) {
header = Bytes.copyOf(header, readable);
buffer.readBytes(header, length, readable - length);
}
for (int i = 1; i < header.length - 1; i ) {
if (header[i] == MAGIC_HIGH && header[i 1] == MAGIC_LOW) {
buffer.readerIndex(buffer.readerIndex() - header.length i);
header = Bytes.copyOf(header, i);
break;
}
}
return super.decode(channel, buffer, readable, header);
}
// 如果可读数小于header的长度,返回需要更多数据
if (readable < HEADER_LENGTH) {
return DecodeResult.NEED_MORE_INPUT;
}
// get data length.
int len = Bytes.bytes2int(header, 12);
// 如果收到的是响应且数据包大小超过限制,则直接构造一个Response返回,并设置错误信息
Object obj = finishRespWhenOverPayload(channel, len, header);
if (null != obj) {
return obj;
}
// 如果数据包长度不够,同样返回需要更多数据
int tt = len HEADER_LENGTH;
if (readable < tt) {
return DecodeResult.NEED_MORE_INPUT;
}
// limit input stream.
ChannelBufferInputStream is = new ChannelBufferInputStream(buffer, len);
try {
// 解析数据包
return decodeBody(channel, is, header);
} finally {
if (is.available() > 0) {
try {
if (logger.isWarnEnabled()) {
logger.warn(TRANSPORT_SKIP_UNUSED_STREAM, "", "", "Skip input stream " is.available());
}
StreamUtils.skipUnusedStream(is);
} catch (IOException e) {
logger.warn(TRANSPORT_SKIP_UNUSED_STREAM, "", "", e.getMessage(), e);
}
}
}
}
// decodeBody其实就是根据数据类型解析,如果是请求就构造一个Request,按照Request的格式解析数据包
// Reponse和Event一样的逻辑
protected Object decodeBody(Channel channel, InputStream is, byte[] header) throws IOException {
byte flag = header[2], proto = (byte) (flag & SERIALIZATION_MASK);
// get request id.
long id = Bytes.bytes2long(header, 4);
if ((flag & FLAG_REQUEST) == 0) {
// decode response.
Response res = new Response(id);
if ((flag & FLAG_EVENT) != 0) {
res.setEvent(true);
}
// get status.
byte status = header[3];
res.setStatus(status);
try {
if (status == Response.OK) {
Object data;
if (res.isEvent()) {
byte[] eventPayload = CodecSupport.getPayload(is);
if (CodecSupport.isHeartBeat(eventPayload, proto)) {
// heart beat response data is always null;
data = null;
} else {
data = decodeEventData(channel, CodecSupport.deserialize(channel.getUrl(), new ByteArrayInputStream(eventPayload), proto), eventPayload);
}
} else {
data = decodeResponseData(channel, CodecSupport.deserialize(channel.getUrl(), is, proto), getRequestData(channel, res, id));
}
res.setResult(data);
} else {
res.setErrorMessage(CodecSupport.deserialize(channel.getUrl(), is, proto).readUTF());
}
} catch (Throwable t) {
res.setStatus(Response.CLIENT_ERROR);
res.setErrorMessage(StringUtils.toString(t));
}
return res;
} else {
// decode request.
Request req;
try {
Object data;
if ((flag & FLAG_EVENT) != 0) {
byte[] eventPayload = CodecSupport.getPayload(is);
if (CodecSupport.isHeartBeat(eventPayload, proto)) {
// heart beat response data is always null;
req = new HeartBeatRequest(id);
((HeartBeatRequest) req).setProto(proto);
data = null;
} else {
req = new Request(id);
data = decodeEventData(channel, CodecSupport.deserialize(channel.getUrl(), new ByteArrayInputStream(eventPayload), proto), eventPayload);
}
req.setEvent(true);
} else {
req = new Request(id);
data = decodeRequestData(channel, CodecSupport.deserialize(channel.getUrl(), is, proto));
}
req.setData(data);
} catch (Throwable t) {
// bad request
req = new Request(id);
req.setBroken(true);
req.setData(t);
}
req.setVersion(Version.getProtocolVersion());
req.setTwoWay((flag & FLAG_TWOWAY) != 0);
return req;
}
}
这样解析到数据之后就交给业务handler处理,处理完成之后将结果encode一遍返回给消费端,消费端又decode获取结果