elasticsearch数据更新与删除机制

2023-11-04 11:25:57 浏览数 (3)

前言:Elasticsearch是一个开源的分布式搜索和分析引擎提供了良好的数据插入能力并提供了灵活的数据更新方式。随之而来的便是大量更新操作引起的doc.deleted文档。同时很多用户在使用elasticsearch时由于种种原因需要对elasticsearch的索引数据进行删除。同样会产生大量的doc.deleted文档。

一.为什么elasticsearch进行update操作时会产生doc.deleted文档;

1.elasticsearch更新数据的方式。

bulk:批量插入更新方式。

代码语言:javascript复制
POST _bulk
{ "index" : { "_index" : "test", "_id" : "1" } }
{ "field1" : "value1" }
{ "delete" : { "_index" : "test", "_id" : "2" } }
{ "create" : { "_index" : "test", "_id" : "3" } }
{ "field1" : "value3" }
{ "update" : {"_id" : "1", "_index" : "test"} }
{ "doc" : {"field2" : "value2"} }

update:根据id,对指定数据进行精确更新。

代码语言:javascript复制
POST /<index>/_update/<_id>

update_by_query:根据指定的查询条件,对数据进行匹配更新。

代码语言:javascript复制
POST my-index-000001/_update_by_query?conflicts=proceed

2.elasticsearch更新数据的原理

当客户端发起更新操作时,elasticsearch首先会根据更新条件(例如:update api传入的_id,或update_by_query传入的match语句)找到相应的文档。elasticsearch使用文档的唯一标识符(_id)来定位文档。当找到要更新的文档后,elasticsearch首先会将原有的旧文档标记为删除状态。然后再将会将新文档插入到索引中。新文档具有相同的唯一标识符(_id),以此来实现文档的更新操作。

代码语言:javascript复制
  @Override
    protected void searchToString(StringBuilder b) {
        super.searchToString(b);
        if (script != null) {
            b.append(" updated with ").append(script);
        }
    }

此外,elasticsearch在客户端执行更新操作时还提供 脚本更新的方式。

发起update_by_query操作是会通过该类进行实现。

代码语言:javascript复制
public class UpdateByQueryAction extends ActionType<BulkByScrollResponse> {
    public static final UpdateByQueryAction INSTANCE = new UpdateByQueryAction();
    public static final String NAME = "indices:data/write/update/byquery";

    private UpdateByQueryAction() {
        super(NAME, BulkByScrollResponse::new);
    }
}

在该类中,定义了一个静态的 INSTANCE 对象,用于表示 UpdateByQueryAction 的单例实例。同时,定义了一个常量 NAME ,表示该动作的名称为"indices:data/write/update/byquery"。 在构造函数中,调用了父类的构造函数 super(NAME, BulkByScrollResponse::new) 。这里传入了动作的名称和一个回调函数,用于创建 BulkByScrollResponse 对象。

BulkByScrollResponse则为执行更新操作后返回的对象。包括执行耗时,执行状态等信息。

在BulkByScrollResponse中通过该构造方法批量滚动操作对象。

代码语言:javascript复制
 接收以下参数:
-  took :表示操作的耗时。 
-  status :表示批量滚动任务的状态,这是一个 BulkByScrollTask.Status 对象。 
-  bulkFailures :表示批量操作失败的列表,这是一个 List<Failure> 对象。 
-  searchFailures :表示搜索失败的列表,这是一个 List<ScrollableHitSource.SearchFailure> 对象。 
-  timedOut :表示操作是否超时。 

代码语言:java复制
 private TimeValue took;
 private BulkByScrollTask.Status status;
 private List<Failure> bulkFailures;
 private List<ScrollableHitSource.SearchFailure> searchFailures;
 private boolean timedOut;
 private static final String TOOK_FIELD = "took";
 private static final String TIMED_OUT_FIELD = "timed_out";
 private static final String FAILURES_FIELD = "failures";
 public BulkByScrollResponse(
        TimeValue took,
        BulkByScrollTask.Status status,
        List<Failure> bulkFailures,
        List<ScrollableHitSource.SearchFailure> searchFailures,
        boolean timedOut
    ) {
        this.took = took;
        this.status = requireNonNull(status, "Null status not supported");
        this.bulkFailures = bulkFailures;
        this.searchFailures = searchFailures;
        this.timedOut = timedOut;
    }
 public BulkByScrollResponse(Iterable<BulkByScrollResponse> toMerge, @Nullable String reasonCancelled) {
        long mergedTook = 0;
        List<BulkByScrollTask.StatusOrException> statuses = new ArrayList<>();
        bulkFailures = new ArrayList<>();
        searchFailures = new ArrayList<>();
        for (BulkByScrollResponse response : toMerge) {
            mergedTook = max(mergedTook, response.getTook().nanos());
            statuses.add(new BulkByScrollTask.StatusOrException(response.status));
            bulkFailures.addAll(response.getBulkFailures());
            searchFailures.addAll(response.getSearchFailures());
            timedOut |= response.isTimedOut();
        }
        took = timeValueNanos(mergedTook);
        status = new BulkByScrollTask.Status(statuses, reasonCancelled);
    }

