前面讲的都是些比较大的东西,即框架层面的东西。今天咱们来个轻松点的,只讲一个点:如题,get单条记录的es查询实现。
1:get语义说明
get是用于搜索单条es的数据,是根据主键id查询数据方式。类比关系型数据库中的sql则相当于:
代码语言:javascript复制select * from test where id = #{id};
当然了,es中每个关键词,都有相当多的附加描述词汇。比如:指定输出字段,版本号。。。
2. get的实现简要说明
从语义上讲,get的结果至多只有一条记录。所以,虽然es是集群存储数据的,但此处都需要从某节点取得一条数据即可。所以,理论上,只要能够快速定位到数据在哪个es节点上,然后向其发起请求,即可获取到结果了。
另外,对于使用主键id来进行查询数据,只要数据结构设计得当,应该会有非常高效的查询能力。
所以,通过本功能的实现方式分析,我们可以简要理解es key的分布方式。
3. get的具体实现
get只是es语法中的一个小点,根据上一节我们分析,知道了如何可以找到处理get的处理器。此处,我们简单再回顾下:
代码语言:javascript复制// org.elasticsearch.rest.RestController#dispatchRequest
@Override
public void dispatchRequest(RestRequest request, RestChannel channel, ThreadContext threadContext) {
try {
// 尝试所有可能的处理器
tryAllHandlers(request, channel, threadContext);
} catch (Exception e) {
try {
// 发生异常则响应异常信息
channel.sendResponse(new BytesRestResponse(channel, e));
} catch (Exception inner) {
inner.addSuppressed(e);
logger.error(() ->
new ParameterizedMessage("failed to send failure response for uri [{}]", request.uri()), inner);
}
}
}
// org.elasticsearch.rest.RestController#tryAllHandlers
private void tryAllHandlers(final RestRequest request, final RestChannel channel, final ThreadContext threadContext) throws Exception {
// 读取 header 信息
for (final RestHeaderDefinition restHeader : headersToCopy) {
final String name = restHeader.getName();
final List<String> headerValues = request.getAllHeaderValues(name);
if (headerValues != null && headerValues.isEmpty() == false) {
final List<String> distinctHeaderValues = headerValues.stream().distinct().collect(Collectors.toList());
if (restHeader.isMultiValueAllowed() == false && distinctHeaderValues.size() > 1) {
channel.sendResponse(
BytesRestResponse.
createSimpleErrorResponse(channel, BAD_REQUEST, "multiple values for single-valued header [" name "]."));
return;
} else {
threadContext.putHeader(name, String.join(",", distinctHeaderValues));
}
}
}
// error_trace cannot be used when we disable detailed errors
// we consume the error_trace parameter first to ensure that it is always consumed
if (request.paramAsBoolean("error_trace", false) && channel.detailedErrorsEnabled() == false) {
channel.sendResponse(
BytesRestResponse.createSimpleErrorResponse(channel, BAD_REQUEST, "error traces in responses are disabled."));
return;
}
final String rawPath = request.rawPath();
final String uri = request.uri();
final RestRequest.Method requestMethod;
try {
// Resolves the HTTP method and fails if the method is invalid
requestMethod = request.method();
// Loop through all possible handlers, attempting to dispatch the request
// 获取可能的处理器,主要是有正则或者索引变量的存在,可能匹配多个处理器
Iterator<MethodHandlers> allHandlers = getAllHandlers(request.params(), rawPath);
while (allHandlers.hasNext()) {
final RestHandler handler;
// 一个处理器里支持多种请求方法
final MethodHandlers handlers = allHandlers.next();
if (handlers == null) {
handler = null;
} else {
handler = handlers.getHandler(requestMethod);
}
if (handler == null) {
// 未找到处理器不代表不能处理,有可能需要继续查找,如果确定不能处理,则直接响应客户端返回
if (handleNoHandlerFound(rawPath, requestMethod, uri, channel)) {
return;
}
} else {
// 找到了处理器,调用其方法
dispatchRequest(request, channel, handler);
return;
}
}
} catch (final IllegalArgumentException e) {
handleUnsupportedHttpMethod(uri, null, channel, getValidHandlerMethodSet(rawPath), e);
return;
}
// If request has not been handled, fallback to a bad request error.
// 降级方法调用
handleBadRequest(uri, requestMethod, channel);
}
// org.elasticsearch.rest.RestController#dispatchRequest
private void dispatchRequest(RestRequest request, RestChannel channel, RestHandler handler) throws Exception {
final int contentLength = request.contentLength();
if (contentLength > 0) {
final XContentType xContentType = request.getXContentType();
if (xContentType == null) {
sendContentTypeErrorMessage(request.getAllHeaderValues("Content-Type"), channel);
return;
}
if (handler.supportsContentStream() && xContentType != XContentType.JSON && xContentType != XContentType.SMILE) {
channel.sendResponse(BytesRestResponse.createSimpleErrorResponse(channel, RestStatus.NOT_ACCEPTABLE,
"Content-Type [" xContentType "] does not support stream parsing. Use JSON or SMILE instead"));
return;
}
}
RestChannel responseChannel = channel;
try {
// 熔断判定
if (handler.canTripCircuitBreaker()) {
inFlightRequestsBreaker(circuitBreakerService).addEstimateBytesAndMaybeBreak(contentLength, "<http_request>");
} else {
inFlightRequestsBreaker(circuitBreakerService).addWithoutBreaking(contentLength);
}
// iff we could reserve bytes for the request we need to send the response also over this channel
responseChannel = new ResourceHandlingHttpChannel(channel, circuitBreakerService, contentLength);
// TODO: Count requests double in the circuit breaker if they need copying?
if (handler.allowsUnsafeBuffers() == false) {
request.ensureSafeBuffers();
}
if (handler.allowSystemIndexAccessByDefault() == false && request.header(ELASTIC_PRODUCT_ORIGIN_HTTP_HEADER) == null) {
// The ELASTIC_PRODUCT_ORIGIN_HTTP_HEADER indicates that the request is coming from an Elastic product with a plan
// to move away from direct access to system indices, and thus deprecation warnings should not be emitted.
// This header is intended for internal use only.
client.threadPool().getThreadContext().putHeader(SYSTEM_INDEX_ACCESS_CONTROL_HEADER_KEY, Boolean.FALSE.toString());
}
// 调用handler处理方法,该handler可能会被过滤器先执行
handler.handleRequest(request, responseChannel, client);
} catch (Exception e) {
responseChannel.sendResponse(new BytesRestResponse(responseChannel, e));
}
}
// org.elasticsearch.xpack.security.rest.SecurityRestFilter#handleRequest
@Override
public void handleRequest(RestRequest request, RestChannel channel, NodeClient client) throws Exception {
if (licenseState.isSecurityEnabled() && request.method() != Method.OPTIONS) {
// CORS - allow for preflight unauthenticated OPTIONS request
if (extractClientCertificate) {
HttpChannel httpChannel = request.getHttpChannel();
SSLEngineUtils.extractClientCertificates(logger, threadContext, httpChannel);
}
final String requestUri = request.uri();
authenticationService.authenticate(maybeWrapRestRequest(request), ActionListener.wrap(
authentication -> {
if (authentication == null) {
logger.trace("No authentication available for REST request [{}]", requestUri);
} else {
logger.trace("Authenticated REST request [{}] as {}", requestUri, authentication);
}
secondaryAuthenticator.authenticateAndAttachToContext(request, ActionListener.wrap(
secondaryAuthentication -> {
if (secondaryAuthentication != null) {
logger.trace("Found secondary authentication {} in REST request [{}]", secondaryAuthentication, requestUri);
}
RemoteHostHeader.process(request, threadContext);
restHandler.handleRequest(request, channel, client);
},
e -> handleException("Secondary authentication", request, channel, e)));
}, e -> handleException("Authentication", request, channel, e)));
} else {
// 转发到下一处理器责任链
restHandler.handleRequest(request, channel, client);
}
}
一般地,很多具体的处理器都会继承 BaseRestHandler, 即 handleRequest() 整体调用时序图如下:
handleRequest() 整体调用时序图
具体代码实现如下:
代码语言:javascript复制// org.elasticsearch.rest.BaseRestHandler#handleRequest
@Override
public final void handleRequest(RestRequest request, RestChannel channel, NodeClient client) throws Exception {
// prepare the request for execution; has the side effect of touching the request parameters
// 看起来叫准备请求,实际非常重要,它会组装后续的请求逻辑
final RestChannelConsumer action = prepareRequest(request, client);
// validate unconsumed params, but we must exclude params used to format the response
// use a sorted set so the unconsumed parameters appear in a reliable sorted order
final SortedSet<String> unconsumedParams =
request.unconsumedParams().stream().filter(p -> !responseParams().contains(p)).collect(Collectors.toCollection(TreeSet::new));
// validate the non-response params
if (!unconsumedParams.isEmpty()) {
final Set<String> candidateParams = new HashSet<>();
candidateParams.addAll(request.consumedParams());
candidateParams.addAll(responseParams());
throw new IllegalArgumentException(unrecognized(request, unconsumedParams, candidateParams, "parameter"));
}
if (request.hasContent() && request.isContentConsumed() == false) {
throw new IllegalArgumentException("request [" request.method() " " request.path() "] does not support having a body");
}
usageCount.increment();
// execute the action
// 即此处仅为调用前面设置好的方法
action.accept(channel);
}
而处理get的处理器,我们实际上可以通过在最初注册的时候,可以看到是 RestGetAction, 它实现的 prepareRequest() 体现了其处理方法。
代码语言:javascript复制// org.elasticsearch.rest.action.document.RestGetAction#prepareRequest
@Override
public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException {
GetRequest getRequest;
if (request.hasParam("type")) {
deprecationLogger.deprecate("get_with_types", TYPES_DEPRECATION_MESSAGE);
getRequest = new GetRequest(request.param("index"), request.param("type"), request.param("id"));
} else {
getRequest = new GetRequest(request.param("index"), request.param("id"));
}
getRequest.refresh(request.paramAsBoolean("refresh", getRequest.refresh()));
getRequest.routing(request.param("routing"));
getRequest.preference(request.param("preference"));
getRequest.realtime(request.paramAsBoolean("realtime", getRequest.realtime()));
if (request.param("fields") != null) {
throw new IllegalArgumentException("the parameter [fields] is no longer supported, "
"please use [stored_fields] to retrieve stored fields or [_source] to load the field from _source");
}
final String fieldsParam = request.param("stored_fields");
if (fieldsParam != null) {
final String[] fields = Strings.splitStringByCommaToArray(fieldsParam);
if (fields != null) {
getRequest.storedFields(fields);
}
}
getRequest.version(RestActions.parseVersion(request));
getRequest.versionType(VersionType.fromString(request.param("version_type"), getRequest.versionType()));
getRequest.fetchSourceContext(FetchSourceContext.parseFromRestRequest(request));
// 封装具体业务处理方法
// 交由 NodeClient 处理
return channel -> client.get(getRequest, new RestToXContentListener<GetResponse>(channel) {
@Override
protected RestStatus getStatus(final GetResponse response) {
return response.isExists() ? OK : NOT_FOUND;
}
});
}
// org.elasticsearch.search.fetch.subphase.FetchSourceContext#parseFromRestRequest
public static FetchSourceContext parseFromRestRequest(RestRequest request) {
Boolean fetchSource = null;
String[] sourceExcludes = null;
String[] sourceIncludes = null;
String source = request.param("_source");
if (source != null) {
if (Booleans.isTrue(source)) {
fetchSource = true;
} else if (Booleans.isFalse(source)) {
fetchSource = false;
} else {
sourceIncludes = Strings.splitStringByCommaToArray(source);
}
}
String sIncludes = request.param("_source_includes");
if (sIncludes != null) {
sourceIncludes = Strings.splitStringByCommaToArray(sIncludes);
}
String sExcludes = request.param("_source_excludes");
if (sExcludes != null) {
sourceExcludes = Strings.splitStringByCommaToArray(sExcludes);
}
if (fetchSource != null || sourceIncludes != null || sourceExcludes != null) {
return new FetchSourceContext(fetchSource == null ? true : fetchSource, sourceIncludes, sourceExcludes);
}
return null;
}
上面就是get的处理实现前奏,可以看出其支持的参数:type, id, refresh, routing, preference, fields, stored_fields, version_type, _source, _source_includes... 果然还是有点复杂,选择越多,越麻烦。
其中,url的请求方式,我们可以从es的route信息中看到:
代码语言:javascript复制// org.elasticsearch.rest.action.document.RestGetAction#routes
@Override
public List<Route> routes() {
return unmodifiableList(asList(
new Route(GET, "/{index}/_doc/{id}"),
new Route(HEAD, "/{index}/_doc/{id}"),
// Deprecated typed endpoints.
new Route(GET, "/{index}/{type}/{id}"),
new Route(HEAD, "/{index}/{type}/{id}")));
}
最终,会得到一个 TransportGetAction 的内部处理器。
代码语言:javascript复制// org.elasticsearch.action.support.TransportAction#execute
/**
* Use this method when the transport action call should result in creation of a new task associated with the call.
*
* This is a typical behavior.
*/
public final Task execute(Request request, ActionListener<Response> listener) {
/*
* While this version of execute could delegate to the TaskListener
* version of execute that'd add yet another layer of wrapping on the
* listener and prevent us from using the listener bare if there isn't a
* task. That just seems like too many objects. Thus the two versions of
* this method.
*/
final Releasable unregisterChildNode = registerChildNode(request.getParentTask());
final Task task;
try {
task = taskManager.register("transport", actionName, request);
} catch (TaskCancelledException e) {
unregisterChildNode.close();
throw e;
}
execute(task, request, new ActionListener<Response>() {
@Override
public void onResponse(Response response) {
try {
Releasables.close(unregisterChildNode, () -> taskManager.unregister(task));
} finally {
listener.onResponse(response);
}
}
@Override
public void onFailure(Exception e) {
try {
Releasables.close(unregisterChildNode, () -> taskManager.unregister(task));
} finally {
listener.onFailure(e);
}
}
});
return task;
}
/**
* Use this method when the transport action should continue to run in the context of the current task
*/
public final void execute(Task task, Request request, ActionListener<Response> listener) {
ActionRequestValidationException validationException = request.validate();
if (validationException != null) {
listener.onFailure(validationException);
return;
}
if (task != null && request.getShouldStoreResult()) {
listener = new TaskResultStoringActionListener<>(taskManager, task, listener);
}
RequestFilterChain<Request, Response> requestFilterChain = new RequestFilterChain<>(this, logger);
requestFilterChain.proceed(task, actionName, request, listener);
}
TransportGetAction 继承了 TransportSingleShardAction 继承了 TransportAction . , 所以 doExecute 调用了 TransportSingleShardAction 的方法。
代码语言:javascript复制// org.elasticsearch.action.support.single.shard.TransportSingleShardAction#doExecute
@Override
protected void doExecute(Task task, Request request, ActionListener<Response> listener) {
new AsyncSingleAction(request, listener).start();
}
// org.elasticsearch.action.support.single.shard.TransportSingleShardAction.AsyncSingleAction
private AsyncSingleAction(Request request, ActionListener<Response> listener) {
this.listener = listener;
ClusterState clusterState = clusterService.state();
if (logger.isTraceEnabled()) {
logger.trace("executing [{}] based on cluster state version [{}]", request, clusterState.version());
}
nodes = clusterState.nodes();
ClusterBlockException blockException = checkGlobalBlock(clusterState);
if (blockException != null) {
throw blockException;
}
String concreteSingleIndex;
// 解析 index 参数
if (resolveIndex(request)) {
concreteSingleIndex = indexNameExpressionResolver.concreteSingleIndex(clusterState, request).getName();
} else {
concreteSingleIndex = request.index();
}
// 组装内部请求
this.internalRequest = new InternalRequest(request, concreteSingleIndex);
resolveRequest(clusterState, internalRequest);
// 再次检测集群阻塞状态
blockException = checkRequestBlock(clusterState, internalRequest);
if (blockException != null) {
throw blockException;
}
// 获取所有分片信息,向集群发起请求
this.shardIt = shards(clusterState, internalRequest);
}
// org.elasticsearch.cluster.metadata.IndexNameExpressionResolver#concreteSingleIndex
/**
* Utility method that allows to resolve an index expression to its corresponding single concrete index.
* Callers should make sure they provide proper {@link org.elasticsearch.action.support.IndicesOptions}
* that require a single index as a result. The indices resolution must in fact return a single index when
* using this method, an {@link IllegalArgumentException} gets thrown otherwise.
*
* @param state the cluster state containing all the data to resolve to expression to a concrete index
* @param request The request that defines how the an alias or an index need to be resolved to a concrete index
* and the expression that can be resolved to an alias or an index name.
* @throws IllegalArgumentException if the index resolution lead to more than one index
* @return the concrete index obtained as a result of the index resolution
*/
public Index concreteSingleIndex(ClusterState state, IndicesRequest request) {
String indexExpression = CollectionUtils.isEmpty(request.indices()) ? null : request.indices()[0];
Index[] indices = concreteIndices(state, request.indicesOptions(), indexExpression);
if (indices.length != 1) {
throw new IllegalArgumentException("unable to return a single index as the index and options"
" provided got resolved to multiple indices");
}
return indices[0];
}
// org.elasticsearch.cluster.metadata.IndexNameExpressionResolver#concreteIndices
/**
* Translates the provided index expression into actual concrete indices, properly deduplicated.
*
* @param state the cluster state containing all the data to resolve to expressions to concrete indices
* @param options defines how the aliases or indices need to be resolved to concrete indices
* @param indexExpressions expressions that can be resolved to alias or index names.
* @return the resolved concrete indices based on the cluster state, indices options and index expressions
* @throws IndexNotFoundException if one of the index expressions is pointing to a missing index or alias and the
* provided indices options in the context don't allow such a case, or if the final result of the indices resolution
* contains no indices and the indices options in the context don't allow such a case.
* @throws IllegalArgumentException if one of the aliases resolve to multiple indices and the provided
* indices options in the context don't allow such a case.
*/
public Index[] concreteIndices(ClusterState state, IndicesOptions options, String... indexExpressions) {
return concreteIndices(state, options, false, indexExpressions);
}
public Index[] concreteIndices(ClusterState state, IndicesOptions options, boolean includeDataStreams, String... indexExpressions) {
Context context = new Context(state, options, false, false, includeDataStreams,
isSystemIndexAccessAllowed());
return concreteIndices(context, indexExpressions);
}
Index[] concreteIndices(Context context, String... indexExpressions) {
if (indexExpressions == null || indexExpressions.length == 0) {
indexExpressions = new String[]{Metadata.ALL};
}
Metadata metadata = context.getState().metadata();
IndicesOptions options = context.getOptions();
// If only one index is specified then whether we fail a request if an index is missing depends on the allow_no_indices
// option. At some point we should change this, because there shouldn't be a reason why whether a single index
// or multiple indices are specified yield different behaviour.
final boolean failNoIndices = indexExpressions.length == 1 ? !options.allowNoIndices() : !options.ignoreUnavailable();
List<String> expressions = Arrays.asList(indexExpressions);
for (ExpressionResolver expressionResolver : expressionResolvers) {
expressions = expressionResolver.resolve(context, expressions);
}
if (expressions.isEmpty()) {
if (!options.allowNoIndices()) {
IndexNotFoundException infe;
if (indexExpressions.length == 1) {
if (indexExpressions[0].equals(Metadata.ALL)) {
infe = new IndexNotFoundException("no indices exist", (String)null);
} else {
infe = new IndexNotFoundException((String)null);
}
} else {
infe = new IndexNotFoundException((String)null);
}
infe.setResources("index_expression", indexExpressions);
throw infe;
} else {
return Index.EMPTY_ARRAY;
}
}
boolean excludedDataStreams = false;
final Set<Index> concreteIndices = new LinkedHashSet<>(expressions.size());
for (String expression : expressions) {
IndexAbstraction indexAbstraction = metadata.getIndicesLookup().get(expression);
if (indexAbstraction == null ) {
if (failNoIndices) {
IndexNotFoundException infe;
if (expression.equals(Metadata.ALL)) {
infe = new IndexNotFoundException("no indices exist", expression);
} else {
infe = new IndexNotFoundException(expression);
}
infe.setResources("index_expression", expression);
throw infe;
} else {
continue;
}
} else if (indexAbstraction.getType() == IndexAbstraction.Type.ALIAS && context.getOptions().ignoreAliases()) {
if (failNoIndices) {
throw aliasesNotSupportedException(expression);
} else {
continue;
}
} else if (indexAbstraction.getType() == IndexAbstraction.Type.DATA_STREAM &&
context.includeDataStreams() == false) {
excludedDataStreams = true;
continue;
}
if (indexAbstraction.getType() == IndexAbstraction.Type.ALIAS && context.isResolveToWriteIndex()) {
IndexMetadata writeIndex = indexAbstraction.getWriteIndex();
if (writeIndex == null) {
throw new IllegalArgumentException("no write index is defined for alias [" indexAbstraction.getName() "]."
" The write index may be explicitly disabled using is_write_index=false or the alias points to multiple"
" indices without one being designated as a write index");
}
if (addIndex(writeIndex, context)) {
concreteIndices.add(writeIndex.getIndex());
}
} else if (indexAbstraction.getType() == IndexAbstraction.Type.DATA_STREAM && context.isResolveToWriteIndex()) {
IndexMetadata writeIndex = indexAbstraction.getWriteIndex();
if (addIndex(writeIndex, context)) {
concreteIndices.add(writeIndex.getIndex());
}
} else {
if (indexAbstraction.getIndices().size() > 1 && !options.allowAliasesToMultipleIndices()) {
String[] indexNames = new String[indexAbstraction.getIndices().size()];
int i = 0;
for (IndexMetadata indexMetadata : indexAbstraction.getIndices()) {
indexNames[i ] = indexMetadata.getIndex().getName();
}
throw new IllegalArgumentException(indexAbstraction.getType().getDisplayName() " [" expression
"] has more than one index associated with it " Arrays.toString(indexNames)
", can't execute a single index op");
}
for (IndexMetadata index : indexAbstraction.getIndices()) {
if (shouldTrackConcreteIndex(context, options, index)) {
concreteIndices.add(index.getIndex());
}
}
}
}
if (options.allowNoIndices() == false && concreteIndices.isEmpty()) {
IndexNotFoundException infe = new IndexNotFoundException((String)null);
infe.setResources("index_expression", indexExpressions);
if (excludedDataStreams) {
// Allows callers to handle IndexNotFoundException differently based on whether data streams were excluded.
infe.addMetadata(EXCLUDED_DATA_STREAMS_KEY, "true");
}
throw infe;
}
checkSystemIndexAccess(context, metadata, concreteIndices, indexExpressions);
return concreteIndices.toArray(new Index[concreteIndices.size()]);
}
// org.elasticsearch.action.get.TransportGetAction#resolveRequest
@Override
protected void resolveRequest(ClusterState state, InternalRequest request) {
// update the routing (request#index here is possibly an alias)
request.request().routing(state.metadata().resolveIndexRouting(request.request().routing(), request.request().index()));
// Fail fast on the node that received the request.
if (request.request().routing() == null && state.getMetadata().routingRequired(request.concreteIndex())) {
throw new RoutingMissingException(request.concreteIndex(), request.request().type(), request.request().id());
}
}
@Override
protected ShardIterator shards(ClusterState state, InternalRequest request) {
return clusterService.operationRouting()
.getShards(clusterService.state(), request.concreteIndex(), request.request().id(), request.request().routing(),
request.request().preference());
}
// org.elasticsearch.action.support.single.shard.TransportSingleShardAction.AsyncSingleAction#start
public void start() {
if (shardIt == null) {
// just execute it on the local node
final Writeable.Reader<Response> reader = getResponseReader();
transportService.sendRequest(clusterService.localNode(), transportShardAction, internalRequest.request(),
new TransportResponseHandler<Response>() {
@Override
public Response read(StreamInput in) throws IOException {
return reader.read(in);
}
@Override
public void handleResponse(final Response response) {
listener.onResponse(response);
}
@Override
public void handleException(TransportException exp) {
listener.onFailure(exp);
}
});
} else {
perform(null);
}
}
代码语言:javascript复制private void perform(@Nullable final Exception currentFailure) {
Exception lastFailure = this.lastFailure;
if (lastFailure == null || TransportActions.isReadOverrideException(currentFailure)) {
lastFailure = currentFailure;
this.lastFailure = currentFailure;
}
final ShardRouting shardRouting = shardIt.nextOrNull();
if (shardRouting == null) {
Exception failure = lastFailure;
if (failure == null || isShardNotAvailableException(failure)) {
failure = new NoShardAvailableActionException(null,
LoggerMessageFormat.format("No shard available for [{}]", internalRequest.request()), failure);
} else {
logger.debug(() -> new ParameterizedMessage("{}: failed to execute [{}]", null,
internalRequest.request()), failure);
}
listener.onFailure(failure);
return;
}
DiscoveryNode node = nodes.get(shardRouting.currentNodeId());
if (node == null) {
onFailure(shardRouting, new NoShardAvailableActionException(shardRouting.shardId()));
} else {
internalRequest.request().internalShardId = shardRouting.shardId();
if (logger.isTraceEnabled()) {
logger.trace(
"sending request [{}] to shard [{}] on node [{}]",
internalRequest.request(),
internalRequest.request().internalShardId,
node
);
}
final Writeable.Reader<Response> reader = getResponseReader();
transportService.sendRequest(node, transportShardAction, internalRequest.request(),
new TransportResponseHandler<Response>() {
@Override
public Response read(StreamInput in) throws IOException {
return reader.read(in);
}
@Override
public void handleResponse(final Response response) {
listener.onResponse(response);
}
@Override
public void handleException(TransportException exp) {
onFailure(shardRouting, exp);
}
});
}
}
sendRequest, 本意是向外部网络发起请求,得到结果的过程。但es针对本节点的请求当然不会真正向外请求,我们以请求本节点的时序图,来看看es处理get的sendRequest过程吧。
sendRequest 调用时序图
具体代码实现如下:
代码语言:javascript复制// org.elasticsearch.transport.TransportService#sendRequest
public <T extends TransportResponse> void sendRequest(final DiscoveryNode node, final String action,
final TransportRequest request,
final TransportResponseHandler<T> handler) {
sendRequest(node, action, request, TransportRequestOptions.EMPTY, handler);
}
public final <T extends TransportResponse> void sendRequest(final DiscoveryNode node, final String action,
final TransportRequest request,
final TransportRequestOptions options,
TransportResponseHandler<T> handler) {
final Transport.Connection connection;
try {
connection = getConnection(node);
} catch (final NodeNotConnectedException ex) {
// the caller might not handle this so we invoke the handler
handler.handleException(ex);
return;
}
sendRequest(connection, action, request, options, handler);
}
/**
* Returns either a real transport connection or a local node connection if we are using the local node optimization.
* @throws NodeNotConnectedException if the given node is not connected
*/
public Transport.Connection getConnection(DiscoveryNode node) {
// 如果是当前节点,直接返回当前service即可,无须再访问远程节点
if (isLocalNode(node)) {
return localNodeConnection;
} else {
return connectionManager.getConnection(node);
}
}
private boolean isLocalNode(DiscoveryNode discoveryNode) {
return Objects.requireNonNull(discoveryNode, "discovery node must not be null").equals(localNode);
}
// org.elasticsearch.transport.TransportService#sendRequest
/**
* Sends a request on the specified connection. If there is a failure sending the request, the specified handler is invoked.
*
* @param connection the connection to send the request on
* @param action the name of the action
* @param request the request
* @param options the options for this request
* @param handler the response handler
* @param <T> the type of the transport response
*/
public final <T extends TransportResponse> void sendRequest(final Transport.Connection connection, final String action,
final TransportRequest request,
final TransportRequestOptions options,
final TransportResponseHandler<T> handler) {
try {
final TransportResponseHandler<T> delegate;
if (request.getParentTask().isSet()) {
// If the connection is a proxy connection, then we will create a cancellable proxy task on the proxy node and an actual
// child task on the target node of the remote cluster.
// ----> a parent task on the local cluster
// |
// ----> a proxy task on the proxy node on the remote cluster
// |
// ----> an actual child task on the target node on the remote cluster
// To cancel the child task on the remote cluster, we must send a cancel request to the proxy node instead of the target
// node as the parent task of the child task is the proxy task not the parent task on the local cluster. Hence, here we
// unwrap the connection and keep track of the connection to the proxy node instead of the proxy connection.
final Transport.Connection unwrappedConn = unwrapConnection(connection);
final Releasable unregisterChildNode = taskManager.registerChildConnection(request.getParentTask().getId(), unwrappedConn);
delegate = new TransportResponseHandler<T>() {
@Override
public void handleResponse(T response) {
unregisterChildNode.close();
handler.handleResponse(response);
}
@Override
public void handleException(TransportException exp) {
unregisterChildNode.close();
handler.handleException(exp);
}
@Override
public String executor() {
return handler.executor();
}
@Override
public T read(StreamInput in) throws IOException {
return handler.read(in);
}
@Override
public String toString() {
return getClass().getName() "/[" action "]:" handler.toString();
}
};
} else {
delegate = handler;
}
asyncSender.sendRequest(connection, action, request, options, delegate);
} catch (final Exception ex) {
// the caller might not handle this so we invoke the handler
final TransportException te;
if (ex instanceof TransportException) {
te = (TransportException) ex;
} else {
te = new TransportException("failure to send", ex);
}
handler.handleException(te);
}
}
// org.elasticsearch.xpack.security.transport.SecurityServerTransportInterceptor#interceptSender
@Override
public AsyncSender interceptSender(AsyncSender sender) {
return new AsyncSender() {
@Override
public <T extends TransportResponse> void sendRequest(Transport.Connection connection, String action, TransportRequest request,
TransportRequestOptions options, TransportResponseHandler<T> handler) {
final boolean requireAuth = shouldRequireExistingAuthentication();
// the transport in core normally does this check, BUT since we are serializing to a string header we need to do it
// ourselves otherwise we wind up using a version newer than what we can actually send
final Version minVersion = Version.min(connection.getVersion(), Version.CURRENT);
// Sometimes a system action gets executed like a internal create index request or update mappings request
// which means that the user is copied over to system actions so we need to change the user
if (AuthorizationUtils.shouldReplaceUserWithSystem(threadPool.getThreadContext(), action)) {
securityContext.executeAsUser(SystemUser.INSTANCE, (original) -> sendWithUser(connection, action, request, options,
new ContextRestoreResponseHandler<>(threadPool.getThreadContext().wrapRestorable(original)
, handler), sender, requireAuth), minVersion);
} else if (AuthorizationUtils.shouldSetUserBasedOnActionOrigin(threadPool.getThreadContext())) {
AuthorizationUtils.switchUserBasedOnActionOriginAndExecute(threadPool.getThreadContext(), securityContext,
(original) -> sendWithUser(connection, action, request, options,
new ContextRestoreResponseHandler<>(threadPool.getThreadContext().wrapRestorable(original)
, handler), sender, requireAuth));
} else if (securityContext.getAuthentication() != null &&
securityContext.getAuthentication().getVersion().equals(minVersion) == false) {
// re-write the authentication since we want the authentication version to match the version of the connection
securityContext.executeAfterRewritingAuthentication(original -> sendWithUser(connection, action, request, options,
new ContextRestoreResponseHandler<>(threadPool.getThreadContext().wrapRestorable(original), handler), sender,
requireAuth), minVersion);
} else {
sendWithUser(connection, action, request, options, handler, sender, requireAuth);
}
}
};
}
// org.elasticsearch.xpack.security.transport.SecurityServerTransportInterceptor#sendWithUser
private <T extends TransportResponse> void sendWithUser(Transport.Connection connection, String action, TransportRequest request,
TransportRequestOptions options, TransportResponseHandler<T> handler,
AsyncSender sender, final boolean requireAuthentication) {
if (securityContext.getAuthentication() == null && requireAuthentication) {
// we use an assertion here to ensure we catch this in our testing infrastructure, but leave the ISE for cases we do not catch
// in tests and may be hit by a user
assertNoAuthentication(action);
throw new IllegalStateException("there should always be a user when sending a message for action [" action "]");
}
try {
sender.sendRequest(connection, action, request, options, handler);
} catch (Exception e) {
handler.handleException(new TransportException("failed sending request", e));
}
}
// org.elasticsearch.transport.TransportService#sendRequestInternal
private <T extends TransportResponse> void sendRequestInternal(final Transport.Connection connection, final String action,
final TransportRequest request,
final TransportRequestOptions options,
TransportResponseHandler<T> handler) {
if (connection == null) {
throw new IllegalStateException("can't send request to a null connection");
}
DiscoveryNode node = connection.getNode();
Supplier<ThreadContext.StoredContext> storedContextSupplier = threadPool.getThreadContext().newRestorableContext(true);
ContextRestoreResponseHandler<T> responseHandler = new ContextRestoreResponseHandler<>(storedContextSupplier, handler);
// TODO we can probably fold this entire request ID dance into connection.sendReqeust but it will be a bigger refactoring
final long requestId = responseHandlers.add(new Transport.ResponseContext<>(responseHandler, connection, action));
final TimeoutHandler timeoutHandler;
if (options.timeout() != null) {
timeoutHandler = new TimeoutHandler(requestId, connection.getNode(), action);
responseHandler.setTimeoutHandler(timeoutHandler);
} else {
timeoutHandler = null;
}
try {
if (lifecycle.stoppedOrClosed()) {
/*
* If we are not started the exception handling will remove the request holder again and calls the handler to notify the
* caller. It will only notify if toStop hasn't done the work yet.
*/
throw new NodeClosedException(localNode);
}
if (timeoutHandler != null) {
assert options.timeout() != null;
timeoutHandler.scheduleTimeout(options.timeout());
}
// 请求发送
connection.sendRequest(requestId, action, request, options); // local node optimization happens upstream
} catch (final Exception e) {
// usually happen either because we failed to connect to the node
// or because we failed serializing the message
final Transport.ResponseContext<? extends TransportResponse> contextToNotify = responseHandlers.remove(requestId);
// If holderToNotify == null then handler has already been taken care of.
if (contextToNotify != null) {
if (timeoutHandler != null) {
timeoutHandler.cancel();
}
// callback that an exception happened, but on a different thread since we don't
// want handlers to worry about stack overflows. In the special case of running into a closing node we run on the current
// thread on a best effort basis though.
final SendRequestTransportException sendRequestException = new SendRequestTransportException(node, action, e);
final String executor = lifecycle.stoppedOrClosed() ? ThreadPool.Names.SAME : ThreadPool.Names.GENERIC;
threadPool.executor(executor).execute(new AbstractRunnable() {
@Override
public void onRejection(Exception e) {
// if we get rejected during node shutdown we don't wanna bubble it up
logger.debug(
() -> new ParameterizedMessage(
"failed to notify response handler on rejection, action: {}",
contextToNotify.action()),
e);
}
@Override
public void onFailure(Exception e) {
logger.warn(
() -> new ParameterizedMessage(
"failed to notify response handler on exception, action: {}",
contextToNotify.action()),
e);
}
@Override
protected void doRun() throws Exception {
contextToNotify.handler().handleException(sendRequestException);
}
});
} else {
logger.debug("Exception while sending request, handler likely already notified due to timeout", e);
}
}
}
// org.elasticsearch.transport.Transport.Connection#sendRequest
@Override
public void sendRequest(long requestId, String action, TransportRequest request, TransportRequestOptions options)
throws TransportException {
sendLocalRequest(requestId, action, request, options);
}
// org.elasticsearch.transport.TransportService#sendLocalRequest
private void sendLocalRequest(long requestId, final String action, final TransportRequest request, TransportRequestOptions options) {
final DirectResponseChannel channel = new DirectResponseChannel(localNode, action, requestId, this, threadPool);
try {
onRequestSent(localNode, requestId, action, request, options);
onRequestReceived(requestId, action);
// 获取 handler
final RequestHandlerRegistry reg = getRequestHandler(action);
if (reg == null) {
throw new ActionNotFoundTransportException("Action [" action "] not found");
}
final String executor = reg.getExecutor();
if (ThreadPool.Names.SAME.equals(executor)) {
//noinspection unchecked
reg.processMessageReceived(request, channel);
} else {
threadPool.executor(executor).execute(new AbstractRunnable() {
@Override
protected void doRun() throws Exception {
//noinspection unchecked
reg.processMessageReceived(request, channel);
}
@Override
public boolean isForceExecution() {
return reg.isForceExecution();
}
@Override
public void onFailure(Exception e) {
try {
channel.sendResponse(e);
} catch (Exception inner) {
inner.addSuppressed(e);
logger.warn(() -> new ParameterizedMessage(
"failed to notify channel of error message for action [{}]", action), inner);
}
}
@Override
public String toString() {
return "processing of [" requestId "][" action "]: " request;
}
});
}
} catch (Exception e) {
try {
channel.sendResponse(e);
} catch (Exception inner) {
inner.addSuppressed(e);
logger.warn(
() -> new ParameterizedMessage(
"failed to notify channel of error message for action [{}]", action), inner);
}
}
}
// org.elasticsearch.transport.RequestHandlerRegistry#processMessageReceived
public void processMessageReceived(Request request, TransportChannel channel) throws Exception {
final Task task = taskManager.register(channel.getChannelType(), action, request);
Releasable unregisterTask = () -> taskManager.unregister(task);
try {
if (channel instanceof TcpTransportChannel && task instanceof CancellableTask) {
final TcpChannel tcpChannel = ((TcpTransportChannel) channel).getChannel();
final Releasable stopTracking = taskManager.startTrackingCancellableChannelTask(tcpChannel, (CancellableTask) task);
unregisterTask = Releasables.wrap(unregisterTask, stopTracking);
}
final TaskTransportChannel taskTransportChannel = new TaskTransportChannel(channel, unregisterTask);
handler.messageReceived(request, taskTransportChannel, task);
unregisterTask = null;
} finally {
Releasables.close(unregisterTask);
}
}
// org.elasticsearch.action.get.TransportGetAction#asyncShardOperation
@Override
protected void asyncShardOperation(GetRequest request, ShardId shardId, ActionListener<GetResponse> listener) throws IOException {
IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
IndexShard indexShard = indexService.getShard(shardId.id());
if (request.realtime()) { // we are not tied to a refresh cycle here anyway
super.asyncShardOperation(request, shardId, listener);
} else {
indexShard.awaitShardSearchActive(b -> {
try {
super.asyncShardOperation(request, shardId, listener);
} catch (Exception ex) {
listener.onFailure(ex);
}
});
}
}
// org.elasticsearch.action.support.single.shard.TransportSingleShardAction#asyncShardOperation
protected void asyncShardOperation(Request request, ShardId shardId, ActionListener<Response> listener) throws IOException {
threadPool.executor(getExecutor(request, shardId))
.execute(ActionRunnable.supply(listener, () -> shardOperation(request, shardId)));
}
// org.elasticsearch.action.get.TransportGetAction#shardOperation
// 该方法为实现get操作的真正方法
@Override
protected GetResponse shardOperation(GetRequest request, ShardId shardId) {
IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
IndexShard indexShard = indexService.getShard(shardId.id());
if (request.refresh() && !request.realtime()) {
indexShard.refresh("refresh_flag_get");
}
// 核心 get 实现
GetResult result = indexShard.getService().get(request.type(), request.id(), request.storedFields(),
request.realtime(), request.version(), request.versionType(), request.fetchSourceContext());
// 使用 GetResponse 包装结果返回,方便结果响应输出
return new GetResponse(result);
}
以上,是 get 的处理框架,基本原理就是先看是否有路由配置,如果有按其规则来,如果没有直接向某个节点发起请求,即可获取到数据。整个get请求前置的转发过程,因为考虑大量的远程调用与复用,显示比较复杂。其中,es中大量使用了包装器模式和观察者模式,值得我们注意和学习。到最后最核心的get的实现,其实是 shardOperation() 方法。它又是如何与lucene交互获取数据的呢?稍后见分晓。
路由是es的一个我特性,而shard则是一个核心概念。接下来,我们看两个细节,即 es get 如何处理路由 和es get 如何处理shard问题。
代码语言:javascript复制// 处理路由问题, 请求方式如: curl -XGET 'http://localhost:9200/test_index/job/1?routing=user123'
// org.elasticsearch.action.get.TransportGetAction#resolveRequest
@Override
protected void resolveRequest(ClusterState state, InternalRequest request) {
// update the routing (request#index here is possibly an alias)
// 取出默认的 routing 规则,或者校验传入的routing规则
request.request().routing(state.metadata().resolveIndexRouting(request.request().routing(), request.request().index()));
// Fail fast on the node that received the request.
if (request.request().routing() == null && state.getMetadata().routingRequired(request.concreteIndex())) {
throw new RoutingMissingException(request.concreteIndex(), request.request().type(), request.request().id());
}
}
// org.elasticsearch.cluster.metadata.Metadata#resolveIndexRouting
/**
* Returns indexing routing for the given index.
*/
// TODO: This can be moved to IndexNameExpressionResolver too, but this means that we will support wildcards and other expressions
// in the index,bulk,update and delete apis.
public String resolveIndexRouting(@Nullable String routing, String aliasOrIndex) {
if (aliasOrIndex == null) {
return routing;
}
// 获取索引对应的元数据信息,用于判定是否是别名型索引
// 非别名型索引,routing 不变
IndexAbstraction result = getIndicesLookup().get(aliasOrIndex);
if (result == null || result.getType() != IndexAbstraction.Type.ALIAS) {
return routing;
}
IndexAbstraction.Alias alias = (IndexAbstraction.Alias) result;
if (result.getIndices().size() > 1) {
rejectSingleIndexOperation(aliasOrIndex, result);
}
AliasMetadata aliasMd = alias.getFirstAliasMetadata();
if (aliasMd.indexRouting() != null) {
// 一个 alias 不允许有多个 routing key
if (aliasMd.indexRouting().indexOf(',') != -1) {
throw new IllegalArgumentException("index/alias [" aliasOrIndex "] provided with routing value ["
aliasMd.getIndexRouting() "] that resolved to several routing values, rejecting operation");
}
if (routing != null) {
if (!routing.equals(aliasMd.indexRouting())) {
throw new IllegalArgumentException("Alias [" aliasOrIndex "] has index routing associated with it ["
aliasMd.indexRouting() "], and was provided with routing value [" routing "], rejecting operation");
}
}
// Alias routing overrides the parent routing (if any).
return aliasMd.indexRouting();
}
return routing;
}
// 处理shard问题
// org.elasticsearch.action.get.TransportGetAction#shards
@Override
protected ShardIterator shards(ClusterState state, InternalRequest request) {
return clusterService.operationRouting()
.getShards(clusterService.state(), request.concreteIndex(), request.request().id(), request.request().routing(),
request.request().preference());
}
// org.elasticsearch.cluster.routing.OperationRouting#getShards
public ShardIterator getShards(ClusterState clusterState, String index, String id, @Nullable String routing,
@Nullable String preference) {
// 先获取shardingTable, 再生成Shard迭代器
return preferenceActiveShardIterator(shards(clusterState, index, id, routing), clusterState.nodes().getLocalNodeId(),
clusterState.nodes(), preference, null, null);
}
// org.elasticsearch.cluster.routing.OperationRouting#shards
protected IndexShardRoutingTable shards(ClusterState clusterState, String index, String id, String routing) {
int shardId = generateShardId(indexMetadata(clusterState, index), id, routing);
return clusterState.getRoutingTable().shardRoutingTable(index, shardId);
}
// org.elasticsearch.cluster.routing.OperationRouting#indexMetadata
protected IndexMetadata indexMetadata(ClusterState clusterState, String index) {
IndexMetadata indexMetadata = clusterState.metadata().index(index);
if (indexMetadata == null) {
throw new IndexNotFoundException(index);
}
return indexMetadata;
}
// 生成shardId
// org.elasticsearch.cluster.routing.OperationRouting#generateShardId
public static int generateShardId(IndexMetadata indexMetadata, @Nullable String id, @Nullable String routing) {
final String effectiveRouting;
final int partitionOffset;
if (routing == null) {
assert(indexMetadata.isRoutingPartitionedIndex() == false) : "A routing value is required for gets from a partitioned index";
effectiveRouting = id;
} else {
effectiveRouting = routing;
}
if (indexMetadata.isRoutingPartitionedIndex()) {
partitionOffset = Math.floorMod(Murmur3HashFunction.hash(id), indexMetadata.getRoutingPartitionSize());
} else {
// we would have still got 0 above but this check just saves us an unnecessary hash calculation
partitionOffset = 0;
}
return calculateScaledShardId(indexMetadata, effectiveRouting, partitionOffset);
}
// org.elasticsearch.cluster.routing.OperationRouting#calculateScaledShardId
private static int calculateScaledShardId(IndexMetadata indexMetadata, String effectiveRouting, int partitionOffset) {
final int hash = Murmur3HashFunction.hash(effectiveRouting) partitionOffset;
// we don't use IMD#getNumberOfShards since the index might have been shrunk such that we need to use the size
// of original index to hash documents
// routingFactor默认为1
return Math.floorMod(hash, indexMetadata.getRoutingNumShards()) / indexMetadata.getRoutingFactor();
}
// org.elasticsearch.cluster.routing.RoutingTable#shardRoutingTable(java.lang.String, int)
/**
* All shards for the provided index and shard id
* @return All the shard routing entries for the given index and shard id
* @throws IndexNotFoundException if provided index does not exist
* @throws ShardNotFoundException if provided shard id is unknown
*/
public IndexShardRoutingTable shardRoutingTable(String index, int shardId) {
IndexRoutingTable indexRouting = index(index);
if (indexRouting == null) {
throw new IndexNotFoundException(index);
}
return shardRoutingTable(indexRouting, shardId);
}
// org.elasticsearch.cluster.routing.RoutingTable#shardRoutingTable
/**
* Get's the {@link IndexShardRoutingTable} for the given shard id from the given {@link IndexRoutingTable}
* or throws a {@link ShardNotFoundException} if no shard by the given id is found in the IndexRoutingTable.
*
* @param indexRouting IndexRoutingTable
* @param shardId ShardId
* @return IndexShardRoutingTable
*/
public static IndexShardRoutingTable shardRoutingTable(IndexRoutingTable indexRouting, int shardId) {
IndexShardRoutingTable indexShard = indexRouting.shard(shardId);
if (indexShard == null) {
throw new ShardNotFoundException(new ShardId(indexRouting.getIndex(), shardId));
}
return indexShard;
}
// org.elasticsearch.cluster.routing.OperationRouting#preferenceActiveShardIterator
private ShardIterator preferenceActiveShardIterator(IndexShardRoutingTable indexShard, String localNodeId,
DiscoveryNodes nodes, @Nullable String preference,
@Nullable ResponseCollectorService collectorService,
@Nullable Map<String, Long> nodeCounts) {
if (preference == null || preference.isEmpty()) {
return shardRoutings(indexShard, nodes, collectorService, nodeCounts);
}
if (preference.charAt(0) == '_') {
Preference preferenceType = Preference.parse(preference);
if (preferenceType == Preference.SHARDS) {
// starts with _shards, so execute on specific ones
int index = preference.indexOf('|');
String shards;
if (index == -1) {
shards = preference.substring(Preference.SHARDS.type().length() 1);
} else {
shards = preference.substring(Preference.SHARDS.type().length() 1, index);
}
String[] ids = Strings.splitStringByCommaToArray(shards);
boolean found = false;
for (String id : ids) {
if (Integer.parseInt(id) == indexShard.shardId().id()) {
found = true;
break;
}
}
if (!found) {
return null;
}
// no more preference
if (index == -1 || index == preference.length() - 1) {
return shardRoutings(indexShard, nodes, collectorService, nodeCounts);
} else {
// update the preference and continue
preference = preference.substring(index 1);
}
}
preferenceType = Preference.parse(preference);
switch (preferenceType) {
case PREFER_NODES:
final Set<String> nodesIds =
Arrays.stream(
preference.substring(Preference.PREFER_NODES.type().length() 1).split(",")
).collect(Collectors.toSet());
return indexShard.preferNodeActiveInitializingShardsIt(nodesIds);
case LOCAL:
return indexShard.preferNodeActiveInitializingShardsIt(Collections.singleton(localNodeId));
case ONLY_LOCAL:
return indexShard.onlyNodeActiveInitializingShardsIt(localNodeId);
case ONLY_NODES:
String nodeAttributes = preference.substring(Preference.ONLY_NODES.type().length() 1);
return indexShard.onlyNodeSelectorActiveInitializingShardsIt(nodeAttributes.split(","), nodes);
default:
throw new IllegalArgumentException("unknown preference [" preferenceType "]");
}
}
// if not, then use it as the index
int routingHash = Murmur3HashFunction.hash(preference);
if (nodes.getMinNodeVersion().onOrAfter(Version.V_6_0_0_alpha1)) {
// The AllocationService lists shards in a fixed order based on nodes
// so earlier versions of this class would have a tendency to
// select the same node across different shardIds.
// Better overall balancing can be achieved if each shardId opts
// for a different element in the list by also incorporating the
// shard ID into the hash of the user-supplied preference key.
routingHash = 31 * routingHash indexShard.shardId.hashCode();
}
if (awarenessAttributes.isEmpty()) {
return indexShard.activeInitializingShardsIt(routingHash);
} else {
return indexShard.preferAttributesActiveInitializingShardsIt(awarenessAttributes, nodes, routingHash);
}
}
private ShardIterator shardRoutings(IndexShardRoutingTable indexShard, DiscoveryNodes nodes,
@Nullable ResponseCollectorService collectorService, @Nullable Map<String, Long> nodeCounts) {
if (awarenessAttributes.isEmpty()) {
if (useAdaptiveReplicaSelection) {
return indexShard.activeInitializingShardsRankedIt(collectorService, nodeCounts);
} else {
return indexShard.activeInitializingShardsRandomIt();
}
} else {
return indexShard.preferAttributesActiveInitializingShardsIt(awarenessAttributes, nodes);
}
}
// org.elasticsearch.cluster.routing.IndexShardRoutingTable#activeInitializingShardsRankedIt
/**
* Returns an iterator over active and initializing shards, ordered by the adaptive replica
* selection formula. Making sure though that its random within the active shards of the same
* (or missing) rank, and initializing shards are the last to iterate through.
*/
public ShardIterator activeInitializingShardsRankedIt(@Nullable ResponseCollectorService collector,
@Nullable Map<String, Long> nodeSearchCounts) {
final int seed = shuffler.nextSeed();
if (allInitializingShards.isEmpty()) {
return new PlainShardIterator(shardId,
rankShardsAndUpdateStats(shuffler.shuffle(activeShards, seed), collector, nodeSearchCounts));
}
ArrayList<ShardRouting> ordered = new ArrayList<>(activeShards.size() allInitializingShards.size());
List<ShardRouting> rankedActiveShards =
rankShardsAndUpdateStats(shuffler.shuffle(activeShards, seed), collector, nodeSearchCounts);
ordered.addAll(rankedActiveShards);
List<ShardRouting> rankedInitializingShards =
rankShardsAndUpdateStats(allInitializingShards, collector, nodeSearchCounts);
ordered.addAll(rankedInitializingShards);
return new PlainShardIterator(shardId, ordered);
}
以上,是 get 的处理框架,基本原理就是先看是否有路由配置,如果有按其规则来,如果没有直接向某个节点发起请求,即可获取到数据。整个get请求前置的转发过程,因为考虑大量的远程调用与复用,显示比较复杂。其中,es中大量使用了包装器模式和观察者模式,值得我们注意和学习。到最后最核心的get的实现,其实是 shardOperation() 方法。它又是如何与lucene交互获取数据的呢?稍后见分晓。
路由是es的一个我特性,而shard则是一个核心概念。接下来,我们看两个细节,即 es get 如何处理路由 和es get 如何处理shard问题。
代码语言:javascript复制// 处理路由问题, 请求方式如: curl -XGET 'http://localhost:9200/test_index/job/1?routing=user123'
// org.elasticsearch.action.get.TransportGetAction#resolveRequest
@Override
protected void resolveRequest(ClusterState state, InternalRequest request) {
// update the routing (request#index here is possibly an alias)
// 取出默认的 routing 规则,或者校验传入的routing规则
request.request().routing(state.metadata().resolveIndexRouting(request.request().routing(), request.request().index()));
// Fail fast on the node that received the request.
if (request.request().routing() == null && state.getMetadata().routingRequired(request.concreteIndex())) {
throw new RoutingMissingException(request.concreteIndex(), request.request().type(), request.request().id());
}
}
// org.elasticsearch.cluster.metadata.Metadata#resolveIndexRouting
/**
* Returns indexing routing for the given index.
*/
// TODO: This can be moved to IndexNameExpressionResolver too, but this means that we will support wildcards and other expressions
// in the index,bulk,update and delete apis.
public String resolveIndexRouting(@Nullable String routing, String aliasOrIndex) {
if (aliasOrIndex == null) {
return routing;
}
// 获取索引对应的元数据信息,用于判定是否是别名型索引
// 非别名型索引,routing 不变
IndexAbstraction result = getIndicesLookup().get(aliasOrIndex);
if (result == null || result.getType() != IndexAbstraction.Type.ALIAS) {
return routing;
}
IndexAbstraction.Alias alias = (IndexAbstraction.Alias) result;
if (result.getIndices().size() > 1) {
rejectSingleIndexOperation(aliasOrIndex, result);
}
AliasMetadata aliasMd = alias.getFirstAliasMetadata();
if (aliasMd.indexRouting() != null) {
// 一个 alias 不允许有多个 routing key
if (aliasMd.indexRouting().indexOf(',') != -1) {
throw new IllegalArgumentException("index/alias [" aliasOrIndex "] provided with routing value ["
aliasMd.getIndexRouting() "] that resolved to several routing values, rejecting operation");
}
if (routing != null) {
if (!routing.equals(aliasMd.indexRouting())) {
throw new IllegalArgumentException("Alias [" aliasOrIndex "] has index routing associated with it ["
aliasMd.indexRouting() "], and was provided with routing value [" routing "], rejecting operation");
}
}
// Alias routing overrides the parent routing (if any).
return aliasMd.indexRouting();
}
return routing;
}
// 处理shard问题
// org.elasticsearch.action.get.TransportGetAction#shards
@Override
protected ShardIterator shards(ClusterState state, InternalRequest request) {
return clusterService.operationRouting()
.getShards(clusterService.state(), request.concreteIndex(), request.request().id(), request.request().routing(),
request.request().preference());
}
// org.elasticsearch.cluster.routing.OperationRouting#getShards
public ShardIterator getShards(ClusterState clusterState, String index, String id, @Nullable String routing,
@Nullable String preference) {
// 先获取shardingTable, 再生成Shard迭代器
return preferenceActiveShardIterator(shards(clusterState, index, id, routing), clusterState.nodes().getLocalNodeId(),
clusterState.nodes(), preference, null, null);
}
// org.elasticsearch.cluster.routing.OperationRouting#shards
protected IndexShardRoutingTable shards(ClusterState clusterState, String index, String id, String routing) {
int shardId = generateShardId(indexMetadata(clusterState, index), id, routing);
return clusterState.getRoutingTable().shardRoutingTable(index, shardId);
}
// org.elasticsearch.cluster.routing.OperationRouting#indexMetadata
protected IndexMetadata indexMetadata(ClusterState clusterState, String index) {
IndexMetadata indexMetadata = clusterState.metadata().index(index);
if (indexMetadata == null) {
throw new IndexNotFoundException(index);
}
return indexMetadata;
}
// 生成shardId
// org.elasticsearch.cluster.routing.OperationRouting#generateShardId
public static int generateShardId(IndexMetadata indexMetadata, @Nullable String id, @Nullable String routing) {
final String effectiveRouting;
final int partitionOffset;
if (routing == null) {
assert(indexMetadata.isRoutingPartitionedIndex() == false) : "A routing value is required for gets from a partitioned index";
effectiveRouting = id;
} else {
effectiveRouting = routing;
}
if (indexMetadata.isRoutingPartitionedIndex()) {
partitionOffset = Math.floorMod(Murmur3HashFunction.hash(id), indexMetadata.getRoutingPartitionSize());
} else {
// we would have still got 0 above but this check just saves us an unnecessary hash calculation
partitionOffset = 0;
}
return calculateScaledShardId(indexMetadata, effectiveRouting, partitionOffset);
}
// org.elasticsearch.cluster.routing.OperationRouting#calculateScaledShardId
private static int calculateScaledShardId(IndexMetadata indexMetadata, String effectiveRouting, int partitionOffset) {
final int hash = Murmur3HashFunction.hash(effectiveRouting) partitionOffset;
// we don't use IMD#getNumberOfShards since the index might have been shrunk such that we need to use the size
// of original index to hash documents
// routingFactor默认为1
return Math.floorMod(hash, indexMetadata.getRoutingNumShards()) / indexMetadata.getRoutingFactor();
}
// org.elasticsearch.cluster.routing.RoutingTable#shardRoutingTable(java.lang.String, int)
/**
* All shards for the provided index and shard id
* @return All the shard routing entries for the given index and shard id
* @throws IndexNotFoundException if provided index does not exist
* @throws ShardNotFoundException if provided shard id is unknown
*/
public IndexShardRoutingTable shardRoutingTable(String index, int shardId) {
IndexRoutingTable indexRouting = index(index);
if (indexRouting == null) {
throw new IndexNotFoundException(index);
}
return shardRoutingTable(indexRouting, shardId);
}
// org.elasticsearch.cluster.routing.RoutingTable#shardRoutingTable
/**
* Get's the {@link IndexShardRoutingTable} for the given shard id from the given {@link IndexRoutingTable}
* or throws a {@link ShardNotFoundException} if no shard by the given id is found in the IndexRoutingTable.
*
* @param indexRouting IndexRoutingTable
* @param shardId ShardId
* @return IndexShardRoutingTable
*/
public static IndexShardRoutingTable shardRoutingTable(IndexRoutingTable indexRouting, int shardId) {
IndexShardRoutingTable indexShard = indexRouting.shard(shardId);
if (indexShard == null) {
throw new ShardNotFoundException(new ShardId(indexRouting.getIndex(), shardId));
}
return indexShard;
}
// org.elasticsearch.cluster.routing.OperationRouting#preferenceActiveShardIterator
private ShardIterator preferenceActiveShardIterator(IndexShardRoutingTable indexShard, String localNodeId,
DiscoveryNodes nodes, @Nullable String preference,
@Nullable ResponseCollectorService collectorService,
@Nullable Map<String, Long> nodeCounts) {
if (preference == null || preference.isEmpty()) {
return shardRoutings(indexShard, nodes, collectorService, nodeCounts);
}
if (preference.charAt(0) == '_') {
Preference preferenceType = Preference.parse(preference);
if (preferenceType == Preference.SHARDS) {
// starts with _shards, so execute on specific ones
int index = preference.indexOf('|');
String shards;
if (index == -1) {
shards = preference.substring(Preference.SHARDS.type().length() 1);
} else {
shards = preference.substring(Preference.SHARDS.type().length() 1, index);
}
String[] ids = Strings.splitStringByCommaToArray(shards);
boolean found = false;
for (String id : ids) {
if (Integer.parseInt(id) == indexShard.shardId().id()) {
found = true;
break;
}
}
if (!found) {
return null;
}
// no more preference
if (index == -1 || index == preference.length() - 1) {
return shardRoutings(indexShard, nodes, collectorService, nodeCounts);
} else {
// update the preference and continue
preference = preference.substring(index 1);
}
}
preferenceType = Preference.parse(preference);
switch (preferenceType) {
case PREFER_NODES:
final Set<String> nodesIds =
Arrays.stream(
preference.substring(Preference.PREFER_NODES.type().length() 1).split(",")
).collect(Collectors.toSet());
return indexShard.preferNodeActiveInitializingShardsIt(nodesIds);
case LOCAL:
return indexShard.preferNodeActiveInitializingShardsIt(Collections.singleton(localNodeId));
case ONLY_LOCAL:
return indexShard.onlyNodeActiveInitializingShardsIt(localNodeId);
case ONLY_NODES:
String nodeAttributes = preference.substring(Preference.ONLY_NODES.type().length() 1);
return indexShard.onlyNodeSelectorActiveInitializingShardsIt(nodeAttributes.split(","), nodes);
default:
throw new IllegalArgumentException("unknown preference [" preferenceType "]");
}
}
// if not, then use it as the index
int routingHash = Murmur3HashFunction.hash(preference);
if (nodes.getMinNodeVersion().onOrAfter(Version.V_6_0_0_alpha1)) {
// The AllocationService lists shards in a fixed order based on nodes
// so earlier versions of this class would have a tendency to
// select the same node across different shardIds.
// Better overall balancing can be achieved if each shardId opts
// for a different element in the list by also incorporating the
// shard ID into the hash of the user-supplied preference key.
routingHash = 31 * routingHash indexShard.shardId.hashCode();
}
if (awarenessAttributes.isEmpty()) {
return indexShard.activeInitializingShardsIt(routingHash);
} else {
return indexShard.preferAttributesActiveInitializingShardsIt(awarenessAttributes, nodes, routingHash);
}
}
private ShardIterator shardRoutings(IndexShardRoutingTable indexShard, DiscoveryNodes nodes,
@Nullable ResponseCollectorService collectorService, @Nullable Map<String, Long> nodeCounts) {
if (awarenessAttributes.isEmpty()) {
if (useAdaptiveReplicaSelection) {
return indexShard.activeInitializingShardsRankedIt(collectorService, nodeCounts);
} else {
return indexShard.activeInitializingShardsRandomIt();
}
} else {
return indexShard.preferAttributesActiveInitializingShardsIt(awarenessAttributes, nodes);
}
}
// org.elasticsearch.cluster.routing.IndexShardRoutingTable#activeInitializingShardsRankedIt
/**
* Returns an iterator over active and initializing shards, ordered by the adaptive replica
* selection formula. Making sure though that its random within the active shards of the same
* (or missing) rank, and initializing shards are the last to iterate through.
*/
public ShardIterator activeInitializingShardsRankedIt(@Nullable ResponseCollectorService collector,
@Nullable Map<String, Long> nodeSearchCounts) {
final int seed = shuffler.nextSeed();
if (allInitializingShards.isEmpty()) {
return new PlainShardIterator(shardId,
rankShardsAndUpdateStats(shuffler.shuffle(activeShards, seed), collector, nodeSearchCounts));
}
ArrayList<ShardRouting> ordered = new ArrayList<>(activeShards.size() allInitializingShards.size());
List<ShardRouting> rankedActiveShards =
rankShardsAndUpdateStats(shuffler.shuffle(activeShards, seed), collector, nodeSearchCounts);
ordered.addAll(rankedActiveShards);
List<ShardRouting> rankedInitializingShards =
rankShardsAndUpdateStats(allInitializingShards, collector, nodeSearchCounts);
ordered.addAll(rankedInitializingShards);
return new PlainShardIterator(shardId, ordered);
}
4. get的内部逻辑
它是在 TransportGetAction 的 shardOperation() 中实现的同步调用:
先来看个整体的调用时序图:
get查找核心时序图
代码语言:javascript复制// org.elasticsearch.action.get.TransportGetAction#shardOperation
// 该方法为实现get操作的真正方法
@Override
protected GetResponse shardOperation(GetRequest request, ShardId shardId) {
IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
IndexShard indexShard = indexService.getShard(shardId.id());
if (request.refresh() && !request.realtime()) {
indexShard.refresh("refresh_flag_get");
}
// 核心 get 实现
GetResult result = indexShard.getService().get(request.type(), request.id(), request.storedFields(),
request.realtime(), request.version(), request.versionType(), request.fetchSourceContext());
return new GetResponse(result);
}
// org.elasticsearch.indices.IndicesService#indexServiceSafe
/**
* Returns an IndexService for the specified index if exists otherwise a {@link IndexNotFoundException} is thrown.
*/
public IndexService indexServiceSafe(Index index) {
IndexService indexService = indices.get(index.getUUID());
if (indexService == null) {
throw new IndexNotFoundException(index);
}
assert indexService.indexUUID().equals(index.getUUID()) : "uuid mismatch local: " indexService.indexUUID()
" incoming: " index.getUUID();
return indexService;
}
// org.elasticsearch.index.IndexService#getShard
/**
* Return the shard with the provided id, or throw an exception if it doesn't exist.
*/
public IndexShard getShard(int shardId) {
IndexShard indexShard = getShardOrNull(shardId);
if (indexShard == null) {
throw new ShardNotFoundException(new ShardId(index(), shardId));
}
return indexShard;
}
// org.elasticsearch.index.shard.IndexShard#getService
public ShardGetService getService() {
return this.getService;
}
// org.elasticsearch.index.get.ShardGetService#get
public GetResult get(String type, String id, String[] gFields, boolean realtime, long version,
VersionType versionType, FetchSourceContext fetchSourceContext) {
return
get(type, id, gFields, realtime, version, versionType, UNASSIGNED_SEQ_NO, UNASSIGNED_PRIMARY_TERM, fetchSourceContext);
}
// org.elasticsearch.index.get.ShardGetService#get
private GetResult get(String type, String id, String[] gFields, boolean realtime, long version, VersionType versionType,
long ifSeqNo, long ifPrimaryTerm, FetchSourceContext fetchSourceContext) {
currentMetric.inc();
try {
long now = System.nanoTime();
GetResult getResult =
innerGet(type, id, gFields, realtime, version, versionType, ifSeqNo, ifPrimaryTerm, fetchSourceContext);
if (getResult.isExists()) {
existsMetric.inc(System.nanoTime() - now);
} else {
missingMetric.inc(System.nanoTime() - now);
}
return getResult;
} finally {
currentMetric.dec();
}
}
// org.elasticsearch.index.get.ShardGetService#innerGet
private GetResult innerGet(String type, String id, String[] gFields, boolean realtime, long version, VersionType versionType,
long ifSeqNo, long ifPrimaryTerm, FetchSourceContext fetchSourceContext) {
// 查询字段信息校验
fetchSourceContext = normalizeFetchSourceContent(fetchSourceContext, gFields);
// 支持无type搜索,即丢弃老版本的 type 字段, 一个索引下全是数据
if (type == null || type.equals("_all")) {
DocumentMapper mapper = mapperService.documentMapper();
type = mapper == null ? null : mapper.type();
}
Engine.GetResult get = null;
if (type != null) {
Term uidTerm = new Term(IdFieldMapper.NAME, Uid.encodeId(id));
// 链式构造一个 Get 实例,调用 get() 方法查找doc
// 其中 indexShard 是前面根据shardId 推断出来的, 也是基于一个doc只能在一个shard上的前提
get = indexShard.get(new Engine.Get(realtime, realtime, type, id, uidTerm)
.version(version).versionType(versionType).setIfSeqNo(ifSeqNo).setIfPrimaryTerm(ifPrimaryTerm));
assert get.isFromTranslog() == false || realtime : "should only read from translog if realtime enabled";
if (get.exists() == false) {
get.close();
}
}
if (get == null || get.exists() == false) {
return new GetResult(shardId.getIndexName(), type, id, UNASSIGNED_SEQ_NO, UNASSIGNED_PRIMARY_TERM, -1, false, null, null, null);
}
try {
// break between having loaded it from translog (so we only have _source), and having a document to load
// 根据docId 查询具体的doc信息
return innerGetLoadFromStoredFields(type, id, gFields, fetchSourceContext, get, mapperService);
} finally {
get.close();
}
}
/**
* decides what needs to be done based on the request input and always returns a valid non-null FetchSourceContext
*/
private FetchSourceContext normalizeFetchSourceContent(@Nullable FetchSourceContext context, @Nullable String[] gFields) {
if (context != null) {
return context;
}
if (gFields == null) {
return FetchSourceContext.FETCH_SOURCE;
}
for (String field : gFields) {
if (SourceFieldMapper.NAME.equals(field)) {
return FetchSourceContext.FETCH_SOURCE;
}
}
return FetchSourceContext.DO_NOT_FETCH_SOURCE;
}
// org.elasticsearch.index.shard.IndexShard#get
public Engine.GetResult get(Engine.Get get) {
// 当前集群状态判定, 是否可读, 如果是 red 状态则是不可读的
readAllowed();
DocumentMapper mapper = mapperService.documentMapper();
if (mapper == null || mapper.type().equals(mapperService.resolveDocumentType(get.type())) == false) {
return GetResult.NOT_EXISTS;
}
// wrapSearcher 实现查询结果封装
return getEngine().get(get, mapper, this::wrapSearcher);
}
// org.elasticsearch.index.shard.IndexShard#readAllowed
public void readAllowed() throws IllegalIndexShardStateException {
IndexShardState state = this.state; // one time volatile read
if (readAllowedStates.contains(state) == false) {
throw new IllegalIndexShardStateException(shardId, state, "operations only allowed when shard state is one of "
readAllowedStates.toString());
}
}
// org.elasticsearch.index.shard.IndexShard#getEngine
Engine getEngine() {
Engine engine = getEngineOrNull();
if (engine == null) {
throw new AlreadyClosedException("engine is closed");
}
return engine;
}
/**
* NOTE: returns null if engine is not yet started (e.g. recovery phase 1, copying over index files, is still running), or if engine is
* closed.
*/
protected Engine getEngineOrNull() {
return this.currentEngineReference.get();
}
private Engine.Searcher wrapSearcher(Engine.Searcher searcher) {
assert ElasticsearchDirectoryReader.unwrap(searcher.getDirectoryReader())
!= null : "DirectoryReader must be an instance or ElasticsearchDirectoryReader";
boolean success = false;
try {
final Engine.Searcher newSearcher = readerWrapper == null ? searcher : wrapSearcher(searcher, readerWrapper);
assert newSearcher != null;
success = true;
return newSearcher;
} catch (IOException ex) {
throw new ElasticsearchException("failed to wrap searcher", ex);
} finally {
if (success == false) {
Releasables.close(success, searcher);
}
}
}
// org.elasticsearch.index.engine.InternalEngine#get
@Override
public GetResult get(Get get, DocumentMapper mapper, Function<Engine.Searcher, Engine.Searcher> searcherWrapper) {
assert Objects.equals(get.uid().field(), IdFieldMapper.NAME) : get.uid().field();
// 使用 ReentrantReadWriteLock 实现并发锁
try (ReleasableLock ignored = readLock.acquire()) {
// 确保未close
ensureOpen();
if (get.realtime()) {
final VersionValue versionValue;
try (Releasable ignore = versionMap.acquireLock(get.uid().bytes())) {
// we need to lock here to access the version map to do this truly in RT
versionValue = getVersionFromMap(get.uid().bytes());
}
// 版本号为空,以下大片逻辑就不走了
if (versionValue != null) {
if (versionValue.isDelete()) {
return GetResult.NOT_EXISTS;
}
if (get.versionType().isVersionConflictForReads(versionValue.version, get.version())) {
throw new VersionConflictEngineException(shardId, get.id(),
get.versionType().explainConflictForReads(versionValue.version, get.version()));
}
if (get.getIfSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO && (
get.getIfSeqNo() != versionValue.seqNo || get.getIfPrimaryTerm() != versionValue.term
)) {
throw new VersionConflictEngineException(shardId, get.id(),
get.getIfSeqNo(), get.getIfPrimaryTerm(), versionValue.seqNo, versionValue.term);
}
if (get.isReadFromTranslog()) {
// this is only used for updates - API _GET calls will always read form a reader for consistency
// the update call doesn't need the consistency since it's source only _parent but parent can go away in 7.0
if (versionValue.getLocation() != null) {
try {
final Translog.Operation operation = translog.readOperation(versionValue.getLocation());
if (operation != null) {
return getFromTranslog(get, (Translog.Index) operation, mapper, searcherWrapper);
}
} catch (IOException e) {
maybeFailEngine("realtime_get", e); // lets check if the translog has failed with a tragic event
throw new EngineException(shardId, "failed to read operation from translog", e);
}
} else {
trackTranslogLocation.set(true);
}
}
assert versionValue.seqNo >= 0 : versionValue;
refreshIfNeeded("realtime_get", versionValue.seqNo);
}
return getFromSearcher(get, acquireSearcher("realtime_get", SearcherScope.INTERNAL, searcherWrapper));
} else {
// we expose what has been externally expose in a point in time snapshot via an explicit refresh
return getFromSearcher(get, acquireSearcher("get", SearcherScope.EXTERNAL, searcherWrapper));
}
}
}
// org.elasticsearch.index.engine.Engine#ensureOpen()
protected final void ensureOpen() {
ensureOpen(null);
}
protected final void ensureOpen(Exception suppressed) {
if (isClosed.get()) {
AlreadyClosedException ace = new AlreadyClosedException(shardId " engine is closed", failedEngine.get());
if (suppressed != null) {
ace.addSuppressed(suppressed);
}
throw ace;
}
}
// org.elasticsearch.index.engine.LiveVersionMap#acquireLock
/**
* Acquires a releaseable lock for the given uId. All *UnderLock methods require
* this lock to be hold by the caller otherwise the visibility guarantees of this version
* map are broken. We assert on this lock to be hold when calling these methods.
* @see KeyedLock
*/
Releasable acquireLock(BytesRef uid) {
return keyedLock.acquire(uid);
}
// org.elasticsearch.common.util.concurrent.KeyedLock#acquire
/**
* Acquires a lock for the given key. The key is compared by it's equals method not by object identity. The lock can be acquired
* by the same thread multiple times. The lock is released by closing the returned {@link Releasable}.
*/
public Releasable acquire(T key) {
while (true) {
KeyLock perNodeLock = map.get(key);
if (perNodeLock == null) {
ReleasableLock newLock = tryCreateNewLock(key);
if (newLock != null) {
return newLock;
}
} else {
assert perNodeLock != null;
int i = perNodeLock.count.get();
// 最终基于 ReentrantLock 实现锁, KeyLock 实现了计数
if (i > 0 && perNodeLock.count.compareAndSet(i, i 1)) {
perNodeLock.lock();
return new ReleasableLock(key, perNodeLock);
}
}
}
}
// org.elasticsearch.common.util.concurrent.KeyedLock#tryCreateNewLock
private ReleasableLock tryCreateNewLock(T key) {
// KeyLock extends ReentrantLock
KeyLock newLock = new KeyLock(fair);
newLock.lock();
KeyLock keyLock = map.putIfAbsent(key, newLock);
if (keyLock == null) {
return new ReleasableLock(key, newLock);
}
return null;
}
// org.elasticsearch.index.engine.InternalEngine#getVersionFromMap
private VersionValue getVersionFromMap(BytesRef id) {
if (versionMap.isUnsafe()) {
synchronized (versionMap) {
// we are switching from an unsafe map to a safe map. This might happen concurrently
// but we only need to do this once since the last operation per ID is to add to the version
// map so once we pass this point we can safely lookup from the version map.
if (versionMap.isUnsafe()) {
refresh("unsafe_version_map", SearcherScope.INTERNAL, true);
}
versionMap.enforceSafeAccess();
}
}
return versionMap.getUnderLock(id);
}
// org.elasticsearch.index.engine.LiveVersionMap#getUnderLock(org.apache.lucene.util.BytesRef)
/**
* Returns the live version (add or delete) for this uid.
*/
VersionValue getUnderLock(final BytesRef uid) {
return getUnderLock(uid, maps);
}
private VersionValue getUnderLock(final BytesRef uid, Maps currentMaps) {
assert assertKeyedLockHeldByCurrentThread(uid);
// First try to get the "live" value:
VersionValue value = currentMaps.current.get(uid);
if (value != null) {
return value;
}
value = currentMaps.old.get(uid);
if (value != null) {
return value;
}
// 返回 null
return tombstones.get(uid);
}
// org.elasticsearch.index.engine.Engine#acquireSearcher
public Searcher acquireSearcher(String source, SearcherScope scope, Function<Searcher, Searcher> wrapper) throws EngineException {
SearcherSupplier releasable = null;
try {
// 获取 searcher , soure=realtime_get
SearcherSupplier reader = releasable = acquireSearcherSupplier(wrapper, scope);
Searcher searcher = reader.acquireSearcher(source);
releasable = null;
return new Searcher(source, searcher.getDirectoryReader(), searcher.getSimilarity(),
searcher.getQueryCache(), searcher.getQueryCachingPolicy(), () -> Releasables.close(searcher, reader));
} finally {
Releasables.close(releasable);
}
}
// org.elasticsearch.index.engine.Engine#acquireSearcherSupplier
/**
* Acquires a point-in-time reader that can be used to create {@link Engine.Searcher}s on demand.
*/
public SearcherSupplier acquireSearcherSupplier(Function<Searcher, Searcher> wrapper, SearcherScope scope) throws EngineException {
/* Acquire order here is store -> manager since we need
* to make sure that the store is not closed before
* the searcher is acquired. */
if (store.tryIncRef() == false) {
throw new AlreadyClosedException(shardId " store is closed", failedEngine.get());
}
Releasable releasable = store::decRef;
try {
// 引用计数 1
ReferenceManager<ElasticsearchDirectoryReader> referenceManager = getReferenceManager(scope);
ElasticsearchDirectoryReader acquire = referenceManager.acquire();
SearcherSupplier reader = new SearcherSupplier(wrapper) {
@Override
public Searcher acquireSearcherInternal(String source) {
assert assertSearcherIsWarmedUp(source, scope);
return new Searcher(source, acquire, engineConfig.getSimilarity(), engineConfig.getQueryCache(),
engineConfig.getQueryCachingPolicy(), () -> {});
}
@Override
protected void doClose() {
try {
referenceManager.release(acquire);
} catch (IOException e) {
throw new UncheckedIOException("failed to close", e);
} catch (AlreadyClosedException e) {
// This means there's a bug somewhere: don't suppress it
throw new AssertionError(e);
} finally {
store.decRef();
}
}
};
releasable = null; // success - hand over the reference to the engine reader
return reader;
} catch (AlreadyClosedException ex) {
throw ex;
} catch (Exception ex) {
maybeFailEngine("acquire_reader", ex);
ensureOpen(ex); // throw EngineCloseException here if we are already closed
logger.error(() -> new ParameterizedMessage("failed to acquire reader"), ex);
throw new EngineException(shardId, "failed to acquire reader", ex);
} finally {
Releasables.close(releasable);
}
}
// org.elasticsearch.index.engine.Engine.SearcherSupplier#acquireSearcher
public final Searcher acquireSearcher(String source) {
if (released.get()) {
throw new AlreadyClosedException("SearcherSupplier was closed");
}
// 新建 searcher
final Searcher searcher = acquireSearcherInternal(source);
return CAN_MATCH_SEARCH_SOURCE.equals(source) ? searcher : wrapper.apply(searcher);
}
// org.elasticsearch.index.engine.Engine.Searcher#Searcher
public Searcher(String source, IndexReader reader,
Similarity similarity, QueryCache queryCache, QueryCachingPolicy queryCachingPolicy,
Closeable onClose) {
super(reader);
setSimilarity(similarity);
setQueryCache(queryCache);
setQueryCachingPolicy(queryCachingPolicy);
this.source = source;
this.onClose = onClose;
}
// org.elasticsearch.index.shard.IndexShard#wrapSearcher
private Engine.Searcher wrapSearcher(Engine.Searcher searcher) {
assert ElasticsearchDirectoryReader.unwrap(searcher.getDirectoryReader())
!= null : "DirectoryReader must be an instance or ElasticsearchDirectoryReader";
boolean success = false;
try {
final Engine.Searcher newSearcher = readerWrapper == null ? searcher : wrapSearcher(searcher, readerWrapper);
assert newSearcher != null;
success = true;
return newSearcher;
} catch (IOException ex) {
throw new ElasticsearchException("failed to wrap searcher", ex);
} finally {
if (success == false) {
Releasables.close(success, searcher);
}
}
}
// org.elasticsearch.index.shard.IndexShard#wrapSearcher
static Engine.Searcher wrapSearcher(Engine.Searcher engineSearcher,
CheckedFunction<DirectoryReader, DirectoryReader, IOException> readerWrapper) throws IOException {
assert readerWrapper != null;
final ElasticsearchDirectoryReader elasticsearchDirectoryReader =
ElasticsearchDirectoryReader.getElasticsearchDirectoryReader(engineSearcher.getDirectoryReader());
if (elasticsearchDirectoryReader == null) {
throw new IllegalStateException("Can't wrap non elasticsearch directory reader");
}
NonClosingReaderWrapper nonClosingReaderWrapper = new NonClosingReaderWrapper(engineSearcher.getDirectoryReader());
DirectoryReader reader = readerWrapper.apply(nonClosingReaderWrapper);
if (reader != nonClosingReaderWrapper) {
if (reader.getReaderCacheHelper() != elasticsearchDirectoryReader.getReaderCacheHelper()) {
throw new IllegalStateException("wrapped directory reader doesn't delegate IndexReader#getCoreCacheKey,"
" wrappers must override this method and delegate to the original readers core cache key. Wrapped readers can't be "
"used as cache keys since their are used only per request which would lead to subtle bugs");
}
if (ElasticsearchDirectoryReader.getElasticsearchDirectoryReader(reader) != elasticsearchDirectoryReader) {
// prevent that somebody wraps with a non-filter reader
throw new IllegalStateException("wrapped directory reader hides actual ElasticsearchDirectoryReader but shouldn't");
}
}
if (reader == nonClosingReaderWrapper) {
return engineSearcher;
} else {
// we close the reader to make sure wrappers can release resources if needed....
// our NonClosingReaderWrapper makes sure that our reader is not closed
return new Engine.Searcher(engineSearcher.source(), reader,
engineSearcher.getSimilarity(), engineSearcher.getQueryCache(), engineSearcher.getQueryCachingPolicy(),
() -> IOUtils.close(reader, // this will close the wrappers excluding the NonClosingReaderWrapper
engineSearcher)); // this will run the closeable on the wrapped engine reader
}
}
获取到searcher实例后,再由其进行get操作。
代码语言:javascript复制// org.elasticsearch.index.engine.Engine#getFromSearcher
protected final GetResult getFromSearcher(Get get, Engine.Searcher searcher) throws EngineException {
final DocIdAndVersion docIdAndVersion;
try {
// 读取docId, version, 其实就是核心的读数据实现之一
docIdAndVersion = VersionsAndSeqNoResolver.loadDocIdAndVersion(searcher.getIndexReader(), get.uid(), true);
} catch (Exception e) {
Releasables.closeWhileHandlingException(searcher);
//TODO: A better exception goes here
throw new EngineException(shardId, "Couldn't resolve version", e);
}
if (docIdAndVersion != null) {
// 检测版本是否冲突,如有则抛出异常
if (get.versionType().isVersionConflictForReads(docIdAndVersion.version, get.version())) {
Releasables.close(searcher);
throw new VersionConflictEngineException(shardId, get.id(),
get.versionType().explainConflictForReads(docIdAndVersion.version, get.version()));
}
if (get.getIfSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO && (
get.getIfSeqNo() != docIdAndVersion.seqNo || get.getIfPrimaryTerm() != docIdAndVersion.primaryTerm
)) {
Releasables.close(searcher);
throw new VersionConflictEngineException(shardId, get.id(),
get.getIfSeqNo(), get.getIfPrimaryTerm(), docIdAndVersion.seqNo, docIdAndVersion.primaryTerm);
}
}
if (docIdAndVersion != null) {
// don't release the searcher on this path, it is the
// responsibility of the caller to call GetResult.release
return new GetResult(searcher, docIdAndVersion, false);
} else {
Releasables.close(searcher);
return GetResult.NOT_EXISTS;
}
}
// org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver#loadDocIdAndVersion
/**
* Load the internal doc ID and version for the uid from the reader, returning<ul>
* <li>null if the uid wasn't found,
* <li>a doc ID and a version otherwise
* </ul>
*/
public static DocIdAndVersion loadDocIdAndVersion(IndexReader reader, Term term, boolean loadSeqNo) throws IOException {
// feild: _id
PerThreadIDVersionAndSeqNoLookup[] lookups = getLookupState(reader, term.field());
List<LeafReaderContext> leaves = reader.leaves();
// iterate backwards to optimize for the frequently updated documents
// which are likely to be in the last segments
for (int i = leaves.size() - 1; i >= 0; i--) {
final LeafReaderContext leaf = leaves.get(i);
PerThreadIDVersionAndSeqNoLookup lookup = lookups[leaf.ord];
DocIdAndVersion result = lookup.lookupVersion(term.bytes(), loadSeqNo, leaf);
if (result != null) {
return result;
}
}
return null;
}
// org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver#getLookupState
private static PerThreadIDVersionAndSeqNoLookup[] getLookupState(IndexReader reader, String uidField) throws IOException {
// We cache on the top level
// This means cache entries have a shorter lifetime, maybe as low as 1s with the
// default refresh interval and a steady indexing rate, but on the other hand it
// proved to be cheaper than having to perform a CHM and a TL get for every segment.
// See https://github.com/elastic/elasticsearch/pull/19856.
IndexReader.CacheHelper cacheHelper = reader.getReaderCacheHelper();
CloseableThreadLocal<PerThreadIDVersionAndSeqNoLookup[]> ctl = lookupStates.get(cacheHelper.getKey());
if (ctl == null) {
// First time we are seeing this reader's core; make a new CTL:
ctl = new CloseableThreadLocal<>();
CloseableThreadLocal<PerThreadIDVersionAndSeqNoLookup[]> other = lookupStates.putIfAbsent(cacheHelper.getKey(), ctl);
if (other == null) {
// Our CTL won, we must remove it when the reader is closed:
cacheHelper.addClosedListener(removeLookupState);
} else {
// Another thread beat us to it: just use their CTL:
ctl = other;
}
}
PerThreadIDVersionAndSeqNoLookup[] lookupState = ctl.get();
if (lookupState == null) {
lookupState = new PerThreadIDVersionAndSeqNoLookup[reader.leaves().size()];
for (LeafReaderContext leaf : reader.leaves()) {
lookupState[leaf.ord] = new PerThreadIDVersionAndSeqNoLookup(leaf.reader(), uidField);
}
ctl.set(lookupState);
}
if (lookupState.length != reader.leaves().size()) {
throw new AssertionError("Mismatched numbers of leaves: " lookupState.length " != " reader.leaves().size());
}
if (lookupState.length > 0 && Objects.equals(lookupState[0].uidField, uidField) == false) {
throw new AssertionError("Index does not consistently use the same uid field: ["
uidField "] != [" lookupState[0].uidField "]");
}
return lookupState;
}
// org.apache.lucene.index.IndexReader#leaves
/**
* Returns the reader's leaves, or itself if this reader is atomic.
* This is a convenience method calling {@code this.getContext().leaves()}.
* @see IndexReaderContext#leaves()
*/
public final List<LeafReaderContext> leaves() {
return getContext().leaves();
}
// org.apache.lucene.index.CompositeReader#getContext
@Override
public final CompositeReaderContext getContext() {
ensureOpen();
// lazy init without thread safety for perf reasons: Building the readerContext twice does not hurt!
if (readerContext == null) {
assert getSequentialSubReaders() != null;
readerContext = CompositeReaderContext.create(this);
}
return readerContext;
}
// org.apache.lucene.index.CompositeReaderContext#leaves
@Override
public List<LeafReaderContext> leaves() throws UnsupportedOperationException {
if (!isTopLevel)
throw new UnsupportedOperationException("This is not a top-level context.");
assert leaves != null;
return leaves;
}
// org.elasticsearch.common.lucene.uid.PerThreadIDVersionAndSeqNoLookup#lookupVersion
/** Return null if id is not found.
* We pass the {@link LeafReaderContext} as an argument so that things
* still work with reader wrappers that hide some documents while still
* using the same cache key. Otherwise we'd have to disable caching
* entirely for these readers.
*/
public DocIdAndVersion lookupVersion(BytesRef id, boolean loadSeqNo, LeafReaderContext context)
throws IOException {
assert context.reader().getCoreCacheHelper().getKey().equals(readerKey) :
"context's reader is not the same as the reader class was initialized on.";
// docId 是个整型?
int docID = getDocID(id, context);
if (docID != DocIdSetIterator.NO_MORE_DOCS) {
final long seqNo;
final long term;
if (loadSeqNo) {
seqNo = readNumericDocValues(context.reader(), SeqNoFieldMapper.NAME, docID);
term = readNumericDocValues(context.reader(), SeqNoFieldMapper.PRIMARY_TERM_NAME, docID);
} else {
seqNo = UNASSIGNED_SEQ_NO;
term = UNASSIGNED_PRIMARY_TERM;
}
final long version = readNumericDocValues(context.reader(), VersionFieldMapper.NAME, docID);
return new DocIdAndVersion(docID, version, seqNo, term, context.reader(), context.docBase);
} else {
return null;
}
}
// org.elasticsearch.common.lucene.uid.PerThreadIDVersionAndSeqNoLookup#getDocID
/**
* returns the internal lucene doc id for the given id bytes.
* {@link DocIdSetIterator#NO_MORE_DOCS} is returned if not found
* */
private int getDocID(BytesRef id, LeafReaderContext context) throws IOException {
// termsEnum can possibly be null here if this leaf contains only no-ops.
// lucene 接口: seekExact
if (termsEnum != null && termsEnum.seekExact(id)) {
final Bits liveDocs = context.reader().getLiveDocs();
int docID = DocIdSetIterator.NO_MORE_DOCS;
// there may be more than one matching docID, in the case of nested docs, so we want the last one:
docsEnum = termsEnum.postings(docsEnum, 0);
for (int d = docsEnum.nextDoc(); d != DocIdSetIterator.NO_MORE_DOCS; d = docsEnum.nextDoc()) {
if (liveDocs != null && liveDocs.get(d) == false) {
continue;
}
docID = d;
}
return docID;
} else {
return DocIdSetIterator.NO_MORE_DOCS;
}
}
// org.elasticsearch.common.lucene.uid.PerThreadIDVersionAndSeqNoLookup#readNumericDocValues
private static long readNumericDocValues(LeafReader reader, String field, int docId) throws IOException {
final NumericDocValues dv = reader.getNumericDocValues(field);
if (dv == null || dv.advanceExact(docId) == false) {
assert false : "document [" docId "] does not have docValues for [" field "]";
throw new IllegalStateException("document [" docId "] does not have docValues for [" field "]");
}
return dv.longValue();
}
// docId 包装
// org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver.DocIdAndVersion
/** Wraps an {@link LeafReaderContext}, a doc ID <b>relative to the context doc base</b> and a version. */
public static class DocIdAndVersion {
public final int docId;
public final long version;
public final long seqNo;
public final long primaryTerm;
public final LeafReader reader;
public final int docBase;
public DocIdAndVersion(int docId, long version, long seqNo, long primaryTerm, LeafReader reader, int docBase) {
this.docId = docId;
this.version = version;
this.seqNo = seqNo;
this.primaryTerm = primaryTerm;
this.reader = reader;
this.docBase = docBase;
}
}
// org.elasticsearch.index.VersionType#isVersionConflictForReads
@Override
public boolean isVersionConflictForReads(long currentVersion, long expectedVersion) {
return isVersionConflict(currentVersion, expectedVersion, false);
}
private boolean isVersionConflict(long currentVersion, long expectedVersion, boolean deleted) {
if (expectedVersion == Versions.MATCH_ANY) {
return false;
}
if (expectedVersion == Versions.MATCH_DELETED) {
return deleted == false;
}
if (currentVersion != expectedVersion) {
return true;
}
return false;
}
// 最后,构造 GetResult 返回
// org.elasticsearch.index.engine.Engine.GetResult#GetResult
public GetResult(Engine.Searcher searcher, DocIdAndVersion docIdAndVersion, boolean fromTranslog) {
this(true, docIdAndVersion.version, docIdAndVersion, searcher, fromTranslog);
}
private GetResult(boolean exists, long version, DocIdAndVersion docIdAndVersion, Engine.Searcher searcher, boolean fromTranslog) {
this.exists = exists;
this.version = version;
this.docIdAndVersion = docIdAndVersion;
this.searcher = searcher;
this.fromTranslog = fromTranslog;
assert fromTranslog == false || searcher.getIndexReader() instanceof TranslogLeafReader;
}
前面根据id调用 lucene 接口获取到了docId, 接下来根据docId查询doc详情:
代码语言:javascript复制// org.elasticsearch.index.get.ShardGetService#innerGetLoadFromStoredFields
private GetResult innerGetLoadFromStoredFields(String type, String id, String[] storedFields, FetchSourceContext fetchSourceContext,
Engine.GetResult get, MapperService mapperService) {
assert get.exists() : "method should only be called if document could be retrieved";
// check first if stored fields to be loaded don't contain an object field
DocumentMapper docMapper = mapperService.documentMapper();
if (storedFields != null) {
for (String field : storedFields) {
Mapper fieldMapper = docMapper.mappers().getMapper(field);
if (fieldMapper == null) {
if (docMapper.mappers().objectMappers().get(field) != null) {
// Only fail if we know it is a object field, missing paths / fields shouldn't fail.
throw new IllegalArgumentException("field [" field "] isn't a leaf field");
}
}
}
}
Map<String, DocumentField> documentFields = null;
Map<String, DocumentField> metadataFields = null;
BytesReference source = null;
DocIdAndVersion docIdAndVersion = get.docIdAndVersion();
// force fetching source if we read from translog and need to recreate stored fields
boolean forceSourceForComputingTranslogStoredFields = get.isFromTranslog() && storedFields != null &&
Stream.of(storedFields).anyMatch(f -> TranslogLeafReader.ALL_FIELD_NAMES.contains(f) == false);
// 构造字段访问器
FieldsVisitor fieldVisitor = buildFieldsVisitors(storedFields,
forceSourceForComputingTranslogStoredFields ? FetchSourceContext.FETCH_SOURCE : fetchSourceContext);
if (fieldVisitor != null) {
try {
// 一个字段读取
docIdAndVersion.reader.document(docIdAndVersion.docId, fieldVisitor);
} catch (IOException e) {
throw new ElasticsearchException("Failed to get type [" type "] and id [" id "]", e);
}
source = fieldVisitor.source();
// in case we read from translog, some extra steps are needed to make _source consistent and to load stored fields
if (get.isFromTranslog()) {
// Fast path: if only asked for the source or stored fields that have been already provided by TranslogLeafReader,
// just make source consistent by reapplying source filters from mapping (possibly also nulling the source)
if (forceSourceForComputingTranslogStoredFields == false) {
try {
source = indexShard.mapperService().documentMapper().sourceMapper().applyFilters(source, null);
} catch (IOException e) {
throw new ElasticsearchException("Failed to reapply filters for [" id "] after reading from translog", e);
}
} else {
// Slow path: recreate stored fields from original source
assert source != null : "original source in translog must exist";
SourceToParse sourceToParse = new SourceToParse(shardId.getIndexName(), type, id, source,
XContentHelper.xContentType(source), fieldVisitor.routing());
ParsedDocument doc = indexShard.mapperService().documentMapper().parse(sourceToParse);
assert doc.dynamicMappingsUpdate() == null : "mapping updates should not be required on already-indexed doc";
// update special fields
doc.updateSeqID(docIdAndVersion.seqNo, docIdAndVersion.primaryTerm);
doc.version().setLongValue(docIdAndVersion.version);
// retrieve stored fields from parsed doc
fieldVisitor = buildFieldsVisitors(storedFields, fetchSourceContext);
for (IndexableField indexableField : doc.rootDoc().getFields()) {
IndexableFieldType fieldType = indexableField.fieldType();
if (fieldType.stored()) {
FieldInfo fieldInfo = new FieldInfo(indexableField.name(), 0, false, false, false, IndexOptions.NONE,
DocValuesType.NONE, -1, Collections.emptyMap(), 0, 0, 0, false);
StoredFieldVisitor.Status status = fieldVisitor.needsField(fieldInfo);
if (status == StoredFieldVisitor.Status.YES) {
if (indexableField.numericValue() != null) {
fieldVisitor.objectField(fieldInfo, indexableField.numericValue());
} else if (indexableField.binaryValue() != null) {
fieldVisitor.binaryField(fieldInfo, indexableField.binaryValue());
} else if (indexableField.stringValue() != null) {
fieldVisitor.objectField(fieldInfo, indexableField.stringValue());
}
} else if (status == StoredFieldVisitor.Status.STOP) {
break;
}
}
}
// retrieve source (with possible transformations, e.g. source filters
source = fieldVisitor.source();
}
}
// put stored fields into result objects
if (!fieldVisitor.fields().isEmpty()) {
fieldVisitor.postProcess(mapperService::fieldType,
mapperService.documentMapper() == null ? null : mapperService.documentMapper().type());
documentFields = new HashMap<>();
metadataFields = new HashMap<>();
for (Map.Entry<String, List<Object>> entry : fieldVisitor.fields().entrySet()) {
if (mapperService.isMetadataField(entry.getKey())) {
metadataFields.put(entry.getKey(), new DocumentField(entry.getKey(), entry.getValue()));
} else {
documentFields.put(entry.getKey(), new DocumentField(entry.getKey(), entry.getValue()));
}
}
}
}
if (source != null) {
// apply request-level source filtering
if (fetchSourceContext.fetchSource() == false) {
source = null;
} else if (fetchSourceContext.includes().length > 0 || fetchSourceContext.excludes().length > 0) {
Map<String, Object> sourceAsMap;
// TODO: The source might be parsed and available in the sourceLookup but that one uses unordered maps so different.
// Do we care?
Tuple<XContentType, Map<String, Object>> typeMapTuple = XContentHelper.convertToMap(source, true);
XContentType sourceContentType = typeMapTuple.v1();
sourceAsMap = typeMapTuple.v2();
sourceAsMap = XContentMapValues.filter(sourceAsMap, fetchSourceContext.includes(), fetchSourceContext.excludes());
try {
source = BytesReference.bytes(XContentFactory.contentBuilder(sourceContentType).map(sourceAsMap));
} catch (IOException e) {
throw new ElasticsearchException("Failed to get id [" id "] with includes/excludes set", e);
}
}
}
if (!fetchSourceContext.fetchSource()) {
source = null;
}
if (source != null && get.isFromTranslog()) {
// reapply source filters from mapping (possibly also nulling the source)
try {
source = docMapper.sourceMapper().applyFilters(source, null);
} catch (IOException e) {
throw new ElasticsearchException("Failed to reapply filters for [" id "] after reading from translog", e);
}
}
if (source != null && (fetchSourceContext.includes().length > 0 || fetchSourceContext.excludes().length > 0)) {
Map<String, Object> sourceAsMap;
// TODO: The source might parsed and available in the sourceLookup but that one uses unordered maps so different. Do we care?
Tuple<XContentType, Map<String, Object>> typeMapTuple = XContentHelper.convertToMap(source, true);
XContentType sourceContentType = typeMapTuple.v1();
sourceAsMap = typeMapTuple.v2();
sourceAsMap = XContentMapValues.filter(sourceAsMap, fetchSourceContext.includes(), fetchSourceContext.excludes());
try {
source = BytesReference.bytes(XContentFactory.contentBuilder(sourceContentType).map(sourceAsMap));
} catch (IOException e) {
throw new ElasticsearchException("Failed to get type [" type "] and id [" id "] with includes/excludes set", e);
}
}
// 构造新的 GetResult 返回, source 就是最终查询的结果
return new GetResult(shardId.getIndexName(), type, id, get.docIdAndVersion().seqNo, get.docIdAndVersion().primaryTerm,
get.version(), get.exists(), source, documentFields, metadataFields);
}
private static FieldsVisitor buildFieldsVisitors(String[] fields, FetchSourceContext fetchSourceContext) {
if (fields == null || fields.length == 0) {
return fetchSourceContext.fetchSource() ? new FieldsVisitor(true) : null;
}
return new CustomFieldsVisitor(Sets.newHashSet(fields), fetchSourceContext.fetchSource());
}
最后,一点lucene的查找实现片段,供参考。
代码语言:javascript复制// 快速定位到指定id的文档位置,相当于索引定位
// org.apache.lucene.codecs.blocktree.SegmentTermsEnum#seekExact(org.apache.lucene.util.BytesRef)
@Override
public boolean seekExact(BytesRef target) throws IOException {
if (fr.index == null) {
throw new IllegalStateException("terms index was not loaded");
}
if (fr.size() > 0 && (target.compareTo(fr.getMin()) < 0 || target.compareTo(fr.getMax()) > 0)) {
return false;
}
term.grow(1 target.length);
assert clearEOF();
// if (DEBUG) {
// System.out.println("nBTTR.seekExact seg=" fr.parent.segment " target=" fr.fieldInfo.name ":" brToString(target) " current=" brToString(term) " (exists?=" termExists ") validIndexPrefix=" validIndexPrefix);
// printSeekState(System.out);
// }
FST.Arc<BytesRef> arc;
int targetUpto;
BytesRef output;
targetBeforeCurrentLength = currentFrame.ord;
if (currentFrame != staticFrame) {
// We are already seek'd; find the common
// prefix of new seek term vs current term and
// re-use the corresponding seek state. For
// example, if app first seeks to foobar, then
// seeks to foobaz, we can re-use the seek state
// for the first 5 bytes.
// if (DEBUG) {
// System.out.println(" re-use current seek state validIndexPrefix=" validIndexPrefix);
// }
arc = arcs[0];
assert arc.isFinal();
output = arc.output();
targetUpto = 0;
SegmentTermsEnumFrame lastFrame = stack[0];
assert validIndexPrefix <= term.length();
final int targetLimit = Math.min(target.length, validIndexPrefix);
int cmp = 0;
// TODO: reverse vLong byte order for better FST
// prefix output sharing
// First compare up to valid seek frames:
while (targetUpto < targetLimit) {
cmp = (term.byteAt(targetUpto)&0xFF) - (target.bytes[target.offset targetUpto]&0xFF);
// if (DEBUG) {
// System.out.println(" cycle targetUpto=" targetUpto " (vs limit=" targetLimit ") cmp=" cmp " (targetLabel=" (char) (target.bytes[target.offset targetUpto]) " vs termLabel=" (char) (term.bytes[targetUpto]) ")" " arc.output=" arc.output " output=" output);
// }
if (cmp != 0) {
break;
}
arc = arcs[1 targetUpto];
assert arc.label() == (target.bytes[target.offset targetUpto] & 0xFF): "arc.label=" (char) arc.label() " targetLabel=" (char) (target.bytes[target.offset targetUpto] & 0xFF);
if (arc.output() != BlockTreeTermsReader.NO_OUTPUT) {
output = BlockTreeTermsReader.FST_OUTPUTS.add(output, arc.output());
}
if (arc.isFinal()) {
lastFrame = stack[1 lastFrame.ord];
}
targetUpto ;
}
if (cmp == 0) {
final int targetUptoMid = targetUpto;
// Second compare the rest of the term, but
// don't save arc/output/frame; we only do this
// to find out if the target term is before,
// equal or after the current term
final int targetLimit2 = Math.min(target.length, term.length());
while (targetUpto < targetLimit2) {
cmp = (term.byteAt(targetUpto)&0xFF) - (target.bytes[target.offset targetUpto]&0xFF);
// if (DEBUG) {
// System.out.println(" cycle2 targetUpto=" targetUpto " (vs limit=" targetLimit ") cmp=" cmp " (targetLabel=" (char) (target.bytes[target.offset targetUpto]) " vs termLabel=" (char) (term.bytes[targetUpto]) ")");
// }
if (cmp != 0) {
break;
}
targetUpto ;
}
if (cmp == 0) {
cmp = term.length() - target.length;
}
targetUpto = targetUptoMid;
}
if (cmp < 0) {
// Common case: target term is after current
// term, ie, app is seeking multiple terms
// in sorted order
// if (DEBUG) {
// System.out.println(" target is after current (shares prefixLen=" targetUpto "); frame.ord=" lastFrame.ord);
// }
currentFrame = lastFrame;
} else if (cmp > 0) {
// Uncommon case: target term
// is before current term; this means we can
// keep the currentFrame but we must rewind it
// (so we scan from the start)
targetBeforeCurrentLength = lastFrame.ord;
// if (DEBUG) {
// System.out.println(" target is before current (shares prefixLen=" targetUpto "); rewind frame ord=" lastFrame.ord);
// }
currentFrame = lastFrame;
currentFrame.rewind();
} else {
// Target is exactly the same as current term
assert term.length() == target.length;
if (termExists) {
// if (DEBUG) {
// System.out.println(" target is same as current; return true");
// }
return true;
} else {
// if (DEBUG) {
// System.out.println(" target is same as current but term doesn't exist");
// }
}
//validIndexPrefix = currentFrame.depth;
//term.length = target.length;
//return termExists;
}
} else {
targetBeforeCurrentLength = -1;
arc = fr.index.getFirstArc(arcs[0]);
// Empty string prefix must have an output (block) in the index!
assert arc.isFinal();
assert arc.output() != null;
// if (DEBUG) {
// System.out.println(" no seek state; push root frame");
// }
output = arc.output();
currentFrame = staticFrame;
//term.length = 0;
targetUpto = 0;
currentFrame = pushFrame(arc, BlockTreeTermsReader.FST_OUTPUTS.add(output, arc.nextFinalOutput()), 0);
}
// if (DEBUG) {
// System.out.println(" start index loop targetUpto=" targetUpto " output=" output " currentFrame.ord=" currentFrame.ord " targetBeforeCurrentLength=" targetBeforeCurrentLength);
// }
// We are done sharing the common prefix with the incoming target and where we are currently seek'd; now continue walking the index:
while (targetUpto < target.length) {
final int targetLabel = target.bytes[target.offset targetUpto] & 0xFF;
final FST.Arc<BytesRef> nextArc = fr.index.findTargetArc(targetLabel, arc, getArc(1 targetUpto), fstReader);
if (nextArc == null) {
// Index is exhausted
// if (DEBUG) {
// System.out.println(" index: index exhausted label=" ((char) targetLabel) " " toHex(targetLabel));
// }
validIndexPrefix = currentFrame.prefix;
//validIndexPrefix = targetUpto;
currentFrame.scanToFloorFrame(target);
if (!currentFrame.hasTerms) {
termExists = false;
term.setByteAt(targetUpto, (byte) targetLabel);
term.setLength(1 targetUpto);
// if (DEBUG) {
// System.out.println(" FAST NOT_FOUND term=" brToString(term));
// }
return false;
}
currentFrame.loadBlock();
final SeekStatus result = currentFrame.scanToTerm(target, true);
if (result == SeekStatus.FOUND) {
// if (DEBUG) {
// System.out.println(" return FOUND term=" term.utf8ToString() " " term);
// }
return true;
} else {
// if (DEBUG) {
// System.out.println(" got " result "; return NOT_FOUND term=" brToString(term));
// }
return false;
}
} else {
// Follow this arc
arc = nextArc;
term.setByteAt(targetUpto, (byte) targetLabel);
// Aggregate output as we go:
assert arc.output() != null;
if (arc.output() != BlockTreeTermsReader.NO_OUTPUT) {
output = BlockTreeTermsReader.FST_OUTPUTS.add(output, arc.output());
}
// if (DEBUG) {
// System.out.println(" index: follow label=" toHex(target.bytes[target.offset targetUpto]&0xff) " arc.output=" arc.output " arc.nfo=" arc.nextFinalOutput);
// }
targetUpto ;
if (arc.isFinal()) {
//if (DEBUG) System.out.println(" arc is final!");
currentFrame = pushFrame(arc, BlockTreeTermsReader.FST_OUTPUTS.add(output, arc.nextFinalOutput()), targetUpto);
//if (DEBUG) System.out.println(" curFrame.ord=" currentFrame.ord " hasTerms=" currentFrame.hasTerms);
}
}
}
//validIndexPrefix = targetUpto;
validIndexPrefix = currentFrame.prefix;
currentFrame.scanToFloorFrame(target);
// Target term is entirely contained in the index:
if (!currentFrame.hasTerms) {
termExists = false;
term.setLength(targetUpto);
// if (DEBUG) {
// System.out.println(" FAST NOT_FOUND term=" brToString(term));
// }
return false;
}
currentFrame.loadBlock();
final SeekStatus result = currentFrame.scanToTerm(target, true);
if (result == SeekStatus.FOUND) {
// if (DEBUG) {
// System.out.println(" return FOUND term=" term.utf8ToString() " " term);
// }
return true;
} else {
// if (DEBUG) {
// System.out.println(" got result " result "; return NOT_FOUND term=" term.utf8ToString());
// }
return false;
}
}
// org.apache.lucene.codecs.blocktree.SegmentTermsEnum#postings
@Override
public PostingsEnum postings(PostingsEnum reuse, int flags) throws IOException {
assert !eof;
//if (DEBUG) {
//System.out.println("BTTR.docs seg=" segment);
//}
currentFrame.decodeMetaData();
//if (DEBUG) {
//System.out.println(" state=" currentFrame.state);
//}
return fr.parent.postingsReader.postings(fr.fieldInfo, currentFrame.state, reuse, flags);
}
// 根据docId 获取文档的详情信息
// org.apache.lucene.index.FilterLeafReader#document
@Override
public void document(int docID, StoredFieldVisitor visitor) throws IOException {
ensureOpen();
in.document(docID, visitor);
}
// org.apache.lucene.index.CodecReader#document
@Override
public final void document(int docID, StoredFieldVisitor visitor) throws IOException {
checkBounds(docID);
getFieldsReader().visitDocument(docID, visitor);
}
// org.apache.lucene.codecs.compressing.CompressingStoredFieldsReader#visitDocument
@Override
public void visitDocument(int docID, StoredFieldVisitor visitor)
throws IOException {
final SerializedDocument doc = document(docID);
for (int fieldIDX = 0; fieldIDX < doc.numStoredFields; fieldIDX ) {
final long infoAndBits = doc.in.readVLong();
final int fieldNumber = (int) (infoAndBits >>> TYPE_BITS);
final FieldInfo fieldInfo = fieldInfos.fieldInfo(fieldNumber);
final int bits = (int) (infoAndBits & TYPE_MASK);
assert bits <= NUMERIC_DOUBLE: "bits=" Integer.toHexString(bits);
switch(visitor.needsField(fieldInfo)) {
case YES:
readField(doc.in, visitor, fieldInfo, bits);
break;
case NO:
if (fieldIDX == doc.numStoredFields - 1) {// don't skipField on last field value; treat like STOP
return;
}
skipField(doc.in, bits);
break;
case STOP:
return;
}
}
}
// org.apache.lucene.codecs.compressing.CompressingStoredFieldsReader#document
SerializedDocument document(int docID) throws IOException {
// 定位文档位置,取出文档。。。
if (state.contains(docID) == false) {
fieldsStream.seek(indexReader.getStartPointer(docID));
state.reset(docID);
}
assert state.contains(docID);
return state.document(docID);
}
// org.apache.lucene.codecs.compressing.FieldsIndexReader#getStartPointer
@Override
long getStartPointer(int docID) {
FutureObjects.checkIndex(docID, maxDoc);
long blockIndex = docs.binarySearch(0, numChunks, docID);
if (blockIndex < 0) {
blockIndex = -2 - blockIndex;
}
return startPointers.get(blockIndex);
}
// org.apache.lucene.codecs.compressing.CompressingStoredFieldsReader.BlockState#document
/**
* Get the serialized representation of the given docID. This docID has
* to be contained in the current block.
*/
SerializedDocument document(int docID) throws IOException {
if (contains(docID) == false) {
throw new IllegalArgumentException();
}
final int index = docID - docBase;
final int offset = Math.toIntExact(offsets[index]);
final int length = Math.toIntExact(offsets[index 1]) - offset;
final int totalLength = Math.toIntExact(offsets[chunkDocs]);
final int numStoredFields = Math.toIntExact(this.numStoredFields[index]);
final BytesRef bytes;
if (merging) {
bytes = this.bytes;
} else {
bytes = new BytesRef();
}
final DataInput documentInput;
if (length == 0) {
// empty
documentInput = new ByteArrayDataInput();
} else if (merging) {
// already decompressed
documentInput = new ByteArrayDataInput(bytes.bytes, bytes.offset offset, length);
} else if (sliced) {
fieldsStream.seek(startPointer);
decompressor.decompress(fieldsStream, chunkSize, offset, Math.min(length, chunkSize - offset), bytes);
documentInput = new DataInput() {
int decompressed = bytes.length;
void fillBuffer() throws IOException {
assert decompressed <= length;
if (decompressed == length) {
throw new EOFException();
}
final int toDecompress = Math.min(length - decompressed, chunkSize);
decompressor.decompress(fieldsStream, toDecompress, 0, toDecompress, bytes);
decompressed = toDecompress;
}
@Override
public byte readByte() throws IOException {
if (bytes.length == 0) {
fillBuffer();
}
--bytes.length;
return bytes.bytes[bytes.offset ];
}
@Override
public void readBytes(byte[] b, int offset, int len) throws IOException {
while (len > bytes.length) {
System.arraycopy(bytes.bytes, bytes.offset, b, offset, bytes.length);
len -= bytes.length;
offset = bytes.length;
fillBuffer();
}
System.arraycopy(bytes.bytes, bytes.offset, b, offset, len);
bytes.offset = len;
bytes.length -= len;
}
};
} else {
fieldsStream.seek(startPointer);
decompressor.decompress(fieldsStream, totalLength, offset, length, bytes);
assert bytes.length == length;
documentInput = new ByteArrayDataInput(bytes.bytes, bytes.offset, bytes.length);
}
return new SerializedDocument(documentInput, length, numStoredFields);
}
5. 几个数据结构
核心的几个get的外观数据结构主要有,请求时GetRequest, 查找时DocIdAndVersion, 真正的结果GetResult, 以及响应时的出参GetResponse. 其实前面已经可以查看,不过在此处做个归集,有意者阅之。
0. get的请求数据结构
代码语言:javascript复制// org.elasticsearch.action.get.GetRequest
public class GetRequest extends SingleShardRequest<GetRequest> implements RealtimeRequest {
private String type;
private String id;
private String routing;
private String preference;
private String[] storedFields;
private FetchSourceContext fetchSourceContext;
private boolean refresh = false;
boolean realtime = true;
private VersionType versionType = VersionType.INTERNAL;
private long version = Versions.MATCH_ANY;
public GetRequest() {
type = MapperService.SINGLE_MAPPING_NAME;
}
GetRequest(StreamInput in) throws IOException {
super(in);
type = in.readString();
id = in.readString();
routing = in.readOptionalString();
if (in.getVersion().before(Version.V_7_0_0)) {
in.readOptionalString();
}
preference = in.readOptionalString();
refresh = in.readBoolean();
storedFields = in.readOptionalStringArray();
realtime = in.readBoolean();
this.versionType = VersionType.fromValue(in.readByte());
this.version = in.readLong();
fetchSourceContext = in.readOptionalWriteable(FetchSourceContext::new);
}
...
}
1. docId 和 version 包含数据结构
代码语言:javascript复制// org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver.DocIdAndVersion
/** Wraps an {@link LeafReaderContext}, a doc ID <b>relative to the context doc base</b> and a version. */
public static class DocIdAndVersion {
public final int docId;
public final long version;
public final long seqNo;
public final long primaryTerm;
public final LeafReader reader;
public final int docBase;
public DocIdAndVersion(int docId, long version, long seqNo, long primaryTerm, LeafReader reader, int docBase) {
this.docId = docId;
this.version = version;
this.seqNo = seqNo;
this.primaryTerm = primaryTerm;
this.reader = reader;
this.docBase = docBase;
}
}
2. GetResult 数据结构
代码语言:javascript复制// org.elasticsearch.index.get.GetResult
public class GetResult implements Writeable, Iterable<DocumentField>, ToXContentObject {
public static final String _INDEX = "_index";
public static final String _TYPE = "_type";
public static final String _ID = "_id";
private static final String _VERSION = "_version";
private static final String _SEQ_NO = "_seq_no";
private static final String _PRIMARY_TERM = "_primary_term";
private static final String FOUND = "found";
private static final String FIELDS = "fields";
private String index;
private String type;
private String id;
private long version;
private long seqNo;
private long primaryTerm;
private boolean exists;
private final Map<String, DocumentField> documentFields;
private final Map<String, DocumentField> metaFields;
private Map<String, Object> sourceAsMap;
private BytesReference source;
private byte[] sourceAsBytes;
public GetResult(StreamInput in) throws IOException {
index = in.readString();
type = in.readOptionalString();
id = in.readString();
if (in.getVersion().onOrAfter(Version.V_6_6_0)) {
seqNo = in.readZLong();
primaryTerm = in.readVLong();
} else {
seqNo = UNASSIGNED_SEQ_NO;
primaryTerm = UNASSIGNED_PRIMARY_TERM;
}
version = in.readLong();
exists = in.readBoolean();
if (exists) {
source = in.readBytesReference();
if (source.length() == 0) {
source = null;
}
if (in.getVersion().onOrAfter(Version.V_7_3_0)) {
documentFields = readFields(in);
metaFields = readFields(in);
} else {
Map<String, DocumentField> fields = readFields(in);
documentFields = new HashMap<>();
metaFields = new HashMap<>();
fields.forEach((fieldName, docField) ->
(MapperService.META_FIELDS_BEFORE_7DOT8.contains(fieldName) ? metaFields : documentFields).put(fieldName, docField));
}
} else {
metaFields = Collections.emptyMap();
documentFields = Collections.emptyMap();
}
}
public GetResult(String index, String type, String id, long seqNo, long primaryTerm, long version, boolean exists,
BytesReference source, Map<String, DocumentField> documentFields, Map<String, DocumentField> metaFields) {
this.index = index;
this.type = type;
this.id = id;
this.seqNo = seqNo;
this.primaryTerm = primaryTerm;
assert (seqNo == UNASSIGNED_SEQ_NO && primaryTerm == UNASSIGNED_PRIMARY_TERM) || (seqNo >= 0 && primaryTerm >= 1) :
"seqNo: " seqNo " primaryTerm: " primaryTerm;
assert exists || (seqNo == UNASSIGNED_SEQ_NO && primaryTerm == UNASSIGNED_PRIMARY_TERM) :
"doc not found but seqNo/primaryTerm are set";
this.version = version;
this.exists = exists;
this.source = source;
this.documentFields = documentFields == null ? emptyMap() : documentFields;
this.metaFields = metaFields == null ? emptyMap() : metaFields;
}
...
}
3. get响应结果 GetResponse
代码语言:javascript复制// org.elasticsearch.action.get.GetResponse
public class GetResponse extends ActionResponse implements Iterable<DocumentField>, ToXContentObject {
GetResult getResult;
GetResponse(StreamInput in) throws IOException {
super(in);
getResult = new GetResult(in);
}
public GetResponse(GetResult getResult) {
this.getResult = getResult;
}
...
}
作者:等你归去来
出处:https://www.cnblogs.com/yougewe/p/14774934.html