elasticsearch的merge机制

2023-11-05 10:00:09 浏览数 (1)

前言:elasticsearch在进行密集的update,update_by_query,delete_by_query操作时会产生大量的doc.deleted文档。而elasticsearch又是如何处理这些doc.deleted文档的呢?

一.什么是elasticsearch的merge

1.数据在elasticsearch中如何进行存储

在elasticsearch中,客户端写入的每一条数据都会保存在索引的shard中,每一个shard都是一个lucene索引,每一个lucene索引都会被分解为多个segement。segement则是存储索引数据的最基本单元。

2.如何对索引进行merge

代码语言:javascript复制
POST /my-index-000001/_forcemerge
POST /<target>/_forcemerge
POST /_forcemerge

该操作可以强制对elasticsearch中的index,datastream进行数据合并。

3.merge操作的作用

当我们使用merge API对索引的分片中的segement发起强制合并,elasticsearch通过merge操作会将索引分片上的多个segement合并到一个segement中。并对已经标记为deleted状态的文档进行删除。并释放这些已经标记为删除状态文档所占用的磁盘空间。

一般来说elasticsearch自身会自动对索引进行merge。但是在update场景与delete_by_query场景下,自动merge的效果缓慢。往往需要较长时间,这些被标记为删除状态的文档才会被elasticsearch进行merge并释放磁盘空间。所以我们可以通过对索引进行多轮次手动merge来加快索引merge的进度。

4.merge操作的原理

forcemerge的原理是将多个小的索引段(index segment)合并为一个更大的段,以减少磁盘空间的使用和提高搜索性能。当索引被更新时,新的文档会被添加到新的段中,而旧的段则会被标记为删除。这样会导致索引中存在多个小的段,而每个段都会占用一定的磁盘空间和系统资源。forcemerge操作可以通过将这些小的段合并为一个或少量的更大段来优化索引。

代码语言:javascript复制
 public ForceMergeRequest(String... indices) {
        super(indices);
        forceMergeUUID = UUIDs.randomBase64UUID();
    }

    public ForceMergeRequest(StreamInput in) throws IOException {
        super(in);
        maxNumSegments = in.readInt();
        onlyExpungeDeletes = in.readBoolean();
        flush = in.readBoolean();
        if (in.getTransportVersion().onOrAfter(FORCE_MERGE_UUID_SIMPLE_VERSION)) {
            forceMergeUUID = in.readString();
        } else {
            forceMergeUUID = in.readOptionalString();
            assert forceMergeUUID != null : "optional was just used as a BwC measure";
        }
    }

通过该段代码构造索引合并的请求,并判断需要合并的索引,如果没有指定具体提的索引则会对全部的索引进行forcemerge操作。

代码语言:javascript复制
 @Override
    public void writeTo(StreamOutput out) throws IOException {
        super.writeTo(out);
        out.writeInt(maxNumSegments);
        out.writeBoolean(onlyExpungeDeletes);
        out.writeBoolean(flush);
        if (out.getTransportVersion().onOrAfter(FORCE_MERGE_UUID_SIMPLE_VERSION)) {
            out.writeString(forceMergeUUID);
        } else {
            out.writeOptionalString(forceMergeUUID);
        }
    }

在合并时则会对文档的UUID进行判断。判断当前文档的UUID是否在FORCE_MERGE_UUID_SIMPLE_VERSION之后。如果是,则将forceMergeUUID(合并操作的UUID)以字符串的形式写入到输出流中。否则,将forceMergeUUID以可选字符串的形式写入到输出流中。

代码语言:javascript复制
public ActionRequestValidationException validate() {
        ActionRequestValidationException validationError = super.validate();
        if (onlyExpungeDeletes && maxNumSegments != Defaults.MAX_NUM_SEGMENTS) {
            validationError = addValidationError(
                "cannot set only_expunge_deletes and max_num_segments at the same time, those two "   "parameters are mutually exclusive",
                validationError
            );
        }
        return validationError;
    }

在进行forcemerge合并时,我们可以指定segments的数量,在构造请求后会调用该方法进行参数的验证。判断请求是否合法。

