Elasticsearch底层系列之查询解析

2018-12-07 22:46:47 浏览数 (1)

1. 背景

       Elasticsearch(ES)可用于全文检索、日志分析、指标分析、APM等众多场景,而且搭建部署容易,后期弹性扩容、故障处理简单。ES在一定程度上实现了一套系统支持多个场景的希望,大幅度降低使用多套专用系统的运维成本(当然ES不是万能的,不能满足事务等场景)。正是因为其通用性和易用性,ES自2010年发布首个版本以来得到爆发式的发展,广泛应用于各类互联网公司的不同业务场景。

       ES的查询接口具有分布式的数据检索、聚合分析能力,数据检索能力用于支持全文检索、日志分析等场景,如Github平台上的代码搜索、基于ES的各类日志分析服务等;聚合分析能力用于支持指标分析、APM等场景,如监控场景、应用的日活/留存分析等。本文基于ES 5.6.4,主要分析ES的分布式执行框架及查询主体流程,探究ES如何实现分布式查询、数据检索、聚合分析等能力。

2. 分布式查询框架及类型

       ES使用开源的Lucene作为存储引擎,它赋予ES高性能的数据检索能力,但Lucene仅仅是一个单机索引库。ES基于Lucene进行分布式封装,以支持集群管理、分布式查询、聚合分析等功能。从使用的直观感受看,ES按照下图方式实现了分布式查询:

图1  查询基本流程图1 查询基本流程
  • 查询可发送到任意节点,接收到某查询的节点会作为该查询的协调节点(Coordinating Node)。
  • 协调节点解析查询,向对应数据分片分发查询子任务。
  • 各数据分片检索本地数据并返回协调节点,经汇聚处理后返回用户。

       而从实现角度看,协调节点的调度逻辑实际远比上述流程复杂,不同查询对应的协调节点的处理逻辑有一定差别。下面我们先简单介绍ES中常见的3类查询:

2.1 QUERY_THEN_FETCH

       这是最常用的查询类型,可以完成大多数的分布式查询和聚合分析功能。在这类查询中,协调节点实际需要向其他节点分发两轮任务,也就说前面流程图描述的任务分发阶段(2&3)会有两轮,具体如下:

  • Query Phase:进行分片粒度的数据检索和聚合,注意此轮调度仅返回文档id集合,并不返回实际数据。
    • 协调节点:解析查询后,向目标数据分片发送查询命令。
    • 数据节点:在每个分片内,按照过滤、排序等条件进行分片粒度的文档id检索和数据聚合,返回结果。
  • Fetch Phase:生成最终的检索、聚合结果。
    • 协调节点:归并Query Phase的结果,得到最终的文档id集合和聚合结果,并向目标数据分片发送数据抓取命令。
    • 数据节点:按需抓取实际需要的数据内容。
2.2 QUERY_AND_FETCH

       对于查询仅涉及单个分片的场景,ES会自动对查询流程做优化,在数据节点进行Query Phase的最后,直接执行Fetch操作。此类查询为QUERY_AND_FETCH。通过去除一轮任务调度优化查询性能,优化过程由ES自动完成,用户不感知。

2.3 DFS_QUERY_THEN_FETCH

       这类查询用于解决ES在多分片、少数据量的场景下计算相关度不准确的问题:以TF/IDF算法为例,ES在计算相关度时仅考虑单个分片内的IDF,可能导致查询结果中,类似的文档因为在不同分片而相关度大为不同的问题。此时可以使用此类查询,在QUERY_THEN_FETCH之前再增加一轮任务调度,用于计算分布式的IDF。但通常情况下,局部和全局IDF的差异会随着索引里文档数的增多渐渐消失,在真实世界的数据量下,这个问题几乎没有影响,没有必要使用此类查询增加一轮任务调度的开销。

       关于这类问题的具体描述,可以参考如下文档:

  • 被破坏的相关度
  • How Shards Affect Relevance Scoring in Elasticsearch

