1. 背景
Elasticsearch(ES)可用于全文检索、日志分析、指标分析、APM等众多场景,而且搭建部署容易,后期弹性扩容、故障处理简单。ES在一定程度上实现了一套系统支持多个场景的希望,大幅度降低使用多套专用系统的运维成本(当然ES不是万能的,不能满足事务等场景)。正是因为其通用性和易用性,ES自2010年发布首个版本以来得到爆发式的发展,广泛应用于各类互联网公司的不同业务场景。
ES的查询接口具有分布式的数据检索、聚合分析能力,数据检索能力用于支持全文检索、日志分析等场景,如Github平台上的代码搜索、基于ES的各类日志分析服务等;聚合分析能力用于支持指标分析、APM等场景,如监控场景、应用的日活/留存分析等。本文基于ES 5.6.4,主要分析ES的分布式执行框架及查询主体流程,探究ES如何实现分布式查询、数据检索、聚合分析等能力。
2. 分布式查询框架及类型
ES使用开源的Lucene作为存储引擎,它赋予ES高性能的数据检索能力,但Lucene仅仅是一个单机索引库。ES基于Lucene进行分布式封装,以支持集群管理、分布式查询、聚合分析等功能。从使用的直观感受看,ES按照下图方式实现了分布式查询:
- 查询可发送到任意节点,接收到某查询的节点会作为该查询的协调节点(Coordinating Node)。
- 协调节点解析查询,向对应数据分片分发查询子任务。
- 各数据分片检索本地数据并返回协调节点,经汇聚处理后返回用户。
而从实现角度看,协调节点的调度逻辑实际远比上述流程复杂,不同查询对应的协调节点的处理逻辑有一定差别。下面我们先简单介绍ES中常见的3类查询:
2.1 QUERY_THEN_FETCH
这是最常用的查询类型,可以完成大多数的分布式查询和聚合分析功能。在这类查询中,协调节点实际需要向其他节点分发两轮任务,也就说前面流程图描述的任务分发阶段(2&3)会有两轮,具体如下:
- Query Phase:进行分片粒度的数据检索和聚合,注意此轮调度仅返回文档id集合,并不返回实际数据。
- 协调节点:解析查询后,向目标数据分片发送查询命令。
- 数据节点:在每个分片内,按照过滤、排序等条件进行分片粒度的文档id检索和数据聚合,返回结果。
- Fetch Phase:生成最终的检索、聚合结果。
- 协调节点:归并Query Phase的结果,得到最终的文档id集合和聚合结果,并向目标数据分片发送数据抓取命令。
- 数据节点:按需抓取实际需要的数据内容。
2.2 QUERY_AND_FETCH
对于查询仅涉及单个分片的场景,ES会自动对查询流程做优化,在数据节点进行Query Phase的最后,直接执行Fetch操作。此类查询为QUERY_AND_FETCH。通过去除一轮任务调度优化查询性能,优化过程由ES自动完成,用户不感知。
2.3 DFS_QUERY_THEN_FETCH
这类查询用于解决ES在多分片、少数据量的场景下计算相关度不准确的问题:以TF/IDF算法为例,ES在计算相关度时仅考虑单个分片内的IDF,可能导致查询结果中,类似的文档因为在不同分片而相关度大为不同的问题。此时可以使用此类查询,在QUERY_THEN_FETCH之前再增加一轮任务调度,用于计算分布式的IDF。但通常情况下,局部和全局IDF的差异会随着索引里文档数的增多渐渐消失,在真实世界的数据量下,这个问题几乎没有影响,没有必要使用此类查询增加一轮任务调度的开销。
关于这类问题的具体描述,可以参考如下文档:
- 被破坏的相关度
- How Shards Affect Relevance Scoring in Elasticsearch
3. 查询执行流程
本节我们深入到代码层面,以QUERY_THEN_FETCH类型查询为例,捋着代码主线,从实际执行角度分析ES的查询流程。查询流程的代码逻辑可以整体划分为两个部分:
- 查询入口:ES接收到用户请求后,根据请求分发框架,进入对应接口的处理逻辑。这部分处理对任何ES请求都是类似的。
- 查询调度:根据查询请求条件,进行查询的Query Phase、Fetch Phase等执行流程,返回查询结果。
在分析具体的查询处理逻辑之前,我们先介绍查询入口部分,看看用户请求在ES中是如何被分发的。
3.1 查询入口
ES提供用户Transport和Rest两种接口:用户可以通过ES官方提供的Transport Client访问ES集群,这种接口使用的协议与ES集群内部节点间的通讯协议一致;也可以使用简单易用的Rest接口,直接发送Http请求访问ES集群,由ES完成Rest请求到Transport请求的转换。考虑Rest接口的易用性,以及Rest层极低的额外开销,建议用户直接使用Rest接口。
以Rest接口为例,查询入口部分的基本流程如下:
- Rest分发
Rest分发由RestController模块完成。在ES节点启动时,会加载所有内置请求的Rest Action,并把对应请求的Http路径和Rest Action作为<Path, RestXXXAction>二元组注册到RestController中。这样对于任意的Rest请求,RestController模块只需根据Http路径,即可轻松找到对应的Rest Action进行请求分发。RestSearchAction的注册样例如下:
代码语言:txt复制public RestSearchAction(Settings settings, RestController controller) {
super(settings);
controller.registerHandler(GET, "/_search", this);
controller.registerHandler(POST, "/_search", this);
controller.registerHandler(GET, "/{index}/_search", this);
controller.registerHandler(POST, "/{index}/_search", this);
controller.registerHandler(GET, "/{index}/{type}/_search", this);
controller.registerHandler(POST, "/{index}/{type}/_search", this);
}
- RestSearchAction【Rest层】
Rest层用于解析Http请求参数,转化为ES内部使用的Transport请求,然后转发给Transport层。其核心逻辑在prepareRequest(...)函数中:
代码语言:txt复制@Override
public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException {
SearchRequest searchRequest = new SearchRequest();
request.withContentOrSourceParamParserOrNull(parser ->
parseSearchRequest(searchRequest, request, parser));
return channel -> client.search(searchRequest, new RestStatusToXContentListener<>(channel));
}
- Transport分发
Transport分发由NodeClient完成。在ES节点启动进行ActionModule.setupActions(...)时,会把对应请求的Transport路径和Transport Action作为<Action, TransportXXXAction>二元组注册到NodeClient中。NodeClient向外暴露的各种接口(如bulk/search),实际均通过Action对请求进行分发。
代码语言:txt复制actions.register(BulkAction.INSTANCE.class,
TransportShardBulkAction.class);
actions.register(SearchAction.INSTANCE, TransportSearchAction.class);
actions.register(SearchScrollAction.INSTANCE, TransportSearchScrollAction.class);
- TransportSearchAction【Transport层】
Transport层的doExecute(...)函数是请求处理的核心入口,实现了多数请求处理的主要逻辑。在查询请求中,TransportSearchAction首先负责解析获取查询涉及的具体Index:
代码语言:txt复制indices = indexNameExpressionResolver.concreteIndices(clusterState, searchRequest.indicesOptions(),
timeProvider.getAbsoluteStartMillis(), localIndices.indices());
然后结合routing信息、perference信息获取后续用于任务分发的分片信息:
代码语言:txt复制GroupShardsIterator<ShardIterator> localShardsIterator = clusterService.operationRouting().searchShards(clusterState,
concreteIndices, routingMap, searchRequest.preference());
最后生成查询请求的调度类并启动调度执行:
代码语言:txt复制searchAsyncAction = new SearchQueryThenFetchAsyncAction(logger, searchTransportService, connectionLookup,
aliasFilter, concreteIndexBoosts, searchPhaseController, executor, searchRequest, listener, shardIterators,
timeProvider, clusterStateVersion, task);
上述即为查询入口的处理流程,它对任何Rest请求都适用。实际上,除了自带的Rest请求外,ES提供强大的扩展能力,用户可以通过自定义插件实现自己的请求及处理逻辑。此外,ES还支持自定义过滤器Filter,在实际进行Transport层处理前进行统一的预处理工作。
介绍完查询入口后,下面我们具体介绍查询执行过程中的调度部分。
3.2 查询调度
调用SearchQueryThenFetchAsyncAction.start(...)之后,查询即进入了以协调节点为中心的查询调度过程,即两个核心阶段Query Phase、Fetch Phase的执行,具体如下面时序图所示。此外,查询调度还包含两个轻量级阶段Expand Phase、Reponse Phase,后面我们按照实际执行顺序,依次介绍他们。
3.2.1 Query Phase
- 协调节点
SearchQueryThenFetchAsyncAction实际是Query Phase的入口,Phase名称由其构造函数体现:
代码语言:txt复制super("query", logger, searchTransportService, nodeIdToConnection, aliasFilter, concreteIndexBoosts, executor, request, listener,
shardsIts, timeProvider, clusterStateVersion, task, searchPhaseController.newSearchPhaseResults(request, shardsIts.size()),
request.getMaxConcurrentShardRequests());
进入Query Phase后,会立即根据并发度参数进行Query任务的分发,具体由祖父类InitialSearchPhase的run(...)函数进行:
代码语言:txt复制for (int index = 0; index < maxConcurrentShardRequests; index ) {
final SearchShardIterator shardRoutings = shardsIts.get(index);
assert shardRoutings.skip() == false;
performPhaseOnShard(index, shardRoutings, shardRoutings.nextOrNull());
}
然后通过SearchTransportService的sendExecuteQuery(...)函数,向具体分片发送Query子任务进行异步执行:
代码语言:txt复制transportService.sendChildRequest(connection, QUERY_ACTION_NAME, request, task,
new ActionListenerResponseHandler<>(listener, supplier));
在介绍每个分片的执行逻辑前,我们先提前了解分片执行结果的处理:每个分片在执行完毕Query子任务后,通过节点间通信,回调祖父类InitialSearchPhase的onShardSuccess(...)函数,把查询结果记录在协调节点保存的数组结构results中,并增加计数:
代码语言:txt复制successfulOps.incrementAndGet();
results.consumeResult(result);
当返回结果的分片数等于预期的总分片数时,协调节点会进入当前Phase的结束处理,启动下一个阶段Fetch Phase的执行。注意,这里有个有意思的地方,ES中只需要一个分片执行成功,即会进行后续Phase处理得到部分结果,当然它会在结果中提示用户实际有多少分片执行成功。
代码语言:txt复制if (xTotalOps == expectedTotalOps) {
onPhaseDone(); # 参考下面onPhaseDone代码
}
……
public final void onPhaseDone() {
executeNextPhase(this, getNextPhase(results, this)); # 参考下面getNextPhase代码
}
……
protected SearchPhase getNextPhase(final SearchPhaseResults<SearchPhaseResult> results, final SearchPhaseContext context) {
return new FetchSearchPhase(results, searchPhaseController, context);
}
- 数据节点
协调节点通过SearchTransportService的sendExecuteQuery(...)函数向目标数据节点发送QUERY_ACTION_NAME类型的查询子任务,通过请求路径QUERY_ACTION_NAME可以在SearchTransportService中找到对应的处理函数SearchService.executeQueryPhase(...):
代码语言:txt复制transportService.registerRequestHandler(QUERY_ACTION_NAME, ShardSearchTransportRequest::new, ThreadPool.Names.SEARCH,
new TaskAwareTransportRequestHandler<ShardSearchTransportRequest>() {
@Override
public void messageReceived(ShardSearchTransportRequest request, TransportChannel channel, Task task) throws Exception {
SearchPhaseResult result = searchService.executeQueryPhase(request, (SearchTask)task);
channel.sendResponse(result);
}
});
数据节点会尝试走canCache分支的Query Phase处理,这样可以利用Cache优化查询,否则走普通执行流程:
代码语言:txt复制private void loadOrExecuteQueryPhase(final ShardSearchRequest request, final SearchContext context) throws Exception {
final boolean canCache = indicesService.canCache(request, context);
context.getQueryShardContext().freezeContext();
if (canCache) {
indicesService.loadIntoContext(request, context, queryPhase);
} else {
queryPhase.execute(context);
}
}
QueryPhase.execute(...)为数据节点进行Query Phase子任务的核心逻辑,它首先从searchContext中获取查询参数和查询对象query,然后生产处理查询结果的collector,最终调用Lucene的IndexSearcher.search(...)函数进行查询,具体参考下面关键代码。
这里先简单介绍下query、collector,帮助理解:
a. query :查询对象用于指定查询条件,比如"host:host001 AND timestamp>1514736000",在分片内进行数据检索。
b. collector :用于消费检索结果,进行Shard级别的limit N(Top N)、聚合计算等操作。它的实现也较为容易理解,如优先级队列、多层嵌套的hash分桶等。注意这里仅获取排序 或 聚合涉及的字段,source、store等内容需要在Fetch Phase中获取。
代码语言:txt复制# 获取参数和查询对象
queryResult.from(searchContext.from());
queryResult.size(searchContext.size());
Query query = searchContext.query();
……
# 生产处理查询结果的collector
# limit N对应的collector
collector = TopScoreDocCollector.create(numDocs, after);
……
final List<Collector> subCollectors = new ArrayList<>();
subCollectors.add(collector);
# 聚合分析对应的collector
subCollectors.addAll(searchContext.queryCollectors().values());
collector = MultiCollector.wrap(subCollectors);
……
searcher.search(query, collector);
另外,如果查询仅涉及一个分片,数据节点会在Query Phase结尾处,直接执行Fetch Phase,即QUERY_AND_FETCH类型查询:
代码语言:txt复制if (request.numberOfShards() == 1) {
return executeFetchPhase(context, operationListener, afterQueryTime);
}
3.2.2 Fetch Phase
- 协调节点
Fetch Phase首先会归并Query Phase得到的文档id集合,并排序得到最终的limit N,同时归并多个分片的聚合数据得到最终的聚合结果。这一步通过reduce操作完成:
代码语言:txt复制final SearchPhaseController.ReducedQueryPhase reducedQueryPhase = resultConsumer.reduce();
然后对需要抓取具体数据的文档id按照分片粒度进行划分,并向对应分片发送抓取请求:
代码语言:txt复制final IntArrayList[] docIdsToLoad = searchPhaseController.fillDocIdsToLoad(numShards, reducedQueryPhase.scoreDocs);
……
for (int i = 0; i < docIdsToLoad.length; i ) {
……
executeFetch(i, searchShardTarget, counter, fetchSearchRequest, queryResult.queryResult(), connection);
……
}
后续执行逻辑和Query Phase类似,每个分片在执行完毕Query子任务后,通过节点间通信,回调innerOnResponse(...)函数通知协调节点,结果会使用shard id作为下标放入数组结构fetchResults中:
代码语言:txt复制successfulOps.incrementAndGet();
results.consumeResult(result);
当最后一个分片执行完成后,协调节点会进入当前Phase结束处理:合并fetch阶段的结果集,并启动下一个阶段执行。
代码语言:txt复制final Runnable finishPhase = ()
-> moveToNextPhase(searchPhaseController, scrollId, reducedQueryPhase, queryAndFetchOptimization ?
queryResults : fetchResults); # 参考下面moveToNextPhase代码
……
private void moveToNextPhase(SearchPhaseController searchPhaseController,
String scrollId, SearchPhaseController.ReducedQueryPhase reducedQueryPhase,
AtomicArray<? extends SearchPhaseResult> fetchResultsArr) {
final InternalSearchResponse internalResponse = searchPhaseController.merge(context.getRequest().scroll() != null,
reducedQueryPhase, fetchResultsArr.asList(), fetchResultsArr::get);
context.executeNextPhase(this, nextPhaseFactory.apply(internalResponse, scrollId));
}
Fetch Phase的构造函数也向我们展示了后续需要执行的两个简单阶段,后面我们会简要介绍:
代码语言:txt复制FetchSearchPhase(InitialSearchPhase.SearchPhaseResults<SearchPhaseResult> resultConsumer,
SearchPhaseController searchPhaseController,
SearchPhaseContext context) {
this(resultConsumer, searchPhaseController, context,
(response, scrollId) -> new ExpandSearchPhase(context, response, // collapse only happens if the request has inner hits
(finalResponse) -> sendResponsePhase(finalResponse, scrollId, context)));
}
- 数据节点
协调节点通过SearchTransportService的sendExecuteFetch(...)函数向目标数据节点发送Transport路径为FETCH_ID_ACTION_NAME的查询子任务,通过FETCH_ID_ACTION_NAME可以在SearchTransportService中找到对应的处理函数SearchService.executeFetchPhase(...):
代码语言:txt复制transportService.registerRequestHandler(FETCH_ID_ACTION_NAME, ShardFetchSearchRequest::new, ThreadPool.Names.SEARCH,
new TaskAwareTransportRequestHandler<ShardFetchSearchRequest>() {
@Override
public void messageReceived(ShardFetchSearchRequest request, TransportChannel channel, Task task) throws Exception {
FetchSearchResult result = searchService.executeFetchPhase(request, (SearchTask)task);
channel.sendResponse(result);
}
});
然后进入实际的Fetch处理逻辑FetchPhase.execute(...),在这里fetchSubPhases是一系列简单的抓取任务,会按照docid抓取对应文档的source、store fields、highlight、docvalue fields等信息:
代码语言:txt复制for (FetchSubPhase fetchSubPhase : fetchSubPhases) {
fetchSubPhase.hitsExecute(context, hits);
}
3.2.3 Expand Phase
在Fetch Phase协调节点处理的结束阶段,我们看到下一个执行阶段为Expand Phase,用于完成ES 5.3版本以后支持的Field Collapsing查询。通过该类查询可以轻松实现按Field值进行分类,每个分类获取排名前N的文档。如在餐厅的菜单系统中按菜系(川菜、湘菜等)分类,获取每个菜系排名前3的美食。用户也可以按Field进行Aggregation实现类似功能,但Field Collapsing会更易用、高效。
Field Collapsing属于一类特殊的查询场景,这里不详细介绍。
3.2.4 Response Phase
Expand Phase的下一执行阶段为Response Phase,用于将查询结果返回用户:
代码语言:txt复制private static SearchPhase sendResponsePhase(InternalSearchResponse response, String scrollId, SearchPhaseContext context) {
return new SearchPhase("response") {
@Override
public void run() throws IOException {
context.onResponse(context.buildSearchResponse(response, scrollId));
}
};
}
4. 小结
本文主要分析了ES的分布式执行框架及查询主体流程,对ES其它他流程及Lucene相关内容未做详细介绍,后续我们会通过具体文章详细介绍,欢迎大家一起交流讨论。