Elasticsearch7.0.0~7.6.2版本bug:大量文档update之后refresh慢

2022-06-05 21:16:10 浏览数 (1)

1 背景

一次日常运营时,用户反馈ES无法写入数据,且出现写拒绝和写入队列堆积,集群健康状态为red。依据这些现象,查看了如下指标:

  • tasks 如图1-1 所示 task中有大量的refresh、write任务,且耗时非常久
图1-1 ES运行中的task图1-1 ES运行中的task
  • translog 集群健康状态为red,原因为有索引的分片一直卡在初始化状态。依照以往经验,如果分片在没有shard limit的限制,一直卡在初始化状态;则shard恢复慢主要是在translog回放;通过查看translog文件,果不其然translog文件非常大,如图1-2 所示。
图1-2 ES中某索引的某shard的translog文件图1-2 ES中某索引的某shard的translog文件
  • flush情况 按照正常情况,单个translog文件是不会这么大。因此查看了下flush的情况(flush会触发trasnlog切割),发现flush被卡住了无法执行;flush的耗时监控如图1-3所示
图1-3 ES节点的Flush耗时情况图1-3 ES节点的Flush耗时情况
  • 堆栈信息 因为出现了refresh,write,flush等线程任务耗时异常的问题,所以可以通过jstack捕捉线程的运行情况,查看线程是怎样被卡住的;堆栈关键信息如下:
代码语言:txt复制
"elasticsearch[node-10][flush][T#5]" #193 daemon prio=5 os_prio=0 cpu=247.85ms elapsed=9924.36s tid=0x00007f0bd4002800 nid=0x4e9 waiting for monitor entry  [0x00007f0a53af9000]
   java.lang.Thread.State: BLOCKED (on object monitor)
        at org.apache.lucene.index.IndexWriter.prepareCommitInternal(IndexWriter.java:3197)
        - waiting to lock <0x00000004b91838d8> (a java.lang.Object)
        - locked <0x00000004b91838c0> (a java.lang.Object)
        at org.apache.lucene.index.IndexWriter.commitInternal(IndexWriter.java:3445)
        - locked <0x00000004b91838c0> (a java.lang.Object)
        at org.apache.lucene.index.IndexWriter.commit(IndexWriter.java:3410)
        at org.elasticsearch.index.engine.InternalEngine.commitIndexWriter(InternalEngine.java:2446)
        at org.elasticsearch.index.engine.InternalEngine.flush(InternalEngine.java:1780)
        at org.elasticsearch.index.shard.IndexShard.flush(IndexShard.java:1092)
        at org.elasticsearch.index.shard.IndexShard$6.doRun(IndexShard.java:3138)
        at org.elasticsearch.common.util.concurrent.ThreadContext$ContextPreservingAbstractRunnable.doRun(ThreadContext.java:773)
        at org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:37)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(java.base@13.0.1/ThreadPoolExecutor.java:1128)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(java.base@13.0.1/ThreadPoolExecutor.java:628)
        at java.lang.Thread.run(java.base@13.0.1/Thread.java:830)