具体的更新步骤则为:

  1. 首先,客户端发送一个更新请求到Elasticsearch节点。请求中包含要更新的文档的索引、类型和唯一标识符(_id),以及要更新的字段和新的值。
  2. 当Elasticsearch节点接收到更新请求后,它会将请求路由到包含要更新文档的分片(shard)所在的节点。
  3. 在分片级别,Elasticsearch首先从磁盘读取要更新的文档,并将其加载到内存中。
  4. 接下来,Elasticsearch会在内存中更新文档的字段值。这通常涉及到解析和处理更新请求中的字段和值,并将其应用到文档上。
  5. 更新后的文档会被写回到磁盘,以确保持久化存储。这通常涉及到将文档序列化为一种可存储的格式(如JSON或二进制格式),并将其写入磁盘上的相应数据文件中。
  6. 当更新操作完成后,elasticsearch会向客户端发送响应,通知更新操作的结果。

二.elasticsearch的数据删除

1.elasticsearch的数据删除方式

  • 删除索引

优点:能够立刻释放磁盘空间。

缺点:会删除整个索引的全部数据。无法满足只删除部分数据的需求。

  • delete_by_query

优点:操作灵活,能够根据传入的条件对指定的数据进行删除。

缺点:标记删除过程较久,磁盘空间释放较慢。在磁盘空间较为充裕时可以使用该方式进行数据删除操作。

2.delete_by_query删除数据的原理

当执行删除操作时,elasticsearch会根据我们传入的条件(例如:delete api传入的_id,或delete_by_query传入的match语句)来找到我们需要进行删除操作的文档。然后标记要删除的文档为已删除状态,在对文档完成标记后并不会立即从磁盘上删除它们。这是为了提高性能和避免数据丢失。标记为已删除的文档仍然存在于索引中,但在搜索和查询时会被过滤掉。 后续elasticsearch会自动对已经标记为删除的文档进行段合并。

代码语言:javascript复制
public class DeleteByQueryAction extends ActionType<BulkByScrollResponse> { 
    public static final DeleteByQueryAction INSTANCE = new DeleteByQueryAction(); 
    public static final String NAME = "indices:data/write/delete/byquery"; 
    private DeleteByQueryAction() { 
        super(NAME, BulkByScrollResponse::new); 
    } 
}

elasticsearch在进行删除动作时,也是通过定义了一个静态的 `INSTANCE` 对象,用于表示"DeleteByQueryAction"的单例实例。同时,定义了一个常量 `NAME` ,表示该动作的名称为"indices:data/write/delete/byquery"。

在构造函数中,调用了父类的构造函数 `super(NAME, BulkByScrollResponse::new)` 。这里传入了动作的名称和一个回调函数,用于创建"BulkByScrollResponse"对象。

关于BulkByScrollResponse返回的大致信息则与前面赘述的基本一致。

值的注意的是update与delete操作都会产生大量的doc.deleted。我们会发现一个有趣的现象。

在大量执行update操作时,我们elasticsearch集群的磁盘使用率会出现一定程度的膨胀,在一定时间之后磁盘使用率才会出现下降并与对数据进行update操作前的磁盘使用率趋于一致。

同样的,很多时候我们在通过delete_by_query 删除数据时,观察集群的磁盘使用率,发现磁盘使用率并不会立刻出现下降,而是极为缓慢的逐渐下降趋势。

这是因为在elasticsearch中当文档被标记为删除状态后,elasticsearch会有一个merge操作(也称为段合并)。

关于更新操作后,产生的doc.deleted文档elasticsearch会如何进行merge,以及merge过程中可能会出现哪些问题则会在后续文章进行讨论。

我正在参与2023腾讯技术创作特训营第三期有奖征文,组队打卡瓜分大奖!

0 人点赞