Netty之bossGroup接收请求转给workGroup

2022-09-23 11:18:18 浏览数 (1)

    bossGroup和workGroup是分开的,bossGroup负责accept请求,而workGroup负责read/write事件,bossGroup accept之后转交给workGroup具体是怎么实现的呢。

BossGroup

    NioEventLoop#processSelectedKey()方法中如下List-1,当bossGroup收到accept事件后,调用unsafe.read()

List-1

代码语言:javascript复制
// Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
// to a spin loop
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
    unsafe.read();
}

    AbstractNioMessageChannel.NioMessageUnsafe.read(),如下List-2,1处获取JDK channel,2处pipeline.fireChannelRead(readBuf.get(i))会调用ServerBootstrapAcceptor.channelRead()。

List-2

代码语言:javascript复制
private final class NioMessageUnsafe extends AbstractNioUnsafe {

    private final List<Object> readBuf = new ArrayList<Object>();

    @Override
    public void read() {
        assert eventLoop().inEventLoop();
        final ChannelConfig config = config();
        final ChannelPipeline pipeline = pipeline();
        final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
        allocHandle.reset(config);

        boolean closed = false;
        Throwable exception = null;
        try {
            try {
                do {
                    //1
                    int localRead = doReadMessages(readBuf);
                    if (localRead == 0) {
                        break;
                    }
                    if (localRead < 0) {
                        closed = true;
                        break;
                    }

                    allocHandle.incMessagesRead(localRead);
                } while (allocHandle.continueReading());
            } catch (Throwable t) {
                exception = t;
            }

            int size = readBuf.size();
            for (int i = 0; i < size; i   ) {
                readPending = false;
                //2
                pipeline.fireChannelRead(readBuf.get(i));
            }
            readBuf.clear();
            allocHandle.readComplete();
            pipeline.fireChannelReadComplete();

    NioServerSocketChannel.doReadMessages()如下获取JDK channel,之后封装后NioSocketChannel,放入到buf中

List-3 

代码语言:javascript复制
@Override
protected int doReadMessages(List<Object> buf) throws Exception {
    SocketChannel ch = SocketUtils.accept(javaChannel());

    try {
        if (ch != null) {
            buf.add(new NioSocketChannel(this, ch));
            return 1;
        }
    } catch (Throwable t) {
        logger.warn("Failed to create a new channel from an accepted socket.", t);

        try {
            ch.close();
        } catch (Throwable t2) {
            logger.warn("Failed to close a socket.", t2);
        }
    }

    return 0;
}

    bossGroup的pipeline,是head->ServerBootstrapAcceptor->tail,ServerBootstrapAcceptor.channelRead()中,childGroup.register(child),这里的child是netty封装后的JDK channel,转交给childGroup,如下List-5

List-4

代码语言:javascript复制
public void channelRead(ChannelHandlerContext ctx, Object msg) {
    final Channel child = (Channel) msg;

    child.pipeline().addLast(childHandler);

    for (Entry<ChannelOption<?>, Object> e: childOptions) {
        try {
            if (!child.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) {
                logger.warn("Unknown channel option: "   e);
            }
        } catch (Throwable t) {
            logger.warn("Failed to set a channel option: "   child, t);
        }
    }

    for (Entry<AttributeKey<?>, Object> e: childAttrs) {
        child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
    }

    try {
        childGroup.register(child).addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                if (!future.isSuccess()) {
                    forceClose(child, future.cause());
                }
            }
        });
    } catch (Throwable t) {
        forceClose(child, t);
    }
}

WorkGroup

    MultithreadEventLoopGroup,如下所示的方法中,next()从众多的child group中选出一个来处理这个新的channel,默认是轮询。

List-5

代码语言:javascript复制
@Override
public ChannelFuture register(Channel channel) {
    return next().register(channel);
}

    List-5中register()只是在这个channel上注册read事件监听,待这个channel上有read事件,即有数据可读时就会进行read。

    List-6中bootstrap设置了childHandler,initializer里对pipeline添加了channelHandler,workGroup从channel中读取数据后,会顺着channel handdler链表进行处理,也就是我们设置的这些channel handler会收到对应的数据。

List-6

代码语言:javascript复制
NioEventLoopGroup bossGroup = new NioEventLoopGroup();
NioEventLoopGroup workGroup = new NioEventLoopGroup();
try {
    ServerBootstrap bootstrap = new ServerBootstrap();
    bootstrap.group(bossGroup, workGroup).channel(NioServerSocketChannel.class)
            ...
            .childHandler(new ServerInitializer(...));

...

public class TrpcServerInitializer extends ChannelInitializer<SocketChannel> {
    private Map<String, Object> handlerMap;
    private List<TrpcFilter> filters;

    public TrpcServerInitializer(Map<String, Object> handlerMap, List<TrpcFilter> filters) {
        this.handlerMap = handlerMap;
        this.filters = filters;
    }

    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
        ChannelPipeline pipeline = socketChannel.pipeline();
        pipeline.addLast(new IdleStateHandler(30, 30, PingPongRequest.BEAT_INTERVAL, TimeUnit.SECONDS));
        pipeline.addLast(new TrpcDecoder(TrpcRequest.class));
        pipeline.addLast(new TrpcEncoder(TrpcResponse.class));
        pipeline.addLast(new TrpcServerHandler(handlerMap, filters));
    }

    workGroup从bossGroup获取channel,之后注册read事件,之后如List-1,如果有read事件,则会调用List-2中的方法,1处从JDK channel中获取byte数据写入ByteBBuf,2处pipeline.fireChannelRead(),将读取的Byte数据转交给我们自定义的channelHandler,比如ByteToMessageDecoder中就可以进行反序列化byte,之后的业务ChannelHandler才能处理业务请求

List-7

代码语言:javascript复制
protected class NioByteUnsafe extends AbstractNioUnsafe {
...

    @Override
    public final void read() {
        final ChannelConfig config = config();
        if (shouldBreakReadReady(config)) {
            clearReadPending();
            return;
        }
        final ChannelPipeline pipeline = pipeline();
        final ByteBufAllocator allocator = config.getAllocator();
        final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
        allocHandle.reset(config);

        ByteBuf byteBuf = null;
        boolean close = false;
        try {
            do {
                byteBuf = allocHandle.allocate(allocator);
                //1
                allocHandle.lastBytesRead(doReadBytes(byteBuf));
                if (allocHandle.lastBytesRead() <= 0) {
                    // nothing was read. release the buffer.
                    byteBuf.release();
                    byteBuf = null;
                    close = allocHandle.lastBytesRead() < 0;
                    if (close) {
                        // There is nothing left to read as we received an EOF.
                        readPending = false;
                    }
                    break;
                }

                allocHandle.incMessagesRead(1);
                readPending = false;
                //2
                pipeline.fireChannelRead(byteBuf);
                byteBuf = null;
            } while (allocHandle.continueReading());

            allocHandle.readComplete();
            pipeline.fireChannelReadComplete();

0 人点赞