3. 查询执行流程

       本节我们深入到代码层面,以QUERY_THEN_FETCH类型查询为例,捋着代码主线,从实际执行角度分析ES的查询流程。查询流程的代码逻辑可以整体划分为两个部分:

  • 查询入口:ES接收到用户请求后,根据请求分发框架,进入对应接口的处理逻辑。这部分处理对任何ES请求都是类似的。
  • 查询调度:根据查询请求条件,进行查询的Query Phase、Fetch Phase等执行流程,返回查询结果。

       在分析具体的查询处理逻辑之前,我们先介绍查询入口部分,看看用户请求在ES中是如何被分发的。

3.1 查询入口

       ES提供用户Transport和Rest两种接口:用户可以通过ES官方提供的Transport Client访问ES集群,这种接口使用的协议与ES集群内部节点间的通讯协议一致;也可以使用简单易用的Rest接口,直接发送Http请求访问ES集群,由ES完成Rest请求到Transport请求的转换。考虑Rest接口的易用性,以及Rest层极低的额外开销,建议用户直接使用Rest接口。

       以Rest接口为例,查询入口部分的基本流程如下:

图2  查询分发流程图2 查询分发流程
  • Rest分发

       Rest分发由RestController模块完成。在ES节点启动时,会加载所有内置请求的Rest Action,并把对应请求的Http路径和Rest Action作为<Path, RestXXXAction>二元组注册到RestController中。这样对于任意的Rest请求,RestController模块只需根据Http路径,即可轻松找到对应的Rest Action进行请求分发。RestSearchAction的注册样例如下:

代码语言:txt复制
public RestSearchAction(Settings settings, RestController controller) {
    super(settings);
    controller.registerHandler(GET, "/_search", this);
    controller.registerHandler(POST, "/_search", this);
    controller.registerHandler(GET, "/{index}/_search", this);
    controller.registerHandler(POST, "/{index}/_search", this);
    controller.registerHandler(GET, "/{index}/{type}/_search", this);
    controller.registerHandler(POST, "/{index}/{type}/_search", this);
}
  • RestSearchAction【Rest层】

       Rest层用于解析Http请求参数,转化为ES内部使用的Transport请求,然后转发给Transport层。其核心逻辑在prepareRequest(...)函数中:

代码语言:txt复制
@Override
public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException {
    SearchRequest searchRequest = new SearchRequest();
    request.withContentOrSourceParamParserOrNull(parser ->
        parseSearchRequest(searchRequest, request, parser));

    return channel -> client.search(searchRequest, new RestStatusToXContentListener<>(channel));
}
  • Transport分发

       Transport分发由NodeClient完成。在ES节点启动进行ActionModule.setupActions(...)时,会把对应请求的Transport路径和Transport Action作为<Action, TransportXXXAction>二元组注册到NodeClient中。NodeClient向外暴露的各种接口(如bulk/search),实际均通过Action对请求进行分发。

代码语言:txt复制
actions.register(BulkAction.INSTANCE.class,
                TransportShardBulkAction.class);
actions.register(SearchAction.INSTANCE, TransportSearchAction.class);
actions.register(SearchScrollAction.INSTANCE, TransportSearchScrollAction.class);
  • TransportSearchAction【Transport层】

       Transport层的doExecute(...)函数是请求处理的核心入口,实现了多数请求处理的主要逻辑。在查询请求中,TransportSearchAction首先负责解析获取查询涉及的具体Index:

代码语言:txt复制
indices = indexNameExpressionResolver.concreteIndices(clusterState, searchRequest.indicesOptions(),
                timeProvider.getAbsoluteStartMillis(), localIndices.indices());

        然后结合routing信息、perference信息获取后续用于任务分发的分片信息:

代码语言:txt复制
GroupShardsIterator<ShardIterator> localShardsIterator = clusterService.operationRouting().searchShards(clusterState,
            concreteIndices, routingMap, searchRequest.preference());

        最后生成查询请求的调度类并启动调度执行:

代码语言:txt复制
searchAsyncAction = new SearchQueryThenFetchAsyncAction(logger, searchTransportService, connectionLookup,
                        aliasFilter, concreteIndexBoosts, searchPhaseController, executor, searchRequest, listener, shardIterators,
                        timeProvider, clusterStateVersion, task);

       上述即为查询入口的处理流程,它对任何Rest请求都适用。实际上,除了自带的Rest请求外,ES提供强大的扩展能力,用户可以通过自定义插件实现自己的请求及处理逻辑。此外,ES还支持自定义过滤器Filter,在实际进行Transport层处理前进行统一的预处理工作。

       介绍完查询入口后,下面我们具体介绍查询执行过程中的调度部分。

3.2 查询调度

       调用SearchQueryThenFetchAsyncAction.start(...)之后,查询即进入了以协调节点为中心的查询调度过程,即两个核心阶段Query Phase、Fetch Phase的执行,具体如下面时序图所示。此外,查询调度还包含两个轻量级阶段Expand Phase、Reponse Phase,后面我们按照实际执行顺序,依次介绍他们。

图3  查询调用时序图图3 查询调用时序图
3.2.1 Query Phase
  • 协调节点

       SearchQueryThenFetchAsyncAction实际是Query Phase的入口,Phase名称由其构造函数体现:

代码语言:txt复制
super("query", logger, searchTransportService, nodeIdToConnection, aliasFilter, concreteIndexBoosts, executor, request, listener,
    shardsIts, timeProvider, clusterStateVersion, task, searchPhaseController.newSearchPhaseResults(request, shardsIts.size()),
    request.getMaxConcurrentShardRequests());

       进入Query Phase后,会立即根据并发度参数进行Query任务的分发,具体由祖父类InitialSearchPhase的run(...)函数进行:

代码语言:txt复制
for (int index = 0; index < maxConcurrentShardRequests; index  ) {
    final SearchShardIterator shardRoutings = shardsIts.get(index);
    assert shardRoutings.skip() == false;
    performPhaseOnShard(index, shardRoutings, shardRoutings.nextOrNull());
}

       然后通过SearchTransportService的sendExecuteQuery(...)函数,向具体分片发送Query子任务进行异步执行:

代码语言:txt复制
transportService.sendChildRequest(connection, QUERY_ACTION_NAME, request, task,
        new ActionListenerResponseHandler<>(listener, supplier));

       在介绍每个分片的执行逻辑前,我们先提前了解分片执行结果的处理:每个分片在执行完毕Query子任务后,通过节点间通信,回调祖父类InitialSearchPhase的onShardSuccess(...)函数,把查询结果记录在协调节点保存的数组结构results中,并增加计数:

代码语言:txt复制
successfulOps.incrementAndGet();
results.consumeResult(result);

       当返回结果的分片数等于预期的总分片数时,协调节点会进入当前Phase的结束处理,启动下一个阶段Fetch Phase的执行。注意,这里有个有意思的地方,ES中只需要一个分片执行成功,即会进行后续Phase处理得到部分结果,当然它会在结果中提示用户实际有多少分片执行成功。

代码语言:txt复制
if (xTotalOps == expectedTotalOps) {
    onPhaseDone();  # 参考下面onPhaseDone代码
}
……
public final void onPhaseDone() {
    executeNextPhase(this, getNextPhase(results, this));  # 参考下面getNextPhase代码
}
……
protected SearchPhase getNextPhase(final SearchPhaseResults<SearchPhaseResult> results, final SearchPhaseContext context) {
    return new FetchSearchPhase(results, searchPhaseController, context);
}
  • 数据节点

       协调节点通过SearchTransportService的sendExecuteQuery(...)函数向目标数据节点发送QUERY_ACTION_NAME类型的查询子任务,通过请求路径QUERY_ACTION_NAME可以在SearchTransportService中找到对应的处理函数SearchService.executeQueryPhase(...):

