Nio2Endpoint组件:Tomcat如何实现异步I/O?

2021-10-18 15:50:16 浏览数 (1)

NIO是同步非阻塞,NIO已经足够好了,Java为什么还要NIO.2呢?

NIO和NIO.2最大的区别?

一个是同步一个是异步。而异步最大特点是,应用程序无需自己触发数据从内核空间到用户空间的拷贝

为何是应用程序去“触发”数据拷贝,而非直接从内核拷贝数据?

应用程序无法访问内核空间,数据拷贝必须由内核负责,问题是谁来触发?

  • 内核主动将数据拷贝到用户空间并通知应用程序
  • 还是等待应用程序通过Selector来查询,当数据就绪后,应用程序再发起一个read调用,这时内核再把数据从内核空间拷贝到用户空间。

数据从内核空间拷贝 =》 用户空间这段时间,应用程序还是阻塞的。所以异步效率高于同步,因为异步模式下应用程序始终不会被阻塞。

  • ServerSocket:用于在本机(Server端)开一个端口,被动的等待数据(用accept()方法),与 Client 端端建立连接后可以进行数据交换
  • Socket:用于连接远端机器(Server端)上的一个端口,主动发出数据,建立连接后也可以接收数据。

网络数据读取在异步模式下的工作过程

应用程序调用read API,同时告诉内核:

  • 数据准备好了后,拷贝到哪个Buffer
  • 调用哪个回调函数去处理这些数据

之后,内核接到该read指令,等待网卡数据到达。 数据到达后,产生硬件中断,内核在中断程序把数据从网卡拷贝到内核空间, 接着做TCP/IP协议层的数据解包和重组, 再把数据拷贝到应用程序指定的Buffer, 最后调用应用程序指定的回调函数。

异步模式下,应用程序当了“需求甲方”,内核则忙前忙后,但最大限度提高了I/O通信效率。 Linux内核2.6的AIO都提供了异步I/O的支持,但还不完善,详情可以看这里:http://lse.sourceforge.net/io/aio.html。 Java的NIO.2 API是对os异步I/O API的封装,通过epoll实现的。

Java NIO.2

服务端程序

为什么需要创建一个线程池?

异步I/O模型下,应用程序不知道数据何时到达,因此向内核注册回调方法,当数据到达时,内核就会调用该回调方法。 同时为提高处理速度,会提供一个线程池给内核使用,这样不会耽误内核线程工作,内核只需把工作交给线程池就立即返回了。

回调类AcceptHandler

它实现了CompletionHandler接口

两个模板参数V和A,分别表示

  • I/O调用的返回值 比如accept的返回值就是AsynchronousSocketChannel
  • 附件类 附件类由用户自己决定。

在accept的调用中,我们传入一个Nio2Server。因此AcceptHandler带有了两个模板参数:AsynchronousSocketChannel和Nio2Server。

CompletionHandler有两个方法:completed和failed,分别在I/O操作成功和失败时调用。completed方法有两个参数,其实就是前面说的两个模板参数。也就是说,Java的NIO.2在调用回调方法时,会把返回值和附件类当作参数传给NIO.2的使用者。

处理读的回调类ReadHandler

代码语言:javascript复制
public class ReadHandler implements CompletionHandler<Integer, ByteBuffer> {   
    // 读取到消息后的处理  
    @Override  
    public void completed(Integer result, ByteBuffer attachment) {  
        // attachment就是数据,调用flip操作,其实就是把读的位置移动最前面
        attachment.flip();  
        // 读取数据
        ... 
    }  

    void failed(Throwable exc, A attachment){
        ...
    }
}

read调用的返回值是一个整型,所以回调方法里:

  • 第一个参数是个整型 表示有多少数据被读取到了Buffer中
  • 第二个参数是一个ByteBuffer 因为调用read方法时,把用来存放数据的ByteBuffer当作附件类传进去了,所以在回调方法有ByteBuffer类型参数,直接从该ByteBuffer获取数据

Nio2Endpoint

Nio2Endpoint的组件

总体工作流程类似NioEndpoint。

Nio2Acceptor扩展Acceptor,用异步I/O接收连接,跑在一个单独线程,也是一个线程组。

Nio2Acceptor接收新的连接后,得到一个AsynchronousSocketChannel,Nio2Acceptor把AsynchronousSocketChannel封装成一个Nio2SocketWrapper,并创建一个SocketProcessor任务类交给线程池处理,并且SocketProcessor持有Nio2SocketWrapper对象。

Executor在执行SocketProcessor时,SocketProcessor的run方法会调用Http11Processor来处理请求,Http11Processor会通过Nio2SocketWrapper读取和解析请求数据,请求经过容器处理后,再把响应通过Nio2SocketWrapper写出。

需要你注意Nio2Endpoint跟NioEndpoint的一个明显不同点是,Nio2Endpoint中没有Poller组件,也就是没有Selector。这是为什么呢?因为在异步I/O模式下,Selector的工作交给内核来做了。

Nio2Endpoint各组件设计

Nio2Acceptor

和NioEndpint一样,Nio2Endpoint用LimitLatch控制连接数,但Nio2Acceptor监听连接的过程不是在一个死循环里不断地调accept,而是回调方法。

连接监听方法:

代码语言:javascript复制
serverSock.accept(null, this);

第二个参数this,表明Nio2Acceptor自己就是处理连接的回调类,因此Nio2Acceptor实现了CompletionHandler接口。

