Netty服务端的新连接接入源码解析

2021-08-06 14:28:54 浏览数 (1)

经过上一章节的学习,我们基本了解了Netty是如何对IO事件以及异步任务的处理了,今天我们就一起来学习一下,Netty是如何处理新连接接入与数据读取的!

一、源码寻找

我们上一章节学到了,当存在IO事件的时候,Netty的反应堆线程会监听这些事件,然后进行处理,忘记的,可以回顾一下上一章节,,我们这里直接进入到:

io.netty.channel.nio.NioEventLoop#processSelectedKey(java.nio.channels.SelectionKey, io.netty.channel.nio.AbstractNioChannel)

这里的代码,我们昨天只是做了一个大概的分析,并没有深入的讲解,这一章节具体分析一下新连接的接入和Channel数据的读取。

代码语言:javascript复制
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
    final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
    .........忽略有效性验证................

    try {
        int readyOps = k.readyOps();
        ...................忽略其他 事件的处理逻辑...............
        if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
            unsafe.read();
        }
    } catch (CancelledKeyException ignored) {
        unsafe.close(unsafe.voidPromise());
    }
}

我们重点关注当事件存在读事件或者新连接接入事件的时候,才会进入到这一判断逻辑,那么由此可见,我们兵丁是要关注unsafe.read() 这一行代码了!

二、新连接接入源码分析

首选,我们声明一下,我们现在一直是按照服务端启动逻辑进行分析的,那么服务端逻辑分析,对照通道就是NioServerSocketChannel, 我们在创建NioServerSocketChannel的时候初始化过一个Unsafe对象,他是NioMessageUnsafe类型的,如果有疑问的同学可以回顾一下NioServerSocketChannel的初始化过程!

所以,必然,我们这里的unsafe.read(); 就必然进入的是NioMessageUnsafe的read方法:

image-20210504115544746

代码语言:javascript复制
@Override
public void read() {
    ..................忽略不必要代码............
    try {
        try {
            do {
                //读取数据  可能是数据  也可能是新连接
                int localRead = doReadMessages(readBuf);
                //如果没数据就跳出
                if (localRead == 0) {
                    break;
                }
                //-1 就是连接被关闭 
                if (localRead < 0) {
                    closed = true;
                    break;
                }
				//读取的连接数增加
                allocHandle.incMessagesRead(localRead);
                //每次默认读取最大16个连接  剩余的后续去读
            } while (allocHandle.continueReading());
        } catch (Throwable t) {
            exception = t;
        }
		//获取连接数量或者读取的数据的数量
        int size = readBuf.size();
        for (int i = 0; i < size; i   ) {
            readPending = false;
            //开始传播channelRead属性
            pipeline.fireChannelRead(readBuf.get(i));
        }
        //清空缓冲区
        readBuf.clear();
        allocHandle.readComplete();
        //传播读取完成事件
        pipeline.fireChannelReadComplete();
        ...................忽略不必要代码.......................
    } finally {
        ...................忽略不必要代码.......................
    }
}

1. 读取新连接

代码语言:javascript复制
int localRead = doReadMessages(readBuf);

这行代码是读取新连接的主要逻辑:

image-20210504181959983

代码语言:javascript复制
@Override
protected int doReadMessages(List<Object> buf) throws Exception {
    //调用JDK ServerSocketChannel获取新连接  JDK SocketChannel
    SocketChannel ch = SocketUtils.accept(javaChannel());
    try {
        if (ch != null) {
            //将客户端连接直接包装为 Netty的管道包装对象 NioSocketChannel
            buf.add(new NioSocketChannel(this, ch));
            return 1;
        }
    } catch (Throwable t) {
        ................忽略异常处理.............
    }
    return 0;
}

可以看到这里的逻辑比较简单,首先,Netty会使用先前保存的JDK 的原生的SocketChannel调用accept方法进行获取JDK新连接的管道!

注意此时获取的管道是JDK NIO的原生的管道对象,和Netty还没有关系,然后再将JDK NIO原生的Channel包装为Netty的NioSocketChannel放到缓冲区里面,注意此时放到缓冲区里面的对象就是Netty的包装对象了!包装完成之后直接返回 ,此时我们的缓冲区就存在数据量,这个数据是NioSocketChannel对象!

我们回到主线 read方法,当调用完doMessage方法之后开始就要处理这个NioSocketChannel了呀!

2. 处理新连接的管道

代码语言:javascript复制
pipeline.fireChannelRead(readBuf.get(i));

从代码上看,可以看到,他是把刚刚我们读到的NioSocketChannel出来往下传播,这个代码是在通道内传播,我们前几节课讲过,此时Pipeline的结构是如图所示的数据结构:

image-20210504194847065

我们看如下代码:

代码语言:javascript复制
@Override
public final ChannelPipeline fireChannelRead(Object msg) {
    AbstractChannelHandlerContext.invokeChannelRead(head, msg);
    return this;
}