"elasticsearch[node-10][refresh][T#3]" #236 daemon prio=5 os_prio=0 cpu=2113167.75ms elapsed=9552.19s tid=0x00007f0bd430c000 nid=0x729 runnable  [0x00007f0b413d4000]
   java.lang.Thread.State: RUNNABLE
        at org.apache.lucene.index.FrozenBufferedUpdates.applyDocValuesUpdates(FrozenBufferedUpdates.java:564)
        at org.apache.lucene.index.FrozenBufferedUpdates.applyDocValuesUpdates(FrozenBufferedUpdates.java:451)
        at org.apache.lucene.index.FrozenBufferedUpdates.apply(FrozenBufferedUpdates.java:421)
        at org.apache.lucene.index.FrozenBufferedUpdates.forceApply(FrozenBufferedUpdates.java:249)
        at org.apache.lucene.index.FrozenBufferedUpdates.tryApply(FrozenBufferedUpdates.java:159)
        at org.apache.lucene.index.IndexWriter.lambda$publishFrozenUpdates$3(IndexWriter.java:2592)
        at org.apache.lucene.index.IndexWriter$$Lambda$4640/0x0000000801952440.process(Unknown Source)
        at org.apache.lucene.index.IndexWriter.processEvents(IndexWriter.java:5116)
        at org.apache.lucene.index.IndexWriter.getReader(IndexWriter.java:507)
        - locked <0x00000004b91838d8> (a java.lang.Object)
        at org.apache.lucene.index.StandardDirectoryReader.doOpenFromWriter(StandardDirectoryReader.java:297)
        at org.apache.lucene.index.StandardDirectoryReader.doOpenIfChanged(StandardDirectoryReader.java:272)
        at org.apache.lucene.index.StandardDirectoryReader.doOpenIfChanged(StandardDirectoryReader.java:262)
        at org.apache.lucene.index.FilterDirectoryReader.doOpenIfChanged(FilterDirectoryReader.java:112)
        at org.apache.lucene.index.DirectoryReader.openIfChanged(DirectoryReader.java:165)
        at org.elasticsearch.index.engine.ElasticsearchReaderManager.refreshIfNeeded(ElasticsearchReaderManager.java:66)
        at org.elasticsearch.index.engine.ElasticsearchReaderManager.refreshIfNeeded(ElasticsearchReaderManager.java:40)
        at org.apache.lucene.search.ReferenceManager.doMaybeRefresh(ReferenceManager.java:176)
        at org.apache.lucene.search.ReferenceManager.maybeRefreshBlocking(ReferenceManager.java:253)
        at org.elasticsearch.index.engine.InternalEngine$ExternalReaderManager.refreshIfNeeded(InternalEngine.java:339)
        at org.elasticsearch.index.engine.InternalEngine$ExternalReaderManager.refreshIfNeeded(InternalEngine.java:321)
        at org.apache.lucene.search.ReferenceManager.doMaybeRefresh(ReferenceManager.java:176)
        at org.apache.lucene.search.ReferenceManager.maybeRefresh(ReferenceManager.java:225)
        at org.elasticsearch.index.engine.InternalEngine.refresh(InternalEngine.java:1606)
        at org.elasticsearch.index.engine.InternalEngine.maybeRefresh(InternalEngine.java:1585)
        at org.elasticsearch.index.shard.IndexShard.scheduledRefresh(IndexShard.java:3241)
        at org.elasticsearch.index.IndexService.maybeRefreshEngine(IndexService.java:791)
        at org.elasticsearch.index.IndexService.access$200(IndexService.java:104)
        at org.elasticsearch.index.IndexService$AsyncRefreshTask.runInternal(IndexService.java:925)
        at org.elasticsearch.common.util.concurrent.AbstractAsyncTask.run(AbstractAsyncTask.java:144)
        at org.elasticsearch.common.util.concurrent.ThreadContext$ContextPreservingRunnable.run(ThreadContext.java:703)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(java.base@13.0.1/ThreadPoolExecutor.java:1128)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(java.base@13.0.1/ThreadPoolExecutor.java:628)
        at java.lang.Thread.run(java.base@13.0.1/Thread.java:830)

从堆栈信息可以看出,refresh在FrozenBufferedUpdates.applyDocValuesUpdates卡住,并且没有释放0x00000004b91838d8 锁,而flush在等待该锁资源。

  • CPU耗时火焰图 从火焰图也可以看出org.apache.lucene.index.FrozenBufferedUpdates#applyDocValuesUpdates是最耗时间的占用了98%的CPU执行时间;如图1-4所示
图1-4 ES节点的火焰图图1-4 ES节点的火焰图

2 故障原因

Q1 为什么refresh慢甚至夯住?

A1: es在6.5.0 之后引入soft_delete特性;es7.0.0之后其默认值为true ;但是其存在一个bug(ES52146 、Lucene9228 ),如果一个索引表存在大量update,则refresh会非常慢甚至夯住;该bug在7.7版本之后进行了修复;该bug的说明可查看本文参考部分的【1】【2】【4】

Q2 为什么translog的单个文件会超过13GB?

A2: refresh慢没有释放锁, flush等待refresh释放锁而卡主,无法执行flush;进而translog没有被正确处理(flush会触发translog文件上卷切割),translog文件变的越来越大

Q3 为什么写入超时,写入队列堆积和写入拒绝?

