ByteToMessageDecoder分析

2019-12-16 17:59:42 浏览数 (1)

ByteToMessageDecoder

该方法提供了将 ByteBuf 转化为对象的解码器处理流程,具体的解码规则交由子类去实现。

我们以读操作 channelRead 为例来研究一下:

代码语言:javascript复制
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (msg instanceof ByteBuf) {
            // out 是一个链表,存放解码成功的对象
            CodecOutputList out = CodecOutputList.newInstance();
            try {
                ByteBuf data = (ByteBuf) msg;
                // cumulation 中存放的是上次未处理完的半包消息
                first = cumulation == null;
                if (first) {
                    cumulation = data;
                } else {
                    // 本次处理,需要把上次遗留的半包和本次数据拼接后,一起处理
                    cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);
                }
                // 调用解码器解码消息
                callDecode(ctx, cumulation, out);
            } catch (DecoderException e) {
                throw e;
            } catch (Exception e) {
                throw new DecoderException(e);
            } finally {
                if (cumulation != null && !cumulation.isReadable()) {
                    numReads = 0;
                    cumulation.release();
                    cumulation = null;
                } else if (   numReads >= discardAfterReads) {
                    numReads = 0;
                    discardSomeReadBytes();
                }

                int size = out.size();
                decodeWasNull = !out.insertSinceRecycled();
                // 如果有解码成功的数据,需要向后传递,让其他 ChannelHandler 继续处理
                fireChannelRead(ctx, out, size);
                out.recycle();
            }
        } else {
            ctx.fireChannelRead(msg);
        }
    }

    protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
        try {
            while (in.isReadable()) {
                int outSize = out.size();

                if (outSize > 0) {
                    // 如果有解码成功的数据,需要向后传递,让其他 ChannelHandler 继续处理
                    fireChannelRead(ctx, out, outSize);
                    out.clear();
                    // 如果当前 ChannelHandler 所属 ctx 被剔除 pipeline 上下文,就不需要继续处理了
                    if (ctx.isRemoved()) {
                        break;
                    }
                    outSize = 0;
                }

                int oldInputLength = in.readableBytes();
                // 解码
                decodeRemovalReentryProtection(ctx, in, out);

                if (ctx.isRemoved()) {
                    break;
                }

                if (outSize == out.size()) {
                    if (oldInputLength == in.readableBytes()) {
                        break;
                    } else {
                        continue;
                    }
                }

                if (oldInputLength == in.readableBytes()) {
                    throw new DecoderException(
                            StringUtil.simpleClassName(getClass())  
                                    ".decode() did not read anything but decoded a message.");
                }

                if (isSingleDecode()) {
                    break;
                }
            }
        } catch (DecoderException e) {
            throw e;
        } catch (Exception cause) {
            throw new DecoderException(cause);
        }
    }

    final void decodeRemovalReentryProtection(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)
            throws Exception {
        // 设置解码器状态为正在解码,避免解码过程中另一个线程调用了 handlerRemoved 把数据销毁,造成混乱
        decodeState = STATE_CALLING_CHILD_DECODE;
        try {
            decode(ctx, in, out);
        } finally {
            // STATE_HANDLER_REMOVED_PENDING 表示在解码过程中,ctx 被移除,需要由当前线程来调用 handlerRemoved 
            boolean removePending = decodeState == STATE_HANDLER_REMOVED_PENDING;
            decodeState = STATE_INIT;
            if (removePending) {
                handlerRemoved(ctx);
            }
        }
    }

    // 具体的消息解码算法,交给子类实现
    protected abstract void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception;    

0 人点赞