序
本文主要研究一下AsyncHttpClient的KeepAliveStrategy
KeepAliveStrategy
org/asynchttpclient/channel/KeepAliveStrategy.java
代码语言:javascript复制public interface KeepAliveStrategy {
/**
* Determines whether the connection should be kept alive after this HTTP message exchange.
*
* @param ahcRequest the Request, as built by AHC
* @param nettyRequest the HTTP request sent to Netty
* @param nettyResponse the HTTP response received from Netty
* @return true if the connection should be kept alive, false if it should be closed.
*/
boolean keepAlive(Request ahcRequest, HttpRequest nettyRequest, HttpResponse nettyResponse);
}
KeepAliveStrategy接口定义了keepAlive方法用于决定是否对该connection进行keep alive
DefaultKeepAliveStrategy
org/asynchttpclient/channel/DefaultKeepAliveStrategy.java
代码语言:javascript复制/**
* Connection strategy implementing standard HTTP 1.0/1.1 behavior.
*/
public class DefaultKeepAliveStrategy implements KeepAliveStrategy {
/**
* Implemented in accordance with RFC 7230 section 6.1 https://tools.ietf.org/html/rfc7230#section-6.1
*/
@Override
public boolean keepAlive(Request ahcRequest, HttpRequest request, HttpResponse response) {
return HttpUtil.isKeepAlive(response)
&& HttpUtil.isKeepAlive(request)
// support non standard Proxy-Connection
&& !response.headers().contains("Proxy-Connection", CLOSE, true);
}
}
DefaultKeepAliveStrategy实现了KeepAliveStrategy接口,其keepAlive方法判根据HTTP 1.0/1.1协议的规定进行判断,需要request、response都是keep alive,且response header不包含
Proxy-Connection: close
才返回true
HttpUtil
io/netty/handler/codec/http/HttpUtil.java
代码语言:javascript复制 /**
* Returns {@code true} if and only if the connection can remain open and
* thus 'kept alive'. This methods respects the value of the.
*
* {@code "Connection"} header first and then the return value of
* {@link HttpVersion#isKeepAliveDefault()}.
*/
public static boolean isKeepAlive(HttpMessage message) {
return !message.headers().containsValue(HttpHeaderNames.CONNECTION, HttpHeaderValues.CLOSE, true) &&
(message.protocolVersion().isKeepAliveDefault() ||
message.headers().containsValue(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE, true));
}
isKeepAlive方法在HttpMessage没有
connection: close
的header,且http协议默认keep alive或者header包含了connection: keep-alive
才返回true
handleHttpResponse
org/asynchttpclient/netty/handler/HttpHandler.java
代码语言:javascript复制 private void handleHttpResponse(final HttpResponse response, final Channel channel, final NettyResponseFuture<?> future, AsyncHandler<?> handler) throws Exception {
HttpRequest httpRequest = future.getNettyRequest().getHttpRequest();
logger.debug("nnRequest {}nnResponse {}n", httpRequest, response);
future.setKeepAlive(config.getKeepAliveStrategy().keepAlive(future.getTargetRequest(), httpRequest, response));
NettyResponseStatus status = new NettyResponseStatus(future.getUri(), response, channel);
HttpHeaders responseHeaders = response.headers();
if (!interceptors.exitAfterIntercept(channel, future, handler, response, status, responseHeaders)) {
boolean abort = abortAfterHandlingStatus(handler, status) || //
abortAfterHandlingHeaders(handler, responseHeaders) || //
abortAfterHandlingReactiveStreams(channel, future, handler);
if (abort) {
finishUpdate(future, channel, true);
}
}
}
HttpHandler的handleHttpResponse方法会通过KeepAliveStrategy的keepAlive来判断是否需要keep alive,然后写入到NettyResponseFuture中
exitAfterHandlingConnect
org/asynchttpclient/netty/handler/intercept/ConnectSuccessInterceptor.java
代码语言:javascript复制 public boolean exitAfterHandlingConnect(Channel channel,
NettyResponseFuture<?> future,
Request request,
ProxyServer proxyServer) {
if (future.isKeepAlive())
future.attachChannel(channel, true);
Uri requestUri = request.getUri();
LOGGER.debug("Connecting to proxy {} for scheme {}", proxyServer, requestUri.getScheme());
channelManager.updatePipelineForHttpTunneling(channel.pipeline(), requestUri);
future.setReuseChannel(true);
future.setConnectAllowed(false);
requestSender.drainChannelAndExecuteNextRequest(channel, future, new RequestBuilder(future.getTargetRequest()).build());
return true;
}
exitAfterHandlingConnect方法在NettyResponseFuture的keep alive为true时执行future.attachChannel(channel, true)
attachChannel
org/asynchttpclient/netty/NettyResponseFuture.java
代码语言:javascript复制 public void attachChannel(Channel channel, boolean reuseChannel) {
// future could have been cancelled first
if (isDone()) {
Channels.silentlyCloseChannel(channel);
}
this.channel = channel;
this.reuseChannel = reuseChannel;
}
public boolean isReuseChannel() {
return reuseChannel;
}
attachChannel这里维护了reuseChannel属性
getOpenChannel
org/asynchttpclient/netty/request/NettyRequestSender.java
代码语言:javascript复制 private Channel getOpenChannel(NettyResponseFuture<?> future, Request request, ProxyServer proxyServer,
AsyncHandler<?> asyncHandler) {
if (future != null && future.isReuseChannel() && Channels.isChannelActive(future.channel())) {
return future.channel();
} else {
return pollPooledChannel(request, proxyServer, asyncHandler);
}
}
private Channel pollPooledChannel(Request request, ProxyServer proxy, AsyncHandler<?> asyncHandler) {
try {
asyncHandler.onConnectionPoolAttempt();
} catch (Exception e) {
LOGGER.error("onConnectionPoolAttempt crashed", e);
}
Uri uri = request.getUri();
String virtualHost = request.getVirtualHost();
final Channel channel = channelManager.poll(uri, virtualHost, proxy, request.getChannelPoolPartitioning());
if (channel != null) {
LOGGER.debug("Using pooled Channel '{}' for '{}' to '{}'", channel, request.getMethod(), uri);
}
return channel;
}
getOpenChannel先判断NettyResponseFuture是否是reuse的,以及是否active,若是则直接返回future.channel(),否则通过pollPooledChannel从连接池中获取
小结
AsyncHttpClient的KeepAliveStrategy定义了keepAlive方法用于决定是否对该connection进行keep alive;HttpHandler的handleHttpResponse方法会通过KeepAliveStrategy的keepAlive来判断是否需要keep alive,然后写入到NettyResponseFuture中;getOpenChannel先判断NettyResponseFuture是否是reuse的,以及是否active,若是则直接返回future.channel(),否则通过pollPooledChannel从连接池中获取。