A3: 因为在写入添加文档时,同样会执行到FrozenBufferedUpdates.applyDocValuesUpdates,该方法会执行耗费大量时间,导致写入变慢;可以看如下堆栈信息

代码语言:txt复制
"elasticsearch[node-3][write][T#5]" #232 daemon prio=5 os_prio=0 cpu=5315909.50ms elapsed=31254.63s tid=0x00007f99d8104800 nid=0x6f4d runnable  [0x00007f992dfdc000]
   java.lang.Thread.State: RUNNABLE
	at org.apache.lucene.index.FrozenBufferedUpdates.applyDocValuesUpdates(FrozenBufferedUpdates.java:568)
	at org.apache.lucene.index.FrozenBufferedUpdates.applyDocValuesUpdates(FrozenBufferedUpdates.java:451)
	at org.apache.lucene.index.FrozenBufferedUpdates.apply(FrozenBufferedUpdates.java:421)
	at org.apache.lucene.index.FrozenBufferedUpdates.forceApply(FrozenBufferedUpdates.java:249)
	at org.apache.lucene.index.FrozenBufferedUpdates.tryApply(FrozenBufferedUpdates.java:159)
	at org.apache.lucene.index.IndexWriter.lambda$publishFrozenUpdates$3(IndexWriter.java:2592)
	at org.apache.lucene.index.IndexWriter$$Lambda$4484/0x0000000801927040.process(Unknown Source)
	at org.apache.lucene.index.IndexWriter.processEvents(IndexWriter.java:5116)
	at org.apache.lucene.index.IndexWriter.updateDocument(IndexWriter.java:1597)
	at org.apache.lucene.index.IndexWriter.softUpdateDocument(IndexWriter.java:1654)
	at org.elasticsearch.index.engine.InternalEngine.updateDocs(InternalEngine.java:1254)
	at org.elasticsearch.index.engine.InternalEngine.indexIntoLucene(InternalEngine.java:1085)
	at org.elasticsearch.index.engine.InternalEngine.index(InternalEngine.java:929)
	at org.elasticsearch.index.shard.IndexShard.index(IndexShard.java:811)
	at org.elasticsearch.index.shard.IndexShard.applyIndexOperation(IndexShard.java:783)
	at org.elasticsearch.index.shard.IndexShard.applyIndexOperationOnPrimary(IndexShard.java:740)
	at org.elasticsearch.action.bulk.TransportShardBulkAction.executeBulkItemRequest(TransportShardBulkAction.java:258)
	at org.elasticsearch.action.bulk.TransportShardBulkAction$2.doRun(TransportShardBulkAction.java:161)
	at org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:37)
	at org.elasticsearch.action.bulk.TransportShardBulkAction.performOnPrimary(TransportShardBulkAction.java:193)
	at org.elasticsearch.action.bulk.TransportShardBulkAction.shardOperationOnPrimary(TransportShardBulkAction.java:118)
	at org.elasticsearch.action.bulk.TransportShardBulkAction.shardOperationOnPrimary(TransportShardBulkAction.java:79)
	at org.elasticsearch.action.support.replication.TransportReplicationAction$PrimaryShardReference.perform(TransportReplicationAction.java:917)
	at org.elasticsearch.action.support.replication.ReplicationOperation.execute(ReplicationOperation.java:108)
	at org.elasticsearch.action.support.replication.TransportReplicationAction$AsyncPrimaryAction.runWithPrimaryShardReference(TransportReplicationAction.java:394)
	at org.elasticsearch.action.support.replication.TransportReplicationAction$AsyncPrimaryAction.lambda$doRun$0(TransportReplicationAction.java:316)
	at org.elasticsearch.action.support.replication.TransportReplicationAction$AsyncPrimaryAction$$Lambda$4128/0x0000000801886c40.accept(Unknown Source)
	at org.elasticsearch.action.ActionListener$1.onResponse(ActionListener.java:63)
	at org.elasticsearch.index.shard.IndexShard.lambda$wrapPrimaryOperationPermitListener$22(IndexShard.java:2796)
	at org.elasticsearch.index.shard.IndexShard$$Lambda$4111/0x000000080187d840.accept(Unknown Source)
	at org.elasticsearch.action.ActionListener$3.onResponse(ActionListener.java:113)
	at org.elasticsearch.index.shard.IndexShardOperationPermits.acquire(IndexShardOperationPermits.java:285)
	at org.elasticsearch.index.shard.IndexShardOperationPermits.acquire(IndexShardOperationPermits.java:237)
	at org.elasticsearch.index.shard.IndexShard.acquirePrimaryOperationPermit(IndexShard.java:2770)
	at org.elasticsearch.action.support.replication.TransportReplicationAction.acquirePrimaryOperationPermit(TransportReplicationAction.java:858)
	at org.elasticsearch.action.support.replication.TransportReplicationAction$AsyncPrimaryAction.doRun(TransportReplicationAction.java:312)
	at org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:37)
	at org.elasticsearch.action.support.replication.TransportReplicationAction.handlePrimaryRequest(TransportReplicationAction.java:275)
	at org.elasticsearch.action.support.replication.TransportReplicationAction$$Lambda$2741/0x00000008014e2c40.messageReceived(Unknown Source)
	at org.elasticsearch.transport.RequestHandlerRegistry.processMessageReceived(RequestHandlerRegistry.java:63)
	at org.elasticsearch.transport.TransportService$7.doRun(TransportService.java:752)
	at org.elasticsearch.common.util.concurrent.ThreadContext$ContextPreservingAbstractRunnable.doRun(ThreadContext.java:773)
	at org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:37)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(java.base@13.0.1/ThreadPoolExecutor.java:1128)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(java.base@13.0.1/ThreadPoolExecutor.java:628)
	at java.lang.Thread.run(java.base@13.0.1/Thread.java:830)