代码语言:javascript复制
public class ForceMergeAction extends ActionType<ForceMergeResponse> {

    public static final ForceMergeAction INSTANCE = new ForceMergeAction();
    public static final String NAME = "indices:admin/forcemerge";

    private ForceMergeAction() {
        super(NAME, ForceMergeResponse::new);
    }
}

ForceMergeAction通过继承ActionType<ForceMergeResponse>来实现ForceMerge操作。并通过ForceMergeResponse

获取到相应的返回信息。

代码语言:javascript复制
private static final ConstructingObjectParser<ForceMergeResponse, Void> PARSER = new ConstructingObjectParser<>(
        "force_merge",
        true,
        arg -> {
            BaseBroadcastResponse response = (BaseBroadcastResponse) arg[0];
            return new ForceMergeResponse(
                response.getTotalShards(),
                response.getSuccessfulShards(),
                response.getFailedShards(),
                Arrays.asList(response.getShardFailures())
            );
        }
    );

在ForceMergeResponse的这段代码中定义了一个ConstructingObjectParser解析器。用来解析我们发起Forcemerge操作后的各中响应信息。

通过该解析器我们可以拿到以下返回信息。

response.getTotalShards() 返回总分片数, response.getSuccessfulShards() 返回成功分片数, response.getFailedShards() 返回失败分片数, Arrays.asList(response.getShardFailures()) 返回所有失败分片的详细信息

关于merge操作的参数优化

index.merge.scheduler.max_thread_count:单个分片上可以同时合并的最大线程数。

默认值Math.max(1, Math.min(4, <<node.processors, node.processors>> / 2))

在固态硬盘中,基于良好的硬件读写性能,我们可以适当调整该参数。如果存储介质为机械硬盘,则建议适当减小该参数值。

代码语言:javascript复制
public static final Setting<Integer> MAX_THREAD_COUNT_SETTING = new Setting<>(
        "index.merge.scheduler.max_thread_count",
        (s) -> Integer.toString(Math.max(1, Math.min(4, EsExecutors.allocatedProcessors(s) / 2))),
        (s) -> Setting.parseInt(s, 1, "index.merge.scheduler.max_thread_count"),
        Property.Dynamic,
        Property.IndexScope
    );
public static final Setting<Integer> MAX_MERGE_COUNT_SETTING = new Setting<>(
        "index.merge.scheduler.max_merge_count",
        (s) -> Integer.toString(MAX_THREAD_COUNT_SETTING.get(s)   5),
        (s) -> Setting.parseInt(s, 1, "index.merge.scheduler.max_merge_count"),
        Property.Dynamic,
        Property.IndexScope
    );

在上述代码中我们可以看到elasticsearch在对索引进行merge时,提供了两种不同的合并调度器策略。

1. index.merge.scheduler.max_thread_count :定义了索引合并调度器的最大线程数。它使用 EsExecutors.allocatedProcessors(s) 方法获取可用的处理器数量,并根据其值计算线程数。线程数的计算公式为可用处理器数量除以2,结果取1和4之间的较小值。这样可以确保最少有一个线程,并且最多不超过4个线程。

2. index.merge.scheduler.max_merge_count :该设置项定义了索引合并调度器的最大合并数量。它使用 MAX_THREAD_COUNT_SETTING.get(s) 方法获取最大线程数,并在此基础上加上5。这样可以确保最少有一个合并任务,并且相对于最大线程数有一定的缓冲。

同时,这两个属性均为动态属性,可以进行热更新,并且作用于索引维度,我们可以根据索引存储介质的不同而对索引进行合适的参数值配置。

关于merge操作的使用建议:

一般存在更新 查询的场景,会产生很多的deleted docs以及零碎的段文件,,可以定期对索引进行forcemerge。当我们对索引进行多轮次的forcemerge时 ,可能会引发高io ,同时也会提高CPU等集群资源的负载。但是可以通过参数大大降低索引中的doc.deleted数量和索引分片的segments 数量,对索引的写入查询性能都会有一定提升。

我正在参与2023腾讯技术创作特训营第三期有奖征文,组队打卡瓜分大奖!

0 人点赞