上文:tomcat-整启动流程-源码解析
tomcat线程模型是什么?
tomcat8以上默认是NIO,tomcat支持四种接收请求的处理方式:BIO,NIO,APR、AIO,用于处理tomcat处理客户端连接进来的后的各种请求的处理。其中处理连接的线程为单线程,而处理如果是读写事件则交给专门的线程池处理。
BIO、NIO、NIO2、ARP作用与区别
对比 | BIO | NIO | NIO2 | ARP | 备注 |
---|---|---|---|---|---|
同步方式 | 阻塞I/O模型 | 同步非阻塞I/O 模型 | 异步 I/O模型 | 同步非阻塞I/O 模型 | APR与NIO底层一致! |
执行方式 | 串行 | 并行 | 并行 | 并行 | |
代码位置 | org.apache.coyote.http11.Http11Protocol | org.apache.coyote.http11.Http11NioProtocol | org.apache.coyote.http11.Http11Nio2Protocol | org.apache.coyote.http11.Http11AprProtocol |
tomcat通过AQS(AbstractQueuedsynchronizer)并发框架实现。通过LimitLatch来控制流量,tomcat默认连接数为200,可以通过server.xml中的中的maxConnections 进行配置。若线程是bio模式,则最大连接数与最大线程数为1:1。
源码学习
tomcat默认的线程模型是怎么选择及配置的?
首先确定一下tomcat的线程模型启动的时候是如何配置的。
通过上文得知tomcat启动都是通过bootstrap的main方法进行启动,其中到了。org.apache.catalina.startup.Catalina#load() 中的
代码位置:org.apache.tomcat.util.digester.Digester#parse(org.xml.sax.InputSource)
代码语言:javascript复制digester.parse(inputSource);
就是用于解析xml并且选择系统初始化配置的线程模型,tomcat8及以下都是bio而8以上都是nio,我这里用的是8.5所以是Nio,看如下。
代码位置:org.apache.catalina.connector.Connector#Connector(java.lang.String)
默认为1.1为bio
代码语言:javascript复制
public Connector(String protocol) {
//设置协议模型 tomcat8默认为bio而8以上都是NIO
setProtocol(protocol);
// Instantiate protocol handler
ProtocolHandler p = null;
try {
Class<?> clazz = Class.forName(protocolHandlerClassName);
p = (ProtocolHandler) clazz.getConstructor().newInstance();
} catch (Exception e) {
log.error(sm.getString(
"coyoteConnector.protocolHandlerInstantiationFailed"), e);
} finally {
this.protocolHandler = p;
}
//设置语言编码默认为utf-8
if (Globals.STRICT_SERVLET_COMPLIANCE) {
uriCharset = StandardCharsets.ISO_8859_1;
} else {
uriCharset = StandardCharsets.UTF_8;
}
// Default for Connector depends on this (deprecated) system property
if (Boolean.parseBoolean(System.getProperty("org.apache.tomcat.util.buf.UDecoder.ALLOW_ENCODED_SLASH", "false"))) {
encodedSolidusHandling = EncodedSolidusHandling.DECODE;
}
}
由于我在server.xml里面配置了
org.apache.catalina.connector.Connector#setProtocol
代码语言:javascript复制/**
* Set the Coyote protocol which will be used by the connector.
*
* @param protocol The Coyote protocol name
*
* @deprecated Will be removed in Tomcat 9. Protocol must be configured via
* the constructor
* 设置nio线程模型
*/
@Deprecated
public void setProtocol(String protocol) {
boolean aprConnector = AprLifecycleListener.isAprAvailable() &&
AprLifecycleListener.getUseAprConnector();
if ("HTTP/1.1".equals(protocol) || protocol == null) {
if (aprConnector) {
setProtocolHandlerClassName("org.apache.coyote.http11.Http11AprProtocol");
} else {
//8.0以上默认都走nio
setProtocolHandlerClassName("org.apache.coyote.http11.Http11NioProtocol");
}
} else if ("AJP/1.3".equals(protocol)) {
if (aprConnector) {
setProtocolHandlerClassName("org.apache.coyote.ajp.AjpAprProtocol");
} else {
setProtocolHandlerClassName("org.apache.coyote.ajp.AjpNioProtocol");
}
} else {
setProtocolHandlerClassName(protocol);
}
}
tomcat线程接收及请求的生命周期
Acceptor线程负责接收请求,并将请求放到Poller线程的事件队列中,注意Acceptor默认连接数为10000。
Poller线程:是一个线程池,默认线程核心数为2,会根据计算机的核心数再进行计算。Poller一直循环的,根据判断是否有值,有就取出,是一个典型的生产者-消费者模式。
SocketProcessor,就是最终的工作线程,用于处理socket的读写事件,通过监听selectionKey的事件来决定处理是什么事件。
验证,启动tomcat输入 http://localhost:8080/
然后在run下面的while (!stopCalled) 打断点,可以看到如下。
通过run方法不断的轮训进行监听新的请求,有的话进行处理。
org.apache.tomcat.util.net.Acceptor#run
代码语言:javascript复制public void run() {
int errorDelay = 0;
long pauseStart = 0;
try {
// Loop until we receive a shutdown command
while (!stopCalled) {
// Loop if endpoint is paused.
// There are two likely scenarios here.
// The first scenario is that Tomcat is shutting down. In this
// case - and particularly for the unit tests - we want to exit
// this loop as quickly as possible. The second scenario is a
// genuine pause of the connector. In this case we want to avoid
// excessive CPU usage.
// Therefore, we start with a tight loop but if there isn't a
// rapid transition to stop then sleeps are introduced.
// < 1ms - tight loop
// 1ms to 10ms - 1ms sleep
// > 10ms - 10ms sleep
while (endpoint.isPaused() && !stopCalled) {
if (state != AcceptorState.PAUSED) {
pauseStart = System.nanoTime();
// Entered pause state
state = AcceptorState.PAUSED;
}
if ((System.nanoTime() - pauseStart) > 1_000_000) {
// Paused for more than 1ms
try {
if ((System.nanoTime() - pauseStart) > 10_000_000) {
Thread.sleep(10);
} else {
Thread.sleep(1);
}
} catch (InterruptedException e) {
// Ignore
}
}
}
//停止标志
if (stopCalled) {
break;
}
//将状变设置为运行状态。
state = AcceptorState.RUNNING;
try {
//if we have reached max connections, wait 超过最大数会等待,默认最大是8192
endpoint.countUpOrAwaitConnection();
// Endpoint might have been paused while waiting for latch
// If that is the case, don't accept new connections
if (endpoint.isPaused()) {
continue;
}
U socket = null;
try {
// Accept the next incoming connection from the server
// 接收服务器的请求
socket = endpoint.serverSocketAccept();
} catch (Exception ioe) {
// We didn't get a socket
endpoint.countDownConnection();
if (endpoint.isRunning()) {
// Introduce delay if necessary
errorDelay = handleExceptionWithDelay(errorDelay);
// re-throw
throw ioe;
} else {
break;
}
}
// Successful accept, reset the error delay
errorDelay = 0;
// Configure the socket
if (!stopCalled && !endpoint.isPaused()) {
// setSocketOptions() will hand the socket off to
// an appropriate processor if successful
if (!endpoint.setSocketOptions(socket)) {
endpoint.closeSocket(socket);
}
} else {
endpoint.destroySocket(socket);
}
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
String msg = sm.getString("endpoint.accept.fail");
// APR specific.
// Could push this down but not sure it is worth the trouble.
if (t instanceof Error) {
Error e = (Error) t;
if (e.getError() == 233) {
// Not an error on HP-UX so log as a warning
// so it can be filtered out on that platform
// See bug 50273
log.warn(msg, t);
} else {
log.error(msg, t);
}
} else {
log.error(msg, t);
}
}
}
} finally {
stopLatch.countDown();
}
state = AcceptorState.ENDED;
}
org.apache.tomcat.util.net.NioEndpoint#setSocketOptions
将socket放到列中后返回true
代码语言:javascript复制@Override
protected boolean setSocketOptions(SocketChannel socket) {
NioSocketWrapper socketWrapper = null;
try {
// Allocate channel and wrapper
NioChannel channel = null;
if (nioChannels != null) {
//从队列中获取通道
channel = nioChannels.pop();
}
//如果不存在
if (channel == null) {
SocketBufferHandler bufhandler = new SocketBufferHandler(
socketProperties.getAppReadBufSize(),
socketProperties.getAppWriteBufSize(),
socketProperties.getDirectBuffer());
//如果是https创建带鉴权的通道否则创建普通的nio
if (isSSLEnabled()) {
channel = new SecureNioChannel(bufhandler, this);
} else {
channel = new NioChannel(bufhandler);
}
}
//创建新的 wrapper容器
NioSocketWrapper newWrapper = new NioSocketWrapper(channel, this);
//通道重置
channel.reset(socket, newWrapper);
//将socket放到connections列表中
connections.put(socket, newWrapper);
socketWrapper = newWrapper;
//配置为不阻塞
socket.configureBlocking(false);
// 设置socket 参数值 (从server.xml的onnector节点上获取参数值)
// 比如 socket发送、接收的缓存大小、心跳检测等
socketProperties.setProperties(socket.socket());
// 从NioChannel的缓存队列中取出一个NioChannel
socketWrapper.setReadTimeout(getConnectionTimeout());
//设置定入超时时间
socketWrapper.setWriteTimeout(getConnectionTimeout());
//设置保持存活数
socketWrapper.setKeepAliveLeft(NioEndpoint.this.getMaxKeepAliveRequests());
// 将新接收到的socketChannel注册到poler
poller.register(socketWrapper);
return true;
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
try {
log.error(sm.getString("endpoint.socketOptionsError"), t);
} catch (Throwable tt) {
ExceptionUtils.handleThrowable(tt);
}
if (socketWrapper == null) {
destroySocket(socket);
}
}
// Tell to close the socket if needed
return false;
}
org.apache.tomcat.util.net.AbstractEndpoint#countUpOrAwaitConnection
代码语言:javascript复制protected void countUpOrAwaitConnection() throws InterruptedException {
//默认最在值为 8192
if (maxConnections==-1) {
return;
}
//LimitLatch 是一个先进先出队列,通过共享锁实现
LimitLatch latch = connectionLimitLatch;
if (latch!=null) {
//通过AQS实现的锁,如果超过队列数据则进行等待
latch.countUpOrAwait();
}
}
那么上面处理完后到poller了,poller 流程如下。
代码语言:javascript复制// 将新接收到的socketChannel注册到poler
poller.register(socketWrapper);
org.apache.tomcat.util.net.NioEndpoint.Poller#register
代码语言:javascript复制public void register(final NioSocketWrapper socketWrapper) {
// 设置socket的poller引用,便于后续处理
socketWrapper.interestOps(SelectionKey.OP_READ);//this is what OP_REGISTER turns into.
PollerEvent event = null;
if (eventCache != null) {
event = eventCache.pop();
}
if (event == null) {
event = new PollerEvent(socketWrapper, OP_REGISTER);
} else {
//注操作;
event.reset(socketWrapper, OP_REGISTER);
}
//添加事件
addEvent(event);
}
private void addEvent(PollerEvent event) {
//加入到队列中
events.offer(event);
//如果队列中没有待处理的事件,则唤醒阻塞状态selector
if (wakeupCounter.incrementAndGet() == 0) {
selector.wakeup();
}
}
poller也是一个轮训一直在监听上游的acceptor是否有对象进来处理。
org.apache.tomcat.util.net.NioEndpoint.Poller#run
这个地方不好调试,因为一直在轮训所以载图展示有值的效果。
代码语言:javascript复制/**
* The background thread that adds sockets to the Poller, checks the
* poller for triggered events and hands the associated socket off to an
* appropriate processor as events occur.
* 轮训事件
*/
@Override
public void run() {
// Loop until destroy() is called
while (true) {
boolean hasEvents = false;
try {
if (!close) {
// 执行事件队列中的事件线程
hasEvents = events();
// wakeupCounter设成-1,这是与addEvent里的代码响应,
if (wakeupCounter.getAndSet(-1) > 0) {
// If we are here, means we have other stuff to do
// Do a non blocking select
keyCount = selector.selectNow();
} else {
keyCount = selector.select(selectorTimeout);
}
wakeupCounter.set(0);
}
if (close) {
events();
timeout(0, false);
try {
selector.close();
} catch (IOException ioe) {
log.error(sm.getString("endpoint.nio.selectorCloseFail"), ioe);
}
break;
}
// Either we timed out or we woke up, process events first
if (keyCount == 0) {
hasEvents = (hasEvents | events());
}
} catch (Throwable x) {
ExceptionUtils.handleThrowable(x);
log.error(sm.getString("endpoint.nio.selectorLoopError"), x);
continue;
}
Iterator<SelectionKey> iterator =
keyCount > 0 ? selector.selectedKeys().iterator() : null;
// Walk through the collection of ready keys and dispatch
// any active event.
//todo 根据向selector中注册key遍历channel中已经就绪的keys,并处理这些key
while (iterator != null && iterator.hasNext()) {
SelectionKey sk = iterator.next();
//而ekyAttachment
iterator.remove();
NioSocketWrapper socketWrapper = (NioSocketWrapper) sk.attachment();
// Attachment may be null if another thread has called
// cancelledKey()
if (socketWrapper != null) {
processKey(sk, socketWrapper);
}
}
// Process timeouts
timeout(keyCount,hasEvents);
}
getStopLatch().countDown();
}
org.apache.tomcat.util.net.NioEndpoint.Poller#processKey
下面这个方法用于处理读写事件
代码语言:javascript复制protected void processKey(SelectionKey sk, NioSocketWrapper socketWrapper) {
try {
if (close) {
cancelledKey(sk, socketWrapper);
} else if (sk.isValid()) {
// 处理通道发生读写事件
if (sk.isReadable() || sk.isWritable()) {
if (socketWrapper.getSendfileData() != null) {
processSendfile(sk, socketWrapper, false);
} else {
// 注消已经发生事件的关注 防止通过不一事件不断的select的问题
unreg(sk, socketWrapper, sk.readyOps());
boolean closeSocket = false;
// Read goes before write
//判断是否为读事件 如果是进行处理
if (sk.isReadable()) {
// 具体的通道处理逻辑;
if (socketWrapper.readOperation != null) {
if (!socketWrapper.readOperation.process()) {
closeSocket = true;
}
} else if (socketWrapper.readBlocking) {
synchronized (socketWrapper.readLock) {
socketWrapper.readBlocking = false;
socketWrapper.readLock.notify();
}
} else if (!processSocket(socketWrapper, SocketEvent.OPEN_READ, true)) {
closeSocket = true;
}
}
//判断是否为写事件,如果是进行写事件逻辑
if (!closeSocket && sk.isWritable()) {
if (socketWrapper.writeOperation != null) {
if (!socketWrapper.writeOperation.process()) {
closeSocket = true;
}
} else if (socketWrapper.writeBlocking) {
synchronized (socketWrapper.writeLock) {
socketWrapper.writeBlocking = false;
socketWrapper.writeLock.notify();
}
} else if (!processSocket(socketWrapper, SocketEvent.OPEN_WRITE, true)) {
closeSocket = true;
}
}
if (closeSocket) {
cancelledKey(sk, socketWrapper);
}
}
}
} else {
// Invalid key
cancelledKey(sk, socketWrapper);
}
} catch (CancelledKeyException ckx) {
cancelledKey(sk, socketWrapper);
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
log.error(sm.getString("endpoint.nio.keyProcessingError"), t);
}
}
org.apache.tomcat.util.net.AbstractEndpoint#processSocket
下面这个方法就是poller将请求转成事件,提交给线程池,由事件驱动
代码语言:javascript复制public boolean processSocket(SocketWrapperBase<S> socketWrapper,
SocketEvent event, boolean dispatch) {
try {
//如果为空则直接返回
if (socketWrapper == null) {
return false;
}
SocketProcessorBase<S> sc = null;
//不是空从缓存中取,先进先出
if (processorCache != null) {
sc = processorCache.pop();
}
//如果为空则创建 否则重置
if (sc == null) {
sc = createSocketProcessor(socketWrapper, event);
} else {
sc.reset(socketWrapper, event);
}
//获取线程池,然后通过工作的线程处理用户发送的信息
Executor executor = getExecutor();
if (dispatch && executor != null) {
executor.execute(sc);
} else {
//同步执行
sc.run();
}
} catch (RejectedExecutionException ree) {
getLog().warn(sm.getString("endpoint.executor.fail", socketWrapper) , ree);
return false;
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
// This means we got an OOM or similar creating a thread, or that
// the pool and its queue are full
getLog().error(sm.getString("endpoint.process.fail"), t);
return false;
}
return true;
}
那么最后就是到这个doRun 位置是:org.apache.tomcat.util.net.SocketProcessorBase#doRun
这里就是最终 SocketProcessor 的执行。
org.apache.tomcat.util.net.NioEndpoint.SocketProcessor#doRun
代码语言:javascript复制@Override
protected void doRun() {
/*
* Do not cache and re-use the value of socketWrapper.getSocket() in
* this method. If the socket closes the value will be updated to
* CLOSED_NIO_CHANNEL and the previous value potentially re-used for
* a new connection. That can result in a stale cached value which
* in turn can result in unintentionally closing currently active
* connections.
*/
//获取poller事件
Poller poller = NioEndpoint.this.poller;
//如果为空则直接关闭这个事件并返回
if (poller == null) {
socketWrapper.close();
return;
}
try {
int handshake = -1;
try {
if (socketWrapper.getSocket().isHandshakeComplete()) {
// No TLS handshaking required. Let the handler
// process this socket / event combination.
handshake = 0;
} else if (event == SocketEvent.STOP || event == SocketEvent.DISCONNECT ||
event == SocketEvent.ERROR) {
// Unable to complete the TLS handshake. Treat it as
// if the handshake failed.
handshake = -1;
} else {
handshake = socketWrapper.getSocket().handshake(event == SocketEvent.OPEN_READ, event == SocketEvent.OPEN_WRITE);
// The handshake process reads/writes from/to the
// socket. status may therefore be OPEN_WRITE once
// the handshake completes. However, the handshake
// happens when the socket is opened so the status
// must always be OPEN_READ after it completes. It
// is OK to always set this as it is only used if
// the handshake completes.
event = SocketEvent.OPEN_READ;
}
} catch (IOException x) {
handshake = -1;
if (log.isDebugEnabled()) {
log.debug("Error during SSL handshake",x);
}
} catch (CancelledKeyException ckx) {
handshake = -1;
}
if (handshake == 0) {
SocketState state = SocketState.OPEN;
// Process the request from this socket
if (event == null) {
// 如果为空,获取connectionhandler进行请求处理
state = getHandler().process(socketWrapper, SocketEvent.OPEN_READ);
} else {
//获取connectionhandler 请求处理
state = getHandler().process(socketWrapper, event);
}
if (state == SocketState.CLOSED) {
poller.cancelledKey(getSelectionKey(), socketWrapper);
}
} else if (handshake == -1 ) {
getHandler().process(socketWrapper, SocketEvent.CONNECT_FAIL);
poller.cancelledKey(getSelectionKey(), socketWrapper);
} else if (handshake == SelectionKey.OP_READ){
socketWrapper.registerReadInterest();
} else if (handshake == SelectionKey.OP_WRITE){
socketWrapper.registerWriteInterest();
}
} catch (CancelledKeyException cx) {
poller.cancelledKey(getSelectionKey(), socketWrapper);
} catch (VirtualMachineError vme) {
ExceptionUtils.handleThrowable(vme);
} catch (Throwable t) {
log.error(sm.getString("endpoint.processing.fail"), t);
poller.cancelledKey(getSelectionKey(), socketWrapper);
} finally {
socketWrapper = null;
event = null;
//return to cache
if (running && processorCache != null) {
processorCache.push(this);
}
}
}
org.apache.coyote.AbstractProtocol.ConnectionHandler#process
这个方法用于处理流程获取socket状态
代码语言:javascript复制@Override
public SocketState process(SocketWrapperBase<S> wrapper, SocketEvent status) {
if (getLog().isDebugEnabled()) {
getLog().debug(sm.getString("abstractConnectionHandler.process",
wrapper.getSocket(), status));
}
if (wrapper == null) {
// Nothing to do. Socket has been closed.
return SocketState.CLOSED;
}
S socket = wrapper.getSocket();
// We take complete ownership of the Processor inside of this method to ensure
// no other thread can release it while we're using it. Whatever processor is
// held by this variable will be associated with the SocketWrapper before this
// method returns.
//用于socket关联processor
Processor processor = (Processor) wrapper.takeCurrentProcessor();
if (getLog().isDebugEnabled()) {
getLog().debug(sm.getString("abstractConnectionHandler.connectionsGet",
processor, socket));
}
// Timeouts are calculated on a dedicated thread and then
// dispatched. Because of delays in the dispatch process, the
// timeout may no longer be required. Check here and avoid
// unnecessary processing.
if (SocketEvent.TIMEOUT == status &&
(processor == null ||
!processor.isAsync() && !processor.isUpgrade() ||
processor.isAsync() && !processor.checkAsyncTimeoutGeneration())) {
// This is effectively a NO-OP
return SocketState.OPEN;
}
//
if (processor != null) {
// Make sure an async timeout doesn't fire
getProtocol().removeWaitingProcessor(processor);
} else if (status == SocketEvent.DISCONNECT || status == SocketEvent.ERROR) {
// Nothing to do. Endpoint requested a close and there is no
// longer a processor associated with this socket.
return SocketState.CLOSED;
}
try {
if (processor == null) {
String negotiatedProtocol = wrapper.getNegotiatedProtocol();
// OpenSSL typically returns null whereas JSSE typically
// returns "" when no protocol is negotiated
if (negotiatedProtocol != null && negotiatedProtocol.length() > 0) {
UpgradeProtocol upgradeProtocol = getProtocol().getNegotiatedProtocol(negotiatedProtocol);
if (upgradeProtocol != null) {
processor = upgradeProtocol.getProcessor(wrapper, getProtocol().getAdapter());
if (getLog().isDebugEnabled()) {
getLog().debug(sm.getString("abstractConnectionHandler.processorCreate", processor));
}
} else if (negotiatedProtocol.equals("http/1.1")) {
// Explicitly negotiated the default protocol.
// Obtain a processor below.
} else {
// TODO:
// OpenSSL 1.0.2's ALPN callback doesn't support
// failing the handshake with an error if no
// protocol can be negotiated. Therefore, we need to
// fail the connection here. Once this is fixed,
// replace the code below with the commented out
// block.
if (getLog().isDebugEnabled()) {
getLog().debug(sm.getString("abstractConnectionHandler.negotiatedProcessor.fail",
negotiatedProtocol));
}
return SocketState.CLOSED;
/*
* To replace the code above once OpenSSL 1.1.0 is
* used.
// Failed to create processor. This is a bug.
throw new IllegalStateException(sm.getString(
"abstractConnectionHandler.negotiatedProcessor.fail",
negotiatedProtocol));
*/
}
}
}
if (processor == null) {
processor = recycledProcessors.pop();
if (getLog().isDebugEnabled()) {
getLog().debug(sm.getString("abstractConnectionHandler.processorPop", processor));
}
}
if (processor == null) {
processor = getProtocol().createProcessor();
//注册流程
register(processor);
if (getLog().isDebugEnabled()) {
getLog().debug(sm.getString("abstractConnectionHandler.processorCreate", processor));
}
}
processor.setSslSupport(
wrapper.getSslSupport(getProtocol().getClientCertProvider()));
SocketState state = SocketState.CLOSED;
do {
//获取状态
state = processor.process(wrapper, status);
if (state == SocketState.UPGRADING) {
// Get the HTTP upgrade handler
UpgradeToken upgradeToken = processor.getUpgradeToken();
// Restore leftover input to the wrapper so the upgrade
// processor can process it.
ByteBuffer leftOverInput = processor.getLeftoverInput();
wrapper.unRead(leftOverInput);
if (upgradeToken == null) {
// Assume direct HTTP/2 connection
UpgradeProtocol upgradeProtocol = getProtocol().getUpgradeProtocol("h2c");
if (upgradeProtocol != null) {
// Release the Http11 processor to be re-used
release(processor);
// Create the upgrade processor
processor = upgradeProtocol.getProcessor(wrapper, getProtocol().getAdapter());
} else {
if (getLog().isDebugEnabled()) {
getLog().debug(sm.getString(
"abstractConnectionHandler.negotiatedProcessor.fail",
"h2c"));
}
// Exit loop and trigger appropriate clean-up
state = SocketState.CLOSED;
}
} else {
HttpUpgradeHandler httpUpgradeHandler = upgradeToken.getHttpUpgradeHandler();
// Release the Http11 processor to be re-used
release(processor);
// Create the upgrade processor
processor = getProtocol().createUpgradeProcessor(wrapper, upgradeToken);
if (getLog().isDebugEnabled()) {
getLog().debug(sm.getString("abstractConnectionHandler.upgradeCreate",
processor, wrapper));
}
// Initialise the upgrade handler (which may trigger
// some IO using the new protocol which is why the lines
// above are necessary)
// This cast should be safe. If it fails the error
// handling for the surrounding try/catch will deal with
// it.
if (upgradeToken.getInstanceManager() == null) {
httpUpgradeHandler.init((WebConnection) processor);
} else {
ClassLoader oldCL = upgradeToken.getContextBind().bind(false, null);
try {
httpUpgradeHandler.init((WebConnection) processor);
} finally {
upgradeToken.getContextBind().unbind(false, oldCL);
}
}
}
}
} while ( state == SocketState.UPGRADING);
if (state == SocketState.LONG) {
// In the middle of processing a request/response. Keep the
// socket associated with the processor. Exact requirements
// depend on type of long poll
longPoll(wrapper, processor);
if (processor.isAsync()) {
getProtocol().addWaitingProcessor(processor);
}
} else if (state == SocketState.OPEN) {
// In keep-alive but between requests. OK to recycle
// processor. Continue to poll for the next request.
release(processor);
processor = null;
wrapper.registerReadInterest();
} else if (state == SocketState.SENDFILE) {
// Sendfile in progress. If it fails, the socket will be
// closed. If it works, the socket either be added to the
// poller (or equivalent) to await more data or processed
// if there are any pipe-lined requests remaining.
} else if (state == SocketState.UPGRADED) {
// Don't add sockets back to the poller if this was a
// non-blocking write otherwise the poller may trigger
// multiple read events which may lead to thread starvation
// in the connector. The write() method will add this socket
// to the poller if necessary.
if (status != SocketEvent.OPEN_WRITE) {
longPoll(wrapper, processor);
getProtocol().addWaitingProcessor(processor);
}
} else if (state == SocketState.SUSPENDED) {
// Don't add sockets back to the poller.
// The resumeProcessing() method will add this socket
// to the poller.
} else {
// Connection closed. OK to recycle the processor.
// Processors handling upgrades require additional clean-up
// before release.
if (processor != null && processor.isUpgrade()) {
UpgradeToken upgradeToken = processor.getUpgradeToken();
HttpUpgradeHandler httpUpgradeHandler = upgradeToken.getHttpUpgradeHandler();
InstanceManager instanceManager = upgradeToken.getInstanceManager();
if (instanceManager == null) {
httpUpgradeHandler.destroy();
} else {
ClassLoader oldCL = upgradeToken.getContextBind().bind(false, null);
try {
httpUpgradeHandler.destroy();
} finally {
try {
instanceManager.destroyInstance(httpUpgradeHandler);
} catch (Throwable e) {
ExceptionUtils.handleThrowable(e);
getLog().error(sm.getString("abstractConnectionHandler.error"), e);
}
upgradeToken.getContextBind().unbind(false, oldCL);
}
}
}
release(processor);
processor = null;
}
if (processor != null) {
wrapper.setCurrentProcessor(processor);
}
return state;
} catch(java.net.SocketException e) {
// SocketExceptions are normal
getLog().debug(sm.getString(
"abstractConnectionHandler.socketexception.debug"), e);
} catch (java.io.IOException e) {
// IOExceptions are normal
getLog().debug(sm.getString(
"abstractConnectionHandler.ioexception.debug"), e);
} catch (ProtocolException e) {
// Protocol exceptions normally mean the client sent invalid or
// incomplete data.
getLog().debug(sm.getString(
"abstractConnectionHandler.protocolexception.debug"), e);
}
// Future developers: if you discover any other
// rare-but-nonfatal exceptions, catch them here, and log as
// above.
catch (OutOfMemoryError oome) {
// Try and handle this here to give Tomcat a chance to close the
// connection and prevent clients waiting until they time out.
// Worst case, it isn't recoverable and the attempt at logging
// will trigger another OOME.
getLog().error(sm.getString("abstractConnectionHandler.oome"), oome);
} catch (Throwable e) {
ExceptionUtils.handleThrowable(e);
// any other exception or error is odd. Here we log it
// with "ERROR" level, so it will show up even on
// less-than-verbose logs.
getLog().error(sm.getString("abstractConnectionHandler.error"), e);
}
// Make sure socket/processor is removed from the list of current
// connections
release(processor);
return SocketState.CLOSED;
}
org.apache.tomcat.util.threads.ThreadPoolExecutor#runWorker
这个方法用于作业,非常复杂~
代码语言:javascript复制final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts 允许中断
boolean completedAbruptly = true;
try {
//不为空 并且线先也不为空
while (task != null || (task = getTask()) != null) {
//aqs锁
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
//判断线程执行数进行的一些中断操作
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted()) {
wt.interrupt();
}
try {
beforeExecute(wt, task);
try {
//执行
task.run();
afterExecute(task, null);
} catch (Throwable ex) {
afterExecute(task, ex);
throw ex;
}
} finally {
//最终将任务置为空,并且解锁,作业 1
task = null;
w.completedTasks ;
w.unlock();
}
}
completedAbruptly = false;
} finally {
//退出的操作
processWorkerExit(w, completedAbruptly);
}
}
那么以上就是整个用户请求到最终处理的逻辑,没有全部列,因为代码量实在过过。需要详情请下载源码研读。
最后
tomat底层是通过netty实现io相关的操作,但是又区别于netty,因为有些处理由tomcat再封装因为本文主要用于学习了解tomcat的线程模型初始化的配置以及种类,还有针对一个用户请求的时候经过哪些组件,当然这里仅对组件描述部分核心逻辑,详细逻辑可能需要你花时间研读,主要了解tomcat底层的实现逻辑各个组件如何配合,以及各种设计模式是如何交互。有兴趣的同学可以根据提供的源码继续深入研究或者参考下面本文参考的文章进行深入。
参考文章:
https://www.cnblogs.com/qmillet/p/12553328.html
https://zhuanlan.zhihu.com/p/393390855
https://blog.csdn.net/qq_16681169/article/details/75003640
https://juejin.cn/post/6844903966422073352
https://juejin.cn/post/6844904018955730951