深入浅出Tomcat网络通信的高并发处理机制
随着互联网应用的快速发展,Web服务器面临的访问压力日益增大,如何高效处理高并发的网络请求成为关键
Tomcat作为Java世界中最受欢迎的Web容器之一,可以灵活选择不同的IO模型来处理网络通信,确保面对高并发的网络请求时能够快速处理
上篇文章21张图解析Tomcat运行原理与架构全貌,我们说到Tomcat中通过Connector来处理网络通信,其中Connector的职责主要由组件EndPoint、Processor、Adapter来完成
EndPoint负责网络通信、Processor负责解析、Adapter负责将请求转换为Servlet的请求并交给容器处理
本篇文章就来重点聊聊AbstractEndpoint的多种实现类是如何处理网络通信的
AbstractEndpoint有三种实现类:NioEndPoint、Nio2EndPoint、AprEndPoint
其中默认使用NioEndPoint(多路复用模型),Nio2EndPoint使用异步IO模型,而AprEndPoint为早期提供高性能(Tomcat 10时被弃用)
NioEndPoint
NioEndPoint将处理网络通信分离为三个步骤,分别使用三个组件进行执行:接收连接、检测IO事件、处理请求
我们先大致对这些组件的作用进行描述,后续再通过源码分析~
Acceptor用于接收连接(循环执行):使用LimitLatch限制最大连接数量,等待客户端完成TCP三次握手连接后,将连接交给Poller
Poller用于检测IO事件是否就绪(循环执行):将连接注册到Selector上,使用Selector监听IO事件,当事件发生(读就绪)时交给Executor进行处理
Executor池化管理线程,使用线程执行后续流程(解析请求、封装适配、交给容器处理...)
Acceptor
Acceptor使用LimitLatch限制连接数量,收到连接后将其逐步封装为PollerEvent并放到Poller的队列中
代码语言:java复制
NioEndpoint.initServerSocket
在启动组件时初始化socket
//服务端Channel
ServerSocketChannel serverSock = ServerSocketChannel.open();
socketProperties.setProperties(serverSock.socket());
InetSocketAddress addr = new InetSocketAddress(getAddress(), getPortWithOffset());
//服务端channel绑定端口,getAcceptCount为连接上限
serverSock.bind(addr, getAcceptCount());
ServerSocketChannel作为服务端Channel只监听一个端口(连接器中设置的端口)
无论Acceptor线程数量为多少,都共享该服务端channel
getAcceptCount()
为建立连接后最大积压的数量,acceptCount默认为100
TCP三次握手完成后会将客户端连接放到accept队列中等待服务端拿走,acceptCount就是accept队列最大存放的连接数量
Acceptor.run
Acceptor接收连接
为了简化流程,只保留了较重要的流程:
- 使用LimitLatch限制连接数量,如果达到最大值则等待
- 等待获取客户端连接socket channel(TCP三次握手完成)
- 把socket channel交给Poller处理
public void run() {
//...
while (!stopCalled) {
//1.使用limit latch限制连接数量,如果达到最大值则等待
endpoint.countUpOrAwaitConnection();
U socket = null;
//2.等待获取客户端socket channel
socket = endpoint.serverSocketAccept();
//3.交给Poller处理
endpoint.setSocketOptions(socket);
}
}
对于实现限制连接数量,当达到最大值时则进行等待,实现的关键为计数、等待
熟悉并发包的同学会立马想到信号量Semaphore,但Tomcat没有直接使用信号量组件,而是基于AQS自己实现同步组件LimitLatch
LimitLatch直接使用原子类进行计数,利用AQS来实现等待
(LimitLatch基于AQS实现比较简单,这里就不进行分析,不熟悉AQS的同学可以查看这篇文章:10分钟从源码级别搞懂AQS)
如果没有超过限制则会获取下一个完成连接(TCP三次握手)的客户端连接SocketChannel result = serverSock.accept();
并将Channel包装成SocketWrapper,再包装为PollerEvent,加入Poller的队列中
代码语言:java复制public void register(final NioSocketWrapper socketWrapper) {
//监听读事件(读就绪时poller能够继续处理)
socketWrapper.interestOps(SelectionKey.OP_READ);
//包装为PollerEvent 指定关心注册事件OP_REGISTER(后续poller将该通道注册到select上)
PollerEvent pollerEvent = createPollerEvent(socketWrapper, OP_REGISTER);
//放入poller的队列
addEvent(pollerEvent);
}
Poller的队列SynchronizedQueue也是Tomcat自己实现的
Acceptor与Poller之间通过队列通信,SynchronizedQueue使用synchronized保证并发操作下的原子性
Poller
Poller循环处理队列中的PollerEvent事件,当Selector上监听的连接发生IO事件时迭代处理,将事件封装为SocketProcessor交给线程池处理
Poller.run
Poller主要循环检测是否有IO事件发生,主要流程为:
- 轮询处理队列中的事件PollerEvent,比如将通道注册到Selector上
- Selector阻塞到事件发生或超时
- 迭代遍历处理事件,交给线程池处理
public void run() {
while (true) {
//...
//1.轮询处理队列中的事件
events();
//2.select 阻塞直到事件发生
selector.select(selectorTimeout);
//3.迭代遍历处理事件
Iterator<SelectionKey> iterator = keyCount > 0 ? selector.selectedKeys().iterator() : null;
while (iterator != null && iterator.hasNext()) {
SelectionKey sk = iterator.next();
iterator.remove();
//从附件中拿到连接的包装
NioSocketWrapper socketWrapper = (NioSocketWrapper) sk.attachment();
if (socketWrapper != null) {
//交给线程池处理
processKey(sk, socketWrapper);
}
}
}
}
交给线程池处理前会将其封装为SocketProcessor
代码语言:java复制AbstractEndpoint.processSocket
public boolean processSocket(SocketWrapperBase<S> socketWrapper,
SocketEvent event, boolean dispatch) {
SocketProcessorBase<S> sc = null;
if (processorCache != null) {
sc = processorCache.pop();
}
//封装SocketProcessor
if (sc == null) {
sc = createSocketProcessor(socketWrapper, event);
} else {
sc.reset(socketWrapper, event);
}
Executor executor = getExecutor();
if (dispatch && executor != null) {
//交给线程池执行
executor.execute(sc);
} else {
//当前线程执行(使用AIO时走这里)
sc.run();
}
}
该方法用于封装SocketProcessor和调用后续执行,放于父类中,所有实现类通用该方法
Executor
线程池中的线程执行SocketProcessor时会去交给ProtocolHandler处理 getHandler().process(socketWrapper, event)
SocketProcessor中存在连接等包装组件,通过它能够处理请求(读取数据)和响应(写回数据)
需要注意的是Tomcat中的线程池是自己实现的,而不是并发包下的ThreadPoolExecutor
(对于Tomcat的线程池,我们后续文章再进行分析)
NioEndPoint大致的运行流程如下图:
连接的包装类(NioSocketWrapper)为核心贯穿全文,封装流程如下:
Acceptor接收连接:NioChannel(拿到客户端连接) -> NioSocketWrapper -> PollerEvent(放入Poller的SynchronizedQueue)
Poller处理PollerEvent注册到Selector,并阻塞监听IO事件:
PollerEvent(从SynchronizedQueue中取出) -> NioSocketWrapper(注册到Selector) -> SocketProcessor(事件发生时交给Executor)
Nio2EndPoint
Nio2EndPoint使用异步IO模型(AIO)来处理网络通信
AIO的特点就是异步,使用回调函数,当数据就绪时使用异步线程调用回调函数
无需再像NIO中使用Selector阻塞,让应用线程来触发读取数据,阻塞到数据拷贝到应用缓冲区
Nio2实际上指的就是AIO,NIO2表明这是对原有NIO的一个升级版本
Nio2EndPoint处理网络通信时不再需要检测IO事件,把这件事交给内核去做,当事件发生(数据就绪)时使用异步线程调用回调函数即可
相比于NioEndPoint,Nio2EndPoint在处理网络通信时,不需要再用Poller检测IO事件
Nio2Acceptor
Nio2EndPoint中使用Nio2Acceptor接收连接,Nio2Acceptor继承Acceptor并实现回调接口CompletionHandler
代码语言:java复制class Nio2Acceptor
extends Acceptor<AsynchronousSocketChannel>
implements CompletionHandler<AsynchronousSocketChannel,Void>
回调接口第一个参数为IO操作的结果,第二个泛型为操作使用的附件,其中有两个方法分别代表着成功/失败后执行的回调
代码语言:java复制public interface CompletionHandler<V,A> {
void completed(V result, A attachment);
void failed(Throwable exc, A attachment);
}
Nio2Acceptor.run
由于使用AIO,Nio2Acceptor在执行任务时不再需要循环,只需要携带回调函数,当客户端连接完成时触发回调
在执行时主要做两件事:
- 使用LimitLatch限制连接数
- 接收连接
public void run() {
if (!isPaused()) {
try {
//1.使用LimitLatch限制连接数
countUpOrAwaitConnection();
} catch (InterruptedException e) {
}
if (!isPaused()) {
//2.接收连接
serverSock.accept(null, this);
} else {
state = AcceptorState.PAUSED;
}
} else {
state = AcceptorState.PAUSED;
}
}
serverSock.accept(null, this);
在接收连接时,使用的服务端channel为AsynchronousServerSocketChannel,并把当前对象作为回调传入
这样在下次收到连接后的回调又可以调用该方法,以此来达到不需要循环调用
Nio2Acceptor.completed
回调成功的方法中主要做几件事:
- 是否限制连接数量
- 调用accept,方便接收下次连接
- 调用后续处理
public void completed(AsynchronousSocketChannel socket,Void attachment) {
errorDelay = 0;
if (isRunning() && !isPaused()) {
//1.是否限制连接数量
if (getMaxConnections() == -1) {
//不限制连接数量,方便接收下一次连接
serverSock.accept(null, this);
} else if (getConnectionCount() < getMaxConnections()) {
try {
//当前连接数小于最大限制连接数,不阻塞,主要是去自增计数
countUpOrAwaitConnection();
} catch (InterruptedException e) {
// Ignore
}
//方便接收下次连接
serverSock.accept(null, this);
} else {
//当前连接数大于等于最大限制连接数,再调用limitlatch会阻塞,为了避免阻塞使用线程池去执行(排队)
getExecutor().execute(this);
}
//setSocketOptions后续处理
if (!setSocketOptions(socket)) {
closeSocket(socket);
}
} else {
if (isRunning()) {
state = AcceptorState.PAUSED;
}
destroySocket(socket);
}
}
在Acceptor中也会调用setSocketOptions方法,那时会将连接包装NioSocketWrapper,然后封装为PollerEvent放入poller队列
在AIO中由于不再存在poller,该方法会将连接包装为Nio2SocketWrapper,然后调用父类AbstractEndpoint.processSocket方法去执行
AbstractEndpoint.processSocket
在此方法中会先将包装类封装为SocketProcessor再去执行
代码语言:java复制 public boolean processSocket(SocketWrapperBase<S> socketWrapper,SocketEvent event, boolean dispatch) {
//...
//封装为SocketProcessor
SocketProcessorBase<S> sc = null;
if (processorCache != null) {
sc = processorCache.pop();
}
if (sc == null) {
sc = createSocketProcessor(socketWrapper, event);
} else {
sc.reset(socketWrapper, event);
}
Executor executor = getExecutor();
if (dispatch && executor != null) {
executor.execute(sc);
} else {
//当前线程执行
sc.run();
}
return true;
}
需要注意的是,当前线程本来就是异步回调线程,参数dispatch会为false
也就是这里不会使用线程池去执行,而是由当前异步回调的线程去执行SocketProcessor
Nio2SocketWrapper
当前线程是连接完成执行异步回调的线程,去执行SocketProcessor也就是会使用Processor解析数据,但此时数据可能还未准备好
为了不让Processor阻塞等待,这里会失败,直到数据就绪时触发的异步回调来执行时才能够读到数据
Nio2SocketWrapper中包含一些读写事件的回调
比如读回调中:当数据就绪时,会去执行processSocket,也就是封装SocketProcessor进行后续调用(此时会第二次使用Processor进行读数据,这样确保数据已就绪)
代码语言:java复制this.readCompletionHandler = new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer nBytes, ByteBuffer attachment) {
//...
getEndpoint().processSocket(Nio2SocketWrapper.this, SocketEvent.OPEN_READ, false);
}
}
Http11Processor
在Processor处理HTTP协议的实现类Http11Processor中,执行service解析请求时,会先解析请求头parseRequestLine
代码语言:java复制public SocketState service(SocketWrapperBase<?> socketWrapper) throws IOException {
//...
inputBuffer.parseRequestLine(keptAlive, protocol.getConnectionTimeout(),protocol.getKeepAliveTimeout())
//...
}
当parseRequestLine返回false时,说明数据未就绪,不会执行后续操作,因此第一次读数据时由于数据未就绪不会再往后执行
NioEndPoint大致的运行流程如下图:
在Nio2EndPoint中,使用异步回调的方式,避免poller中的操作,能够提升效率,但是大量异步线程的引入又会带来线程上下文切换的开销
连接的包装类(Nio2SocketWrapper)为核心贯穿全文,封装流程如下:
Nio2Channel(回调拿到客户端连接) -> Nio2SocketWrapper -> SocketProcessor(事件发生时)
连接完成的回调:Nio2Channel -> Nio2SocketWrapper -> SocketProcessor -> Http11Processor(解析失败,数据未就绪)
读事件就绪的回调:Nio2SocketWrapper -> SocketProcessor -> Http11Processor
AprEndPoint
APR(Apache Portable Runtime)是Apache提供的可移植运行库,是为了早期的Tomcat的提供高性能的
早期NIO还不成熟,使用APR通过JNI调用本地C语言实现的库,能够使用操作系统的epoll来实现多路复用模型,旨在提高高性能
AprEndPoint 流程与NioEndPoint相同,只是其中调用的方法不同,NioEndPoint调用JDK NIO的API,而AprEndPoint调用APR库
AprEndPoint在通道上使用的缓冲区是基于直接内存的(DirectByteBuffer),而NioEndPoint与Nio2EndPoint都是使用堆内存的(HeapByteBuffer)
使用直接内存的好处是能够减少数据拷贝带来的开销,但无法使用JVM来进行管理内存
并且AprEndPoint还能使用零拷贝sendfile,将数据从磁盘读到网卡发送时减少各种拷贝开销
但在后来NIO、AIO逐渐成熟,AprEndPoint带来的好处逐渐被追平,在Tomcat 10时被遗弃
总结
NioEndPoint将处理网络通信分为接收连接、监听事件、处理请求三个步骤
其中Acceptor负责接收连接,使用LimitLatch限制连接数量(若超过上限则等待),获取客户端连接NioChannel,包装为NioSocketWrapper,再封装为PollerEvent放入Poller的队列中
Poller会轮询处理PollerEvent,通过PollerEvent拿到NioSocketWrapper将连接注册到Selector上,使用Selector监听事件,有事件触发时,从附件中获取连接的包装NioSocketWrapper,将其封装为SocketProcessor交给线程池处理
线程池的线程处理SocketProcessor时,则会使用Processor解析协议,后续再封装请求/响应调用容器处理
Nio2EndPoint 使用AIO,由内核监听事件(数据就绪)后使用异步线程执行回调
其中Nio2Acceptor继承Acceptor,接收连接不再循环处理,而是使用异步回调:当连接完成后再使用LimitLatch判断是否限制连接,调用非阻塞accept便于接收下次连接(回调),然后将客户端连接Nio2Channel封装为Nio2SocketWrapper再封装为SocketProcessor处理(后续调用processor无法解析,因为当前是连接完成的回调线程,数据还未就绪)
当数据就绪时,通过Nio2SocketWrapper的回调继续封装为SocketProcessor向后处理(后续调用processor可以解析,因为当前为读数据就绪的回调线程,第二次读)
早期的APR通过本地库、直接内存、零拷贝等多种方式进行性能优化