代码语言:javascript复制
@Override
public void completed(AsynchronousSocketChannel socket,
        Void attachment) {
        
    if (isRunning() && !isPaused()) {
        if (getMaxConnections() == -1) {
            // 若无连接限制,则继续接收新连接
            serverSock.accept(null, this);
        } else {
            // 若有连接限制,就在线程池里执行run,run会检查连接数
            getExecutor().execute(this);
        }
        // 处理请求
        if (!setSocketOptions(socket)) {
            closeSocket(socket);
        }
    } 

为什么要执行run方法?

因为在run方法里会检查连接数,当连接达到最大数时,线程可能会被LimitLatch阻塞。

为什么要放在线程池里跑?

若放在当前线程里执行,completed方法可能被阻塞,导致该回调方法一直无法返回。 接着completed方法会调用setSocketOptions方法,在这个方法里,会创建Nio2SocketWrapper和SocketProcessor,并交给线程池处理。

Nio2SocketWrapper

封装Channel,并提供接口给Http11Processor读写数据。

Http11Processor无法阻塞等待数据的,按异步I/O模式,Http11Processor在调用Nio2SocketWrapper#read时需注册回调类,调用read后会立即返回。

可若立即返回后Http11Processor还没有读到数据,怎么办?该请求的处理不就失败了? 为解决这个问题,Http11Processor通过2次read调用完成数据读取操作:

  • 第一次read调用 连接刚刚建立好后,Acceptor创建SocketProcessor任务类交给线程池去处理,Http11Processor在处理请求的过程中,会调用Nio2SocketWrapper#read发出第一次读请求,同时注册回调类readCompletionHandler,因为数据没读到,Http11Processor把当前的Nio2SocketWrapper标记为数据不完整。

接着SocketProcessor线程被回收,Http11Processor并未阻塞等待数据。 Http11Processor维护了一个Nio2SocketWrapper列表,也就是维护了连接的状态。

  • 第二次read调用 当数据到达后,内核已经把数据拷贝到Http11Processor指定的Buffer里,同时回调类readCompletionHandler被调用,在这个回调处理方法里会重新创建一个新的SocketProcessor任务来继续处理这个连接,而这个新的SocketProcessor任务类持有原来那个Nio2SocketWrapper,这一次Http11Processor可以通过Nio2SocketWrapper读取数据了,因为数据已经到了应用层的Buffer。

Nio2SocketWrapper#read会被调用两次,但不是串行调两次,而是Poller会先后创建两个SocketProcessor任务类,在两个线程中执行,执行过程中每次Http11Processor都会调Nio2SocketWrapper#read。

代码语言:javascript复制
public int read(boolean block, ByteBuffer to){

//第二次调用时直接通过这个方法取数据
int nRead = populateReadBuffer(to);

...

//第一次时数据没取到,会调用下面这个方法去真正执行I/O操作并注册回调函数:
nRead = fillReadBuffer(block);

...
}

两次read可以简单理解为,连接被保留着,数据没就绪处理的线程资源先释放了。收到异步数据就绪通知后,根据原有连接重建处理线程,继续处理。阻塞期间线程可复用。

回调类readCompletionHandler

Nio2SocketWrapper是作为附件类传递的,这样在回调函数里能拿到所有上下文。

代码语言:javascript复制
this.readCompletionHandler = new CompletionHandler<Integer, SocketWrapperBase<Nio2Channel>>() {
    public void completed(Integer nBytes, SocketWrapperBase<Nio2Channel> attachment) {
        ...
        // 通过附件类SocketWrapper拿到所有的上下文
        Nio2SocketWrapper.this.getEndpoint().processSocket(attachment, SocketEvent.OPEN_READ, false);
    }

    public void failed(Throwable exc, SocketWrapperBase<Nio2Channel> attachment) {
        ...
    }
}

总结

在异步I/O模型里,内核做了很多事情,它把数据准备好,并拷贝到用户空间,再通知应用程序去处理,也就是调用应用程序注册的回调函数。Java在操作系统 异步IO API的基础上进行了封装,提供了Java NIO.2 API,而Tomcat的异步I/O模型就是基于Java NIO.2 实现的。

由于NIO和NIO.2的API接口和使用方法完全不同,可以想象一个系统中如果已经支持同步I/O,要再支持异步I/O,改动是比较大的,很有可能不得不重新设计组件之间的接口。但是Tomcat通过充分的抽象,比如SocketWrapper对Channel的封装,再加上Http11Processor的两次read调用,巧妙地解决了这个问题,使得协议处理器Http11Processor和I/O通信处理器Endpoint之间的接口保持不变。

FAQ

  • Tomcat里NIO为什么不参考netty,通过使用堆外内存来避免零拷贝问题? 主要还是堆外内存管理起来没有JVM堆那么方便,为了稳定性的考虑吧,另外APR就是堆外内存的方案,也就是已经提供了这个选项。

tomcat 在哪里配置 使用nioendpoint 还是nio2endpoint呢? server.xml中:

代码语言:javascript复制
<Connector port="8443" protocol="org.apache.coyote.http11.Http11NioProtocol"
maxThreads="150" SSLEnabled="true">

</Connector>

参考

  • https://docs.spring.io/spring-boot/docs/current/reference/html/howto-embedded-web-servers.html
  • http://lse.sourceforge.net/io/aio.html

0 人点赞