3 临时解决方案

长久的解决方案,最好是升级ES版本到7.7.0

3.1 脏元数据处理

如果你的集群出现了有节点脏元数据,即该节点上的索引副本认为其是主分片,因为它上面含有最多的新数据(该索引副本主分片已经在其他节点分配完成了),现象表现为节点反复加入和离开集群;处理手段:停掉有问题的节点(因为有索引shard脏数据无法消费集群元信息),将有问题索引副本设置为0,再重启这些节点。

3.2 trasnlog清理

针对translog过大恢复卡住,shard一直处在初始化状态,恢复手段只有清除translog

  • 通过指定目录的方式清除translog
代码语言:txt复制
bin/elasticsearch-shard remove-corrupted-data   --dir /es_home/storage/data/nodes/0/indices/UmZrtruITFyNyTngUlpvng/22/translog/ --truncate-clean-translog
  • 通过指定shard-id和索引名称进行清除
代码语言:txt复制
bin/elasticsearch-shard  --index test  --shard-id 22 --truncate-clean-translog

进行translog文件清除需要,先停止节点;在节点停止的过程中为了避免分片在其他节点进行重新分配,可以进行如下设置

代码语言:txt复制
//平衡与分配策略做调整是为了避免,脏数据的产生与平衡影响集群的恢复
PUT _cluster/settings
{
  "transient": {
    //不允许rebalance
    "cluster.routing.rebalance.enable": "none",
   //只允许新建的索引分配主分片
    "cluster.routing.allocation.enable": "new_primaries"
  }
}

完成清除之后重启节点,执行reroute命令

代码语言:txt复制
POST /_cluster/reroute
{
  "commands" : [
    {
      "allocate_stale_primary" : {
        "index" : "test",
        "shard" : 22,
        "node" : "wW6OXqAqT1e6w7w8kc7222",
       //修改为true
        "accept_data_loss" : true
      }
    }
  ]
}

3.3 refresh慢临时解决方案

  • 调小索引刷新频率,index.refresh_interval设置为1s
  • 关闭index.soft_deletes.enabled,该值默认值为true,将其设置为false;什么是soft_deletes可看本文查看本文参考部分文档【2】
  • 写入并发调整:为避免并发写入过大,update的文档过多而refresh卡住,用户适当调小写入速度。

因为soft_deletes只有创建索引表的时候才能设置,所以只能进行reindex索引,完成reindex;并对原索引进行快照备份删除原始索引,并且将原始索引的名称设置为reindex之后索引的别名(避免用户侧需要改动代码)

reindex 570GB 大概36亿个文档,耗时约5个小时;reindex 速度提升slices

