走过路过不要错过
点击蓝字关注我们
上一篇文章中,我们看了get在es的实现过程,虽只是一个简单的单条查询,但看起来实现却非常之复杂。纠其原因,是我们围绕了太多外围的东西讲了,而其核心则无外乎三点:1. 定义id对应的机器节点;2. 查找真正的docId;3. 查找docId对应的field信息;
本篇,我们再看另一个es的重要功能:search. 可以说,整个es就是立足于search的,所以,单就这事,足够我们啃上许久许久了。但,我们可以分步来,今日只需聊个大概框架,细节留待日后再说。实际上,平时我们聊事物时,又何偿不是在聊框架类的东西呢。只是有时候钻到牛角尖去,反倒能体现一个人的水平问题了。而针对这一点,则往往会牵出一个人的两个能力点:抓取问题核心的能力;深度理解和思考的能力。
闲话休绪,进入正题:search的处理框架。
1:searchAction框架
我们就以如下请求作为研究来源,即如何发起一个普通的search请求:
代码语言:javascript复制# 查找 test 索引中字段 name=ali 的记录
curl -X GET -H 'content-type:application/json' -d '{"query":{"match":{"name":"ali"}}}' http://localhost:9200/test/job/_search
如上备注所说,过滤条件其实就一个 name=ali, 我们可以很容易类比到sql中的表达:
代码语言:javascript复制select * from test where name='ali';
看起来问题并不复杂,那么es中又是如何处理该事务的呢?es中的包划分得比较清晰,比如http请求,会先交给rest包下实现处理,内部处理交由action模块处理,启动模块由bootstrap处理等等。总之,这是一个优秀应用必备一个特性:代码清晰易懂。
整个http请求的search入口包,其存放位置如下:
client实例负责许多的请求转发入口,负责与远程或者本机的es节点进行通讯,调度等重要工作。它是在es启动时初始化的一个重要实例,其存放位置如下:
action的内部请求定义包,用于在启动时注册处理器,以及在接收到http请求后,将其统一转发到内部节点处理,其存放位置如下:
最后,来看下 search 的语法包含哪些?这可以在每个具体Action的 routes() 方法中找到:
代码语言:javascript复制// org.elasticsearch.rest.action.search.RestSearchAction#routes
@Override
public List<Route> routes() {
return unmodifiableList(asList(
new Route(GET, "/_search"),
new Route(POST, "/_search"),
new Route(GET, "/{index}/_search"),
new Route(POST, "/{index}/_search"),
// Deprecated typed endpoints.
new Route(GET, "/{index}/{type}/_search"),
new Route(POST, "/{index}/{type}/_search")));
}
即 search支持不带索引、带索引、带索引带type、GET/POST搜索, 可谓是语法宽松得很呐。
2. search的框架实现
本节我们就来看看实现search功能,es都是如何做的呢?
接到外部请求后,会交给nettyHandler, 然后交给RestController, 然后再找到具体的handler, 然后进行prepare, accept. 具体实现细节可以参考前几篇文章。这里只想说明,最终会交到 RestSearchAction 进行处理。而 RestSearchAction 继承了BaseRestHandler, 会统一走处理流程: prepareAction() -> 参数检查 -> 具体实现调用 ;
2.1. 搜索请求的接入
本部分主要讲解,当接到外部请求后,如果内部的searcher,因是有许多的线程池,以及许多的分布式节点,这前面许多工作并不会执行真正查询。故只会有一些分发起查询工作,参数解析等。我们可以先看个时序图,解其大致:
实现如下:
代码语言:javascript复制// org.elasticsearch.rest.BaseRestHandler#handleRequest
@Override
public final void handleRequest(RestRequest request, RestChannel channel, NodeClient client) throws Exception {
// prepare the request for execution; has the side effect of touching the request parameters
// 具体的处理实现上下文准备
final RestChannelConsumer action = prepareRequest(request, client);
// validate unconsumed params, but we must exclude params used to format the response
// use a sorted set so the unconsumed parameters appear in a reliable sorted order
final SortedSet<String> unconsumedParams =
request.unconsumedParams().stream().filter(p -> !responseParams().contains(p)).collect(Collectors.toCollection(TreeSet::new));
// validate the non-response params
if (!unconsumedParams.isEmpty()) {
final Set<String> candidateParams = new HashSet<>();
candidateParams.addAll(request.consumedParams());
candidateParams.addAll(responseParams());
throw new IllegalArgumentException(unrecognized(request, unconsumedParams, candidateParams, "parameter"));
}
if (request.hasContent() && request.isContentConsumed() == false) {
throw new IllegalArgumentException("request [" request.method() " " request.path() "] does not support having a body");
}
usageCount.increment();
// execute the action
action.accept(channel);
}
// org.elasticsearch.rest.action.search.RestSearchAction#prepareRequest
@Override
public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException {
SearchRequest searchRequest = new SearchRequest();
/*
* We have to pull out the call to `source().size(size)` because
* _update_by_query and _delete_by_query uses this same parsing
* path but sets a different variable when it sees the `size`
* url parameter.
*
* Note that we can't use `searchRequest.source()::size` because
* `searchRequest.source()` is null right now. We don't have to
* guard against it being null in the IntConsumer because it can't
* be null later. If that is confusing to you then you are in good
* company.
*/
IntConsumer setSize = size -> searchRequest.source().size(size);
// 解析参数
request.withContentOrSourceParamParserOrNull(parser ->
parseSearchRequest(searchRequest, request, parser, client.getNamedWriteableRegistry(), setSize));
// 具体的search 业务处理入口
return channel -> {
RestCancellableNodeClient cancelClient = new RestCancellableNodeClient(client, request.getHttpChannel());
cancelClient.execute(SearchAction.INSTANCE, searchRequest, new RestStatusToXContentListener<>(channel));
};
}
这前置工作,两个重点:1. 解析参数;2. 构建业务处理的consumer逻辑;其中,es中大量使用了lamda表达式,大大简化了java编程的繁文缛节,算是为java扳回点颜面。
search作为es中重要且复杂功能,其参数也是超级复杂,要想完全理解各参数,倒真是可以花上几篇的文章好好讲上几讲。不过想稍微多了解点,也可以展开下面的实现,看个大概。
代码语言:javascript复制// org.elasticsearch.rest.action.search.RestSearchAction#parseSearchRequest
/**
* Parses the rest request on top of the SearchRequest, preserving values that are not overridden by the rest request.
*
* @param requestContentParser body of the request to read. This method does not attempt to read the body from the {@code request}
* parameter
* @param setSize how the size url parameter is handled. {@code udpate_by_query} and regular search differ here.
*/
public static void parseSearchRequest(SearchRequest searchRequest, RestRequest request,
XContentParser requestContentParser,
NamedWriteableRegistry namedWriteableRegistry,
IntConsumer setSize) throws IOException {
if (searchRequest.source() == null) {
searchRequest.source(new SearchSourceBuilder());
}
searchRequest.indices(Strings.splitStringByCommaToArray(request.param("index")));
if (requestContentParser != null) {
// 将外部请求转换为可读的格式,比如解析出 {"query":{"match":{"xx":"1"}}}
// 此处相当于词法语法解析,有些难度呢
searchRequest.source().parseXContent(requestContentParser, true);
}
final int batchedReduceSize = request.paramAsInt("batched_reduce_size", searchRequest.getBatchedReduceSize());
searchRequest.setBatchedReduceSize(batchedReduceSize);
if (request.hasParam("pre_filter_shard_size")) {
searchRequest.setPreFilterShardSize(request.paramAsInt("pre_filter_shard_size", SearchRequest.DEFAULT_PRE_FILTER_SHARD_SIZE));
}
if (request.hasParam("max_concurrent_shard_requests")) {
// only set if we have the parameter since we auto adjust the max concurrency on the coordinator
// based on the number of nodes in the cluster
final int maxConcurrentShardRequests = request.paramAsInt("max_concurrent_shard_requests",
searchRequest.getMaxConcurrentShardRequests());
searchRequest.setMaxConcurrentShardRequests(maxConcurrentShardRequests);
}
if (request.hasParam("allow_partial_search_results")) {
// only set if we have the parameter passed to override the cluster-level default
searchRequest.allowPartialSearchResults(request.paramAsBoolean("allow_partial_search_results", null));
}
// do not allow 'query_and_fetch' or 'dfs_query_and_fetch' search types
// from the REST layer. these modes are an internal optimization and should
// not be specified explicitly by the user.
String searchType = request.param("search_type");
if ("query_and_fetch".equals(searchType) ||
"dfs_query_and_fetch".equals(searchType)) {
throw new IllegalArgumentException("Unsupported search type [" searchType "]");
} else {
searchRequest.searchType(searchType);
}
// 解析顶层参数备用
parseSearchSource(searchRequest.source(), request, setSize);
searchRequest.requestCache(request.paramAsBoolean("request_cache", searchRequest.requestCache()));
String scroll = request.param("scroll");
if (scroll != null) {
// scroll 解析,与时间相关
searchRequest.scroll(new Scroll(parseTimeValue(scroll, null, "scroll")));
}
if (request.hasParam("type")) {
deprecationLogger.deprecate("search_with_types", TYPES_DEPRECATION_MESSAGE);
searchRequest.types(Strings.splitStringByCommaToArray(request.param("type")));
}
searchRequest.routing(request.param("routing"));
searchRequest.preference(request.param("preference"));
searchRequest.indicesOptions(IndicesOptions.fromRequest(request, searchRequest.indicesOptions()));
checkRestTotalHits(request, searchRequest);
if (searchRequest.pointInTimeBuilder() != null) {
preparePointInTime(searchRequest, request, namedWriteableRegistry);
} else {
searchRequest.setCcsMinimizeRoundtrips(
request.paramAsBoolean("ccs_minimize_roundtrips", searchRequest.isCcsMinimizeRoundtrips()));
}
}
// org.elasticsearch.search.builder.SearchSourceBuilder#parseXContent
/**
* Parse some xContent into this SearchSourceBuilder, overwriting any values specified in the xContent. Use this if you need to set up
* different defaults than a regular SearchSourceBuilder would have and use {@link #fromXContent(XContentParser, boolean)} if you have
* normal defaults.
*
* @param parser The xContent parser.
* @param checkTrailingTokens If true throws a parsing exception when extra tokens are found after the main object.
*/
public void parseXContent(XContentParser parser, boolean checkTrailingTokens) throws IOException {
XContentParser.Token token = parser.currentToken();
String currentFieldName = null;
if (token != XContentParser.Token.START_OBJECT && (token = parser.nextToken()) != XContentParser.Token.START_OBJECT) {
throw new ParsingException(parser.getTokenLocation(), "Expected [" XContentParser.Token.START_OBJECT
"] but found [" token "]", parser.getTokenLocation());
}
// 循环解析直到结束
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName();
} else if (token.isValue()) {
if (FROM_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
from = parser.intValue();
} else if (SIZE_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
size = parser.intValue();
} else if (TIMEOUT_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
timeout = TimeValue.parseTimeValue(parser.text(), null, TIMEOUT_FIELD.getPreferredName());
} else if (TERMINATE_AFTER_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
terminateAfter = parser.intValue();
} else if (MIN_SCORE_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
minScore = parser.floatValue();
} else if (VERSION_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
version = parser.booleanValue();
} else if (SEQ_NO_PRIMARY_TERM_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
seqNoAndPrimaryTerm = parser.booleanValue();
} else if (EXPLAIN_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
explain = parser.booleanValue();
} else if (TRACK_SCORES_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
trackScores = parser.booleanValue();
} else if (TRACK_TOTAL_HITS_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
if (token == XContentParser.Token.VALUE_BOOLEAN ||
(token == XContentParser.Token.VALUE_STRING && Booleans.isBoolean(parser.text()))) {
trackTotalHitsUpTo = parser.booleanValue() ? TRACK_TOTAL_HITS_ACCURATE : TRACK_TOTAL_HITS_DISABLED;
} else {
trackTotalHitsUpTo = parser.intValue();
}
} else if (_SOURCE_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
fetchSourceContext = FetchSourceContext.fromXContent(parser);
} else if (STORED_FIELDS_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
storedFieldsContext =
StoredFieldsContext.fromXContent(SearchSourceBuilder.STORED_FIELDS_FIELD.getPreferredName(), parser);
} else if (SORT_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
sort(parser.text());
} else if (PROFILE_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
profile = parser.booleanValue();
} else {
throw new ParsingException(parser.getTokenLocation(), "Unknown key for a " token " in [" currentFieldName "].",
parser.getTokenLocation());
}
} else if (token == XContentParser.Token.START_OBJECT) {
// 解析query 参数
if (QUERY_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
queryBuilder = parseInnerQueryBuilder(parser);
} else if (POST_FILTER_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
postQueryBuilder = parseInnerQueryBuilder(parser);
} else if (_SOURCE_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
fetchSourceContext = FetchSourceContext.fromXContent(parser);
} else if (SCRIPT_FIELDS_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
scriptFields = new ArrayList<>();
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
scriptFields.add(new ScriptField(parser));
}
} else if (INDICES_BOOST_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
deprecationLogger.deprecate("indices_boost_object_format",
"Object format in indices_boost is deprecated, please use array format instead");
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName();
} else if (token.isValue()) {
indexBoosts.add(new IndexBoost(currentFieldName, parser.floatValue()));
} else {
throw new ParsingException(parser.getTokenLocation(), "Unknown key for a " token
" in [" currentFieldName "].", parser.getTokenLocation());
}
}
} else if (AGGREGATIONS_FIELD.match(currentFieldName, parser.getDeprecationHandler())
|| AGGS_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
aggregations = AggregatorFactories.parseAggregators(parser);
} else if (HIGHLIGHT_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
highlightBuilder = HighlightBuilder.fromXContent(parser);
} else if (SUGGEST_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
suggestBuilder = SuggestBuilder.fromXContent(parser);
} else if (SORT_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
sorts = new ArrayList<>(SortBuilder.fromXContent(parser));
} else if (RESCORE_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
rescoreBuilders = new ArrayList<>();
rescoreBuilders.add(RescorerBuilder.parseFromXContent(parser));
} else if (EXT_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
extBuilders = new ArrayList<>();
String extSectionName = null;
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
extSectionName = parser.currentName();
} else {
SearchExtBuilder searchExtBuilder = parser.namedObject(SearchExtBuilder.class, extSectionName, null);
if (searchExtBuilder.getWriteableName().equals(extSectionName) == false) {
throw new IllegalStateException("The parsed [" searchExtBuilder.getClass().getName() "] object has a "
"different writeable name compared to the name of the section that it was parsed from: found ["
searchExtBuilder.getWriteableName() "] expected [" extSectionName "]");
}
extBuilders.add(searchExtBuilder);
}
}
} else if (SLICE.match(currentFieldName, parser.getDeprecationHandler())) {
sliceBuilder = SliceBuilder.fromXContent(parser);
} else if (COLLAPSE.match(currentFieldName, parser.getDeprecationHandler())) {
collapse = CollapseBuilder.fromXContent(parser);
} else if (POINT_IN_TIME.match(currentFieldName, parser.getDeprecationHandler())) {
pointInTimeBuilder = PointInTimeBuilder.fromXContent(parser);
} else if (RUNTIME_MAPPINGS_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
runtimeMappings = parser.map();
} else {
throw new ParsingException(parser.getTokenLocation(), "Unknown key for a " token " in [" currentFieldName "].",
parser.getTokenLocation());
}
} else if (token == XContentParser.Token.START_ARRAY) {
if (STORED_FIELDS_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
storedFieldsContext = StoredFieldsContext.fromXContent(STORED_FIELDS_FIELD.getPreferredName(), parser);
} else if (DOCVALUE_FIELDS_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
docValueFields = new ArrayList<>();
while ((token = parser.nextToken()) != XContentParser.Token.END_ARRAY) {
docValueFields.add(FieldAndFormat.fromXContent(parser));
}
} else if (FETCH_FIELDS_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
fetchFields = new ArrayList<>();
while ((token = parser.nextToken()) != XContentParser.Token.END_ARRAY) {
fetchFields.add(FieldAndFormat.fromXContent(parser));
}
} else if (INDICES_BOOST_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
while ((token = parser.nextToken()) != XContentParser.Token.END_ARRAY) {
indexBoosts.add(new IndexBoost(parser));
}
} else if (SORT_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
sorts = new ArrayList<>(SortBuilder.fromXContent(parser));
} else if (RESCORE_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
rescoreBuilders = new ArrayList<>();
while ((token = parser.nextToken()) != XContentParser.Token.END_ARRAY) {
rescoreBuilders.add(RescorerBuilder.parseFromXContent(parser));
}
} else if (STATS_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
stats = new ArrayList<>();
while ((token = parser.nextToken()) != XContentParser.Token.END_ARRAY) {
if (token == XContentParser.Token.VALUE_STRING) {
stats.add(parser.text());
} else {
throw new ParsingException(parser.getTokenLocation(), "Expected [" XContentParser.Token.VALUE_STRING
"] in [" currentFieldName "] but found [" token "]", parser.getTokenLocation());
}
}
} else if (_SOURCE_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
fetchSourceContext = FetchSourceContext.fromXContent(parser);
} else if (SEARCH_AFTER.match(currentFieldName, parser.getDeprecationHandler())) {
searchAfterBuilder = SearchAfterBuilder.fromXContent(parser);
} else {
throw new ParsingException(parser.getTokenLocation(), "Unknown key for a " token " in [" currentFieldName "].",
parser.getTokenLocation());
}
} else {
throw new ParsingException(parser.getTokenLocation(), "Unknown key for a " token " in [" currentFieldName "].",
parser.getTokenLocation());
}
}
// 解析完成,token被使用完
if (checkTrailingTokens) {
token = parser.nextToken();
if (token != null) {
throw new ParsingException(parser.getTokenLocation(), "Unexpected token [" token "] found after the main object.");
}
}
}
// org.elasticsearch.index.query.AbstractQueryBuilder#parseInnerQueryBuilder
/**
* Parses a query excluding the query element that wraps it
*/
public static QueryBuilder parseInnerQueryBuilder(XContentParser parser) throws IOException {
if (parser.currentToken() != XContentParser.Token.START_OBJECT) {
if (parser.nextToken() != XContentParser.Token.START_OBJECT) {
throw new ParsingException(parser.getTokenLocation(), "[_na] query malformed, must start with start_object");
}
}
if (parser.nextToken() == XContentParser.Token.END_OBJECT) {
// we encountered '{}' for a query clause, it used to be supported, deprecated in 5.0 and removed in 6.0
throw new IllegalArgumentException("query malformed, empty clause found at [" parser.getTokenLocation() "]");
}
if (parser.currentToken() != XContentParser.Token.FIELD_NAME) {
throw new ParsingException(parser.getTokenLocation(), "[_na] query malformed, no field after start_object");
}
String queryName = parser.currentName();
// move to the next START_OBJECT
if (parser.nextToken() != XContentParser.Token.START_OBJECT) {
throw new ParsingException(parser.getTokenLocation(), "[" queryName "] query malformed, no start_object after query name");
}
QueryBuilder result;
try {
result = parser.namedObject(QueryBuilder.class, queryName, null);
} catch (NamedObjectNotFoundException e) {
String message = String.format(Locale.ROOT, "unknown query [%s]%s", queryName,
SuggestingErrorOnUnknown.suggest(queryName, e.getCandidates()));
throw new ParsingException(new XContentLocation(e.getLineNumber(), e.getColumnNumber()), message, e);
}
//end_object of the specific query (e.g. match, multi_match etc.) element
if (parser.currentToken() != XContentParser.Token.END_OBJECT) {
throw new ParsingException(parser.getTokenLocation(),
"[" queryName "] malformed query, expected [END_OBJECT] but found [" parser.currentToken() "]");
}
//end_object of the query object
if (parser.nextToken() != XContentParser.Token.END_OBJECT) {
throw new ParsingException(parser.getTokenLocation(),
"[" queryName "] malformed query, expected [END_OBJECT] but found [" parser.currentToken() "]");
}
return result;
}
// org.elasticsearch.rest.action.search.RestSearchAction#parseSearchSource
/**
* Parses the rest request on top of the SearchSourceBuilder, preserving
* values that are not overridden by the rest request.
*/
private static void parseSearchSource(final SearchSourceBuilder searchSourceBuilder, RestRequest request, IntConsumer setSize) {
// ?q=xx 格式的搜索
QueryBuilder queryBuilder = RestActions.urlParamsToQueryBuilder(request);
if (queryBuilder != null) {
searchSourceBuilder.query(queryBuilder);
}
int from = request.paramAsInt("from", -1);
if (from != -1) {
searchSourceBuilder.from(from);
}
int size = request.paramAsInt("size", -1);
if (size != -1) {
setSize.accept(size);
}
if (request.hasParam("explain")) {
searchSourceBuilder.explain(request.paramAsBoolean("explain", null));
}
if (request.hasParam("version")) {
searchSourceBuilder.version(request.paramAsBoolean("version", null));
}
if (request.hasParam("seq_no_primary_term")) {
searchSourceBuilder.seqNoAndPrimaryTerm(request.paramAsBoolean("seq_no_primary_term", null));
}
if (request.hasParam("timeout")) {
searchSourceBuilder.timeout(request.paramAsTime("timeout", null));
}
if (request.hasParam("terminate_after")) {
int terminateAfter = request.paramAsInt("terminate_after",
SearchContext.DEFAULT_TERMINATE_AFTER);
if (terminateAfter < 0) {
throw new IllegalArgumentException("terminateAfter must be > 0");
} else if (terminateAfter > 0) {
searchSourceBuilder.terminateAfter(terminateAfter);
}
}
StoredFieldsContext storedFieldsContext =
StoredFieldsContext.fromRestRequest(SearchSourceBuilder.STORED_FIELDS_FIELD.getPreferredName(), request);
if (storedFieldsContext != null) {
searchSourceBuilder.storedFields(storedFieldsContext);
}
String sDocValueFields = request.param("docvalue_fields");
if (sDocValueFields != null) {
if (Strings.hasText(sDocValueFields)) {
String[] sFields = Strings.splitStringByCommaToArray(sDocValueFields);
for (String field : sFields) {
searchSourceBuilder.docValueField(field, null);
}
}
}
FetchSourceContext fetchSourceContext = FetchSourceContext.parseFromRestRequest(request);
if (fetchSourceContext != null) {
searchSourceBuilder.fetchSource(fetchSourceContext);
}
if (request.hasParam("track_scores")) {
searchSourceBuilder.trackScores(request.paramAsBoolean("track_scores", false));
}
if (request.hasParam("track_total_hits")) {
if (Booleans.isBoolean(request.param("track_total_hits"))) {
searchSourceBuilder.trackTotalHits(
request.paramAsBoolean("track_total_hits", true)
);
} else {
searchSourceBuilder.trackTotalHitsUpTo(
request.paramAsInt("track_total_hits", SearchContext.DEFAULT_TRACK_TOTAL_HITS_UP_TO)
);
}
}
String sSorts = request.param("sort");
if (sSorts != null) {
String[] sorts = Strings.splitStringByCommaToArray(sSorts);
for (String sort : sorts) {
int delimiter = sort.lastIndexOf(":");
if (delimiter != -1) {
String sortField = sort.substring(0, delimiter);
String reverse = sort.substring(delimiter 1);
if ("asc".equals(reverse)) {
searchSourceBuilder.sort(sortField, SortOrder.ASC);
} else if ("desc".equals(reverse)) {
searchSourceBuilder.sort(sortField, SortOrder.DESC);
}
} else {
searchSourceBuilder.sort(sort);
}
}
}
String sStats = request.param("stats");
if (sStats != null) {
searchSourceBuilder.stats(Arrays.asList(Strings.splitStringByCommaToArray(sStats)));
}
String suggestField = request.param("suggest_field");
if (suggestField != null) {
String suggestText = request.param("suggest_text", request.param("q"));
int suggestSize = request.paramAsInt("suggest_size", 5);
String suggestMode = request.param("suggest_mode");
searchSourceBuilder.suggest(new SuggestBuilder().addSuggestion(suggestField,
termSuggestion(suggestField)
.text(suggestText).size(suggestSize)
.suggestMode(SuggestMode.resolve(suggestMode))));
}
}
// org.elasticsearch.rest.action.RestActions#urlParamsToQueryBuilder
public static QueryBuilder urlParamsToQueryBuilder(RestRequest request) {
String queryString = request.param("q");
if (queryString == null) {
return null;
}
QueryStringQueryBuilder queryBuilder = QueryBuilders.queryStringQuery(queryString);
queryBuilder.defaultField(request.param("df"));
queryBuilder.analyzer(request.param("analyzer"));
queryBuilder.analyzeWildcard(request.paramAsBoolean("analyze_wildcard", false));
queryBuilder.lenient(request.paramAsBoolean("lenient", null));
String defaultOperator = request.param("default_operator");
if (defaultOperator != null) {
queryBuilder.defaultOperator(Operator.fromString(defaultOperator));
}
return queryBuilder;
}
2.2. search请求的分发
最终执行search时,由 RestCancellableNodeClient 进行execute, 并通过 SearchAction.INSTANCE 查找到处理器 TransportSearchAction . 但此任务,仍是在做前期工作,当前client即会作为协调节点,它知当前语义如何,需要将请求拆解、分发给各节点,或者自行使用异步线程处理。
代码语言:javascript复制// org.elasticsearch.action.search.TransportSearchAction#doExecute
@Override
protected void doExecute(Task task, SearchRequest searchRequest, ActionListener<SearchResponse> listener) {
executeRequest(task, searchRequest, this::searchAsyncAction, listener);
}
private void executeRequest(Task task, SearchRequest searchRequest,
SearchAsyncActionProvider searchAsyncActionProvider, ActionListener<SearchResponse> listener) {
final long relativeStartNanos = System.nanoTime();
final SearchTimeProvider timeProvider =
new SearchTimeProvider(searchRequest.getOrCreateAbsoluteStartMillis(), relativeStartNanos, System::nanoTime);
ActionListener<SearchSourceBuilder> rewriteListener = ActionListener.wrap(source -> {
if (source != searchRequest.source()) {
// only set it if it changed - we don't allow null values to be set but it might be already null. this way we catch
// situations when source is rewritten to null due to a bug
searchRequest.source(source);
}
// 重点逻辑
final ClusterState clusterState = clusterService.state();
final SearchContextId searchContext;
final Map<String, OriginalIndices> remoteClusterIndices;
if (searchRequest.pointInTimeBuilder() != null) {
searchContext = SearchContextId.decode(namedWriteableRegistry, searchRequest.pointInTimeBuilder().getId());
remoteClusterIndices = getIndicesFromSearchContexts(searchContext, searchRequest.indicesOptions());
} else {
searchContext = null;
// 获取远程索引
remoteClusterIndices = remoteClusterService.groupIndices(searchRequest.indicesOptions(),
searchRequest.indices(), idx -> indexNameExpressionResolver.hasIndexAbstraction(idx, clusterState));
}
OriginalIndices localIndices = remoteClusterIndices.remove(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY);
if (remoteClusterIndices.isEmpty()) {
// 本地执行 搜索
executeLocalSearch(
task, timeProvider, searchRequest, localIndices, clusterState, listener, searchContext, searchAsyncActionProvider);
} else {
if (shouldMinimizeRoundtrips(searchRequest)) {
final TaskId parentTaskId = task.taskInfo(clusterService.localNode().getId(), false).getTaskId();
ccsRemoteReduce(parentTaskId, searchRequest, localIndices, remoteClusterIndices, timeProvider,
searchService.aggReduceContextBuilder(searchRequest),
remoteClusterService, threadPool, listener,
(r, l) -> executeLocalSearch(
task, timeProvider, r, localIndices, clusterState, l, searchContext, searchAsyncActionProvider));
} else {
AtomicInteger skippedClusters = new AtomicInteger(0);
// 更多的是需要收集许多shard数据
collectSearchShards(searchRequest.indicesOptions(), searchRequest.preference(), searchRequest.routing(),
skippedClusters, remoteClusterIndices, remoteClusterService, threadPool,
ActionListener.wrap(
searchShardsResponses -> {
final BiFunction<String, String, DiscoveryNode> clusterNodeLookup =
getRemoteClusterNodeLookup(searchShardsResponses);
final Map<String, AliasFilter> remoteAliasFilters;
final List<SearchShardIterator> remoteShardIterators;
if (searchContext != null) {
remoteAliasFilters = searchContext.aliasFilter();
remoteShardIterators = getRemoteShardsIteratorFromPointInTime(searchShardsResponses,
searchContext, searchRequest.pointInTimeBuilder().getKeepAlive(), remoteClusterIndices);
} else {
remoteAliasFilters = getRemoteAliasFilters(searchShardsResponses);
remoteShardIterators = getRemoteShardsIterator(searchShardsResponses, remoteClusterIndices,
remoteAliasFilters);
}
int localClusters = localIndices == null ? 0 : 1;
int totalClusters = remoteClusterIndices.size() localClusters;
int successfulClusters = searchShardsResponses.size() localClusters;
executeSearch((SearchTask) task, timeProvider, searchRequest, localIndices, remoteShardIterators,
clusterNodeLookup, clusterState, remoteAliasFilters, listener,
new SearchResponse.Clusters(totalClusters, successfulClusters, skippedClusters.get()),
searchContext, searchAsyncActionProvider);
},
listener::onFailure));
}
}
}, listener::onFailure);
// 调用 rewriteListener, 此处source 代表所有输入的搜索条件参数 (json)
if (searchRequest.source() == null) {
rewriteListener.onResponse(searchRequest.source());
} else {
Rewriteable.rewriteAndFetch(searchRequest.source(), searchService.getRewriteContext(timeProvider::getAbsoluteStartMillis),
rewriteListener);
}
}
// org.elasticsearch.index.query.Rewriteable#rewriteAndFetch
/**
* Rewrites the given rewriteable and fetches pending async tasks for each round before rewriting again.
*/
static <T extends Rewriteable<T>> void rewriteAndFetch(T original, QueryRewriteContext context, ActionListener<T> rewriteResponse) {
rewriteAndFetch(original, context, rewriteResponse, 0);
}
/**
* Rewrites the given rewriteable and fetches pending async tasks for each round before rewriting again.
*/
static <T extends Rewriteable<T>> void rewriteAndFetch(T original, QueryRewriteContext context, ActionListener<T>
rewriteResponse, int iteration) {
T builder = original;
try {
for (T rewrittenBuilder = builder.rewrite(context); rewrittenBuilder != builder;
rewrittenBuilder = builder.rewrite(context)) {
builder = rewrittenBuilder;
if (iteration >= MAX_REWRITE_ROUNDS) {
// this is some protection against user provided queries if they don't obey the contract of rewrite we allow 16 rounds
// and then we fail to prevent infinite loops
throw new IllegalStateException("too many rewrite rounds, rewriteable might return new objects even if they are not "
"rewritten");
}
if (context.hasAsyncActions()) {
T finalBuilder = builder;
final int currentIterationNumber = iteration;
context.executeAsyncActions(ActionListener.wrap(n -> rewriteAndFetch(finalBuilder, context, rewriteResponse,
currentIterationNumber), rewriteResponse::onFailure));
return;
}
}
rewriteResponse.onResponse(builder);
} catch (IOException|IllegalArgumentException|ParsingException ex) {
rewriteResponse.onFailure(ex);
}
}
以上就是大致的搜索过程接入,看着着实有点累吧。先来看看单节点搜索实现框架:
代码语言:javascript复制// org.elasticsearch.action.search.TransportSearchAction#executeLocalSearch
private void executeLocalSearch(Task task, SearchTimeProvider timeProvider, SearchRequest searchRequest, OriginalIndices localIndices,
ClusterState clusterState, ActionListener<SearchResponse> listener,
SearchContextId searchContext,
SearchAsyncActionProvider searchAsyncActionProvider) {
executeSearch((SearchTask)task, timeProvider, searchRequest, localIndices, Collections.emptyList(),
(clusterName, nodeId) -> null, clusterState, Collections.emptyMap(), listener, SearchResponse.Clusters.EMPTY,
searchContext, searchAsyncActionProvider);
}
// org.elasticsearch.action.search.TransportSearchAction#executeSearch
private void executeSearch(SearchTask task, SearchTimeProvider timeProvider, SearchRequest searchRequest,
OriginalIndices localIndices, List<SearchShardIterator> remoteShardIterators,
BiFunction<String, String, DiscoveryNode> remoteConnections, ClusterState clusterState,
Map<String, AliasFilter> remoteAliasMap, ActionListener<SearchResponse> listener,
SearchResponse.Clusters clusters, @Nullable SearchContextId searchContext,
SearchAsyncActionProvider searchAsyncActionProvider) {
clusterState.blocks().globalBlockedRaiseException(ClusterBlockLevel.READ);
// TODO: I think startTime() should become part of ActionRequest and that should be used both for index name
// date math expressions and $now in scripts. This way all apis will deal with now in the same way instead
// of just for the _search api
final List<SearchShardIterator> localShardIterators;
final Map<String, AliasFilter> aliasFilter;
final String[] concreteLocalIndices;
if (searchContext != null) {
assert searchRequest.pointInTimeBuilder() != null;
aliasFilter = searchContext.aliasFilter();
concreteLocalIndices = localIndices == null ? new String[0] : localIndices.indices();
localShardIterators = getLocalLocalShardsIteratorFromPointInTime(clusterState, localIndices,
searchRequest.getLocalClusterAlias(), searchContext, searchRequest.pointInTimeBuilder().getKeepAlive());
} else {
// 解析索引
final Index[] indices = resolveLocalIndices(localIndices, clusterState, timeProvider);
// 解析路由
Map<String, Set<String>> routingMap = indexNameExpressionResolver.resolveSearchRouting(clusterState, searchRequest.routing(),
searchRequest.indices());
routingMap = routingMap == null ? Collections.emptyMap() : Collections.unmodifiableMap(routingMap);
concreteLocalIndices = new String[indices.length];
for (int i = 0; i < indices.length; i ) {
concreteLocalIndices[i] = indices[i].getName();
}
Map<String, Long> nodeSearchCounts = searchTransportService.getPendingSearchRequests();
GroupShardsIterator<ShardIterator> localShardRoutings = clusterService.operationRouting().searchShards(clusterState,
concreteLocalIndices, routingMap, searchRequest.preference(),
searchService.getResponseCollectorService(), nodeSearchCounts);
localShardIterators = StreamSupport.stream(localShardRoutings.spliterator(), false)
.map(it -> new SearchShardIterator(
searchRequest.getLocalClusterAlias(), it.shardId(), it.getShardRoutings(), localIndices))
.collect(Collectors.toList());
// 别名过滤器
aliasFilter = buildPerIndexAliasFilter(searchRequest, clusterState, indices, remoteAliasMap);
}
// 分片迭代器
final GroupShardsIterator<SearchShardIterator> shardIterators = mergeShardsIterators(localShardIterators, remoteShardIterators);
// 检查shard数量是否超限
failIfOverShardCountLimit(clusterService, shardIterators.size());
Map<String, Float> concreteIndexBoosts = resolveIndexBoosts(searchRequest, clusterState);
// optimize search type for cases where there is only one shard group to search on
// 搜索优化:一个shard时,查找完成一个立即返回
if (shardIterators.size() == 1) {
// if we only have one group, then we always want Q_T_F, no need for DFS, and no need to do THEN since we hit one shard
searchRequest.searchType(QUERY_THEN_FETCH);
}
if (searchRequest.allowPartialSearchResults() == null) {
// No user preference defined in search request - apply cluster service default
searchRequest.allowPartialSearchResults(searchService.defaultAllowPartialSearchResults());
}
if (searchRequest.isSuggestOnly()) {
// disable request cache if we have only suggest
searchRequest.requestCache(false);
switch (searchRequest.searchType()) {
case DFS_QUERY_THEN_FETCH:
// convert to Q_T_F if we have only suggest
searchRequest.searchType(QUERY_THEN_FETCH);
break;
}
}
final DiscoveryNodes nodes = clusterState.nodes();
// 索引搜索连接管理
BiFunction<String, String, Transport.Connection> connectionLookup = buildConnectionLookup(searchRequest.getLocalClusterAlias(),
nodes::get, remoteConnections, searchTransportService::getConnection);
// 获取专门用于搜索的线程池
final Executor asyncSearchExecutor = asyncSearchExecutor(concreteLocalIndices, clusterState);
final boolean preFilterSearchShards = shouldPreFilterSearchShards(clusterState, searchRequest, concreteLocalIndices,
localShardIterators.size() remoteShardIterators.size());
// 构造异步请求处理器,开启search
searchAsyncActionProvider.asyncSearchAction(
task, searchRequest, asyncSearchExecutor, shardIterators, timeProvider, connectionLookup, clusterState,
Collections.unmodifiableMap(aliasFilter), concreteIndexBoosts, listener,
preFilterSearchShards, threadPool, clusters).start();
}
主要就是检查索引、分片、线程池等,搞好之后提到异步执行去了。而分布式系统高性能的秘诀就是在这里,难点也是在这里,难以排查跟踪。
下面我们简单看看多节点搜索时,需要收集结果,其过程大致如下:
代码语言:javascript复制// org.elasticsearch.action.search.TransportSearchAction#collectSearchShards
static void collectSearchShards(IndicesOptions indicesOptions, String preference, String routing, AtomicInteger skippedClusters,
Map<String, OriginalIndices> remoteIndicesByCluster, RemoteClusterService remoteClusterService,
ThreadPool threadPool, ActionListener<Map<String, ClusterSearchShardsResponse>> listener) {
final CountDown responsesCountDown = new CountDown(remoteIndicesByCluster.size());
final Map<String, ClusterSearchShardsResponse> searchShardsResponses = new ConcurrentHashMap<>();
final AtomicReference<Exception> exceptions = new AtomicReference<>();
for (Map.Entry<String, OriginalIndices> entry : remoteIndicesByCluster.entrySet()) {
final String clusterAlias = entry.getKey();
boolean skipUnavailable = remoteClusterService.isSkipUnavailable(clusterAlias);
Client clusterClient = remoteClusterService.getRemoteClusterClient(threadPool, clusterAlias);
final String[] indices = entry.getValue().indices();
ClusterSearchShardsRequest searchShardsRequest = new ClusterSearchShardsRequest(indices)
.indicesOptions(indicesOptions).local(true).preference(preference).routing(routing);
clusterClient.admin().cluster().searchShards(searchShardsRequest,
new CCSActionListener<ClusterSearchShardsResponse, Map<String, ClusterSearchShardsResponse>>(
clusterAlias, skipUnavailable, responsesCountDown, skippedClusters, exceptions, listener) {
@Override
void innerOnResponse(ClusterSearchShardsResponse clusterSearchShardsResponse) {
searchShardsResponses.put(clusterAlias, clusterSearchShardsResponse);
}
@Override
Map<String, ClusterSearchShardsResponse> createFinalResponse() {
return searchShardsResponses;
}
}
);
}
}
然后其中有许多处理shard, 索引的细节,感兴趣的自行深入。
代码语言:javascript复制// org.elasticsearch.cluster.metadata.IndexNameExpressionResolver#resolveSearchRouting
/**
* Resolves the search routing if in the expression aliases are used. If expressions point to concrete indices
* or aliases with no routing defined the specified routing is used.
*
* @return routing values grouped by concrete index
*/
public Map<String, Set<String>> resolveSearchRouting(ClusterState state, @Nullable String routing, String... expressions) {
List<String> resolvedExpressions = expressions != null ? Arrays.asList(expressions) : Collections.emptyList();
Context context = new Context(state, IndicesOptions.lenientExpandOpen(), false, false, true, isSystemIndexAccessAllowed());
for (ExpressionResolver expressionResolver : expressionResolvers) {
resolvedExpressions = expressionResolver.resolve(context, resolvedExpressions);
}
// TODO: it appears that this can never be true?
if (isAllIndices(resolvedExpressions)) {
return resolveSearchRoutingAllIndices(state.metadata(), routing);
}
Map<String, Set<String>> routings = null;
Set<String> paramRouting = null;
// List of indices that don't require any routing
Set<String> norouting = new HashSet<>();
if (routing != null) {
paramRouting = Sets.newHashSet(Strings.splitStringByCommaToArray(routing));
}
for (String expression : resolvedExpressions) {
IndexAbstraction indexAbstraction = state.metadata().getIndicesLookup().get(expression);
if (indexAbstraction != null && indexAbstraction.getType() == IndexAbstraction.Type.ALIAS) {
IndexAbstraction.Alias alias = (IndexAbstraction.Alias) indexAbstraction;
for (Tuple<String, AliasMetadata> item : alias.getConcreteIndexAndAliasMetadatas()) {
String concreteIndex = item.v1();
AliasMetadata aliasMetadata = item.v2();
if (!norouting.contains(concreteIndex)) {
if (!aliasMetadata.searchRoutingValues().isEmpty()) {
// Routing alias
if (routings == null) {
routings = new HashMap<>();
}
Set<String> r = routings.get(concreteIndex);
if (r == null) {
r = new HashSet<>();
routings.put(concreteIndex, r);
}
r.addAll(aliasMetadata.searchRoutingValues());
if (paramRouting != null) {
r.retainAll(paramRouting);
}
if (r.isEmpty()) {
routings.remove(concreteIndex);
}
} else {
// Non-routing alias
if (!norouting.contains(concreteIndex)) {
norouting.add(concreteIndex);
if (paramRouting != null) {
Set<String> r = new HashSet<>(paramRouting);
if (routings == null) {
routings = new HashMap<>();
}
routings.put(concreteIndex, r);
} else {
if (routings != null) {
routings.remove(concreteIndex);
}
}
}
}
}
}
} else {
// Index
if (!norouting.contains(expression)) {
norouting.add(expression);
if (paramRouting != null) {
Set<String> r = new HashSet<>(paramRouting);
if (routings == null) {
routings = new HashMap<>();
}
routings.put(expression, r);
} else {
if (routings != null) {
routings.remove(expression);
}
}
}
}
}
if (routings == null || routings.isEmpty()) {
return null;
}
return routings;
}
// org.elasticsearch.cluster.routing.OperationRouting#searchShards
public GroupShardsIterator<ShardIterator> searchShards(ClusterState clusterState,
String[] concreteIndices,
@Nullable Map<String, Set<String>> routing,
@Nullable String preference,
@Nullable ResponseCollectorService collectorService,
@Nullable Map<String, Long> nodeCounts) {
final Set<IndexShardRoutingTable> shards = computeTargetedShards(clusterState, concreteIndices, routing);
final Set<ShardIterator> set = new HashSet<>(shards.size());
for (IndexShardRoutingTable shard : shards) {
ShardIterator iterator = preferenceActiveShardIterator(shard,
clusterState.nodes().getLocalNodeId(), clusterState.nodes(), preference, collectorService, nodeCounts);
if (iterator != null) {
set.add(iterator);
}
}
return GroupShardsIterator.sortAndCreate(new ArrayList<>(set));
}
// org.elasticsearch.cluster.routing.OperationRouting#computeTargetedShards
private Set<IndexShardRoutingTable> computeTargetedShards(ClusterState clusterState, String[] concreteIndices,
@Nullable Map<String, Set<String>> routing) {
routing = routing == null ? EMPTY_ROUTING : routing; // just use an empty map
final Set<IndexShardRoutingTable> set = new HashSet<>();
// we use set here and not list since we might get duplicates
for (String index : concreteIndices) {
final IndexRoutingTable indexRouting = indexRoutingTable(clusterState, index);
final IndexMetadata indexMetadata = indexMetadata(clusterState, index);
final Set<String> effectiveRouting = routing.get(index);
if (effectiveRouting != null) {
for (String r : effectiveRouting) {
final int routingPartitionSize = indexMetadata.getRoutingPartitionSize();
for (int partitionOffset = 0; partitionOffset < routingPartitionSize; partitionOffset ) {
set.add(RoutingTable.shardRoutingTable(indexRouting, calculateScaledShardId(indexMetadata, r, partitionOffset)));
}
}
} else {
for (IndexShardRoutingTable indexShard : indexRouting) {
set.add(indexShard);
}
}
}
return set;
}
之后便使用 asyncExecutor执行start() 开启搜索分发。
代码语言:javascript复制// org.elasticsearch.action.search.TransportSearchAction#buildConnectionLookup
static BiFunction<String, String, Transport.Connection> buildConnectionLookup(String requestClusterAlias,
Function<String, DiscoveryNode> localNodes,
BiFunction<String, String, DiscoveryNode> remoteNodes,
BiFunction<String, DiscoveryNode, Transport.Connection> nodeToConnection) {
return (clusterAlias, nodeId) -> {
final DiscoveryNode discoveryNode;
final boolean remoteCluster;
if (clusterAlias == null || requestClusterAlias != null) {
assert requestClusterAlias == null || requestClusterAlias.equals(clusterAlias);
discoveryNode = localNodes.apply(nodeId);
remoteCluster = false;
} else {
discoveryNode = remoteNodes.apply(clusterAlias, nodeId);
remoteCluster = true;
}
if (discoveryNode == null) {
throw new IllegalStateException("no node found for id: " nodeId);
}
return nodeToConnection.apply(remoteCluster ? clusterAlias : null, discoveryNode);
};
}
// org.elasticsearch.action.search.TransportSearchAction#asyncSearchExecutor
Executor asyncSearchExecutor(final String[] indices, final ClusterState clusterState) {
final boolean onlySystemIndices = Arrays.stream(indices)
.allMatch(index -> {
final IndexMetadata indexMetadata = clusterState.metadata().index(index);
return indexMetadata != null && indexMetadata.isSystem();
});
return onlySystemIndices ? threadPool.executor(ThreadPool.Names.SYSTEM_READ) : threadPool.executor(ThreadPool.Names.SEARCH);
}
// org.elasticsearch.action.search.AbstractSearchAsyncAction#start
/**
* This is the main entry point for a search. This method starts the search execution of the initial phase.
*/
public final void start() {
if (getNumShards() == 0) {
//no search shards to search on, bail with empty response
//(it happens with search across _all with no indices around and consistent with broadcast operations)
int trackTotalHitsUpTo = request.source() == null ? SearchContext.DEFAULT_TRACK_TOTAL_HITS_UP_TO :
request.source().trackTotalHitsUpTo() == null ? SearchContext.DEFAULT_TRACK_TOTAL_HITS_UP_TO :
request.source().trackTotalHitsUpTo();
// total hits is null in the response if the tracking of total hits is disabled
boolean withTotalHits = trackTotalHitsUpTo != SearchContext.TRACK_TOTAL_HITS_DISABLED;
listener.onResponse(new SearchResponse(InternalSearchResponse.empty(withTotalHits), null, 0, 0, 0, buildTookInMillis(),
ShardSearchFailure.EMPTY_ARRAY, clusters, null));
return;
}
executePhase(this);
}
// org.elasticsearch.action.search.AbstractSearchAsyncAction#executePhase
private void executePhase(SearchPhase phase) {
try {
phase.run();
} catch (Exception e) {
if (logger.isDebugEnabled()) {
logger.debug(new ParameterizedMessage("Failed to execute [{}] while moving to [{}] phase", request, phase.getName()), e);
}
onPhaseFailure(phase, "", e);
}
}
最终 search 会以异步run的形式到达,并迭代shard运行。
代码语言:javascript复制// org.elasticsearch.action.search.AbstractSearchAsyncAction#run
@Override
public final void run() {
for (final SearchShardIterator iterator : toSkipShardsIts) {
assert iterator.skip();
skipShard(iterator);
}
if (shardsIts.size() > 0) {
assert request.allowPartialSearchResults() != null : "SearchRequest missing setting for allowPartialSearchResults";
if (request.allowPartialSearchResults() == false) {
final StringBuilder missingShards = new StringBuilder();
// Fail-fast verification of all shards being available
for (int index = 0; index < shardsIts.size(); index ) {
final SearchShardIterator shardRoutings = shardsIts.get(index);
if (shardRoutings.size() == 0) {
if(missingShards.length() > 0){
missingShards.append(", ");
}
missingShards.append(shardRoutings.shardId());
}
}
if (missingShards.length() > 0) {
//Status red - shard is missing all copies and would produce partial results for an index search
final String msg = "Search rejected due to missing shards [" missingShards
"]. Consider using `allow_partial_search_results` setting to bypass this error.";
throw new SearchPhaseExecutionException(getName(), msg, null, ShardSearchFailure.EMPTY_ARRAY);
}
}
// 多个shard运行搜索
for (int i = 0; i < shardsIts.size(); i ) {
final SearchShardIterator shardRoutings = shardsIts.get(i);
assert shardRoutings.skip() == false;
assert shardItIndexMap.containsKey(shardRoutings);
int shardIndex = shardItIndexMap.get(shardRoutings);
performPhaseOnShard(shardIndex, shardRoutings, shardRoutings.nextOrNull());
}
}
}
// org.elasticsearch.action.search.AbstractSearchAsyncAction#performPhaseOnShard
protected void performPhaseOnShard(final int shardIndex, final SearchShardIterator shardIt, final SearchShardTarget shard) {
/*
* We capture the thread that this phase is starting on. When we are called back after executing the phase, we are either on the
* same thread (because we never went async, or the same thread was selected from the thread pool) or a different thread. If we
* continue on the same thread in the case that we never went async and this happens repeatedly we will end up recursing deeply and
* could stack overflow. To prevent this, we fork if we are called back on the same thread that execution started on and otherwise
* we can continue (cf. InitialSearchPhase#maybeFork).
*/
if (shard == null) {
SearchShardTarget unassignedShard = new SearchShardTarget(null, shardIt.shardId(),
shardIt.getClusterAlias(), shardIt.getOriginalIndices());
fork(() -> onShardFailure(shardIndex, unassignedShard, shardIt, new NoShardAvailableActionException(shardIt.shardId())));
} else {
final PendingExecutions pendingExecutions = throttleConcurrentRequests ?
pendingExecutionsPerNode.computeIfAbsent(shard.getNodeId(), n -> new PendingExecutions(maxConcurrentRequestsPerNode))
: null;
Runnable r = () -> {
final Thread thread = Thread.currentThread();
try {
// 在单shard上搜索
executePhaseOnShard(shardIt, shard,
new SearchActionListener<Result>(shard, shardIndex) {
@Override
public void innerOnResponse(Result result) {
try {
onShardResult(result, shardIt);
} catch (Exception exc) {
onShardFailure(shardIndex, shard, shardIt, exc);
} finally {
executeNext(pendingExecutions, thread);
}
}
@Override
public void onFailure(Exception t) {
try {
onShardFailure(shardIndex, shard, shardIt, t);
} finally {
executeNext(pendingExecutions, thread);
}
}
});
} catch (final Exception e) {
try {
/*
* It is possible to run into connection exceptions here because we are getting the connection early and might
* run into nodes that are not connected. In this case, on shard failure will move us to the next shard copy.
*/
fork(() -> onShardFailure(shardIndex, shard, shardIt, e));
} finally {
executeNext(pendingExecutions, thread);
}
}
};
// 排队或立即运行
if (throttleConcurrentRequests) {
pendingExecutions.tryRun(r);
} else {
r.run();
}
}
}
// org.elasticsearch.action.search.SearchQueryThenFetchAsyncAction#executePhaseOnShard
protected void executePhaseOnShard(final SearchShardIterator shardIt,
final SearchShardTarget shard,
final SearchActionListener<SearchPhaseResult> listener) {
// 构造search请求
ShardSearchRequest request = rewriteShardSearchRequest(
super.buildShardSearchRequest(shardIt, listener.requestIndex));
// 发送 search 请求到对应节点,而对于本地节点则mock一个connection, 直接执行本地搜索
getSearchTransport()
.sendExecuteQuery(
getConnection(shard.getClusterAlias(), shard.getNodeId()),
request,
getTask(),
listener);
}
// org.elasticsearch.action.search.AbstractSearchAsyncAction#buildShardSearchRequest
@Override
public final ShardSearchRequest buildShardSearchRequest(SearchShardIterator shardIt, int shardIndex) {
AliasFilter filter = aliasFilter.get(shardIt.shardId().getIndex().getUUID());
assert filter != null;
float indexBoost = concreteIndexBoosts.getOrDefault(shardIt.shardId().getIndex().getUUID(), DEFAULT_INDEX_BOOST);
ShardSearchRequest shardRequest = new ShardSearchRequest(shardIt.getOriginalIndices(), request, shardIt.shardId(), shardIndex,
getNumShards(), filter, indexBoost, timeProvider.getAbsoluteStartMillis(), shardIt.getClusterAlias(),
shardIt.getSearchContextId(), shardIt.getSearchContextKeepAlive());
// if we already received a search result we can inform the shard that it
// can return a null response if the request rewrites to match none rather
// than creating an empty response in the search thread pool.
// Note that, we have to disable this shortcut for queries that create a context (scroll and search context).
shardRequest.canReturnNullResponseIfMatchNoDocs(hasShardResponse.get() && shardRequest.scroll() == null);
return shardRequest;
}
发送请求过程之前我们细细见识过,此处可忽略。
代码语言:javascript复制// org.elasticsearch.action.search.SearchTransportService#sendExecuteQuery
public void sendExecuteQuery(Transport.Connection connection, final ShardSearchRequest request, SearchTask task,
final SearchActionListener<SearchPhaseResult> listener) {
// we optimize this and expect a QueryFetchSearchResult if we only have a single shard in the search request
// this used to be the QUERY_AND_FETCH which doesn't exist anymore.
final boolean fetchDocuments = request.numberOfShards() == 1;
Writeable.Reader<SearchPhaseResult> reader = fetchDocuments ? QueryFetchSearchResult::new : QuerySearchResult::new;
final ActionListener handler = responseWrapper.apply(connection, listener);
transportService.sendChildRequest(connection, QUERY_ACTION_NAME, request, task,
new ConnectionCountingHandler<>(handler, reader, clientConnections, connection.getNode().getId()));
}
// org.elasticsearch.transport.TransportService#sendChildRequest
public <T extends TransportResponse> void sendChildRequest(final Transport.Connection connection, final String action,
final TransportRequest request, final Task parentTask,
final TransportResponseHandler<T> handler) {
sendChildRequest(connection, action, request, parentTask, TransportRequestOptions.EMPTY, handler);
}
// org.elasticsearch.transport.TransportService#sendChildRequest
public <T extends TransportResponse> void sendChildRequest(final Transport.Connection connection, final String action,
final TransportRequest request, final Task parentTask,
final TransportRequestOptions options,
final TransportResponseHandler<T> handler) {
request.setParentTask(localNode.getId(), parentTask.getId());
sendRequest(connection, action, request, options, handler);
}
// org.elasticsearch.transport.TransportService#sendRequest
/**
* Sends a request on the specified connection. If there is a failure sending the request, the specified handler is invoked.
*
* @param connection the connection to send the request on
* @param action the name of the action
* @param request the request
* @param options the options for this request
* @param handler the response handler
* @param <T> the type of the transport response
*/
public final <T extends TransportResponse> void sendRequest(final Transport.Connection connection, final String action,
final TransportRequest request,
final TransportRequestOptions options,
final TransportResponseHandler<T> handler) {
try {
final TransportResponseHandler<T> delegate;
if (request.getParentTask().isSet()) {
// If the connection is a proxy connection, then we will create a cancellable proxy task on the proxy node and an actual
// child task on the target node of the remote cluster.
// ----> a parent task on the local cluster
// |
// ----> a proxy task on the proxy node on the remote cluster
// |
// ----> an actual child task on the target node on the remote cluster
// To cancel the child task on the remote cluster, we must send a cancel request to the proxy node instead of the target
// node as the parent task of the child task is the proxy task not the parent task on the local cluster. Hence, here we
// unwrap the connection and keep track of the connection to the proxy node instead of the proxy connection.
final Transport.Connection unwrappedConn = unwrapConnection(connection);
final Releasable unregisterChildNode = taskManager.registerChildConnection(request.getParentTask().getId(), unwrappedConn);
delegate = new TransportResponseHandler<T>() {
@Override
public void handleResponse(T response) {
unregisterChildNode.close();
handler.handleResponse(response);
}
@Override
public void handleException(TransportException exp) {
unregisterChildNode.close();
handler.handleException(exp);
}
@Override
public String executor() {
return handler.executor();
}
@Override
public T read(StreamInput in) throws IOException {
return handler.read(in);
}
@Override
public String toString() {
return getClass().getName() "/[" action "]:" handler.toString();
}
};
} else {
delegate = handler;
}
asyncSender.sendRequest(connection, action, request, options, delegate);
} catch (final Exception ex) {
// the caller might not handle this so we invoke the handler
final TransportException te;
if (ex instanceof TransportException) {
te = (TransportException) ex;
} else {
te = new TransportException("failure to send", ex);
}
handler.handleException(te);
}
}
// org.elasticsearch.xpack.security.transport.SecurityServerTransportInterceptor#interceptSender
@Override
public AsyncSender interceptSender(AsyncSender sender) {
return new AsyncSender() {
@Override
public <T extends TransportResponse> void sendRequest(Transport.Connection connection, String action, TransportRequest request,
TransportRequestOptions options, TransportResponseHandler<T> handler) {
final boolean requireAuth = shouldRequireExistingAuthentication();
// the transport in core normally does this check, BUT since we are serializing to a string header we need to do it
// ourselves otherwise we wind up using a version newer than what we can actually send
final Version minVersion = Version.min(connection.getVersion(), Version.CURRENT);
// Sometimes a system action gets executed like a internal create index request or update mappings request
// which means that the user is copied over to system actions so we need to change the user
if (AuthorizationUtils.shouldReplaceUserWithSystem(threadPool.getThreadContext(), action)) {
securityContext.executeAsUser(SystemUser.INSTANCE, (original) -> sendWithUser(connection, action, request, options,
new ContextRestoreResponseHandler<>(threadPool.getThreadContext().wrapRestorable(original)
, handler), sender, requireAuth), minVersion);
} else if (AuthorizationUtils.shouldSetUserBasedOnActionOrigin(threadPool.getThreadContext())) {
AuthorizationUtils.switchUserBasedOnActionOriginAndExecute(threadPool.getThreadContext(), securityContext,
(original) -> sendWithUser(connection, action, request, options,
new ContextRestoreResponseHandler<>(threadPool.getThreadContext().wrapRestorable(original)
, handler), sender, requireAuth));
} else if (securityContext.getAuthentication() != null &&
securityContext.getAuthentication().getVersion().equals(minVersion) == false) {
// re-write the authentication since we want the authentication version to match the version of the connection
securityContext.executeAfterRewritingAuthentication(original -> sendWithUser(connection, action, request, options,
new ContextRestoreResponseHandler<>(threadPool.getThreadContext().wrapRestorable(original), handler), sender,
requireAuth), minVersion);
} else {
sendWithUser(connection, action, request, options, handler, sender, requireAuth);
}
}
};
}
// org.elasticsearch.transport.TransportService#sendRequestInternal
private <T extends TransportResponse> void sendRequestInternal(final Transport.Connection connection, final String action,
final TransportRequest request,
final TransportRequestOptions options,
TransportResponseHandler<T> handler) {
if (connection == null) {
throw new IllegalStateException("can't send request to a null connection");
}
DiscoveryNode node = connection.getNode();
Supplier<ThreadContext.StoredContext> storedContextSupplier = threadPool.getThreadContext().newRestorableContext(true);
ContextRestoreResponseHandler<T> responseHandler = new ContextRestoreResponseHandler<>(storedContextSupplier, handler);
// TODO we can probably fold this entire request ID dance into connection.sendReqeust but it will be a bigger refactoring
final long requestId = responseHandlers.add(new Transport.ResponseContext<>(responseHandler, connection, action));
final TimeoutHandler timeoutHandler;
if (options.timeout() != null) {
timeoutHandler = new TimeoutHandler(requestId, connection.getNode(), action);
responseHandler.setTimeoutHandler(timeoutHandler);
} else {
timeoutHandler = null;
}
try {
if (lifecycle.stoppedOrClosed()) {
/*
* If we are not started the exception handling will remove the request holder again and calls the handler to notify the
* caller. It will only notify if toStop hasn't done the work yet.
*/
throw new NodeClosedException(localNode);
}
if (timeoutHandler != null) {
assert options.timeout() != null;
timeoutHandler.scheduleTimeout(options.timeout());
}
connection.sendRequest(requestId, action, request, options); // local node optimization happens upstream
} catch (final Exception e) {
// usually happen either because we failed to connect to the node
// or because we failed serializing the message
final Transport.ResponseContext<? extends TransportResponse> contextToNotify = responseHandlers.remove(requestId);
// If holderToNotify == null then handler has already been taken care of.
if (contextToNotify != null) {
if (timeoutHandler != null) {
timeoutHandler.cancel();
}
// callback that an exception happened, but on a different thread since we don't
// want handlers to worry about stack overflows. In the special case of running into a closing node we run on the current
// thread on a best effort basis though.
final SendRequestTransportException sendRequestException = new SendRequestTransportException(node, action, e);
final String executor = lifecycle.stoppedOrClosed() ? ThreadPool.Names.SAME : ThreadPool.Names.GENERIC;
threadPool.executor(executor).execute(new AbstractRunnable() {
@Override
public void onRejection(Exception e) {
// if we get rejected during node shutdown we don't wanna bubble it up
logger.debug(
() -> new ParameterizedMessage(
"failed to notify response handler on rejection, action: {}",
contextToNotify.action()),
e);
}
@Override
public void onFailure(Exception e) {
logger.warn(
() -> new ParameterizedMessage(
"failed to notify response handler on exception, action: {}",
contextToNotify.action()),
e);
}
@Override
protected void doRun() throws Exception {
contextToNotify.handler().handleException(sendRequestException);
}
});
} else {
logger.debug("Exception while sending request, handler likely already notified due to timeout", e);
}
}
}
// org.elasticsearch.transport.Transport.Connection#sendRequest
@Override
public void sendRequest(long requestId, String action, TransportRequest request, TransportRequestOptions options)
throws TransportException {
sendLocalRequest(requestId, action, request, options);
}
// org.elasticsearch.transport.TransportService#sendLocalRequest
private void sendLocalRequest(long requestId, final String action, final TransportRequest request, TransportRequestOptions options) {
final DirectResponseChannel channel = new DirectResponseChannel(localNode, action, requestId, this, threadPool);
try {
onRequestSent(localNode, requestId, action, request, options);
onRequestReceived(requestId, action);
final RequestHandlerRegistry reg = getRequestHandler(action);
if (reg == null) {
throw new ActionNotFoundTransportException("Action [" action "] not found");
}
final String executor = reg.getExecutor();
if (ThreadPool.Names.SAME.equals(executor)) {
//noinspection unchecked
// ...
reg.processMessageReceived(request, channel);
} else {
threadPool.executor(executor).execute(new AbstractRunnable() {
@Override
protected void doRun() throws Exception {
//noinspection unchecked
reg.processMessageReceived(request, channel);
}
@Override
public boolean isForceExecution() {
return reg.isForceExecution();
}
@Override
public void onFailure(Exception e) {
try {
channel.sendResponse(e);
} catch (Exception inner) {
inner.addSuppressed(e);
logger.warn(() -> new ParameterizedMessage(
"failed to notify channel of error message for action [{}]", action), inner);
}
}
@Override
public String toString() {
return "processing of [" requestId "][" action "]: " request;
}
});
}
} catch (Exception e) {
try {
channel.sendResponse(e);
} catch (Exception inner) {
inner.addSuppressed(e);
logger.warn(
() -> new ParameterizedMessage(
"failed to notify channel of error message for action [{}]", action), inner);
}
}
}
3. 单节点的搜索实现
前面许多动作,都是在prepare. 但最终,始终要落到lucene上搜索才行。它将由 SearchService 执行搜索动作。
总体时时序图如下:
具体代码实现:
代码语言:javascript复制// org.elasticsearch.search.SearchService#executeQueryPhase
public void executeQueryPhase(ShardSearchRequest request, boolean keepStatesInContext,
SearchShardTask task, ActionListener<SearchPhaseResult> listener) {
assert request.canReturnNullResponseIfMatchNoDocs() == false || request.numberOfShards() > 1
: "empty responses require more than one shard";
final IndexShard shard = getShard(request);
rewriteAndFetchShardRequest(shard, request, new ActionListener<ShardSearchRequest>() {
@Override
public void onResponse(ShardSearchRequest orig) {
// check if we can shortcut the query phase entirely.
if (orig.canReturnNullResponseIfMatchNoDocs()) {
assert orig.scroll() == null;
final CanMatchResponse canMatchResp;
try {
ShardSearchRequest clone = new ShardSearchRequest(orig);
canMatchResp = canMatch(clone, false);
} catch (Exception exc) {
listener.onFailure(exc);
return;
}
if (canMatchResp.canMatch == false) {
listener.onResponse(QuerySearchResult.nullInstance());
return;
}
}
// fork the execution in the search thread pool
// 异步运行 executor.execute()
runAsync(getExecutor(shard), () -> executeQueryPhase(orig, task, keepStatesInContext), listener);
}
@Override
public void onFailure(Exception exc) {
listener.onFailure(exc);
}
});
}
// org.elasticsearch.search.SearchService#executeQueryPhase
private SearchPhaseResult executeQueryPhase(ShardSearchRequest request,
SearchShardTask task,
boolean keepStatesInContext) throws Exception {
final ReaderContext readerContext = createOrGetReaderContext(request, keepStatesInContext);
try (Releasable ignored = readerContext.markAsUsed(getKeepAlive(request));
// 创建上下文,参数信息,此处将重新解析请求参数
SearchContext context = createContext(readerContext, request, task, true)) {
final long afterQueryTime;
try (SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(context)) {
// 执行 search, 得到 DocId 信息,放入context中
loadOrExecuteQueryPhase(request, context);
if (context.queryResult().hasSearchContext() == false && readerContext.singleSession()) {
freeReaderContext(readerContext.id());
}
afterQueryTime = executor.success();
}
if (request.numberOfShards() == 1) {
// 结果集中只存在一个shard, 则可立即查询文档详情
return executeFetchPhase(readerContext, context, afterQueryTime);
} else {
// 否则,需要再做排序操作后再查询文档详情
// Pass the rescoreDocIds to the queryResult to send them the coordinating node and receive them back in the fetch phase.
// We also pass the rescoreDocIds to the LegacyReaderContext in case the search state needs to stay in the data node.
final RescoreDocIds rescoreDocIds = context.rescoreDocIds();
context.queryResult().setRescoreDocIds(rescoreDocIds);
readerContext.setRescoreDocIds(rescoreDocIds);
return context.queryResult();
}
} catch (Exception e) {
// execution exception can happen while loading the cache, strip it
if (e instanceof ExecutionException) {
e = (e.getCause() == null || e.getCause() instanceof Exception) ?
(Exception) e.getCause() : new ElasticsearchException(e.getCause());
}
logger.trace("Query phase failed", e);
processFailure(readerContext, e);
throw e;
}
}
// org.elasticsearch.search.SearchService#loadOrExecuteQueryPhase
/**
* Try to load the query results from the cache or execute the query phase directly if the cache cannot be used.
*/
private void loadOrExecuteQueryPhase(final ShardSearchRequest request, final SearchContext context) throws Exception {
final boolean canCache = indicesService.canCache(request, context);
context.getQueryShardContext().freezeContext();
if (canCache) {
indicesService.loadIntoContext(request, context, queryPhase);
} else {
queryPhase.execute(context);
}
}
以上即是单节点搜索框架,查询DocId, 然后视情况查询doc详情。说来倒也简单,只是许多细节,需要知晓。
首先,创建上下文的过程,主要是解析现有参数,备后续使用,以及后续结果也将存入上下文中。按需展开。
代码语言:javascript复制// org.elasticsearch.search.SearchService#createContext
final SearchContext createContext(ReaderContext readerContext,
ShardSearchRequest request,
SearchShardTask task,
boolean includeAggregations) throws IOException {
final DefaultSearchContext context = createSearchContext(readerContext, request, defaultSearchTimeout);
try {
if (request.scroll() != null) {
context.scrollContext().scroll = request.scroll();
}
// 解析参数
parseSource(context, request.source(), includeAggregations);
// if the from and size are still not set, default them
// DEFAULT_FROM=0
if (context.from() == -1) {
context.from(DEFAULT_FROM);
}
// DEFAULT_SIZE=10
if (context.size() == -1) {
context.size(DEFAULT_SIZE);
}
context.setTask(task);
// pre process
queryPhase.preProcess(context);
} catch (Exception e) {
context.close();
throw e;
}
return context;
}
// org.elasticsearch.search.SearchService#parseSource
private void parseSource(DefaultSearchContext context, SearchSourceBuilder source, boolean includeAggregations) {
// nothing to parse...
if (source == null) {
return;
}
SearchShardTarget shardTarget = context.shardTarget();
QueryShardContext queryShardContext = context.getQueryShardContext();
context.from(source.from());
context.size(source.size());
Map<String, InnerHitContextBuilder> innerHitBuilders = new HashMap<>();
// 解析query
if (source.query() != null) {
InnerHitContextBuilder.extractInnerHits(source.query(), innerHitBuilders);
context.parsedQuery(queryShardContext.toQuery(source.query()));
}
// 解析 post_filter
if (source.postFilter() != null) {
InnerHitContextBuilder.extractInnerHits(source.postFilter(), innerHitBuilders);
context.parsedPostFilter(queryShardContext.toQuery(source.postFilter()));
}
if (innerHitBuilders.size() > 0) {
for (Map.Entry<String, InnerHitContextBuilder> entry : innerHitBuilders.entrySet()) {
try {
entry.getValue().build(context, context.innerHits());
} catch (IOException e) {
throw new SearchException(shardTarget, "failed to build inner_hits", e);
}
}
}
if (source.sorts() != null) {
try {
Optional<SortAndFormats> optionalSort = SortBuilder.buildSort(source.sorts(), context.getQueryShardContext());
if (optionalSort.isPresent()) {
context.sort(optionalSort.get());
}
} catch (IOException e) {
throw new SearchException(shardTarget, "failed to create sort elements", e);
}
}
context.trackScores(source.trackScores());
if (source.trackTotalHitsUpTo() != null
&& source.trackTotalHitsUpTo() != SearchContext.TRACK_TOTAL_HITS_ACCURATE
&& context.scrollContext() != null) {
throw new SearchException(shardTarget, "disabling [track_total_hits] is not allowed in a scroll context");
}
if (source.trackTotalHitsUpTo() != null) {
context.trackTotalHitsUpTo(source.trackTotalHitsUpTo());
}
if (source.minScore() != null) {
context.minimumScore(source.minScore());
}
if (source.profile()) {
context.setProfilers(new Profilers(context.searcher()));
}
if (source.timeout() != null) {
context.timeout(source.timeout());
}
context.terminateAfter(source.terminateAfter());
// 聚合解析
if (source.aggregations() != null && includeAggregations) {
AggregationContext aggContext = new ProductionAggregationContext(
context.getQueryShardContext(),
/*
* The query on the search context right now doesn't include
* the filter for nested documents or slicing so we have to
* delay reading it until the aggs ask for it.
*/
() -> context.query() == null ? new MatchAllDocsQuery() : context.query(),
context.getProfilers() == null ? null : context.getProfilers().getAggregationProfiler(),
multiBucketConsumerService.create(),
() -> new SubSearchContext(context).parsedQuery(context.parsedQuery()).fetchFieldsContext(context.fetchFieldsContext()),
context::addReleasable,
context.bitsetFilterCache(),
context.indexShard().shardId().hashCode(),
context::getRelativeTimeInMillis,
context::isCancelled
);
try {
AggregatorFactories factories = source.aggregations().build(aggContext, null);
context.aggregations(new SearchContextAggregations(factories));
} catch (IOException e) {
throw new AggregationInitializationException("Failed to create aggregators", e);
}
}
if (source.suggest() != null) {
try {
context.suggest(source.suggest().build(queryShardContext));
} catch (IOException e) {
throw new SearchException(shardTarget, "failed to create SuggestionSearchContext", e);
}
}
if (source.rescores() != null) {
try {
for (RescorerBuilder<?> rescore : source.rescores()) {
context.addRescore(rescore.buildContext(queryShardContext));
}
} catch (IOException e) {
throw new SearchException(shardTarget, "failed to create RescoreSearchContext", e);
}
}
if (source.explain() != null) {
context.explain(source.explain());
}
if (source.fetchSource() != null) {
context.fetchSourceContext(source.fetchSource());
}
if (source.docValueFields() != null) {
FetchDocValuesContext docValuesContext = new FetchDocValuesContext(context.getQueryShardContext(), source.docValueFields());
context.docValuesContext(docValuesContext);
}
if (source.fetchFields() != null) {
FetchFieldsContext fetchFieldsContext = new FetchFieldsContext(source.fetchFields());
context.fetchFieldsContext(fetchFieldsContext);
}
if (source.highlighter() != null) {
HighlightBuilder highlightBuilder = source.highlighter();
try {
context.highlight(highlightBuilder.build(queryShardContext));
} catch (IOException e) {
throw new SearchException(shardTarget, "failed to create SearchContextHighlighter", e);
}
}
if (source.scriptFields() != null && source.size() != 0) {
int maxAllowedScriptFields = queryShardContext.getIndexSettings().getMaxScriptFields();
if (source.scriptFields().size() > maxAllowedScriptFields) {
throw new IllegalArgumentException(
"Trying to retrieve too many script_fields. Must be less than or equal to: [" maxAllowedScriptFields
"] but was [" source.scriptFields().size() "]. This limit can be set by changing the ["
IndexSettings.MAX_SCRIPT_FIELDS_SETTING.getKey() "] index level setting.");
}
for (org.elasticsearch.search.builder.SearchSourceBuilder.ScriptField field : source.scriptFields()) {
FieldScript.Factory factory = scriptService.compile(field.script(), FieldScript.CONTEXT);
SearchLookup lookup = context.getQueryShardContext().lookup();
FieldScript.LeafFactory searchScript = factory.newFactory(field.script().getParams(), lookup);
context.scriptFields().add(new ScriptField(field.fieldName(), searchScript, field.ignoreFailure()));
}
}
if (source.ext() != null) {
for (SearchExtBuilder searchExtBuilder : source.ext()) {
context.addSearchExt(searchExtBuilder);
}
}
if (source.version() != null) {
context.version(source.version());
}
if (source.seqNoAndPrimaryTerm() != null) {
context.seqNoAndPrimaryTerm(source.seqNoAndPrimaryTerm());
}
if (source.stats() != null) {
context.groupStats(source.stats());
}
if (CollectionUtils.isEmpty(source.searchAfter()) == false) {
if (context.scrollContext() != null) {
throw new SearchException(shardTarget, "`search_after` cannot be used in a scroll context.");
}
if (context.from() > 0) {
throw new SearchException(shardTarget, "`from` parameter must be set to 0 when `search_after` is used.");
}
FieldDoc fieldDoc = SearchAfterBuilder.buildFieldDoc(context.sort(), source.searchAfter());
context.searchAfter(fieldDoc);
}
if (source.slice() != null) {
if (context.scrollContext() == null) {
throw new SearchException(shardTarget, "`slice` cannot be used outside of a scroll context");
}
context.sliceBuilder(source.slice());
}
if (source.storedFields() != null) {
if (source.storedFields().fetchFields() == false) {
if (context.sourceRequested()) {
throw new SearchException(shardTarget, "[stored_fields] cannot be disabled if [_source] is requested");
}
if (context.fetchFieldsContext() != null) {
throw new SearchException(shardTarget, "[stored_fields] cannot be disabled when using the [fields] option");
}
}
context.storedFieldsContext(source.storedFields());
}
if (source.collapse() != null) {
if (context.scrollContext() != null) {
throw new SearchException(shardTarget, "cannot use `collapse` in a scroll context");
}
if (context.searchAfter() != null) {
throw new SearchException(shardTarget, "cannot use `collapse` in conjunction with `search_after`");
}
if (context.rescore() != null && context.rescore().isEmpty() == false) {
throw new SearchException(shardTarget, "cannot use `collapse` in conjunction with `rescore`");
}
final CollapseContext collapseContext = source.collapse().build(queryShardContext);
context.collapse(collapseContext);
}
}
以上即是单节点搜索框架,查询DocId, 然后视情况查询doc详情。说来倒也简单,只是许多细节,需要知晓。
首先,创建上下文的过程,主要是解析现有参数,备后续使用,以及后续结果也将存入上下文中。按需展开。
代码语言:javascript复制// org.elasticsearch.search.SearchService#createContext
final SearchContext createContext(ReaderContext readerContext,
ShardSearchRequest request,
SearchShardTask task,
boolean includeAggregations) throws IOException {
final DefaultSearchContext context = createSearchContext(readerContext, request, defaultSearchTimeout);
try {
if (request.scroll() != null) {
context.scrollContext().scroll = request.scroll();
}
// 解析参数
parseSource(context, request.source(), includeAggregations);
// if the from and size are still not set, default them
// DEFAULT_FROM=0
if (context.from() == -1) {
context.from(DEFAULT_FROM);
}
// DEFAULT_SIZE=10
if (context.size() == -1) {
context.size(DEFAULT_SIZE);
}
context.setTask(task);
// pre process
queryPhase.preProcess(context);
} catch (Exception e) {
context.close();
throw e;
}
return context;
}
// org.elasticsearch.search.SearchService#parseSource
private void parseSource(DefaultSearchContext context, SearchSourceBuilder source, boolean includeAggregations) {
// nothing to parse...
if (source == null) {
return;
}
SearchShardTarget shardTarget = context.shardTarget();
QueryShardContext queryShardContext = context.getQueryShardContext();
context.from(source.from());
context.size(source.size());
Map<String, InnerHitContextBuilder> innerHitBuilders = new HashMap<>();
// 解析query
if (source.query() != null) {
InnerHitContextBuilder.extractInnerHits(source.query(), innerHitBuilders);
context.parsedQuery(queryShardContext.toQuery(source.query()));
}
// 解析 post_filter
if (source.postFilter() != null) {
InnerHitContextBuilder.extractInnerHits(source.postFilter(), innerHitBuilders);
context.parsedPostFilter(queryShardContext.toQuery(source.postFilter()));
}
if (innerHitBuilders.size() > 0) {
for (Map.Entry<String, InnerHitContextBuilder> entry : innerHitBuilders.entrySet()) {
try {
entry.getValue().build(context, context.innerHits());
} catch (IOException e) {
throw new SearchException(shardTarget, "failed to build inner_hits", e);
}
}
}
if (source.sorts() != null) {
try {
Optional<SortAndFormats> optionalSort = SortBuilder.buildSort(source.sorts(), context.getQueryShardContext());
if (optionalSort.isPresent()) {
context.sort(optionalSort.get());
}
} catch (IOException e) {
throw new SearchException(shardTarget, "failed to create sort elements", e);
}
}
context.trackScores(source.trackScores());
if (source.trackTotalHitsUpTo() != null
&& source.trackTotalHitsUpTo() != SearchContext.TRACK_TOTAL_HITS_ACCURATE
&& context.scrollContext() != null) {
throw new SearchException(shardTarget, "disabling [track_total_hits] is not allowed in a scroll context");
}
if (source.trackTotalHitsUpTo() != null) {
context.trackTotalHitsUpTo(source.trackTotalHitsUpTo());
}
if (source.minScore() != null) {
context.minimumScore(source.minScore());
}
if (source.profile()) {
context.setProfilers(new Profilers(context.searcher()));
}
if (source.timeout() != null) {
context.timeout(source.timeout());
}
context.terminateAfter(source.terminateAfter());
// 聚合解析
if (source.aggregations() != null && includeAggregations) {
AggregationContext aggContext = new ProductionAggregationContext(
context.getQueryShardContext(),
/*
* The query on the search context right now doesn't include
* the filter for nested documents or slicing so we have to
* delay reading it until the aggs ask for it.
*/
() -> context.query() == null ? new MatchAllDocsQuery() : context.query(),
context.getProfilers() == null ? null : context.getProfilers().getAggregationProfiler(),
multiBucketConsumerService.create(),
() -> new SubSearchContext(context).parsedQuery(context.parsedQuery()).fetchFieldsContext(context.fetchFieldsContext()),
context::addReleasable,
context.bitsetFilterCache(),
context.indexShard().shardId().hashCode(),
context::getRelativeTimeInMillis,
context::isCancelled
);
try {
AggregatorFactories factories = source.aggregations().build(aggContext, null);
context.aggregations(new SearchContextAggregations(factories));
} catch (IOException e) {
throw new AggregationInitializationException("Failed to create aggregators", e);
}
}
if (source.suggest() != null) {
try {
context.suggest(source.suggest().build(queryShardContext));
} catch (IOException e) {
throw new SearchException(shardTarget, "failed to create SuggestionSearchContext", e);
}
}
if (source.rescores() != null) {
try {
for (RescorerBuilder<?> rescore : source.rescores()) {
context.addRescore(rescore.buildContext(queryShardContext));
}
} catch (IOException e) {
throw new SearchException(shardTarget, "failed to create RescoreSearchContext", e);
}
}
if (source.explain() != null) {
context.explain(source.explain());
}
if (source.fetchSource() != null) {
context.fetchSourceContext(source.fetchSource());
}
if (source.docValueFields() != null) {
FetchDocValuesContext docValuesContext = new FetchDocValuesContext(context.getQueryShardContext(), source.docValueFields());
context.docValuesContext(docValuesContext);
}
if (source.fetchFields() != null) {
FetchFieldsContext fetchFieldsContext = new FetchFieldsContext(source.fetchFields());
context.fetchFieldsContext(fetchFieldsContext);
}
if (source.highlighter() != null) {
HighlightBuilder highlightBuilder = source.highlighter();
try {
context.highlight(highlightBuilder.build(queryShardContext));
} catch (IOException e) {
throw new SearchException(shardTarget, "failed to create SearchContextHighlighter", e);
}
}
if (source.scriptFields() != null && source.size() != 0) {
int maxAllowedScriptFields = queryShardContext.getIndexSettings().getMaxScriptFields();
if (source.scriptFields().size() > maxAllowedScriptFields) {
throw new IllegalArgumentException(
"Trying to retrieve too many script_fields. Must be less than or equal to: [" maxAllowedScriptFields
"] but was [" source.scriptFields().size() "]. This limit can be set by changing the ["
IndexSettings.MAX_SCRIPT_FIELDS_SETTING.getKey() "] index level setting.");
}
for (org.elasticsearch.search.builder.SearchSourceBuilder.ScriptField field : source.scriptFields()) {
FieldScript.Factory factory = scriptService.compile(field.script(), FieldScript.CONTEXT);
SearchLookup lookup = context.getQueryShardContext().lookup();
FieldScript.LeafFactory searchScript = factory.newFactory(field.script().getParams(), lookup);
context.scriptFields().add(new ScriptField(field.fieldName(), searchScript, field.ignoreFailure()));
}
}
if (source.ext() != null) {
for (SearchExtBuilder searchExtBuilder : source.ext()) {
context.addSearchExt(searchExtBuilder);
}
}
if (source.version() != null) {
context.version(source.version());
}
if (source.seqNoAndPrimaryTerm() != null) {
context.seqNoAndPrimaryTerm(source.seqNoAndPrimaryTerm());
}
if (source.stats() != null) {
context.groupStats(source.stats());
}
if (CollectionUtils.isEmpty(source.searchAfter()) == false) {
if (context.scrollContext() != null) {
throw new SearchException(shardTarget, "`search_after` cannot be used in a scroll context.");
}
if (context.from() > 0) {
throw new SearchException(shardTarget, "`from` parameter must be set to 0 when `search_after` is used.");
}
FieldDoc fieldDoc = SearchAfterBuilder.buildFieldDoc(context.sort(), source.searchAfter());
context.searchAfter(fieldDoc);
}
if (source.slice() != null) {
if (context.scrollContext() == null) {
throw new SearchException(shardTarget, "`slice` cannot be used outside of a scroll context");
}
context.sliceBuilder(source.slice());
}
if (source.storedFields() != null) {
if (source.storedFields().fetchFields() == false) {
if (context.sourceRequested()) {
throw new SearchException(shardTarget, "[stored_fields] cannot be disabled if [_source] is requested");
}
if (context.fetchFieldsContext() != null) {
throw new SearchException(shardTarget, "[stored_fields] cannot be disabled when using the [fields] option");
}
}
context.storedFieldsContext(source.storedFields());
}
if (source.collapse() != null) {
if (context.scrollContext() != null) {
throw new SearchException(shardTarget, "cannot use `collapse` in a scroll context");
}
if (context.searchAfter() != null) {
throw new SearchException(shardTarget, "cannot use `collapse` in conjunction with `search_after`");
}
if (context.rescore() != null && context.rescore().isEmpty() == false) {
throw new SearchException(shardTarget, "cannot use `collapse` in conjunction with `rescore`");
}
final CollapseContext collapseContext = source.collapse().build(queryShardContext);
context.collapse(collapseContext);
}
}
第二、具体查找docId是由 QueryPhase 实现:
代码语言:javascript复制// org.elasticsearch.search.query.QueryPhase#execute
public void execute(SearchContext searchContext) throws QueryPhaseExecutionException {
if (searchContext.hasOnlySuggest()) {
suggestPhase.execute(searchContext);
searchContext.queryResult().topDocs(new TopDocsAndMaxScore(
new TopDocs(new TotalHits(0, TotalHits.Relation.EQUAL_TO), Lucene.EMPTY_SCORE_DOCS), Float.NaN),
new DocValueFormat[0]);
return;
}
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("{}", new SearchContextSourcePrinter(searchContext));
}
// Pre-process aggregations as late as possible. In the case of a DFS_Q_T_F
// request, preProcess is called on the DFS phase phase, this is why we pre-process them
// here to make sure it happens during the QUERY phase
// 几个前置后置处理点,使整体功能齐全
aggregationPhase.preProcess(searchContext);
// 执行真正的查询
boolean rescore = executeInternal(searchContext);
if (rescore) { // only if we do a regular search
rescorePhase.execute(searchContext);
}
suggestPhase.execute(searchContext);
aggregationPhase.execute(searchContext);
if (searchContext.getProfilers() != null) {
ProfileShardResult shardResults = SearchProfileShardResults
.buildShardResults(searchContext.getProfilers());
searchContext.queryResult().profileResults(shardResults);
}
}
// org.elasticsearch.search.query.QueryPhase#executeInternal
/**
* In a package-private method so that it can be tested without having to
* wire everything (mapperService, etc.)
* @return whether the rescoring phase should be executed
*/
static boolean executeInternal(SearchContext searchContext) throws QueryPhaseExecutionException {
final ContextIndexSearcher searcher = searchContext.searcher();
SortAndFormats sortAndFormatsForRewrittenNumericSort = null;
final IndexReader reader = searcher.getIndexReader();
QuerySearchResult queryResult = searchContext.queryResult();
queryResult.searchTimedOut(false);
try {
queryResult.from(searchContext.from());
queryResult.size(searchContext.size());
Query query = searchContext.query();
assert query == searcher.rewrite(query); // already rewritten
final ScrollContext scrollContext = searchContext.scrollContext();
if (scrollContext != null) {
if (scrollContext.totalHits == null) {
// first round
assert scrollContext.lastEmittedDoc == null;
// there is not much that we can optimize here since we want to collect all
// documents in order to get the total number of hits
} else {
final ScoreDoc after = scrollContext.lastEmittedDoc;
if (returnsDocsInOrder(query, searchContext.sort())) {
// now this gets interesting: since we sort in index-order, we can directly
// skip to the desired doc
if (after != null) {
query = new BooleanQuery.Builder()
.add(query, BooleanClause.Occur.MUST)
.add(new MinDocQuery(after.doc 1), BooleanClause.Occur.FILTER)
.build();
}
// ... and stop collecting after ${size} matches
searchContext.terminateAfter(searchContext.size());
} else if (canEarlyTerminate(reader, searchContext.sort())) {
// now this gets interesting: since the search sort is a prefix of the index sort, we can directly
// skip to the desired doc
if (after != null) {
query = new BooleanQuery.Builder()
.add(query, BooleanClause.Occur.MUST)
.add(new SearchAfterSortedDocQuery(searchContext.sort().sort, (FieldDoc) after), BooleanClause.Occur.FILTER)
.build();
}
}
}
}
final LinkedList<QueryCollectorContext> collectors = new LinkedList<>();
// whether the chain contains a collector that filters documents
boolean hasFilterCollector = false;
if (searchContext.terminateAfter() != SearchContext.DEFAULT_TERMINATE_AFTER) {
// add terminate_after before the filter collectors
// it will only be applied on documents accepted by these filter collectors
collectors.add(createEarlyTerminationCollectorContext(searchContext.terminateAfter()));
// this collector can filter documents during the collection
hasFilterCollector = true;
}
if (searchContext.parsedPostFilter() != null) {
// add post filters before aggregations
// it will only be applied to top hits
collectors.add(createFilteredCollectorContext(searcher, searchContext.parsedPostFilter().query()));
// this collector can filter documents during the collection
hasFilterCollector = true;
}
if (searchContext.queryCollectors().isEmpty() == false) {
// plug in additional collectors, like aggregations
collectors.add(createMultiCollectorContext(searchContext.queryCollectors().values()));
}
if (searchContext.minimumScore() != null) {
// apply the minimum score after multi collector so we filter aggs as well
collectors.add(createMinScoreCollectorContext(searchContext.minimumScore()));
// this collector can filter documents during the collection
hasFilterCollector = true;
}
CheckedConsumer<List<LeafReaderContext>, IOException> leafSorter = l -> {};
// try to rewrite numeric or date sort to the optimized distanceFeatureQuery
if ((searchContext.sort() != null) && SYS_PROP_REWRITE_SORT) {
Query rewrittenQuery = tryRewriteLongSort(searchContext, searcher.getIndexReader(), query, hasFilterCollector);
if (rewrittenQuery != null) {
query = rewrittenQuery;
// modify sorts: add sort on _score as 1st sort, and move the sort on the original field as the 2nd sort
SortField[] oldSortFields = searchContext.sort().sort.getSort();
DocValueFormat[] oldFormats = searchContext.sort().formats;
SortField[] newSortFields = new SortField[oldSortFields.length 1];
DocValueFormat[] newFormats = new DocValueFormat[oldSortFields.length 1];
newSortFields[0] = SortField.FIELD_SCORE;
newFormats[0] = DocValueFormat.RAW;
System.arraycopy(oldSortFields, 0, newSortFields, 1, oldSortFields.length);
System.arraycopy(oldFormats, 0, newFormats, 1, oldFormats.length);
sortAndFormatsForRewrittenNumericSort = searchContext.sort(); // stash SortAndFormats to restore it later
searchContext.sort(new SortAndFormats(new Sort(newSortFields), newFormats));
leafSorter = createLeafSorter(oldSortFields[0]);
}
}
boolean timeoutSet = scrollContext == null && searchContext.timeout() != null &&
searchContext.timeout().equals(SearchService.NO_TIMEOUT) == false;
final Runnable timeoutRunnable;
if (timeoutSet) {
final long startTime = searchContext.getRelativeTimeInMillis();
final long timeout = searchContext.timeout().millis();
final long maxTime = startTime timeout;
timeoutRunnable = searcher.addQueryCancellation(() -> {
final long time = searchContext.getRelativeTimeInMillis();
if (time > maxTime) {
throw new TimeExceededException();
}
});
} else {
timeoutRunnable = null;
}
if (searchContext.lowLevelCancellation()) {
searcher.addQueryCancellation(() -> {
SearchShardTask task = searchContext.getTask();
if (task != null && task.isCancelled()) {
throw new TaskCancelledException("cancelled");
}
});
}
try {
boolean shouldRescore;
// if we are optimizing sort and there are no other collectors
if (sortAndFormatsForRewrittenNumericSort!=null && collectors.size()==0 && searchContext.getProfilers()==null) {
shouldRescore = searchWithCollectorManager(searchContext, searcher, query, leafSorter, timeoutSet);
} else {
// search...
shouldRescore = searchWithCollector(searchContext, searcher, query, collectors, hasFilterCollector, timeoutSet);
}
// if we rewrote numeric long or date sort, restore fieldDocs based on the original sort
if (sortAndFormatsForRewrittenNumericSort!=null) {
searchContext.sort(sortAndFormatsForRewrittenNumericSort); // restore SortAndFormats
restoreTopFieldDocs(queryResult, sortAndFormatsForRewrittenNumericSort);
}
ExecutorService executor = searchContext.indexShard().getThreadPool().executor(ThreadPool.Names.SEARCH);
if (executor instanceof QueueResizingEsThreadPoolExecutor) {
QueueResizingEsThreadPoolExecutor rExecutor = (QueueResizingEsThreadPoolExecutor) executor;
queryResult.nodeQueueSize(rExecutor.getCurrentQueueSize());
queryResult.serviceTimeEWMA((long) rExecutor.getTaskExecutionEWMA());
}
return shouldRescore;
} finally {
// Search phase has finished, no longer need to check for timeout
// otherwise aggregation phase might get cancelled.
if (timeoutRunnable!=null) {
searcher.removeQueryCancellation(timeoutRunnable);
}
}
} catch (Exception e) {
throw new QueryPhaseExecutionException(searchContext.shardTarget(), "Failed to execute main query", e);
}
}
// org.elasticsearch.search.query.QueryPhase#searchWithCollector
private static boolean searchWithCollector(SearchContext searchContext, ContextIndexSearcher searcher, Query query,
LinkedList<QueryCollectorContext> collectors, boolean hasFilterCollector, boolean timeoutSet) throws IOException {
// create the top docs collector last when the other collectors are known
// 创建docs上下文
final TopDocsCollectorContext topDocsFactory = createTopDocsCollectorContext(searchContext, hasFilterCollector);
// add the top docs collector, the first collector context in the chain
collectors.addFirst(topDocsFactory);
final Collector queryCollector;
if (searchContext.getProfilers() != null) {
InternalProfileCollector profileCollector = QueryCollectorContext.createQueryCollectorWithProfiler(collectors);
searchContext.getProfilers().getCurrentQueryProfiler().setCollector(profileCollector);
queryCollector = profileCollector;
} else {
queryCollector = QueryCollectorContext.createQueryCollector(collectors);
}
QuerySearchResult queryResult = searchContext.queryResult();
try {
// 调用lucene接口,执行真正的查询
searcher.search(query, queryCollector);
} catch (EarlyTerminatingCollector.EarlyTerminationException e) {
queryResult.terminatedEarly(true);
} catch (TimeExceededException e) {
assert timeoutSet : "TimeExceededException thrown even though timeout wasn't set";
if (searchContext.request().allowPartialSearchResults() == false) {
// Can't rethrow TimeExceededException because not serializable
throw new QueryPhaseExecutionException(searchContext.shardTarget(), "Time exceeded");
}
queryResult.searchTimedOut(true);
}
if (searchContext.terminateAfter() != SearchContext.DEFAULT_TERMINATE_AFTER && queryResult.terminatedEarly() == null) {
queryResult.terminatedEarly(false);
}
for (QueryCollectorContext ctx : collectors) {
ctx.postProcess(queryResult);
}
return topDocsFactory.shouldRescore();
}
更多。。。
代码语言:javascript复制// org.elasticsearch.search.query.TopDocsCollectorContext#createTopDocsCollectorContext
/**
* Creates a {@link TopDocsCollectorContext} from the provided <code>searchContext</code>.
* @param hasFilterCollector True if the collector chain contains at least one collector that can filters document.
*/
static TopDocsCollectorContext createTopDocsCollectorContext(SearchContext searchContext,
boolean hasFilterCollector) throws IOException {
final IndexReader reader = searchContext.searcher().getIndexReader();
final Query query = searchContext.query();
// top collectors don't like a size of 0
final int totalNumDocs = Math.max(1, reader.numDocs());
if (searchContext.size() == 0) {
// no matter what the value of from is
return new EmptyTopDocsCollectorContext(reader, query, searchContext.sort(),
searchContext.trackTotalHitsUpTo(), hasFilterCollector);
} else if (searchContext.scrollContext() != null) {
// we can disable the tracking of total hits after the initial scroll query
// since the total hits is preserved in the scroll context.
int trackTotalHitsUpTo = searchContext.scrollContext().totalHits != null ?
SearchContext.TRACK_TOTAL_HITS_DISABLED : SearchContext.TRACK_TOTAL_HITS_ACCURATE;
// no matter what the value of from is
int numDocs = Math.min(searchContext.size(), totalNumDocs);
return new ScrollingTopDocsCollectorContext(reader, query, searchContext.scrollContext(),
searchContext.sort(), numDocs, searchContext.trackScores(), searchContext.numberOfShards(),
trackTotalHitsUpTo, hasFilterCollector);
} else if (searchContext.collapse() != null) {
boolean trackScores = searchContext.sort() == null ? true : searchContext.trackScores();
int numDocs = Math.min(searchContext.from() searchContext.size(), totalNumDocs);
return new CollapsingTopDocsCollectorContext(searchContext.collapse(), searchContext.sort(), numDocs, trackScores);
} else {
int numDocs = Math.min(searchContext.from() searchContext.size(), totalNumDocs);
final boolean rescore = searchContext.rescore().isEmpty() == false;
if (rescore) {
assert searchContext.sort() == null;
for (RescoreContext rescoreContext : searchContext.rescore()) {
numDocs = Math.max(numDocs, rescoreContext.getWindowSize());
}
}
return new SimpleTopDocsCollectorContext(reader, query, searchContext.sort(), searchContext.searchAfter(), numDocs,
searchContext.trackScores(), searchContext.trackTotalHitsUpTo(), hasFilterCollector) {
@Override
boolean shouldRescore() {
return rescore;
}
};
}
}
// org.apache.lucene.search.IndexSearcher#search
/** Lower-level search API.
*
* <p>{@link LeafCollector#collect(int)} is called for every matching document.
*
* @throws BooleanQuery.TooManyClauses If a query would exceed
* {@link BooleanQuery#getMaxClauseCount()} clauses.
*/
public void search(Query query, Collector results)
throws IOException {
query = rewrite(query);
search(leafContexts, createWeight(query, results.scoreMode(), 1), results);
}
// org.elasticsearch.search.internal.ContextIndexSearcher#search
@Override
protected void search(List<LeafReaderContext> leaves, Weight weight, Collector collector) throws IOException {
for (LeafReaderContext ctx : leaves) { // search each subreader
searchLeaf(ctx, weight, collector);
}
}
// org.elasticsearch.search.internal.ContextIndexSearcher#searchLeaf
/**
* Lower-level search API.
*
* {@link LeafCollector#collect(int)} is called for every matching document in
* the provided <code>ctx</code>.
*/
private void searchLeaf(LeafReaderContext ctx, Weight weight, Collector collector) throws IOException {
cancellable.checkCancelled();
weight = wrapWeight(weight);
final LeafCollector leafCollector;
try {
leafCollector = collector.getLeafCollector(ctx);
} catch (CollectionTerminatedException e) {
// there is no doc of interest in this reader context
// continue with the following leaf
return;
}
Bits liveDocs = ctx.reader().getLiveDocs();
BitSet liveDocsBitSet = getSparseBitSetOrNull(liveDocs);
if (liveDocsBitSet == null) {
BulkScorer bulkScorer = weight.bulkScorer(ctx);
if (bulkScorer != null) {
try {
bulkScorer.score(leafCollector, liveDocs);
} catch (CollectionTerminatedException e) {
// collection was terminated prematurely
// continue with the following leaf
}
}
} else {
// if the role query result set is sparse then we should use the SparseFixedBitSet for advancing:
Scorer scorer = weight.scorer(ctx);
if (scorer != null) {
try {
intersectScorerAndBitSet(scorer, liveDocsBitSet, leafCollector,
this.cancellable.isEnabled() ? cancellable::checkCancelled: () -> {});
} catch (CollectionTerminatedException e) {
// collection was terminated prematurely
// continue with the following leaf
}
}
}
}
// org.apache.lucene.search.BulkScorer#score
/** Scores and collects all matching documents.
* @param collector The collector to which all matching documents are passed.
* @param acceptDocs {@link Bits} that represents the allowed documents to match, or
* {@code null} if they are all allowed to match.
*/
public void score(LeafCollector collector, Bits acceptDocs) throws IOException {
final int next = score(collector, acceptDocs, 0, DocIdSetIterator.NO_MORE_DOCS);
assert next == DocIdSetIterator.NO_MORE_DOCS;
}
// org.elasticsearch.search.query.TopDocsCollectorContext.SimpleTopDocsCollectorContext#postProcess
@Override
void postProcess(QuerySearchResult result) throws IOException {
final TopDocsAndMaxScore topDocs = newTopDocs();
result.topDocs(topDocs, sortAndFormats == null ? null : sortAndFormats.formats);
}
// org.elasticsearch.search.query.TopDocsCollectorContext.SimpleTopDocsCollectorContext#newTopDocs
TopDocsAndMaxScore newTopDocs() {
TopDocs in = topDocsSupplier.get();
float maxScore = maxScoreSupplier.get();
final TopDocs newTopDocs;
if (in instanceof TopFieldDocs) {
TopFieldDocs fieldDocs = (TopFieldDocs) in;
newTopDocs = new TopFieldDocs(totalHitsSupplier.get(), fieldDocs.scoreDocs, fieldDocs.fields);
} else {
newTopDocs = new TopDocs(totalHitsSupplier.get(), in.scoreDocs);
}
return new TopDocsAndMaxScore(newTopDocs, maxScore);
}
最后,我们来看下查询到docId后,查询字段信息过程。(一般地,该过程会在所有节点的docId都查找完成之后,由协调节点处理后再进行该阶段操作。但此处,我们相当于走了 QUERY_AND_FETCH 流程,即立即查询结果。)
代码语言:javascript复制// org.elasticsearch.search.SearchService#executeFetchPhase
private QueryFetchSearchResult executeFetchPhase(ReaderContext reader, SearchContext context, long afterQueryTime) {
try (SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(context, true, afterQueryTime)){
shortcutDocIdsToLoad(context);
// 执行查询,将结果写入 context
fetchPhase.execute(context);
if (reader.singleSession()) {
freeReaderContext(reader.id());
}
executor.success();
}
return new QueryFetchSearchResult(context.queryResult(), context.fetchResult());
}
// org.elasticsearch.search.SearchService#shortcutDocIdsToLoad
/**
* Shortcut ids to load, we load only "from" and up to "size". The phase controller
* handles this as well since the result is always size * shards for Q_T_F
*/
private void shortcutDocIdsToLoad(SearchContext context) {
final int[] docIdsToLoad;
int docsOffset = 0;
final Suggest suggest = context.queryResult().suggest();
int numSuggestDocs = 0;
final List<CompletionSuggestion> completionSuggestions;
if (suggest != null && suggest.hasScoreDocs()) {
completionSuggestions = suggest.filter(CompletionSuggestion.class);
for (CompletionSuggestion completionSuggestion : completionSuggestions) {
numSuggestDocs = completionSuggestion.getOptions().size();
}
} else {
completionSuggestions = Collections.emptyList();
}
if (context.request().scroll() != null) {
TopDocs topDocs = context.queryResult().topDocs().topDocs;
docIdsToLoad = new int[topDocs.scoreDocs.length numSuggestDocs];
for (int i = 0; i < topDocs.scoreDocs.length; i ) {
docIdsToLoad[docsOffset ] = topDocs.scoreDocs[i].doc;
}
} else {
TopDocs topDocs = context.queryResult().topDocs().topDocs;
if (topDocs.scoreDocs.length < context.from()) {
// no more docs...
docIdsToLoad = new int[numSuggestDocs];
} else {
int totalSize = context.from() context.size();
docIdsToLoad = new int[Math.min(topDocs.scoreDocs.length - context.from(), context.size())
numSuggestDocs];
for (int i = context.from(); i < Math.min(totalSize, topDocs.scoreDocs.length); i ) {
docIdsToLoad[docsOffset ] = topDocs.scoreDocs[i].doc;
}
}
}
for (CompletionSuggestion completionSuggestion : completionSuggestions) {
for (CompletionSuggestion.Entry.Option option : completionSuggestion.getOptions()) {
docIdsToLoad[docsOffset ] = option.getDoc().doc;
}
}
context.docIdsToLoad(docIdsToLoad, docIdsToLoad.length);
}
// org.elasticsearch.search.fetch.FetchPhase#execute
public void execute(SearchContext context) {
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("{}", new SearchContextSourcePrinter(context));
}
if (context.isCancelled()) {
throw new TaskCancelledException("cancelled");
}
if (context.docIdsToLoadSize() == 0) {
// no individual hits to process, so we shortcut
context.fetchResult().hits(new SearchHits(new SearchHit[0], context.queryResult().getTotalHits(),
context.queryResult().getMaxScore()));
return;
}
DocIdToIndex[] docs = new DocIdToIndex[context.docIdsToLoadSize()];
for (int index = 0; index < context.docIdsToLoadSize(); index ) {
docs[index] = new DocIdToIndex(context.docIdsToLoad()[index], index);
}
// make sure that we iterate in doc id order
Arrays.sort(docs);
Map<String, Set<String>> storedToRequestedFields = new HashMap<>();
FieldsVisitor fieldsVisitor = createStoredFieldsVisitor(context, storedToRequestedFields);
FetchContext fetchContext = new FetchContext(context);
SearchHit[] hits = new SearchHit[context.docIdsToLoadSize()];
List<FetchSubPhaseProcessor> processors = getProcessors(context.shardTarget(), fetchContext);
NestedDocuments nestedDocuments = context.getNestedDocuments();
int currentReaderIndex = -1;
LeafReaderContext currentReaderContext = null;
LeafNestedDocuments leafNestedDocuments = null;
CheckedBiConsumer<Integer, FieldsVisitor, IOException> fieldReader = null;
boolean hasSequentialDocs = hasSequentialDocs(docs);
for (int index = 0; index < context.docIdsToLoadSize(); index ) {
if (context.isCancelled()) {
throw new TaskCancelledException("cancelled");
}
int docId = docs[index].docId;
try {
int readerIndex = ReaderUtil.subIndex(docId, context.searcher().getIndexReader().leaves());
if (currentReaderIndex != readerIndex) {
currentReaderContext = context.searcher().getIndexReader().leaves().get(readerIndex);
currentReaderIndex = readerIndex;
if (currentReaderContext.reader() instanceof SequentialStoredFieldsLeafReader
&& hasSequentialDocs && docs.length >= 10) {
// All the docs to fetch are adjacent but Lucene stored fields are optimized
// for random access and don't optimize for sequential access - except for merging.
// So we do a little hack here and pretend we're going to do merges in order to
// get better sequential access.
SequentialStoredFieldsLeafReader lf = (SequentialStoredFieldsLeafReader) currentReaderContext.reader();
fieldReader = lf.getSequentialStoredFieldsReader()::visitDocument;
} else {
// reader.documet() 查询文档
fieldReader = currentReaderContext.reader()::document;
}
for (FetchSubPhaseProcessor processor : processors) {
processor.setNextReader(currentReaderContext);
}
leafNestedDocuments = nestedDocuments.getLeafNestedDocuments(currentReaderContext);
}
assert currentReaderContext != null;
HitContext hit = prepareHitContext(
context,
leafNestedDocuments,
nestedDocuments::hasNonNestedParent,
fieldsVisitor,
docId,
storedToRequestedFields,
currentReaderContext,
fieldReader);
for (FetchSubPhaseProcessor processor : processors) {
processor.process(hit);
}
hits[docs[index].index] = hit.hit();
} catch (Exception e) {
throw new FetchPhaseExecutionException(context.shardTarget(), "Error running fetch phase for doc [" docId "]", e);
}
}
if (context.isCancelled()) {
throw new TaskCancelledException("cancelled");
}
TotalHits totalHits = context.queryResult().getTotalHits();
context.fetchResult().hits(new SearchHits(hits, totalHits, context.queryResult().getMaxScore()));
}
// org.elasticsearch.search.fetch.FetchPhase#prepareHitContext
private HitContext prepareHitContext(SearchContext context,
LeafNestedDocuments nestedDocuments,
Predicate<String> hasNonNestedParent,
FieldsVisitor fieldsVisitor,
int docId,
Map<String, Set<String>> storedToRequestedFields,
LeafReaderContext subReaderContext,
CheckedBiConsumer<Integer, FieldsVisitor, IOException> storedFieldReader) throws IOException {
if (nestedDocuments.advance(docId - subReaderContext.docBase) == null) {
return prepareNonNestedHitContext(
context, fieldsVisitor, docId, storedToRequestedFields, subReaderContext, storedFieldReader);
} else {
return prepareNestedHitContext(context, docId, nestedDocuments, hasNonNestedParent, storedToRequestedFields,
subReaderContext, storedFieldReader);
}
}
// org.elasticsearch.search.fetch.FetchPhase#prepareNonNestedHitContext
/**
* Resets the provided {@link HitContext} with information on the current
* document. This includes the following:
* - Adding an initial {@link SearchHit} instance.
* - Loading the document source and setting it on {@link HitContext#sourceLookup()}. This
* allows fetch subphases that use the hit context to access the preloaded source.
*/
private HitContext prepareNonNestedHitContext(SearchContext context,
FieldsVisitor fieldsVisitor,
int docId,
Map<String, Set<String>> storedToRequestedFields,
LeafReaderContext subReaderContext,
CheckedBiConsumer<Integer, FieldsVisitor, IOException> fieldReader) throws IOException {
int subDocId = docId - subReaderContext.docBase;
QueryShardContext queryShardContext = context.getQueryShardContext();
if (fieldsVisitor == null) {
SearchHit hit = new SearchHit(docId, null, new Text(queryShardContext.getType()), null, null);
return new HitContext(hit, subReaderContext, subDocId);
} else {
SearchHit hit;
// 字段填充
loadStoredFields(context.getQueryShardContext()::getFieldType, queryShardContext.getType(), fieldReader,
fieldsVisitor, subDocId);
Uid uid = fieldsVisitor.uid();
if (fieldsVisitor.fields().isEmpty() == false) {
Map<String, DocumentField> docFields = new HashMap<>();
Map<String, DocumentField> metaFields = new HashMap<>();
fillDocAndMetaFields(context, fieldsVisitor, storedToRequestedFields, docFields, metaFields);
hit = new SearchHit(docId, uid.id(), new Text(queryShardContext.getType()), docFields, metaFields);
} else {
// hit 信息返回
hit = new SearchHit(docId, uid.id(), new Text(queryShardContext.getType()), emptyMap(), emptyMap());
}
HitContext hitContext = new HitContext(hit, subReaderContext, subDocId);
if (fieldsVisitor.source() != null) {
// Store the loaded source on the hit context so that fetch subphases can access it.
// Also make it available to scripts by storing it on the shared SearchLookup instance.
hitContext.sourceLookup().setSource(fieldsVisitor.source());
SourceLookup scriptSourceLookup = context.getQueryShardContext().lookup().source();
scriptSourceLookup.setSegmentAndDocument(subReaderContext, subDocId);
scriptSourceLookup.setSource(fieldsVisitor.source());
}
return hitContext;
}
}
// org.elasticsearch.search.fetch.FetchPhase#loadStoredFields
private void loadStoredFields(Function<String, MappedFieldType> fieldTypeLookup,
@Nullable String type,
CheckedBiConsumer<Integer, FieldsVisitor, IOException> fieldReader,
FieldsVisitor fieldVisitor, int docId) throws IOException {
fieldVisitor.reset();
// org.apache.lucene.index.FilterLeafReader#document
fieldReader.accept(docId, fieldVisitor);
fieldVisitor.postProcess(fieldTypeLookup, type);
}
经过如上过程,es已经搜索得到结果,最后就是将结果响应给客户端了。此过程虽不复杂,却也值得一看。
代码语言:javascript复制// org.elasticsearch.rest.action.RestStatusToXContentListener#buildResponse
@Override
public RestResponse buildResponse(Response response, XContentBuilder builder) throws Exception {
assert response.isFragment() == false; //would be nice if we could make default methods final
response.toXContent(builder, channel.request());
RestResponse restResponse = new BytesRestResponse(response.status(), builder);
if (RestStatus.CREATED == restResponse.status()) {
final String location = extractLocation.apply(response);
if (location != null) {
restResponse.addHeader("Location", location);
}
}
return restResponse;
}
代码语言:javascript复制// org.elasticsearch.action.search.SearchResponse#toXContent
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
innerToXContent(builder, params);
builder.endObject();
return builder;
}
// org.elasticsearch.action.search.SearchResponse#innerToXContent
public XContentBuilder innerToXContent(XContentBuilder builder, Params params) throws IOException {
if (scrollId != null) {
builder.field(SCROLL_ID.getPreferredName(), scrollId);
}
if (pointInTimeId != null) {
builder.field(POINT_IN_TIME_ID.getPreferredName(), pointInTimeId);
}
builder.field(TOOK.getPreferredName(), tookInMillis);
builder.field(TIMED_OUT.getPreferredName(), isTimedOut());
if (isTerminatedEarly() != null) {
builder.field(TERMINATED_EARLY.getPreferredName(), isTerminatedEarly());
}
if (getNumReducePhases() != 1) {
builder.field(NUM_REDUCE_PHASES.getPreferredName(), getNumReducePhases());
}
RestActions.buildBroadcastShardsHeader(builder, params, getTotalShards(), getSuccessfulShards(), getSkippedShards(),
getFailedShards(), getShardFailures());
clusters.toXContent(builder, params);
internalResponse.toXContent(builder, params);
return builder;
}
// org.elasticsearch.action.search.SearchResponseSections#toXContent
@Override
public final XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
hits.toXContent(builder, params);
if (aggregations != null) {
aggregations.toXContent(builder, params);
}
if (suggest != null) {
suggest.toXContent(builder, params);
}
if (profileResults != null) {
profileResults.toXContent(builder, params);
}
return builder;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject(Fields.HITS);
boolean totalHitAsInt = params.paramAsBoolean(RestSearchAction.TOTAL_HITS_AS_INT_PARAM, false);
if (totalHitAsInt) {
long total = totalHits == null ? -1 : totalHits.value;
builder.field(Fields.TOTAL, total);
} else if (totalHits != null) {
builder.startObject(Fields.TOTAL);
builder.field("value", totalHits.value);
builder.field("relation", totalHits.relation == Relation.EQUAL_TO ? "eq" : "gte");
builder.endObject();
}
if (Float.isNaN(maxScore)) {
builder.nullField(Fields.MAX_SCORE);
} else {
builder.field(Fields.MAX_SCORE, maxScore);
}
builder.field(Fields.HITS);
builder.startArray();
for (SearchHit hit : hits) {
hit.toXContent(builder, params);
}
builder.endArray();
builder.endObject();
return builder;
}
// org.elasticsearch.search.SearchHit#toXContent
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
toInnerXContent(builder, params);
builder.endObject();
return builder;
}
// org.elasticsearch.search.SearchHit#toInnerXContent
// public because we render hit as part of completion suggestion option
public XContentBuilder toInnerXContent(XContentBuilder builder, Params params) throws IOException {
// For inner_hit hits shard is null and that is ok, because the parent search hit has all this information.
// Even if this was included in the inner_hit hits this would be the same, so better leave it out.
if (getExplanation() != null && shard != null) {
builder.field(Fields._SHARD, shard.getShardId());
builder.field(Fields._NODE, shard.getNodeIdText());
}
if (index != null) {
builder.field(Fields._INDEX, RemoteClusterAware.buildRemoteIndexName(clusterAlias, index));
}
if (type != null) {
builder.field(Fields._TYPE, type);
}
if (id != null) {
builder.field(Fields._ID, id);
}
if (nestedIdentity != null) {
nestedIdentity.toXContent(builder, params);
}
if (version != -1) {
builder.field(Fields._VERSION, version);
}
if (seqNo != SequenceNumbers.UNASSIGNED_SEQ_NO) {
builder.field(Fields._SEQ_NO, seqNo);
builder.field(Fields._PRIMARY_TERM, primaryTerm);
}
if (Float.isNaN(score)) {
builder.nullField(Fields._SCORE);
} else {
builder.field(Fields._SCORE, score);
}
for (DocumentField field : metaFields.values()) {
// ignore empty metadata fields
if (field.getValues().size() == 0) {
continue;
}
// _ignored is the only multi-valued meta field
// TODO: can we avoid having an exception here?
if (field.getName().equals(IgnoredFieldMapper.NAME)) {
builder.field(field.getName(), field.getValues());
} else {
builder.field(field.getName(), field.<Object>getValue());
}
}
if (source != null) {
XContentHelper.writeRawField(SourceFieldMapper.NAME, source, builder, params);
}
if (documentFields.isEmpty() == false &&
// ignore fields all together if they are all empty
documentFields.values().stream()
.anyMatch(df -> df.getValues().size() > 0)) {
builder.startObject(Fields.FIELDS);
for (DocumentField field : documentFields.values()) {
if (field.getValues().size() > 0) {
field.toXContent(builder, params);
}
}
builder.endObject();
}
if (highlightFields != null && !highlightFields.isEmpty()) {
builder.startObject(Fields.HIGHLIGHT);
for (HighlightField field : highlightFields.values()) {
field.toXContent(builder, params);
}
builder.endObject();
}
sortValues.toXContent(builder, params);
if (matchedQueries.length > 0) {
builder.startArray(Fields.MATCHED_QUERIES);
for (String matchedFilter : matchedQueries) {
builder.value(matchedFilter);
}
builder.endArray();
}
if (getExplanation() != null) {
builder.field(Fields._EXPLANATION);
buildExplanation(builder, getExplanation());
}
if (innerHits != null) {
builder.startObject(Fields.INNER_HITS);
for (Map.Entry<String, SearchHits> entry : innerHits.entrySet()) {
builder.startObject(entry.getKey());
entry.getValue().toXContent(builder, params);
builder.endObject();
}
builder.endObject();
}
return builder;
}
本文讲了es search的简要框架过程,并就单个节点的搜索细节给了答案。原没有什么,只供各位看官了解了解罢了。若要知大概,只需两个时序图即可,无须废神。