ByteToMessageDecoder
该方法提供了将 ByteBuf 转化为对象的解码器处理流程,具体的解码规则交由子类去实现。
我们以读操作 channelRead 为例来研究一下:
代码语言:javascript复制public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof ByteBuf) {
// out 是一个链表,存放解码成功的对象
CodecOutputList out = CodecOutputList.newInstance();
try {
ByteBuf data = (ByteBuf) msg;
// cumulation 中存放的是上次未处理完的半包消息
first = cumulation == null;
if (first) {
cumulation = data;
} else {
// 本次处理,需要把上次遗留的半包和本次数据拼接后,一起处理
cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);
}
// 调用解码器解码消息
callDecode(ctx, cumulation, out);
} catch (DecoderException e) {
throw e;
} catch (Exception e) {
throw new DecoderException(e);
} finally {
if (cumulation != null && !cumulation.isReadable()) {
numReads = 0;
cumulation.release();
cumulation = null;
} else if ( numReads >= discardAfterReads) {
numReads = 0;
discardSomeReadBytes();
}
int size = out.size();
decodeWasNull = !out.insertSinceRecycled();
// 如果有解码成功的数据,需要向后传递,让其他 ChannelHandler 继续处理
fireChannelRead(ctx, out, size);
out.recycle();
}
} else {
ctx.fireChannelRead(msg);
}
}
protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
try {
while (in.isReadable()) {
int outSize = out.size();
if (outSize > 0) {
// 如果有解码成功的数据,需要向后传递,让其他 ChannelHandler 继续处理
fireChannelRead(ctx, out, outSize);
out.clear();
// 如果当前 ChannelHandler 所属 ctx 被剔除 pipeline 上下文,就不需要继续处理了
if (ctx.isRemoved()) {
break;
}
outSize = 0;
}
int oldInputLength = in.readableBytes();
// 解码
decodeRemovalReentryProtection(ctx, in, out);
if (ctx.isRemoved()) {
break;
}
if (outSize == out.size()) {
if (oldInputLength == in.readableBytes()) {
break;
} else {
continue;
}
}
if (oldInputLength == in.readableBytes()) {
throw new DecoderException(
StringUtil.simpleClassName(getClass())
".decode() did not read anything but decoded a message.");
}
if (isSingleDecode()) {
break;
}
}
} catch (DecoderException e) {
throw e;
} catch (Exception cause) {
throw new DecoderException(cause);
}
}
final void decodeRemovalReentryProtection(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)
throws Exception {
// 设置解码器状态为正在解码,避免解码过程中另一个线程调用了 handlerRemoved 把数据销毁,造成混乱
decodeState = STATE_CALLING_CHILD_DECODE;
try {
decode(ctx, in, out);
} finally {
// STATE_HANDLER_REMOVED_PENDING 表示在解码过程中,ctx 被移除,需要由当前线程来调用 handlerRemoved
boolean removePending = decodeState == STATE_HANDLER_REMOVED_PENDING;
decodeState = STATE_INIT;
if (removePending) {
handlerRemoved(ctx);
}
}
}
// 具体的消息解码算法,交给子类实现
protected abstract void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception;