【Tomcat源码分析】从零开始理解 HTTP 请求处理 (第二篇)

2024-09-20 14:28:48 浏览数 (5)

前言

上一篇讲述了 Connector 启动逻辑,具体可查看:

【Tomcat 源码分析】从零开始理解 HTTP 请求处理 (第一篇)

深入探究 Connector 启动逻辑后,我们接下来需细致分析 HTTP 请求的执行流程。从客户端发出的请求,要经历哪些环节才能最终被处理?

Connector 请求逻辑

Acceptor

Acceptor 线程负责监听套接字,并将已连接套接字移交给 Poller 线程处理。Acceptor 线程的数量由 AbstracEndPoint 的 acceptorThreadCount 成员变量控制,默认值为 1。

AbstractEndpoint.Acceptor 是 AbstractEndpoint 类的静态抽象类,它实现了 Runnable 接口,

部分代码如下:

代码语言:javascript复制
public abstract static class Acceptor implements Runnable {
    public enum AcceptorState {
        NEW, RUNNING, PAUSED, ENDED
    }

    protected volatile AcceptorState state = AcceptorState.NEW;
    public final AcceptorState getState() {
        return state;
    }

    private String threadName;
    protected final void setThreadName(final String threadName) {
        this.threadName = threadName;
    }
    protected final String getThreadName() {
        return threadName;
    }
}

NioEndpoint 的 Acceptor 成员内部类继承了 AbstractEndpoint.Acceptor,

代码语言:javascript复制
protected class Acceptor extends AbstractEndpoint.Acceptor {
    @Override
    public void run() {
        int errorDelay = 0;

        // Loop until we receive a shutdown command
        while (running) {

            // Loop if endpoint is paused
            // 1. 运行过程中,如果`Endpoint`暂停了,则`Acceptor`进行自旋(间隔50毫秒) `
            while (paused && running) {
                state = AcceptorState.PAUSED;
                try {
                    Thread.sleep(50);
                } catch (InterruptedException e) {
                    // Ignore
                }
            }
            // 2. 如果`Endpoint`终止运行了,则`Acceptor`也会终止
            if (!running) {
                break;
            }
            state = AcceptorState.RUNNING;

            try {
                //if we have reached max connections, wait
                // 3. 如果请求达到了最大连接数,则wait直到连接数降下来
                countUpOrAwaitConnection();

                SocketChannel socket = null;
                try {
                    // Accept the next incoming connection from the server
                    // socket
                    // 4. 接受下一次连接的socket
                    socket = serverSock.accept();
                } catch (IOException ioe) {
                    // We didn't get a socket
                    countDownConnection();
                    if (running) {
                        // Introduce delay if necessary
                        errorDelay = handleExceptionWithDelay(errorDelay);
                        // re-throw
                        throw ioe;
                    } else {
                        break;
                    }
                }
                // Successful accept, reset the error delay
                errorDelay = 0;

                // Configure the socket
                if (running && !paused) {
                    // setSocketOptions() will hand the socket off to
                    // an appropriate processor if successful
                    // 5. `setSocketOptions()`这儿是关键,会将socket以事件的方式传递给poller
                    if (!setSocketOptions(socket)) {
                        closeSocket(socket);
                    }
                } else {
                    closeSocket(socket);
                }
            } catch (Throwable t) {
                ExceptionUtils.handleThrowable(t);
                log.error(sm.getString("endpoint.accept.fail"), t);
            }
        }
        state = AcceptorState.ENDED;
    }
}

从以上代码中可以看出:

  • countUpOrAwaitConnection 函数会检查当前连接数是否已达到最大连接数 maxConnections,若未达到则连接数加一,否则等待。
  • socket = serverSock.accept()这一行中的 serverSock 正是 NioEndpoint 的 bind 函数中打开的 ServerSocketChannel。为了方便引用该变量,NioEndpoint 的 Acceptor 类被设计为成员类而非静态类。
  • setSocketOptions 函数调用上的注释表明该函数将已连接套接字交给 Poller 线程进行处理。

setSocketOptions 方法接下来对已连接套接字进行处理:

代码语言:javascript复制
protected boolean setSocketOptions(SocketChannel socket) {
    // Process the connection
    try {
        //disable blocking, APR style, we are gonna be polling it
        socket.configureBlocking(false);
        Socket sock = socket.socket();
        socketProperties.setProperties(sock);

        NioChannel channel = nioChannels.pop();
        if (channel == null) {
            SocketBufferHandler bufhandler = new SocketBufferHandler(
                    socketProperties.getAppReadBufSize(),
                    socketProperties.getAppWriteBufSize(),
                    socketProperties.getDirectBuffer());
            if (isSSLEnabled()) {
                channel = new SecureNioChannel(socket, bufhandler, selectorPool, this);
            } else {
                channel = new NioChannel(socket, bufhandler);
            }
        } else {
            channel.setIOChannel(socket);
            channel.reset();
        }
        // 将channel注册到poller,注意关键的两个方法,`getPoller0()`和`Poller.register()`
        getPoller0().register(channel);
    } catch (Throwable t) {
        ExceptionUtils.handleThrowable(t);
        try {
            log.error("",t);
        } catch (Throwable tt) {
            ExceptionUtils.handleThrowable(tt);
        }
        // Tell to close the socket
        return false;
    }
    return true;
}

setSocketOptions 方法会从 NioChannel 栈中取出一个可重用的对象(若存在),否则创建一个新的 NioChannel 对象。

随后,getPoller0 方法采用轮询的方式选择一个 Poller 线程,并使用 Poller 类的 register 方法将该 NioChannel 对象注册到选定的 Poller 线程上。

如果成功将套接字转交给 Poller 线程,则函数返回 true,否则返回 false。当返回 false 时,Acceptor 类的 closeSocket 函数会关闭通道和底层 Socket 连接,并减少当前最大连接数。

Poller

Poller 线程主要负责以较少的资源轮询已连接套接字,以维持连接状态,并在数据可用时将任务传递给工作线程处理。

Poller 线程的数量由 NioEndPoint 的 pollerThreadCount 成员变量控制,默认值为 2 或可用处理器数量的较小值。

Poller 实现了 Runnable 接口,其构造函数会为每个 Poller 线程创建一个新的 Selector。

代码语言:javascript复制
public class Poller implements Runnable {
    private Selector selector;
    private final SynchronizedQueue<PollerEvent> events =
            new SynchronizedQueue<>();
    // 省略一些代码
    public Poller() throws IOException {
        this.selector = Selector.open();
    }

    public Selector getSelector() { return selector;}
    // 省略一些代码
}

将 channel 注册到 poller 时,需要关注两个关键方法:getPoller0()Poller.register()

首先分析 getPoller0() 方法。该方法的关键在于采用取模的方式轮询选择 poller,以实现负载均衡

代码语言:javascript复制
/**
 * The socket poller.
 */
private Poller[] pollers = null;
private AtomicInteger pollerRotater = new AtomicInteger(0);
/**
 * Return an available poller in true round robin fashion.
 *
 * @return The next poller in sequence
 */
public Poller getPoller0() {
    int idx = Math.abs(pollerRotater.incrementAndGet()) % pollers.length;
    return pollers[idx];
}

接下来分析 Poller.register() 方法。由于 Poller 维护了一个 events 同步队列,因此 Acceptor 接受到的 channel 会被放入该队列,具体的代码为 events.offer(event);

代码语言:javascript复制
public class Poller implements Runnable {

    private final SynchronizedQueue<PollerEvent> events = new SynchronizedQueue<>();

    /**
     * Registers a newly created socket with the poller.
     *
     * @param socket    The newly created socket
     */
    public void register(final NioChannel socket) {
        socket.setPoller(this);
        NioSocketWrapper ka = new NioSocketWrapper(socket, NioEndpoint.this);
        socket.setSocketWrapper(ka);
        ka.setPoller(this);
        ka.setReadTimeout(getSocketProperties().getSoTimeout());
        ka.setWriteTimeout(getSocketProperties().getSoTimeout());
        ka.setKeepAliveLeft(NioEndpoint.this.getMaxKeepAliveRequests());
        ka.setSecure(isSSLEnabled());
        ka.setReadTimeout(getConnectionTimeout());
        ka.setWriteTimeout(getConnectionTimeout());
        PollerEvent r = eventCache.pop();
        ka.interestOps(SelectionKey.OP_READ);//this is what OP_REGISTER turns into.
        if ( r==null) r = new PollerEvent(socket,ka,OP_REGISTER);
        else r.reset(socket,ka,OP_REGISTER);
        addEvent(r);
    }

    private void addEvent(PollerEvent event) {
        events.offer(event);
        if ( wakeupCounter.incrementAndGet() == 0 ) selector.wakeup();
    }
}
PollerEvent

接下来看一下 PollerEvent,它实现了 Runnable 接口,用来表示一个轮询事件,代码如下:

代码语言:javascript复制
public static class PollerEvent implements Runnable {
    private NioChannel socket;
    private int interestOps;
    private NioSocketWrapper socketWrapper;

    public PollerEvent(NioChannel ch, NioSocketWrapper w, int intOps) {
        reset(ch, w, intOps);
    }

    public void reset(NioChannel ch, NioSocketWrapper w, int intOps) {
        socket = ch;
        interestOps = intOps;
        socketWrapper = w;
    }

    public void reset() {
        reset(null, null, 0);
    }

    @Override
    public void run() {
        if (interestOps == OP_REGISTER) {
            try {
                socket.getIOChannel().register(
                        socket.getPoller().getSelector(), SelectionKey.OP_READ, socketWrapper);
            } catch (Exception x) {
                log.error(sm.getString("endpoint.nio.registerFail"), x);
            }
        } else {
            final SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector());
            try {
                if (key == null) {
                    socket.socketWrapper.getEndpoint().countDownConnection();
                    ((NioSocketWrapper) socket.socketWrapper).closed = true;
                } else {
                    final NioSocketWrapper socketWrapper = (NioSocketWrapper) key.attachment();
                    if (socketWrapper != null) {
                        //we are registering the key to start with, reset the fairness counter.
                        int ops = key.interestOps() | interestOps;
                        socketWrapper.interestOps(ops);
                        key.interestOps(ops);
                    } else {
                        socket.getPoller().cancelledKey(key);
                    }
                }
            } catch (CancelledKeyException ckx) {
                try {
                    socket.getPoller().cancelledKey(key);
                } catch (Exception ignore) {}
            }
        }
    }

}

在 run 函数中:

  • 如果感兴趣集是自定义的 OP_REGISTER,则说明该事件表示的已连接套接字通道尚未被轮询线程处理过。此时,将该通道注册到 Poller 线程的 Selector 上,感兴趣集设置为 OP_READ,通道注册的附件是一个 NioSocketWrapper 对象。这就是 Poller 的 register 方法添加事件的过程。
  • 否则,获取已连接套接字通道注册到 Poller 线程的 Selector 上的 SelectionKey,并为 key 添加新的感兴趣集。

重访 Poller

如上文所述,Poller 类实现了 Runnable 接口,其重写的 run 方法如下所示:

代码语言:javascript复制
public boolean events() {
    boolean result = false;
    PollerEvent pe = null;
    for (int i = 0, size = events.size(); i < size && (pe = events.poll()) != null; i   ) {
        result = true;
        try {
            //直接调用run方法
            pe.run();
            pe.reset();
            if (running && !paused) {
                eventCache.push(pe);
            }
        } catch ( Throwable x ) {
            log.error("",x);
        }
    }
    return result;
}

@Override
public void run() {
    // Loop until destroy() is called
    while (true) {
        boolean hasEvents = false;

        try {
            if (!close) {
                /执行PollerEvent的run方法
                hasEvents = events();
                if (wakeupCounter.getAndSet(-1) > 0) {
                    //if we are here, means we have other stuff to do
                    //do a non blocking select
                    keyCount = selector.selectNow();
                } else {
                    keyCount = selector.select(selectorTimeout);
                }
                wakeupCounter.set(0);
            }
            if (close) {
                events();
                timeout(0, false);
                try {
                    selector.close();
                } catch (IOException ioe) {
                    log.error(sm.getString("endpoint.nio.selectorCloseFail"), ioe);
                }
                break;
            }
        } catch (Throwable x) {
            ExceptionUtils.handleThrowable(x);
            log.error("",x);
            continue;
        }
        //either we timed out or we woke up, process events first
        if ( keyCount == 0 ) hasEvents = (hasEvents | events());

        // 获取当前选择器中所有注册的“选择键(已就绪的监听事件)”
        Iterator<SelectionKey> iterator =
            keyCount > 0 ? selector.selectedKeys().iterator() : null;
        // Walk through the collection of ready keys and dispatch
        // any active event.
        // 对已经准备好的key进行处理
        while (iterator != null && iterator.hasNext()) {
            SelectionKey sk = iterator.next();
            NioSocketWrapper attachment = (NioSocketWrapper)sk.attachment();
            // Attachment may be null if another thread has called
            // cancelledKey()
            if (attachment == null) {
                iterator.remove();
            } else {
                iterator.remove();
                // 真正处理key的地方
                processKey(sk, attachment);
            }
        }//while

        //process timeouts
        timeout(keyCount,hasEvents);
    }//while

    getStopLatch().countDown();
}

在 Poller 的 run 方法中:

  • 如果队列里有元素,则会先将队列中的事件全部执行一遍。PollerEvent 的 run 方法会将通道注册到 Poller 的 Selector 上。
  • 接着,对 select 返回的 SelectionKey 进行处理。由于在 PollerEvent 中注册通道时带上了 NioSocketWrapper 附件,因此这里可以使用 SelectionKey 的 attachment 方法获取到该附件,然后调用 processKey 方法去处理已连接套接字通道。

接下来分析 processKey() 方法。该方法会根据 key 的类型,分别处理读事件和写事件。

  • 处理读事件:例如生成 Request 对象。
  • 处理写事件:例如将生成的 Response 对象通过 socket 写回客户端。
代码语言:javascript复制
protected void processKey(SelectionKey sk, NioSocketWrapper attachment) {
    try {
        if ( close ) {
            cancelledKey(sk);
        } else if ( sk.isValid() && attachment != null ) {
            if (sk.isReadable() || sk.isWritable() ) {
                if ( attachment.getSendfileData() != null ) {
                    processSendfile(sk,attachment, false);
                } else {
                    unreg(sk, attachment, sk.readyOps());
                    boolean closeSocket = false;
                    // 1. 处理读事件,比如生成Request对象
                    // Read goes before write
                    if (sk.isReadable()) {
                        if (!processSocket(attachment, SocketEvent.OPEN_READ, true)) {
                            closeSocket = true;
                        }
                    }
                    // 2. 处理写事件,比如将生成的Response对象通过socket写回客户端
                    if (!closeSocket && sk.isWritable()) {
                        if (!processSocket(attachment, SocketEvent.OPEN_WRITE, true)) {
                            closeSocket = true;
                        }
                    }
                    if (closeSocket) {
                        cancelledKey(sk);
                    }
                }
            }
        } else {
            //invalid key
            cancelledKey(sk);
        }
    } catch ( CancelledKeyException ckx ) {
        cancelledKey(sk);
    } catch (Throwable t) {
        ExceptionUtils.handleThrowable(t);
        log.error("",t);
    }
}

接下来分析 processSocket() 方法。

该方法会从 processorCache 中获取一个 Processor 来处理 socket。Processor 的实现为 SocketProcessor。

最后,将 Processor 放入工作线程池中执行。

代码语言:javascript复制
public boolean processSocket(SocketWrapperBase<S> socketWrapper,
        SocketEvent event, boolean dispatch) {
    try {
        if (socketWrapper == null) {
            return false;
        }
        // 1. 从`processorCache`里面拿一个`Processor`来处理socket,`Processor`的实现为`SocketProcessor`
        SocketProcessorBase<S> sc = processorCache.pop();
        if (sc == null) {
            sc = createSocketProcessor(socketWrapper, event);
        } else {
            sc.reset(socketWrapper, event);
        }
        // 2. 将`Processor`放到工作线程池中执行
        Executor executor = getExecutor();
        if (dispatch && executor != null) {
            executor.execute(sc);
        } else {
            sc.run();
        }
    } catch (RejectedExecutionException ree) {
        getLog().warn(sm.getString("endpoint.executor.fail", socketWrapper) , ree);
        return false;
    } catch (Throwable t) {
        ExceptionUtils.handleThrowable(t);
        // This means we got an OOM or similar creating a thread, or that
        // the pool and its queue are full
        getLog().error(sm.getString("endpoint.process.fail"), t);
        return false;
    }
    return true;
}

dispatch 参数表示是否要在另外的线程中处理。上文 processKey 各处传递的参数都是 true。

当 dispatch 为 true 且工作线程池存在时,会执行 executor.execute(sc),之后由工作线程池处理已连接套接字;

否则,Poller 线程会继续自己处理已连接套接字。

AbstractEndPoint 类中的 createSocketProcessor 是一个抽象方法,NioEndPoint 类实现了它。

代码语言:javascript复制
@Override
protected SocketProcessorBase<NioChannel> createSocketProcessor(
        SocketWrapperBase<NioChannel> socketWrapper, SocketEvent event) {
    return new SocketProcessor(socketWrapper, event);
}

接下来分析 SocketProcessor.doRun() 方法 (SocketProcessor.run() 方法最终调用此方法)。

该方法将处理逻辑交给 Handler 处理。当 event 为 null 时,则表明是一个 OPEN_READ 事件。

该类的注释说明 SocketProcessor 与 Worker 的作用等价。

代码语言:javascript复制
/**
 * This class is the equivalent of the Worker, but will simply use in an
 * external Executor thread pool.
 */
protected class SocketProcessor extends SocketProcessorBase<NioChannel> {

    public SocketProcessor(SocketWrapperBase<NioChannel> socketWrapper, SocketEvent event) {
        super(socketWrapper, event);
    }

    @Override
    protected void doRun() {
        NioChannel socket = socketWrapper.getSocket();
        SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector());

        try {
            int handshake = -1;

            try {
                if (key != null) {
                    if (socket.isHandshakeComplete()) {
                        // No TLS handshaking required. Let the handler
                        // process this socket / event combination.
                        handshake = 0;
                    } else if (event == SocketEvent.STOP || event == SocketEvent.DISCONNECT ||
                            event == SocketEvent.ERROR) {
                        // Unable to complete the TLS handshake. Treat it as
                        // if the handshake failed.
                        handshake = -1;
                    } else {
                        handshake = socket.handshake(key.isReadable(), key.isWritable());
                        // The handshake process reads/writes from/to the
                        // socket. status may therefore be OPEN_WRITE once
                        // the handshake completes. However, the handshake
                        // happens when the socket is opened so the status
                        // must always be OPEN_READ after it completes. It
                        // is OK to always set this as it is only used if
                        // the handshake completes.
                        event = SocketEvent.OPEN_READ;
                    }
                }
            } catch (IOException x) {
                handshake = -1;
                if (log.isDebugEnabled()) log.debug("Error during SSL handshake",x);
            } catch (CancelledKeyException ckx) {
                handshake = -1;
            }
            if (handshake == 0) {
                SocketState state = SocketState.OPEN;
                // Process the request from this socket
                // 将处理逻辑交给`Handler`处理,当event为null时,则表明是一个`OPEN_READ`事件
                if (event == null) {
                    state = getHandler().process(socketWrapper, SocketEvent.OPEN_READ);
                } else {
                    state = getHandler().process(socketWrapper, event);
                }
                if (state == SocketState.CLOSED) {
                    close(socket, key);
                }
            } else if (handshake == -1 ) {
                close(socket, key);
            } else if (handshake == SelectionKey.OP_READ){
                socketWrapper.registerReadInterest();
            } else if (handshake == SelectionKey.OP_WRITE){
                socketWrapper.registerWriteInterest();
            }
        } catch (CancelledKeyException cx) {
            socket.getPoller().cancelledKey(key);
        } catch (VirtualMachineError vme) {
            ExceptionUtils.handleThrowable(vme);
        } catch (Throwable t) {
            log.error("", t);
            socket.getPoller().cancelledKey(key);
        } finally {
            socketWrapper = null;
            event = null;
            //return to cache
            if (running && !paused) {
                processorCache.push(this);
            }
        }
    }
}

Handler 的关键方法是 process(),虽然该方法有很多条件分支,但逻辑非常清晰,主要是调用 Processor.process() 方法。

代码语言:javascript复制

@Override
public SocketState process(SocketWrapperBase<S> wrapper, SocketEvent status) {
    try {

        if (processor == null) {
            processor = getProtocol().createProcessor();
            register(processor);
        }

        processor.setSslSupport(
                wrapper.getSslSupport(getProtocol().getClientCertProvider()));

        // Associate the processor with the connection
        connections.put(socket, processor);

        SocketState state = SocketState.CLOSED;
        do {
            // 关键的代码,终于找到你了
            state = processor.process(wrapper, status);

        } while ( state == SocketState.UPGRADING);
        return state;
    }
    catch (Throwable e) {
        ExceptionUtils.handleThrowable(e);
        // any other exception or error is odd. Here we log it
        // with "ERROR" level, so it will show up even on
        // less-than-verbose logs.
        getLog().error(sm.getString("abstractConnectionHandler.error"), e);
    } finally {
        ContainerThreadMarker.clear();
    }

    // Make sure socket/processor is removed from the list of current
    // connections
    connections.remove(socket);
    release(processor);
    return SocketState.CLOSED;
}
Processor

createProcessor

代码语言:javascript复制
protected Http11Processor createProcessor() {
    // 构建 Http11Processor
    Http11Processor processor = new Http11Processor(
            proto.getMaxHttpHeaderSize(), (JIoEndpoint)proto.endpoint, // 1. http header 的最大尺寸
            proto.getMaxTrailerSize(),proto.getMaxExtensionSize());
    processor.setAdapter(proto.getAdapter());
    // 2. 默认的 KeepAlive 情况下, 每个 Socket 处理的最多的 请求次数
    processor.setMaxKeepAliveRequests(proto.getMaxKeepAliveRequests());
    // 3. 开启 KeepAlive 的 Timeout
    processor.setKeepAliveTimeout(proto.getKeepAliveTimeout());
    // 4. http 当遇到文件上传时的 默认超时时间 (300 * 1000)
    processor.setConnectionUploadTimeout(
            proto.getConnectionUploadTimeout());
    processor.setDisableUploadTimeout(proto.getDisableUploadTimeout());
    // 5. 当 http 请求的 body size超过这个值时, 通过 gzip 进行压缩
    processor.setCompressionMinSize(proto.getCompressionMinSize());
    // 6. http 请求是否开启 compression 处理
    processor.setCompression(proto.getCompression());
    processor.setNoCompressionUserAgents(proto.getNoCompressionUserAgents());
    // 7. http body里面的内容是 "text/html,text/xml,text/plain" 才会进行 压缩处理
    processor.setCompressableMimeTypes(proto.getCompressableMimeTypes());
    processor.setRestrictedUserAgents(proto.getRestrictedUserAgents());
    // 8. socket 的 buffer, 默认 9000
    processor.setSocketBuffer(proto.getSocketBuffer());
    // 9. 最大的 Post 处理尺寸的大小 4 * 1000
    processor.setMaxSavePostSize(proto.getMaxSavePostSize());
    processor.setServer(proto.getServer());
    processor.setDisableKeepAlivePercentage(
            proto.getDisableKeepAlivePercentage());
    register(processor);
    return processor;
}

这里我们主要关注的是 Processor 对读操作的处理,只有一行代码:调用 service() 方法。

代码语言:javascript复制
public abstract class AbstractProcessorLight implements Processor {

    @Override
    public SocketState process(SocketWrapperBase<?> socketWrapper, SocketEvent status)
            throws IOException {

        SocketState state = SocketState.CLOSED;
        Iterator<DispatchType> dispatches = null;
        do {
            if (dispatches != null) {
                DispatchType nextDispatch = dispatches.next();
                state = dispatch(nextDispatch.getSocketStatus());
            } else if (status == SocketEvent.DISCONNECT) {
                // Do nothing here, just wait for it to get recycled
            } else if (isAsync() || isUpgrade() || state == SocketState.ASYNC_END) {
                state = dispatch(status);
                if (state == SocketState.OPEN) {
                    // There may be pipe-lined data to read. If the data isn't
                    // processed now, execution will exit this loop and call
                    // release() which will recycle the processor (and input
                    // buffer) deleting any pipe-lined data. To avoid this,
                    // process it now.
                    state = service(socketWrapper);
                }
            } else if (status == SocketEvent.OPEN_WRITE) {
                // Extra write event likely after async, ignore
                state = SocketState.LONG;
            } else if (status == SocketEvent.OPEN_READ){
                // 调用`service()`方法
                state = service(socketWrapper);
            } else {
                // Default to closing the socket if the SocketEvent passed in
                // is not consistent with the current state of the Processor
                state = SocketState.CLOSED;
            }

            if (getLog().isDebugEnabled()) {
                getLog().debug("Socket: ["   socketWrapper  
                        "], Status in: ["   status  
                        "], State out: ["   state   "]");
            }

            if (state != SocketState.CLOSED && isAsync()) {
                state = asyncPostProcess();
                if (getLog().isDebugEnabled()) {
                    getLog().debug("Socket: ["   socketWrapper  
                            "], State after async post processing: ["   state   "]");
                }
            }

            if (dispatches == null || !dispatches.hasNext()) {
                // Only returns non-null iterator if there are
                // dispatches to process.
                dispatches = getIteratorAndClearDispatches();
            }
        } while (state == SocketState.ASYNC_END ||
                dispatches != null && state != SocketState.CLOSED);

        return state;
    }
}

Processor.service() 方法比较重要,该方法比较长,超过 200 行,在此不再拷贝代码。

该方法的主要逻辑是:

  • 生成 Request 和 Response 对象。
  • 调用 Adapter.service() 方法,并将生成的 Request 和 Response 对象传递进去。

Adapter

Adapter 用于连接 Connector 和 Container,起到承上启下的作用。Processor 会调用 Adapter.service() 方法。该方法主要做了以下几件事:

  • 根据 Coyote 框架的 request 和 response 对象,生成 Connector 的 request 和 response 对象(是 HttpServletRequest 和 HttpServletResponse 的封装)。
  • 补充 Header。
  • 解析请求,该方法可能会处理代理服务器、设置必要的 Header 等操作。
  • 真正进入容器的地方,调用 Engine 容器下 pipeline 的阀门。
  • 通过 request.finishRequest 和 response.finishResponse(将 OutputBuffer 中的数据刷到浏览器)来完成整个请求。
代码语言:javascript复制
@Override
public void service(org.apache.coyote.Request req, org.apache.coyote.Response res)
        throws Exception {

    // 1. 根据coyote框架的request和response对象,生成connector的request和response对象(是HttpServletRequest和HttpServletResponse的封装)
    Request request = (Request) req.getNote(ADAPTER_NOTES);
    Response response = (Response) res.getNote(ADAPTER_NOTES);

    if (request == null) {
        // Create objects
        request = connector.createRequest();
        request.setCoyoteRequest(req);
        response = connector.createResponse();
        response.setCoyoteResponse(res);

        // Link objects
        request.setResponse(response);
        response.setRequest(request);

        // Set as notes
        req.setNote(ADAPTER_NOTES, request);
        res.setNote(ADAPTER_NOTES, response);

        // Set query string encoding
        req.getParameters().setQueryStringCharset(connector.getURICharset());
    }

    // 2. 补充header
    if (connector.getXpoweredBy()) {
        response.addHeader("X-Powered-By", POWERED_BY);
    }

    boolean async = false;
    boolean postParseSuccess = false;

    req.getRequestProcessor().setWorkerThreadName(THREAD_NAME.get());

    try {
        // Parse and set Catalina and configuration specific
        // request parameters
        // 3. 解析请求,该方法会出现代理服务器、设置必要的header等操作
        // 用来处理请求映射 (获取 host, context, wrapper, URI 后面的参数的解析, sessionId )
        postParseSuccess = postParseRequest(req, request, res, response);
        if (postParseSuccess) {
            //check valves if we support async
            request.setAsyncSupported(
                    connector.getService().getContainer().getPipeline().isAsyncSupported());
            // Calling the container
            // 4. 真正进入容器的地方,调用Engine容器下pipeline的阀门
            connector.getService().getContainer().getPipeline().getFirst().invoke(
                    request, response);
        }
        if (request.isAsync()) {
            async = true;
            ReadListener readListener = req.getReadListener();
            if (readListener != null && request.isFinished()) {
                // Possible the all data may have been read during service()
                // method so this needs to be checked here
                ClassLoader oldCL = null;
                try {
                    oldCL = request.getContext().bind(false, null);
                    if (req.sendAllDataReadEvent()) {
                        req.getReadListener().onAllDataRead();
                    }
                } finally {
                    request.getContext().unbind(false, oldCL);
                }
            }

            Throwable throwable =
                    (Throwable) request.getAttribute(RequestDispatcher.ERROR_EXCEPTION);

            // If an async request was started, is not going to end once
            // this container thread finishes and an error occurred, trigger
            // the async error process
            if (!request.isAsyncCompleting() && throwable != null) {
                request.getAsyncContextInternal().setErrorState(throwable, true);
            }
        } else {
            //5. 通过request.finishRequest 与 response.finishResponse(刷OutputBuffer中的数据到浏览器) 来完成整个请求
            request.finishRequest();
            //将 org.apache.catalina.connector.Response对应的 OutputBuffer 中的数据 刷到 org.apache.coyote.Response 对应的 InternalOutputBuffer 中, 并且最终调用 socket对应的 outputStream 将数据刷出去( 这里会组装 Http Response 中的 header 与 body 里面的数据, 并且刷到远端 )
            response.finishResponse();
        }

    } catch (IOException e) {
        // Ignore
    } finally {
        AtomicBoolean error = new AtomicBoolean(false);
        res.action(ActionCode.IS_ERROR, error);

        if (request.isAsyncCompleting() && error.get()) {
            // Connection will be forcibly closed which will prevent
            // completion happening at the usual point. Need to trigger
            // call to onComplete() here.
            res.action(ActionCode.ASYNC_POST_PROCESS,  null);
            async = false;
        }

        // Access log
        if (!async && postParseSuccess) {
            // Log only if processing was invoked.
            // If postParseRequest() failed, it has already logged it.
            Context context = request.getContext();
            // If the context is null, it is likely that the endpoint was
            // shutdown, this connection closed and the request recycled in
            // a different thread. That thread will have updated the access
            // log so it is OK not to update the access log here in that
            // case.
            if (context != null) {
                context.logAccess(request, response,
                        System.currentTimeMillis() - req.getStartTime(), false);
            }
        }

        req.getRequestProcessor().setWorkerThreadName(null);

        // Recycle the wrapper request and response
        if (!async) {
            request.recycle();
            response.recycle();
        }
    }
}
请求预处理

postParseRequest 方法对请求做预处理,例如:

  • 对路径去除分号表示的路径参数。
  • 进行 URI 解码。
  • 规格化(点号和两点号)。
