前言:elasticsearch在进行密集的update,update_by_query,delete_by_query操作时会产生大量的doc.deleted文档。而elasticsearch又是如何处理这些doc.deleted文档的呢?
一.什么是elasticsearch的merge
1.数据在elasticsearch中如何进行存储
在elasticsearch中,客户端写入的每一条数据都会保存在索引的shard中,每一个shard都是一个lucene索引,每一个lucene索引都会被分解为多个segement。segement则是存储索引数据的最基本单元。
2.如何对索引进行merge
代码语言:javascript复制POST /my-index-000001/_forcemerge
POST /<target>/_forcemerge
POST /_forcemerge
该操作可以强制对elasticsearch中的index,datastream进行数据合并。
3.merge操作的作用
当我们使用merge API对索引的分片中的segement发起强制合并,elasticsearch通过merge操作会将索引分片上的多个segement合并到一个segement中。并对已经标记为deleted状态的文档进行删除。并释放这些已经标记为删除状态文档所占用的磁盘空间。
一般来说elasticsearch自身会自动对索引进行merge。但是在update场景与delete_by_query场景下,自动merge的效果缓慢。往往需要较长时间,这些被标记为删除状态的文档才会被elasticsearch进行merge并释放磁盘空间。所以我们可以通过对索引进行多轮次手动merge来加快索引merge的进度。
4.merge操作的原理
forcemerge的原理是将多个小的索引段(index segment)合并为一个更大的段,以减少磁盘空间的使用和提高搜索性能。当索引被更新时,新的文档会被添加到新的段中,而旧的段则会被标记为删除。这样会导致索引中存在多个小的段,而每个段都会占用一定的磁盘空间和系统资源。forcemerge操作可以通过将这些小的段合并为一个或少量的更大段来优化索引。
代码语言:javascript复制 public ForceMergeRequest(String... indices) {
super(indices);
forceMergeUUID = UUIDs.randomBase64UUID();
}
public ForceMergeRequest(StreamInput in) throws IOException {
super(in);
maxNumSegments = in.readInt();
onlyExpungeDeletes = in.readBoolean();
flush = in.readBoolean();
if (in.getTransportVersion().onOrAfter(FORCE_MERGE_UUID_SIMPLE_VERSION)) {
forceMergeUUID = in.readString();
} else {
forceMergeUUID = in.readOptionalString();
assert forceMergeUUID != null : "optional was just used as a BwC measure";
}
}
通过该段代码构造索引合并的请求,并判断需要合并的索引,如果没有指定具体提的索引则会对全部的索引进行forcemerge操作。
代码语言:javascript复制 @Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeInt(maxNumSegments);
out.writeBoolean(onlyExpungeDeletes);
out.writeBoolean(flush);
if (out.getTransportVersion().onOrAfter(FORCE_MERGE_UUID_SIMPLE_VERSION)) {
out.writeString(forceMergeUUID);
} else {
out.writeOptionalString(forceMergeUUID);
}
}
在合并时则会对文档的UUID进行判断。判断当前文档的UUID是否在FORCE_MERGE_UUID_SIMPLE_VERSION之后。如果是,则将forceMergeUUID(合并操作的UUID)以字符串的形式写入到输出流中。否则,将forceMergeUUID以可选字符串的形式写入到输出流中。
代码语言:javascript复制public ActionRequestValidationException validate() {
ActionRequestValidationException validationError = super.validate();
if (onlyExpungeDeletes && maxNumSegments != Defaults.MAX_NUM_SEGMENTS) {
validationError = addValidationError(
"cannot set only_expunge_deletes and max_num_segments at the same time, those two " "parameters are mutually exclusive",
validationError
);
}
return validationError;
}
在进行forcemerge合并时,我们可以指定segments的数量,在构造请求后会调用该方法进行参数的验证。判断请求是否合法。
代码语言:javascript复制public class ForceMergeAction extends ActionType<ForceMergeResponse> {
public static final ForceMergeAction INSTANCE = new ForceMergeAction();
public static final String NAME = "indices:admin/forcemerge";
private ForceMergeAction() {
super(NAME, ForceMergeResponse::new);
}
}
ForceMergeAction通过继承ActionType<ForceMergeResponse>来实现ForceMerge操作。并通过ForceMergeResponse
获取到相应的返回信息。
代码语言:javascript复制private static final ConstructingObjectParser<ForceMergeResponse, Void> PARSER = new ConstructingObjectParser<>(
"force_merge",
true,
arg -> {
BaseBroadcastResponse response = (BaseBroadcastResponse) arg[0];
return new ForceMergeResponse(
response.getTotalShards(),
response.getSuccessfulShards(),
response.getFailedShards(),
Arrays.asList(response.getShardFailures())
);
}
);
在ForceMergeResponse的这段代码中定义了一个ConstructingObjectParser解析器。用来解析我们发起Forcemerge操作后的各中响应信息。
通过该解析器我们可以拿到以下返回信息。
response.getTotalShards() 返回总分片数,
response.getSuccessfulShards() 返回成功分片数,
response.getFailedShards() 返回失败分片数,
Arrays.asList(response.getShardFailures()) 返回所有失败分片的详细信息
关于merge操作的参数优化
index.merge.scheduler.max_thread_count:单个分片上可以同时合并的最大线程数。
默认值Math.max(1, Math.min(4, <<node.processors, node.processors>> / 2))
在固态硬盘中,基于良好的硬件读写性能,我们可以适当调整该参数。如果存储介质为机械硬盘,则建议适当减小该参数值。
代码语言:javascript复制public static final Setting<Integer> MAX_THREAD_COUNT_SETTING = new Setting<>(
"index.merge.scheduler.max_thread_count",
(s) -> Integer.toString(Math.max(1, Math.min(4, EsExecutors.allocatedProcessors(s) / 2))),
(s) -> Setting.parseInt(s, 1, "index.merge.scheduler.max_thread_count"),
Property.Dynamic,
Property.IndexScope
);
public static final Setting<Integer> MAX_MERGE_COUNT_SETTING = new Setting<>(
"index.merge.scheduler.max_merge_count",
(s) -> Integer.toString(MAX_THREAD_COUNT_SETTING.get(s) 5),
(s) -> Setting.parseInt(s, 1, "index.merge.scheduler.max_merge_count"),
Property.Dynamic,
Property.IndexScope
);
在上述代码中我们可以看到elasticsearch在对索引进行merge时,提供了两种不同的合并调度器策略。
1. index.merge.scheduler.max_thread_count :定义了索引合并调度器的最大线程数。它使用 EsExecutors.allocatedProcessors(s) 方法获取可用的处理器数量,并根据其值计算线程数。线程数的计算公式为可用处理器数量除以2,结果取1和4之间的较小值。这样可以确保最少有一个线程,并且最多不超过4个线程。
2. index.merge.scheduler.max_merge_count :该设置项定义了索引合并调度器的最大合并数量。它使用 MAX_THREAD_COUNT_SETTING.get(s) 方法获取最大线程数,并在此基础上加上5。这样可以确保最少有一个合并任务,并且相对于最大线程数有一定的缓冲。
同时,这两个属性均为动态属性,可以进行热更新,并且作用于索引维度,我们可以根据索引存储介质的不同而对索引进行合适的参数值配置。
关于merge操作的使用建议:
一般存在更新 查询的场景,会产生很多的deleted docs以及零碎的段文件,,可以定期对索引进行forcemerge。当我们对索引进行多轮次的forcemerge时 ,可能会引发高io ,同时也会提高CPU等集群资源的负载。但是可以通过参数大大降低索引中的doc.deleted数量和索引分片的segments 数量,对索引的写入查询性能都会有一定提升。
我正在参与2023腾讯技术创作特训营第三期有奖征文,组队打卡瓜分大奖!