代码语言:txt复制
transportService.registerRequestHandler(QUERY_ACTION_NAME, ShardSearchTransportRequest::new, ThreadPool.Names.SEARCH,
    new TaskAwareTransportRequestHandler<ShardSearchTransportRequest>() {
        @Override
        public void messageReceived(ShardSearchTransportRequest request, TransportChannel channel, Task task) throws Exception {
            SearchPhaseResult result = searchService.executeQueryPhase(request, (SearchTask)task);
            channel.sendResponse(result);
        }
    });

       数据节点会尝试走canCache分支的Query Phase处理,这样可以利用Cache优化查询,否则走普通执行流程:

代码语言:txt复制
private void loadOrExecuteQueryPhase(final ShardSearchRequest request, final SearchContext context) throws Exception {
    final boolean canCache = indicesService.canCache(request, context);
    context.getQueryShardContext().freezeContext();
    if (canCache) {
        indicesService.loadIntoContext(request, context, queryPhase);
    } else {
        queryPhase.execute(context);
    }
}

       QueryPhase.execute(...)为数据节点进行Query Phase子任务的核心逻辑,它首先从searchContext中获取查询参数和查询对象query,然后生产处理查询结果的collector,最终调用Lucene的IndexSearcher.search(...)函数进行查询,具体参考下面关键代码。

       这里先简单介绍下query、collector,帮助理解:

        a. query :查询对象用于指定查询条件,比如"host:host001 AND timestamp>1514736000",在分片内进行数据检索。

        b. collector :用于消费检索结果,进行Shard级别的limit N(Top N)、聚合计算等操作。它的实现也较为容易理解,如优先级队列、多层嵌套的hash分桶等。注意这里仅获取排序 或 聚合涉及的字段,source、store等内容需要在Fetch Phase中获取。

代码语言:txt复制
# 获取参数和查询对象
queryResult.from(searchContext.from());
queryResult.size(searchContext.size());
Query query = searchContext.query();
……
# 生产处理查询结果的collector
#     limit N对应的collector
collector = TopScoreDocCollector.create(numDocs, after);
……
final List<Collector> subCollectors = new ArrayList<>();
subCollectors.add(collector);
#     聚合分析对应的collector
subCollectors.addAll(searchContext.queryCollectors().values());
collector = MultiCollector.wrap(subCollectors);
……
searcher.search(query, collector);

       另外,如果查询仅涉及一个分片,数据节点会在Query Phase结尾处,直接执行Fetch Phase,即QUERY_AND_FETCH类型查询:

代码语言:txt复制
if (request.numberOfShards() == 1) {
    return executeFetchPhase(context, operationListener, afterQueryTime);
}
3.2.2 Fetch Phase
  • 协调节点

       Fetch Phase首先会归并Query Phase得到的文档id集合,并排序得到最终的limit N,同时归并多个分片的聚合数据得到最终的聚合结果。这一步通过reduce操作完成:

代码语言:txt复制
final SearchPhaseController.ReducedQueryPhase reducedQueryPhase = resultConsumer.reduce();

       然后对需要抓取具体数据的文档id按照分片粒度进行划分,并向对应分片发送抓取请求:

代码语言:txt复制
final IntArrayList[] docIdsToLoad = searchPhaseController.fillDocIdsToLoad(numShards, reducedQueryPhase.scoreDocs);
……
for (int i = 0; i < docIdsToLoad.length; i  ) {
	……
	executeFetch(i, searchShardTarget, counter, fetchSearchRequest, queryResult.queryResult(), connection);
	……
}

       后续执行逻辑和Query Phase类似,每个分片在执行完毕Query子任务后,通过节点间通信,回调innerOnResponse(...)函数通知协调节点,结果会使用shard id作为下标放入数组结构fetchResults中:

代码语言:txt复制
successfulOps.incrementAndGet();
results.consumeResult(result);

       当最后一个分片执行完成后,协调节点会进入当前Phase结束处理:合并fetch阶段的结果集,并启动下一个阶段执行。

代码语言:txt复制
final Runnable finishPhase = ()
    -> moveToNextPhase(searchPhaseController, scrollId, reducedQueryPhase, queryAndFetchOptimization ?
    queryResults : fetchResults);  # 参考下面moveToNextPhase代码
……
private void moveToNextPhase(SearchPhaseController searchPhaseController,
                             String scrollId, SearchPhaseController.ReducedQueryPhase reducedQueryPhase,
                             AtomicArray<? extends SearchPhaseResult> fetchResultsArr) {
    final InternalSearchResponse internalResponse = searchPhaseController.merge(context.getRequest().scroll() != null,
        reducedQueryPhase, fetchResultsArr.asList(), fetchResultsArr::get);
    context.executeNextPhase(this, nextPhaseFactory.apply(internalResponse, scrollId));
}

       Fetch Phase的构造函数也向我们展示了后续需要执行的两个简单阶段,后面我们会简要介绍:

代码语言:txt复制
FetchSearchPhase(InitialSearchPhase.SearchPhaseResults<SearchPhaseResult> resultConsumer,
                 SearchPhaseController searchPhaseController,
                 SearchPhaseContext context) {
    this(resultConsumer, searchPhaseController, context,
        (response, scrollId) -> new ExpandSearchPhase(context, response, // collapse only happens if the request has inner hits
            (finalResponse) -> sendResponsePhase(finalResponse, scrollId, context)));
}
  • 数据节点

       协调节点通过SearchTransportService的sendExecuteFetch(...)函数向目标数据节点发送Transport路径为FETCH_ID_ACTION_NAME的查询子任务,通过FETCH_ID_ACTION_NAME可以在SearchTransportService中找到对应的处理函数SearchService.executeFetchPhase(...):

代码语言:txt复制
transportService.registerRequestHandler(FETCH_ID_ACTION_NAME, ShardFetchSearchRequest::new, ThreadPool.Names.SEARCH,
    new TaskAwareTransportRequestHandler<ShardFetchSearchRequest>() {
        @Override
        public void messageReceived(ShardFetchSearchRequest request, TransportChannel channel, Task task) throws Exception {
            FetchSearchResult result = searchService.executeFetchPhase(request, (SearchTask)task);
            channel.sendResponse(result);
        }
    });

       然后进入实际的Fetch处理逻辑FetchPhase.execute(...),在这里fetchSubPhases是一系列简单的抓取任务,会按照docid抓取对应文档的source、store fields、highlight、docvalue fields等信息:

代码语言:txt复制
for (FetchSubPhase fetchSubPhase : fetchSubPhases) {
    fetchSubPhase.hitsExecute(context, hits);
}
3.2.3 Expand Phase

       在Fetch Phase协调节点处理的结束阶段,我们看到下一个执行阶段为Expand Phase,用于完成ES 5.3版本以后支持的Field Collapsing查询。通过该类查询可以轻松实现按Field值进行分类,每个分类获取排名前N的文档。如在餐厅的菜单系统中按菜系(川菜、湘菜等)分类,获取每个菜系排名前3的美食。用户也可以按Field进行Aggregation实现类似功能,但Field Collapsing会更易用、高效。

       Field Collapsing属于一类特殊的查询场景,这里不详细介绍。

3.2.4 Response Phase

       Expand Phase的下一执行阶段为Response Phase,用于将查询结果返回用户:

代码语言:txt复制
private static SearchPhase sendResponsePhase(InternalSearchResponse response, String scrollId, SearchPhaseContext context) {
    return new SearchPhase("response") {
        @Override
        public void run() throws IOException {
            context.onResponse(context.buildSearchResponse(response, scrollId));
        }
    };
}

4. 小结

       本文主要分析了ES的分布式执行框架及查询主体流程,对ES其它他流程及Lucene相关内容未做详细介绍,后续我们会通过具体文章详细介绍,欢迎大家一起交流讨论。

0 人点赞