代码语言:txt复制
POST _reindex?wait_for_completion=false&scroll=10m&slices=20
{
  "source": {
    "index": "test",
    "size": 9000
  },
  "dest": {
    "index": "test_v2"
  }
}

⚠️ 注意undefined reindex期间避免有任务误操作,往原索引继续写入数据,将原索引设置为只读PUT test/_settings { "index.blocks.read_only": true }

4 代码分析

4.1 代码调用链

如图4-1所示,refresh和fush在浅红色块,会有锁竞争的关系,竞争的锁分别为fullFlushLock和indexWrite。

图4-1 refresh和flush代码调用链图4-1 refresh和flush代码调用链

在图4-1 中黄色块部分,如果有大量更新之后这块代码会非常耗时甚至夯住。

我们来看一下卡住的这一块代码的逻辑 org.apache.lucene.index.FrozenBufferedUpdates#applyDocValuesUpdates

代码语言:txt复制
  private static long applyDocValuesUpdates(BufferedUpdatesStream.SegmentState segState,
                                            Map<String, FieldUpdatesBuffer> updates,
                                            long delGen,
                                            boolean segmentPrivateDeletes) throws IOException {

    // TODO: we can process the updates per DV field, from last to first so that
    // if multiple terms affect same document for the same field, we add an update
    // only once (that of the last term). To do that, we can keep a bitset which
    // marks which documents have already been updated. So e.g. if term T1
    // updates doc 7, and then we process term T2 and it updates doc 7 as well,
    // we don't apply the update since we know T1 came last and therefore wins
    // the update.
    // We can also use that bitset as 'liveDocs' to pass to TermEnum.docs(), so
    // that these documents aren't even returned.
    long updateCount = 0;
    // We first write all our updates private, and only in the end publish to the ReadersAndUpdates */
    final List<DocValuesFieldUpdates> resolvedUpdates = new ArrayList<>();
    //在es中该map只有一个key,即为__soft_deletes
    for (Map.Entry<String, FieldUpdatesBuffer> fieldUpdate : updates.entrySet()) {
      String updateField = fieldUpdate.getKey();
      DocValuesFieldUpdates dvUpdates = null;
      FieldUpdatesBuffer value = fieldUpdate.getValue();
      boolean isNumeric = value.isNumeric();
      FieldUpdatesBuffer.BufferedUpdateIterator iterator = value.iterator();
      FieldUpdatesBuffer.BufferedUpdate bufferedUpdate;
      TermDocsIterator termDocsIterator = new TermDocsIterator(segState.reader, false);
      //主要问题点是该层循坏没有进行去重,在7.7.0版本之后,同一个Term只会执行一次
      while ((bufferedUpdate = iterator.next()) != null) {
        // TODO: we traverse the terms in update order (not term order) so that we
        // apply the updates in the correct order, i.e. if two terms update the
        // same document, the last one that came in wins, irrespective of the
        // terms lexical order.
        // we can apply the updates in terms order if we keep an updatesGen (and
        // increment it with every update) and attach it to each NumericUpdate. Note
        // that we cannot rely only on docIDUpto because an app may send two updates
        // which will get same docIDUpto, yet will still need to respect the order
        // those updates arrived.
        // TODO: we could at least *collate* by field?
        final DocIdSetIterator docIdSetIterator = termDocsIterator.nextTerm(bufferedUpdate.termField, bufferedUpdate.termValue);
        if (docIdSetIterator != null) {
          final int limit;
          if (delGen == segState.delGen) {
            assert segmentPrivateDeletes;
            limit = bufferedUpdate.docUpTo;
          } else {
            limit = Integer.MAX_VALUE;
          }
          final BytesRef binaryValue;
          final long longValue;
          if (bufferedUpdate.hasValue == false) {
            longValue = -1;
            binaryValue = null;
          } else {
            longValue = bufferedUpdate.numericValue;
            binaryValue = bufferedUpdate.binaryValue;
          }
           termDocsIterator.getDocs();
          if (dvUpdates == null) {
            if (isNumeric) {
              if (value.hasSingleValue()) {
                dvUpdates = new NumericDocValuesFieldUpdates
                    .SingleValueNumericDocValuesFieldUpdates(delGen, updateField, segState.reader.maxDoc(),
                    value.getNumericValue(0));
              } else {
                dvUpdates = new NumericDocValuesFieldUpdates(delGen, updateField, value.getMinNumeric(),
                    value.getMaxNumeric(), segState.reader.maxDoc());
              }
            } else {
              dvUpdates = new BinaryDocValuesFieldUpdates(delGen, updateField, segState.reader.maxDoc());
            }
            resolvedUpdates.add(dvUpdates);
          }
          final IntConsumer docIdConsumer;
          final DocValuesFieldUpdates update = dvUpdates;
          if (bufferedUpdate.hasValue == false) {
            docIdConsumer = doc -> update.reset(doc);
          } else if (isNumeric) {
            docIdConsumer = doc -> update.add(doc, longValue);
          } else {
            docIdConsumer = doc -> update.add(doc, binaryValue);
          }
          final Bits acceptDocs = segState.rld.getLiveDocs();
          if (segState.rld.sortMap != null && segmentPrivateDeletes) {
            // This segment was sorted on flush; we must apply seg-private deletes carefully in this case:
            int doc;
            while ((doc = docIdSetIterator.nextDoc()) != DocIdSetIterator.NO_MORE_DOCS) {
              if (acceptDocs == null || acceptDocs.get(doc)) {
                // The limit is in the pre-sorted doc space:
                if (segState.rld.sortMap.newToOld(doc) < limit) {
                  docIdConsumer.accept(doc);
                  updateCount  ;
                }
              }
            }
          } else {
            int doc;
            while ((doc = docIdSetIterator.nextDoc()) != DocIdSetIterator.NO_MORE_DOCS) {
              if (doc >= limit) {
                break; // no more docs that can be updated for this term
              }
              if (acceptDocs == null || acceptDocs.get(doc)) {
                docIdConsumer.accept(doc);
                updateCount  ;
              }
            }
          }
        }
      }
    }

    // now freeze & publish:
    for (DocValuesFieldUpdates update : resolvedUpdates) {
      if (update.any()) {
        update.finish();
        segState.rld.addDVUpdate(update);
      }
    }

    return updateCount;
  }

通过对applyDocValuesUpdates进行断点debug和es7.7.0版本之后的代码做对比;可以发现问题主要出现在bufferedUpdate = iterator.next()这一行代码,没有去重term执行。假如软更新文档10w次,那么会出现10w*10W迭代【1】。

debug测试代码块如下:

代码语言:txt复制
package com.dirk.soft.delete;

import org.apache.lucene.analysis.core.WhitespaceAnalyzer;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.NumericDocValuesField;
import org.apache.lucene.document.StringField;
import org.apache.lucene.index.*;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.lucene.search.ScoreDoc;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FSDirectory;
import java.io.IOException;
import java.nio.file.Paths;
import java.util.Date;

/**
 * Created by
 *
 * @Author : yang
 * @create 2022/6/5 09:49
 */
public class SoftDeletesTest1 {
    private final static String indexPath = "/Users/yang/workspace/github/lucene-learn/docpath";
    private Directory dir = null;
    {
        try {
            dir = FSDirectory.open(Paths.get(indexPath));
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    //索引存放目录
    // 放在方法外 这个变量能高亮显示
    private IndexWriter indexWriter;
    IndexWriterConfig indexWriterConfig;
    public void doIndexAndSearch() throws Exception {
        indexWriterConfig = new IndexWriterConfig(new WhitespaceAnalyzer());
        indexWriterConfig.setSoftDeletesField("__soft_deletes");
        indexWriterConfig.setUseCompoundFile(true);
        indexWriterConfig.setMergePolicy(NoMergePolicy.INSTANCE);
        indexWriter = new IndexWriter(dir, indexWriterConfig);
        Document doc;
        // 文档0
        doc = new Document();
        doc.add(new StringField("_id", "0", Field.Store.YES));
        doc.add(new NumericDocValuesField("docValuesFiled", 2));
        doc.add(new StringField("_version", "0", Field.Store.YES));
        indexWriter.addDocument(doc);
        // 文档1
        doc = new Document();
        doc.add(new StringField("_id", "1", Field.Store.YES));
        doc.add(new NumericDocValuesField("docValuesFiled", 3));
        doc.add(new StringField("_version", "1", Field.Store.YES));
        indexWriter.addDocument(doc);
        // 第一次软删除
        Document newDoc = new Document();
        newDoc.add(new StringField("_id", "1", Field.Store.YES));
        newDoc.add(new StringField("_version", "2", Field.Store.YES));
        indexWriter.softUpdateDocument(new Term("_id", "1"), newDoc, new NumericDocValuesField("__soft_deletes", 3));
        // 第二次软删除
        newDoc = new Document();
        newDoc.add(new StringField("_id", "1", Field.Store.YES));
        newDoc.add(new StringField("_version", "3", Field.Store.YES));
        indexWriter.softUpdateDocument(new Term("_id", "1"), newDoc, new NumericDocValuesField("__soft_deletes", 3));
        // 第三次软删除
        newDoc = new Document();
        newDoc.add(new StringField("_id", "1", Field.Store.YES));
        newDoc.add(new StringField("_version", "4", Field.Store.YES));
        indexWriter.softUpdateDocument(new Term("_id", "2"), newDoc, new NumericDocValuesField("__soft_deletes", 3));
        indexWriter.commit();
        DirectoryReader readerBeforeMerge = DirectoryReader.open(indexWriter);
        ScoreDoc[] scoreDocs = (new IndexSearcher(readerBeforeMerge)).search(new MatchAllDocsQuery(), 100).scoreDocs;
        for (ScoreDoc scoreDoc : scoreDocs) {
            System.out.println("time:" new Date()  ";  docId: 文档"   scoreDoc.doc  
                    ", FieldValue of Field abc: "   readerBeforeMerge.document(scoreDoc.doc).get("_id")  
                    ",_version:" readerBeforeMerge.document(scoreDoc.doc).get("_version"));
        }
    }

    public static void main(String[] args) throws Exception{
        SoftDeletesTest1 test = new SoftDeletesTest1();
        test.doIndexAndSearch();
    }
}

执行结果

代码语言:txt复制
time:Sun Jun 05 17:22:15 CST 2022;  docId: 文档0, FieldValue of Field abc: 0,_version:0
time:Sun Jun 05 17:22:15 CST 2022;  docId: 文档3, FieldValue of Field abc: 1,_version:3
time:Sun Jun 05 17:22:15 CST 2022;  docId: 文档4, FieldValue of Field abc: 1,_version:4

debug的applyDocValuesUpdates,如图4-2和4-3所示

图4-2 applyDocValuesUpdates的debug1图4-2 applyDocValuesUpdates的debug1
图4-2 applyDocValuesUpdates的debug2图4-2 applyDocValuesUpdates的debug2

5 本地复现

5.1 测试代码

org.elasticsearch.index.engine.EngineTestCase#testRefresh

代码语言:txt复制
public void  testRefresh(){
        long startTime = System.currentTimeMillis();
        int i  = 0 ;
        while (i  <100000) {
            try {
                //更新文档id为1的文档10w次
                engine.index(indexForDoc(createParsedDoc("1", null)));
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        logger.info("updating 耗时 :{}ms",(System.currentTimeMillis() - startTime));
        startTime = System.currentTimeMillis();
        engine.refresh("test", randomFrom(Engine.SearcherScope.values()), randomBoolean());
        logger.info("refresh耗时 :{}ms",(System.currentTimeMillis() - startTime));
    }

5.2 测试结果

分别测试对es7.5.2版本和es7.10.2进行了测试:

  • 10w次更新,refresh耗时: es7.5.2 10次测试结果中7次耗时1s以内,其余三次在30s ; es7.10.2 10次测试结果均在1s以内
  • 50W次更新,refresh耗时: es7.5.2 Suite timeout exceeded (>= 1200000 msec); es7.10.2 10次测试结果均在3s以内

5.3 待进一步探索的问题

为什么测试10W、50W更新的时候,7.5.2时间有长有短?按照逻辑来说应该执行refresh时间比较长?

6 参考

【1】 总结近期遇到的几个问题

【2】 Elasticsearch 7.4的 soft-deletes 是个什么鬼

【3】 清除translog

【4】 soft-delete 可能导致 write queue 持续积压的问题

【5】 elasticsearch translog恢复到一定百分比卡住(stuck),导致索引无法恢复

【6】 Lucene软删除

0 人点赞