代码语言:javascript复制
protected boolean postParseRequest(org.apache.coyote.Request req, Request request,
        org.apache.coyote.Response res, Response response) throws IOException, ServletException {
    // 省略部分代码
    MessageBytes decodedURI = req.decodedURI();

    if (undecodedURI.getType() == MessageBytes.T_BYTES) {
        // Copy the raw URI to the decodedURI
        decodedURI.duplicate(undecodedURI);

        // Parse the path parameters. This will:
        //   - strip out the path parameters
        //   - convert the decodedURI to bytes
        parsePathParameters(req, request);

        // URI decoding
        // %xx decoding of the URL
        try {
            req.getURLDecoder().convert(decodedURI, false);
        } catch (IOException ioe) {
            res.setStatus(400);
            res.setMessage("Invalid URI: "   ioe.getMessage());
            connector.getService().getContainer().logAccess(
                    request, response, 0, true);
            return false;
        }
        // Normalization
        if (!normalize(req.decodedURI())) {
            res.setStatus(400);
            res.setMessage("Invalid URI");
            connector.getService().getContainer().logAccess(
                    request, response, 0, true);
            return false;
        }
        // Character decoding
        convertURI(decodedURI, request);
        // Check that the URI is still normalized
        if (!checkNormalize(req.decodedURI())) {
            res.setStatus(400);
            res.setMessage("Invalid URI character encoding");
            connector.getService().getContainer().logAccess(
                    request, response, 0, true);
            return false;
        }
    } else {
        /* The URI is chars or String, and has been sent using an in-memory
            * protocol handler. The following assumptions are made:
            * - req.requestURI() has been set to the 'original' non-decoded,
            *   non-normalized URI
            * - req.decodedURI() has been set to the decoded, normalized form
            *   of req.requestURI()
            */
        decodedURI.toChars();
        // Remove all path parameters; any needed path parameter should be set
        // using the request object rather than passing it in the URL
        CharChunk uriCC = decodedURI.getCharChunk();
        int semicolon = uriCC.indexOf(';');
        if (semicolon > 0) {
            decodedURI.setChars
                (uriCC.getBuffer(), uriCC.getStart(), semicolon);
        }
    }

    // Request mapping.
    MessageBytes serverName;
    if (connector.getUseIPVHosts()) {
        serverName = req.localName();
        if (serverName.isNull()) {
            // well, they did ask for it
            res.action(ActionCode.REQ_LOCAL_NAME_ATTRIBUTE, null);
        }
    } else {
        serverName = req.serverName();
    }

    // Version for the second mapping loop and
    // Context that we expect to get for that version
    String version = null;
    Context versionContext = null;
    boolean mapRequired = true;

    while (mapRequired) {
        // This will map the the latest version by default
        connector.getService().getMapper().map(serverName, decodedURI,
                version, request.getMappingData());
        // 省略部分代码
    }
    // 省略部分代码
}

以 MessageBytes 的类型是 T_BYTES 为例:

  • parsePathParameters 方法去除 URI 中分号表示的路径参数。
  • req.getURLDecoder() 获取一个 UDecoder 实例,它的 convert 方法对 URI 解码,这里的解码只是移除百分号,计算百分号后两位的十六进制数字值以替代原来的三位百分号编码。
  • normalize 方法规格化 URI,解释路径中的“.” 和“..” 。
  • convertURI 方法利用 Connector 的 uriEncoding 属性将 URI 的字节转换为字符表示。
  • 注意 connector.getService().getMapper().map(serverName, decodedURI, version, request.getMappingData()) 这行代码。之前 Service 启动时,MapperListener 注册了该 Service 内的各 Host 和 Context。根据 URI 选择 Context 时,Mapper 的 map 方法采用的是 convertURI 方法解码后的 URI 与每个 Context 的路径去比较。

容器处理

如果请求可以被传给容器的 Pipeline,即当 postParseRequest 方法返回 true 时,则由容器继续处理。在 service 方法中有 connector.getService().getContainer().getPipeline().getFirst().invoke(request, response) 这一行代码:

  • Connector 调用 getService 返回 StandardService。
  • StandardService 调用 getContainer 返回 StandardEngine。
  • StandardEngine 调用 getPipeline 返回与其关联的 StandardPipeline。

结语:

由于篇幅问题,本章节到此就结束了。请您继续关注下一篇文章,我会在下一篇文章中详细介绍后续的处理流程。

好了,本章节到此告一段落。希望对你有所帮助,祝学习顺利。

1 人点赞