他是从头节点开始传播的,channelRead的传播是自上而下的,所以就势必会传播到 ServerBootstrapAcceptor的逻辑中,所以我们进入到ServerBootstrapAcceptor#channelRead方法:

代码语言:javascript复制
public void channelRead(ChannelHandlerContext ctx, Object msg) {
    //能进入都这一段逻辑的就必定是通道对象,因为只有服务端管道会存在该处理器
    final Channel child = (Channel) msg;
	//向服务端管道追加childHandler   在构建ServerBootStrap的时候传入的
    child.pipeline().addLast(childHandler);
	//在构建ServerBootStrap的时候传入的
    setChannelOptions(child, childOptions, logger);
	//在构建ServerBootStrap的时候传入的
    setAttributes(child, childAttrs);
    try {
        //开始进行注册,注册逻辑同NioServerSocketChannel相同
        childGroup.register(child).addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                if (!future.isSuccess()) {
                    forceClose(child, future.cause());
                }
            }
        });
    } catch (Throwable t) {
        forceClose(child, t);
    }
}

我们可以看到这个,先是向通道内注册一些客户端的参数,然后开始进行注册Channel, 注册的时候同NioServerSocketChannel的注册逻辑一样,只不过NioSocketChannel的关注事件是OP_READ事件,这里留一个作业,同学们可以自己分析一下NioSocketChannel的创建,分析一下它的注册逻辑与反应堆逻辑!

三、客户端数据读取源码解析

我们还是直接回到

io.netty.channel.nio.NioEventLoop#processSelectedKey(java.nio.channels.SelectionKey, io.netty.channel.nio.AbstractNioChannel)

代码语言:javascript复制
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
    final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
    .........忽略有效性验证................

    try {
        int readyOps = k.readyOps();
        ...................忽略其他 事件的处理逻辑...............
        if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
            unsafe.read();
        }
    } catch (CancelledKeyException ignored) {
        unsafe.close(unsafe.voidPromise());
    }
}

上面分析过,负责客户端新连接的通道是NioSocketChannel,大家自行分析一下内部逻辑,与NioServerSocketChannel的相似度90%!

1. 读取通道数据

NioSocketChannel的Unsafe是 NioByteUnsafe, 所以我们直接进入到:

io.netty.channel.nio.AbstractNioByteChannel.NioByteUnsafe#read

查看他是如何进行数据读取的:

代码语言:javascript复制
@Override
public final void read() {
    ........................忽略........................
    //获取客户端通道管道
    final ChannelPipeline pipeline = pipeline();
    //获取一个内存分配器
    final ByteBufAllocator allocator = config.getAllocator();
    final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
    allocHandle.reset(config);

    ByteBuf byteBuf = null;
    boolean close = false;
    try {
        do {
            //分配一个ByteBuf缓冲区
            byteBuf = allocHandle.allocate(allocator);
            //开始向缓冲区内写入通道的数据
            allocHandle.lastBytesRead(doReadBytes(byteBuf));
            if (allocHandle.lastBytesRead() <= 0) {
                // 如果没有读取到缓冲区,就释放缓冲区.
                byteBuf.release();
                byteBuf = null;
                //设置关闭标志
                close = allocHandle.lastBytesRead() < 0;
                if (close) {
                    // There is nothing left to read as we received an EOF.
                    readPending = false;
                }
                break;
            }

            allocHandle.incMessagesRead(1);
            readPending = false;
            //传播一次readChnnel事件
            pipeline.fireChannelRead(byteBuf);
            byteBuf = null;
        } while (allocHandle.continueReading());

        allocHandle.readComplete();
        //传播一次readChnnelComplete事件 
        pipeline.fireChannelReadComplete();
		//关闭通道
        if (close) {
            closeOnRead(pipeline);
        }
    } catch (Throwable t) {
        ....................忽略不必要代码.....................
    }
}

我们整体将逻辑分为以下几个步骤:

  1. 获取一个内存分配器,Netty中存在一个专门用于分配ByteBuf的内存分配器。这里是将它获取出来!
  2. 使用上一步获取的内存分配器分配一块缓冲区,用域后续的使用!
  3. 开始读取通道内的数据写入预先分配好的缓冲区!
  4. 读取数据完毕后,将带有数据的缓冲区调用pieline的传播方法进行数据的传播 readChannel方法!
  5. 当通道内的数据被处理完后,传播一次 channelReadComplete方法

四、总结

  1. 在Netty中NioServerSocketChannel与NioSocketChannel的处理中,对于数据的读取拥有不同的处理方法,NioServerSockerChannel主要用于处理新连接的,在初始化的时候就会在通道内加入一个新连接接入器ServerBootstrapAcceptor! NioServerSocketChannel对象在读取到数据后将之包装为NioSocketChannel对象,然后使用ServerBootstrapAcceptor进行NioSocketChannel的注册与启动反应堆线程!
  2. 当通道内存在数据的时候,被NioSockerChannel探测到后,就会先分配一块缓冲区,将数据读取进预先分配好的缓冲区,然后进行数据的向下通道流转(事件触发)!

0 人点赞