ElasticSearch 5.6源码解析HTTP/TCP请求

2018-05-23 17:18:20 浏览数 (1)

http请求解析

NettyHttpServerTransport 监听http请求。在其他版本中这个类在源码内,可直接找到。但在5.6中这个类被封装在netty插件中。因此从监听到http请求到请求转发到restController这部分我没看到。以下是从网络上找到的。http://blog.csdn.net/xgjianstart/article/details/70143365

首先,NettyHttpServerTransport 会负责进行监听Http请求。通过配置http.netty.http.blocking_server 你可以选择是Nio还是传统的阻塞式服务。默认是NIO。该类在配置pipeline的时候,最后添加了HttpRequestHandler,所以具体的接受到请求后的处理逻辑就由该类来完成了。

代码语言:javascript复制
pipeline.addLast("handler", requestHandler);

HttpRequestHandler 实现了标准的 messageReceived(ChannelHandlerContext ctx, MessageEvent e) 方法,在该方法中,HttpRequestHandler 会回调NettyHttpServerTransport.dispatchRequest方法,而该方法会调用HttpServerAdapter.dispatchRequest,接着又会调用HttpServer.internalDispatchRequest方法(额,好吧,我承认嵌套挺深有点深):

代码语言:javascript复制
public void internalDispatchRequest(final HttpRequest request, final HttpChannel channel) {
        String rawPath = request.rawPath();
        if (rawPath.startsWith("/_plugin/")) {
            RestFilterChain filterChain = restController.filterChain(pluginSiteFilter);
            filterChain.continueProcessing(request, channel);
            return;
        } else if (rawPath.equals("/favicon.ico")) {
            handleFavicon(request, channel);
            return;
        }
        restController.dispatchRequest(request, channel);
    }

这个方法里我们看到了plugin等被有限处理。最后请求又被转发给 RestController。

从上面可以看出,请求被分发到RestController,然后调用dispatchRequest方法。在dispatchRequest方法中首先获取到根据请求的类型获取handler:

代码语言:javascript复制
final RestHandler handler = getHandler(request);

这个handler是在创建具体的请求实例时写入restController中的。

代码语言:javascript复制
public RestSearchAction(Settings settings, RestController controller) {
    super(settings);
    controller.registerHandler(GET, "/_search", this);
    controller.registerHandler(POST, "/_search", this);
    controller.registerHandler(GET, "/{index}/_search", this);
    controller.registerHandler(POST, "/{index}/_search", this);
    controller.registerHandler(GET, "/{index}/{type}/_search", this);
    controller.registerHandler(POST, "/{index}/{type}/_search", this);
}

以search请求为例,此处的handler是RestSearchAction类型的。 最后调用的是handler的handlerequest方法。

代码语言:javascript复制
  wrappedHandler.handleRequest(request, channel, client);

RestSearchAction继承了 BaseRestHandler。因此这个地方调用的是BaseRestHandler的handleRequest,在handleRequest中准备请求:

代码语言:javascript复制
 final RestChannelConsumer action = prepareRequest(request, client);

这个prepareRequest方法是RestSearchAction的prepareRequest,根据RestRequest生成一个SearchRequest,交给NodeClient执行。

代码语言:javascript复制
channel -> client.search(searchRequest, new RestStatusToXContentListener<>(channel))

NodeClient执行doExecute方法,将之前的SearchAction.INSTANCE(GenericAction)转化为对应的TransportSearchAction:

代码语言:javascript复制
transportAction(action).execute(request, listener);

上述TransportSearchAction的execute方法最终执行的是自身的doExecute方法

代码语言:javascript复制
this.action.doExecute(task, request, listener);

在doExecute中执行真正的查找流程。

Tcp请求解析

Tcp请求是由Netty4Transport来接收解析的。和上述一样,Netty4Transport被封装在netty插件中,因此没看到tcp接收的过程。 后面请求被转发至TCPtransport的messageReceived方法。而在该方法中调用handleRequest处理请求。

代码语言:javascript复制
handleRequest(channel, profileName, streamIn, requestId, messageLengthBytes, version, remoteAddress, status);

在handleRequest中读取出action,跟不同的action生成具体的TransportRequest子类和一个RequestHandler,并执行该RequestHandler。

代码语言:javascript复制
final String action = stream.readString();
.....
final TransportRequest request = reg.newRequest();
.....
threadPool.executor(reg.getExecutor()).execute(new RequestHandler(reg, request, transportChannel));

RequestHandler是一个可运行的AbstractRunnable,最后执行的dorun方法:

代码语言:javascript复制
 protected void doRun() throws Exception {
    reg.processMessageReceived(request, transportChannel);
 }

processMessageReceived中则是通过handler调用其messageReceived方法

代码语言:javascript复制
handler.messageReceived(request, channel);

以search请求为例,这个handler不是上述生成的RequestHandler而是RequestHandlerRegistry中的handler,是一个TransportRequestHandler。而这个TransportRequestHandler是handledTransportAction中的一个变量,而handledTransportAction继承于transportAction,且是所有transport*action的父类,比如说transportBulkAction,transportSearchAction等等。 因此上述的handler.messageReceived(request, channel)调用的是handledTransportAction的子类transportSearchAction的messageReceived。至此总算找到了执行该请求的最终方法,即ransportSearchAction的messageReceived方法。但是还有个问题,reg(RequestHandlerRegistry )是从哪来的? 这个RequestHandlerRegistry 则在TCPtransport的messageReceived方法中通过transportServiceAdapter获取的:

代码语言:javascript复制
final RequestHandlerRegistry reg = transportServiceAdapter.getRequestHandler(action);

而这个RequestHandlerRegistry是transportSearchAction构建的时候调用父类HandledTransportAction的构造方法注册的:

代码语言:javascript复制
protected HandledTransportAction(Settings settings, String actionName, boolean canTripCircuitBreaker, ThreadPool threadPool,TransportService transportService, ActionFilters actionFilters,IndexNameExpressionResolver indexNameExpressionResolver, Supplier<Request> request) {
     super(settings, actionName, threadPool, actionFilters, indexNameExpressionResolver, transportService.getTaskManager());
     transportService.registerRequestHandler(actionName, request, ThreadPool.Names.SAME, false, canTripCircuitBreaker,
            new TransportHandler());
    }

找到了RequestHandlerRegistry(TransportService中),获取reg的transportServiceAdapter(TCPtransport),二者是怎么联系起来的?

TransportService中有一个adapter,实现了TransportServiceAdapter方法。在dostart方法中设置了transport的adpter:

代码语言:javascript复制
transport.transportServiceAdapter(adapter);

即设置了TCPtransport的adapter。那么TCPtransport调用的RequestHandlerRegistry reg = transportServiceAdapter.getRequestHandler(action),就是调用transportService中的adapter,从而获取到注册的RequestHandlerRegistry。 此处我们还是没回答TransportService和TCPtransport二者的关系。

networkModule中的提供了注册transport的方法,但是一直没找到TcpTransport的注册方法,后来发现tcptransport是通过netty插件注册进去的。 在Node类构造TransportService时传入:

代码语言:javascript复制
Transport transport = networkModule.getTransportSupplier().get();
final TransportService transportService = newTransportService(settings, transport, threadPool,networkModule.getTransportInterceptor(), localNodeFactory, settingsModule.getClusterSettings());

终于找到源头了。。真的好乱。其类依赖关系如下:

image.png

es2

0 人点赞