1 背景
一次日常运营时,用户反馈ES无法写入数据,且出现写拒绝和写入队列堆积,集群健康状态为red。依据这些现象,查看了如下指标:
- tasks 如图1-1 所示 task中有大量的refresh、write任务,且耗时非常久
- translog 集群健康状态为red,原因为有索引的分片一直卡在初始化状态。依照以往经验,如果分片在没有shard limit的限制,一直卡在初始化状态;则shard恢复慢主要是在translog回放;通过查看translog文件,果不其然translog文件非常大,如图1-2 所示。
- flush情况 按照正常情况,单个translog文件是不会这么大。因此查看了下flush的情况(flush会触发trasnlog切割),发现flush被卡住了无法执行;flush的耗时监控如图1-3所示
- 堆栈信息 因为出现了refresh,write,flush等线程任务耗时异常的问题,所以可以通过jstack捕捉线程的运行情况,查看线程是怎样被卡住的;堆栈关键信息如下:
"elasticsearch[node-10][flush][T#5]" #193 daemon prio=5 os_prio=0 cpu=247.85ms elapsed=9924.36s tid=0x00007f0bd4002800 nid=0x4e9 waiting for monitor entry [0x00007f0a53af9000]
java.lang.Thread.State: BLOCKED (on object monitor)
at org.apache.lucene.index.IndexWriter.prepareCommitInternal(IndexWriter.java:3197)
- waiting to lock <0x00000004b91838d8> (a java.lang.Object)
- locked <0x00000004b91838c0> (a java.lang.Object)
at org.apache.lucene.index.IndexWriter.commitInternal(IndexWriter.java:3445)
- locked <0x00000004b91838c0> (a java.lang.Object)
at org.apache.lucene.index.IndexWriter.commit(IndexWriter.java:3410)
at org.elasticsearch.index.engine.InternalEngine.commitIndexWriter(InternalEngine.java:2446)
at org.elasticsearch.index.engine.InternalEngine.flush(InternalEngine.java:1780)
at org.elasticsearch.index.shard.IndexShard.flush(IndexShard.java:1092)
at org.elasticsearch.index.shard.IndexShard$6.doRun(IndexShard.java:3138)
at org.elasticsearch.common.util.concurrent.ThreadContext$ContextPreservingAbstractRunnable.doRun(ThreadContext.java:773)
at org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:37)
at java.util.concurrent.ThreadPoolExecutor.runWorker(java.base@13.0.1/ThreadPoolExecutor.java:1128)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(java.base@13.0.1/ThreadPoolExecutor.java:628)
at java.lang.Thread.run(java.base@13.0.1/Thread.java:830)
"elasticsearch[node-10][refresh][T#3]" #236 daemon prio=5 os_prio=0 cpu=2113167.75ms elapsed=9552.19s tid=0x00007f0bd430c000 nid=0x729 runnable [0x00007f0b413d4000]
java.lang.Thread.State: RUNNABLE
at org.apache.lucene.index.FrozenBufferedUpdates.applyDocValuesUpdates(FrozenBufferedUpdates.java:564)
at org.apache.lucene.index.FrozenBufferedUpdates.applyDocValuesUpdates(FrozenBufferedUpdates.java:451)
at org.apache.lucene.index.FrozenBufferedUpdates.apply(FrozenBufferedUpdates.java:421)
at org.apache.lucene.index.FrozenBufferedUpdates.forceApply(FrozenBufferedUpdates.java:249)
at org.apache.lucene.index.FrozenBufferedUpdates.tryApply(FrozenBufferedUpdates.java:159)
at org.apache.lucene.index.IndexWriter.lambda$publishFrozenUpdates$3(IndexWriter.java:2592)
at org.apache.lucene.index.IndexWriter$$Lambda$4640/0x0000000801952440.process(Unknown Source)
at org.apache.lucene.index.IndexWriter.processEvents(IndexWriter.java:5116)
at org.apache.lucene.index.IndexWriter.getReader(IndexWriter.java:507)
- locked <0x00000004b91838d8> (a java.lang.Object)
at org.apache.lucene.index.StandardDirectoryReader.doOpenFromWriter(StandardDirectoryReader.java:297)
at org.apache.lucene.index.StandardDirectoryReader.doOpenIfChanged(StandardDirectoryReader.java:272)
at org.apache.lucene.index.StandardDirectoryReader.doOpenIfChanged(StandardDirectoryReader.java:262)
at org.apache.lucene.index.FilterDirectoryReader.doOpenIfChanged(FilterDirectoryReader.java:112)
at org.apache.lucene.index.DirectoryReader.openIfChanged(DirectoryReader.java:165)
at org.elasticsearch.index.engine.ElasticsearchReaderManager.refreshIfNeeded(ElasticsearchReaderManager.java:66)
at org.elasticsearch.index.engine.ElasticsearchReaderManager.refreshIfNeeded(ElasticsearchReaderManager.java:40)
at org.apache.lucene.search.ReferenceManager.doMaybeRefresh(ReferenceManager.java:176)
at org.apache.lucene.search.ReferenceManager.maybeRefreshBlocking(ReferenceManager.java:253)
at org.elasticsearch.index.engine.InternalEngine$ExternalReaderManager.refreshIfNeeded(InternalEngine.java:339)
at org.elasticsearch.index.engine.InternalEngine$ExternalReaderManager.refreshIfNeeded(InternalEngine.java:321)
at org.apache.lucene.search.ReferenceManager.doMaybeRefresh(ReferenceManager.java:176)
at org.apache.lucene.search.ReferenceManager.maybeRefresh(ReferenceManager.java:225)
at org.elasticsearch.index.engine.InternalEngine.refresh(InternalEngine.java:1606)
at org.elasticsearch.index.engine.InternalEngine.maybeRefresh(InternalEngine.java:1585)
at org.elasticsearch.index.shard.IndexShard.scheduledRefresh(IndexShard.java:3241)
at org.elasticsearch.index.IndexService.maybeRefreshEngine(IndexService.java:791)
at org.elasticsearch.index.IndexService.access$200(IndexService.java:104)
at org.elasticsearch.index.IndexService$AsyncRefreshTask.runInternal(IndexService.java:925)
at org.elasticsearch.common.util.concurrent.AbstractAsyncTask.run(AbstractAsyncTask.java:144)
at org.elasticsearch.common.util.concurrent.ThreadContext$ContextPreservingRunnable.run(ThreadContext.java:703)
at java.util.concurrent.ThreadPoolExecutor.runWorker(java.base@13.0.1/ThreadPoolExecutor.java:1128)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(java.base@13.0.1/ThreadPoolExecutor.java:628)
at java.lang.Thread.run(java.base@13.0.1/Thread.java:830)
从堆栈信息可以看出,refresh在FrozenBufferedUpdates.applyDocValuesUpdates
卡住,并且没有释放0x00000004b91838d8
锁,而flush在等待该锁资源。
- CPU耗时火焰图
从火焰图也可以看出
org.apache.lucene.index.FrozenBufferedUpdates#applyDocValuesUpdates
是最耗时间的占用了98%的CPU执行时间;如图1-4所示
2 故障原因
Q1 为什么refresh慢甚至夯住?
A1: es在6.5.0 之后引入soft_delete特性;es7.0.0之后其默认值为true ;但是其存在一个bug(ES52146 、Lucene9228 ),如果一个索引表存在大量update,则refresh会非常慢甚至夯住;该bug在7.7版本之后进行了修复;该bug的说明可查看本文参考部分的【1】【2】【4】
Q2 为什么translog的单个文件会超过13GB?
A2: refresh慢没有释放锁, flush等待refresh释放锁而卡主,无法执行flush;进而translog没有被正确处理(flush会触发translog文件上卷切割),translog文件变的越来越大
Q3 为什么写入超时,写入队列堆积和写入拒绝?
A3: 因为在写入添加文档时,同样会执行到FrozenBufferedUpdates.applyDocValuesUpdates
,该方法会执行耗费大量时间,导致写入变慢;可以看如下堆栈信息
"elasticsearch[node-3][write][T#5]" #232 daemon prio=5 os_prio=0 cpu=5315909.50ms elapsed=31254.63s tid=0x00007f99d8104800 nid=0x6f4d runnable [0x00007f992dfdc000]
java.lang.Thread.State: RUNNABLE
at org.apache.lucene.index.FrozenBufferedUpdates.applyDocValuesUpdates(FrozenBufferedUpdates.java:568)
at org.apache.lucene.index.FrozenBufferedUpdates.applyDocValuesUpdates(FrozenBufferedUpdates.java:451)
at org.apache.lucene.index.FrozenBufferedUpdates.apply(FrozenBufferedUpdates.java:421)
at org.apache.lucene.index.FrozenBufferedUpdates.forceApply(FrozenBufferedUpdates.java:249)
at org.apache.lucene.index.FrozenBufferedUpdates.tryApply(FrozenBufferedUpdates.java:159)
at org.apache.lucene.index.IndexWriter.lambda$publishFrozenUpdates$3(IndexWriter.java:2592)
at org.apache.lucene.index.IndexWriter$$Lambda$4484/0x0000000801927040.process(Unknown Source)
at org.apache.lucene.index.IndexWriter.processEvents(IndexWriter.java:5116)
at org.apache.lucene.index.IndexWriter.updateDocument(IndexWriter.java:1597)
at org.apache.lucene.index.IndexWriter.softUpdateDocument(IndexWriter.java:1654)
at org.elasticsearch.index.engine.InternalEngine.updateDocs(InternalEngine.java:1254)
at org.elasticsearch.index.engine.InternalEngine.indexIntoLucene(InternalEngine.java:1085)
at org.elasticsearch.index.engine.InternalEngine.index(InternalEngine.java:929)
at org.elasticsearch.index.shard.IndexShard.index(IndexShard.java:811)
at org.elasticsearch.index.shard.IndexShard.applyIndexOperation(IndexShard.java:783)
at org.elasticsearch.index.shard.IndexShard.applyIndexOperationOnPrimary(IndexShard.java:740)
at org.elasticsearch.action.bulk.TransportShardBulkAction.executeBulkItemRequest(TransportShardBulkAction.java:258)
at org.elasticsearch.action.bulk.TransportShardBulkAction$2.doRun(TransportShardBulkAction.java:161)
at org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:37)
at org.elasticsearch.action.bulk.TransportShardBulkAction.performOnPrimary(TransportShardBulkAction.java:193)
at org.elasticsearch.action.bulk.TransportShardBulkAction.shardOperationOnPrimary(TransportShardBulkAction.java:118)
at org.elasticsearch.action.bulk.TransportShardBulkAction.shardOperationOnPrimary(TransportShardBulkAction.java:79)
at org.elasticsearch.action.support.replication.TransportReplicationAction$PrimaryShardReference.perform(TransportReplicationAction.java:917)
at org.elasticsearch.action.support.replication.ReplicationOperation.execute(ReplicationOperation.java:108)
at org.elasticsearch.action.support.replication.TransportReplicationAction$AsyncPrimaryAction.runWithPrimaryShardReference(TransportReplicationAction.java:394)
at org.elasticsearch.action.support.replication.TransportReplicationAction$AsyncPrimaryAction.lambda$doRun$0(TransportReplicationAction.java:316)
at org.elasticsearch.action.support.replication.TransportReplicationAction$AsyncPrimaryAction$$Lambda$4128/0x0000000801886c40.accept(Unknown Source)
at org.elasticsearch.action.ActionListener$1.onResponse(ActionListener.java:63)
at org.elasticsearch.index.shard.IndexShard.lambda$wrapPrimaryOperationPermitListener$22(IndexShard.java:2796)
at org.elasticsearch.index.shard.IndexShard$$Lambda$4111/0x000000080187d840.accept(Unknown Source)
at org.elasticsearch.action.ActionListener$3.onResponse(ActionListener.java:113)
at org.elasticsearch.index.shard.IndexShardOperationPermits.acquire(IndexShardOperationPermits.java:285)
at org.elasticsearch.index.shard.IndexShardOperationPermits.acquire(IndexShardOperationPermits.java:237)
at org.elasticsearch.index.shard.IndexShard.acquirePrimaryOperationPermit(IndexShard.java:2770)
at org.elasticsearch.action.support.replication.TransportReplicationAction.acquirePrimaryOperationPermit(TransportReplicationAction.java:858)
at org.elasticsearch.action.support.replication.TransportReplicationAction$AsyncPrimaryAction.doRun(TransportReplicationAction.java:312)
at org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:37)
at org.elasticsearch.action.support.replication.TransportReplicationAction.handlePrimaryRequest(TransportReplicationAction.java:275)
at org.elasticsearch.action.support.replication.TransportReplicationAction$$Lambda$2741/0x00000008014e2c40.messageReceived(Unknown Source)
at org.elasticsearch.transport.RequestHandlerRegistry.processMessageReceived(RequestHandlerRegistry.java:63)
at org.elasticsearch.transport.TransportService$7.doRun(TransportService.java:752)
at org.elasticsearch.common.util.concurrent.ThreadContext$ContextPreservingAbstractRunnable.doRun(ThreadContext.java:773)
at org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:37)
at java.util.concurrent.ThreadPoolExecutor.runWorker(java.base@13.0.1/ThreadPoolExecutor.java:1128)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(java.base@13.0.1/ThreadPoolExecutor.java:628)
at java.lang.Thread.run(java.base@13.0.1/Thread.java:830)
3 临时解决方案
长久的解决方案,最好是升级ES版本到7.7.0
3.1 脏元数据处理
如果你的集群出现了有节点脏元数据,即该节点上的索引副本认为其是主分片,因为它上面含有最多的新数据(该索引副本主分片已经在其他节点分配完成了),现象表现为节点反复加入和离开集群;处理手段:停掉有问题的节点(因为有索引shard脏数据无法消费集群元信息),将有问题索引副本设置为0,再重启这些节点。
3.2 trasnlog清理
针对translog过大恢复卡住,shard一直处在初始化状态,恢复手段只有清除translog
- 通过指定目录的方式清除translog
bin/elasticsearch-shard remove-corrupted-data --dir /es_home/storage/data/nodes/0/indices/UmZrtruITFyNyTngUlpvng/22/translog/ --truncate-clean-translog
- 通过指定shard-id和索引名称进行清除
bin/elasticsearch-shard --index test --shard-id 22 --truncate-clean-translog
进行translog文件清除需要,先停止节点;在节点停止的过程中为了避免分片在其他节点进行重新分配,可以进行如下设置
代码语言:txt复制//平衡与分配策略做调整是为了避免,脏数据的产生与平衡影响集群的恢复
PUT _cluster/settings
{
"transient": {
//不允许rebalance
"cluster.routing.rebalance.enable": "none",
//只允许新建的索引分配主分片
"cluster.routing.allocation.enable": "new_primaries"
}
}
完成清除之后重启节点,执行reroute命令
代码语言:txt复制POST /_cluster/reroute
{
"commands" : [
{
"allocate_stale_primary" : {
"index" : "test",
"shard" : 22,
"node" : "wW6OXqAqT1e6w7w8kc7222",
//修改为true
"accept_data_loss" : true
}
}
]
}
3.3 refresh慢临时解决方案
- 调小索引刷新频率,index.refresh_interval设置为1s
- 关闭index.soft_deletes.enabled,该值默认值为true,将其设置为false;什么是soft_deletes可看本文查看本文参考部分文档【2】
- 写入并发调整:为避免并发写入过大,update的文档过多而refresh卡住,用户适当调小写入速度。
因为soft_deletes只有创建索引表的时候才能设置,所以只能进行reindex索引,完成reindex;并对原索引进行快照备份删除原始索引,并且将原始索引的名称设置为reindex之后索引的别名(避免用户侧需要改动代码)
reindex 570GB 大概36亿个文档,耗时约5个小时;reindex 速度提升slices
代码语言:txt复制POST _reindex?wait_for_completion=false&scroll=10m&slices=20
{
"source": {
"index": "test",
"size": 9000
},
"dest": {
"index": "test_v2"
}
}
⚠️ 注意undefined reindex期间避免有任务误操作,往原索引继续写入数据,将原索引设置为只读PUT test/_settings { "index.blocks.read_only": true }
4 代码分析
4.1 代码调用链
如图4-1所示,refresh和fush在浅红色块,会有锁竞争的关系,竞争的锁分别为fullFlushLock和indexWrite。
在图4-1 中黄色块部分,如果有大量更新之后这块代码会非常耗时甚至夯住。
我们来看一下卡住的这一块代码的逻辑 org.apache.lucene.index.FrozenBufferedUpdates#applyDocValuesUpdates
;
private static long applyDocValuesUpdates(BufferedUpdatesStream.SegmentState segState,
Map<String, FieldUpdatesBuffer> updates,
long delGen,
boolean segmentPrivateDeletes) throws IOException {
// TODO: we can process the updates per DV field, from last to first so that
// if multiple terms affect same document for the same field, we add an update
// only once (that of the last term). To do that, we can keep a bitset which
// marks which documents have already been updated. So e.g. if term T1
// updates doc 7, and then we process term T2 and it updates doc 7 as well,
// we don't apply the update since we know T1 came last and therefore wins
// the update.
// We can also use that bitset as 'liveDocs' to pass to TermEnum.docs(), so
// that these documents aren't even returned.
long updateCount = 0;
// We first write all our updates private, and only in the end publish to the ReadersAndUpdates */
final List<DocValuesFieldUpdates> resolvedUpdates = new ArrayList<>();
//在es中该map只有一个key,即为__soft_deletes
for (Map.Entry<String, FieldUpdatesBuffer> fieldUpdate : updates.entrySet()) {
String updateField = fieldUpdate.getKey();
DocValuesFieldUpdates dvUpdates = null;
FieldUpdatesBuffer value = fieldUpdate.getValue();
boolean isNumeric = value.isNumeric();
FieldUpdatesBuffer.BufferedUpdateIterator iterator = value.iterator();
FieldUpdatesBuffer.BufferedUpdate bufferedUpdate;
TermDocsIterator termDocsIterator = new TermDocsIterator(segState.reader, false);
//主要问题点是该层循坏没有进行去重,在7.7.0版本之后,同一个Term只会执行一次
while ((bufferedUpdate = iterator.next()) != null) {
// TODO: we traverse the terms in update order (not term order) so that we
// apply the updates in the correct order, i.e. if two terms update the
// same document, the last one that came in wins, irrespective of the
// terms lexical order.
// we can apply the updates in terms order if we keep an updatesGen (and
// increment it with every update) and attach it to each NumericUpdate. Note
// that we cannot rely only on docIDUpto because an app may send two updates
// which will get same docIDUpto, yet will still need to respect the order
// those updates arrived.
// TODO: we could at least *collate* by field?
final DocIdSetIterator docIdSetIterator = termDocsIterator.nextTerm(bufferedUpdate.termField, bufferedUpdate.termValue);
if (docIdSetIterator != null) {
final int limit;
if (delGen == segState.delGen) {
assert segmentPrivateDeletes;
limit = bufferedUpdate.docUpTo;
} else {
limit = Integer.MAX_VALUE;
}
final BytesRef binaryValue;
final long longValue;
if (bufferedUpdate.hasValue == false) {
longValue = -1;
binaryValue = null;
} else {
longValue = bufferedUpdate.numericValue;
binaryValue = bufferedUpdate.binaryValue;
}
termDocsIterator.getDocs();
if (dvUpdates == null) {
if (isNumeric) {
if (value.hasSingleValue()) {
dvUpdates = new NumericDocValuesFieldUpdates
.SingleValueNumericDocValuesFieldUpdates(delGen, updateField, segState.reader.maxDoc(),
value.getNumericValue(0));
} else {
dvUpdates = new NumericDocValuesFieldUpdates(delGen, updateField, value.getMinNumeric(),
value.getMaxNumeric(), segState.reader.maxDoc());
}
} else {
dvUpdates = new BinaryDocValuesFieldUpdates(delGen, updateField, segState.reader.maxDoc());
}
resolvedUpdates.add(dvUpdates);
}
final IntConsumer docIdConsumer;
final DocValuesFieldUpdates update = dvUpdates;
if (bufferedUpdate.hasValue == false) {
docIdConsumer = doc -> update.reset(doc);
} else if (isNumeric) {
docIdConsumer = doc -> update.add(doc, longValue);
} else {
docIdConsumer = doc -> update.add(doc, binaryValue);
}
final Bits acceptDocs = segState.rld.getLiveDocs();
if (segState.rld.sortMap != null && segmentPrivateDeletes) {
// This segment was sorted on flush; we must apply seg-private deletes carefully in this case:
int doc;
while ((doc = docIdSetIterator.nextDoc()) != DocIdSetIterator.NO_MORE_DOCS) {
if (acceptDocs == null || acceptDocs.get(doc)) {
// The limit is in the pre-sorted doc space:
if (segState.rld.sortMap.newToOld(doc) < limit) {
docIdConsumer.accept(doc);
updateCount ;
}
}
}
} else {
int doc;
while ((doc = docIdSetIterator.nextDoc()) != DocIdSetIterator.NO_MORE_DOCS) {
if (doc >= limit) {
break; // no more docs that can be updated for this term
}
if (acceptDocs == null || acceptDocs.get(doc)) {
docIdConsumer.accept(doc);
updateCount ;
}
}
}
}
}
}
// now freeze & publish:
for (DocValuesFieldUpdates update : resolvedUpdates) {
if (update.any()) {
update.finish();
segState.rld.addDVUpdate(update);
}
}
return updateCount;
}
通过对applyDocValuesUpdates
进行断点debug和es7.7.0版本之后的代码做对比;可以发现问题主要出现在bufferedUpdate = iterator.next()
这一行代码,没有去重term执行。假如软更新文档10w次,那么会出现10w*10W迭代【1】。
debug测试代码块如下:
代码语言:txt复制package com.dirk.soft.delete;
import org.apache.lucene.analysis.core.WhitespaceAnalyzer;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.NumericDocValuesField;
import org.apache.lucene.document.StringField;
import org.apache.lucene.index.*;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.lucene.search.ScoreDoc;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FSDirectory;
import java.io.IOException;
import java.nio.file.Paths;
import java.util.Date;
/**
* Created by
*
* @Author : yang
* @create 2022/6/5 09:49
*/
public class SoftDeletesTest1 {
private final static String indexPath = "/Users/yang/workspace/github/lucene-learn/docpath";
private Directory dir = null;
{
try {
dir = FSDirectory.open(Paths.get(indexPath));
} catch (IOException e) {
e.printStackTrace();
}
}
//索引存放目录
// 放在方法外 这个变量能高亮显示
private IndexWriter indexWriter;
IndexWriterConfig indexWriterConfig;
public void doIndexAndSearch() throws Exception {
indexWriterConfig = new IndexWriterConfig(new WhitespaceAnalyzer());
indexWriterConfig.setSoftDeletesField("__soft_deletes");
indexWriterConfig.setUseCompoundFile(true);
indexWriterConfig.setMergePolicy(NoMergePolicy.INSTANCE);
indexWriter = new IndexWriter(dir, indexWriterConfig);
Document doc;
// 文档0
doc = new Document();
doc.add(new StringField("_id", "0", Field.Store.YES));
doc.add(new NumericDocValuesField("docValuesFiled", 2));
doc.add(new StringField("_version", "0", Field.Store.YES));
indexWriter.addDocument(doc);
// 文档1
doc = new Document();
doc.add(new StringField("_id", "1", Field.Store.YES));
doc.add(new NumericDocValuesField("docValuesFiled", 3));
doc.add(new StringField("_version", "1", Field.Store.YES));
indexWriter.addDocument(doc);
// 第一次软删除
Document newDoc = new Document();
newDoc.add(new StringField("_id", "1", Field.Store.YES));
newDoc.add(new StringField("_version", "2", Field.Store.YES));
indexWriter.softUpdateDocument(new Term("_id", "1"), newDoc, new NumericDocValuesField("__soft_deletes", 3));
// 第二次软删除
newDoc = new Document();
newDoc.add(new StringField("_id", "1", Field.Store.YES));
newDoc.add(new StringField("_version", "3", Field.Store.YES));
indexWriter.softUpdateDocument(new Term("_id", "1"), newDoc, new NumericDocValuesField("__soft_deletes", 3));
// 第三次软删除
newDoc = new Document();
newDoc.add(new StringField("_id", "1", Field.Store.YES));
newDoc.add(new StringField("_version", "4", Field.Store.YES));
indexWriter.softUpdateDocument(new Term("_id", "2"), newDoc, new NumericDocValuesField("__soft_deletes", 3));
indexWriter.commit();
DirectoryReader readerBeforeMerge = DirectoryReader.open(indexWriter);
ScoreDoc[] scoreDocs = (new IndexSearcher(readerBeforeMerge)).search(new MatchAllDocsQuery(), 100).scoreDocs;
for (ScoreDoc scoreDoc : scoreDocs) {
System.out.println("time:" new Date() "; docId: 文档" scoreDoc.doc
", FieldValue of Field abc: " readerBeforeMerge.document(scoreDoc.doc).get("_id")
",_version:" readerBeforeMerge.document(scoreDoc.doc).get("_version"));
}
}
public static void main(String[] args) throws Exception{
SoftDeletesTest1 test = new SoftDeletesTest1();
test.doIndexAndSearch();
}
}
执行结果
代码语言:txt复制time:Sun Jun 05 17:22:15 CST 2022; docId: 文档0, FieldValue of Field abc: 0,_version:0
time:Sun Jun 05 17:22:15 CST 2022; docId: 文档3, FieldValue of Field abc: 1,_version:3
time:Sun Jun 05 17:22:15 CST 2022; docId: 文档4, FieldValue of Field abc: 1,_version:4
debug的applyDocValuesUpdates
,如图4-2和4-3所示
5 本地复现
5.1 测试代码
org.elasticsearch.index.engine.EngineTestCase#testRefresh
public void testRefresh(){
long startTime = System.currentTimeMillis();
int i = 0 ;
while (i <100000) {
try {
//更新文档id为1的文档10w次
engine.index(indexForDoc(createParsedDoc("1", null)));
} catch (IOException e) {
e.printStackTrace();
}
}
logger.info("updating 耗时 :{}ms",(System.currentTimeMillis() - startTime));
startTime = System.currentTimeMillis();
engine.refresh("test", randomFrom(Engine.SearcherScope.values()), randomBoolean());
logger.info("refresh耗时 :{}ms",(System.currentTimeMillis() - startTime));
}
5.2 测试结果
分别测试对es7.5.2版本和es7.10.2进行了测试:
- 10w次更新,refresh耗时: es7.5.2 10次测试结果中7次耗时1s以内,其余三次在30s ; es7.10.2 10次测试结果均在1s以内
- 50W次更新,refresh耗时: es7.5.2 Suite timeout exceeded (>= 1200000 msec); es7.10.2 10次测试结果均在3s以内
5.3 待进一步探索的问题
为什么测试10W、50W更新的时候,7.5.2时间有长有短?按照逻辑来说应该执行refresh时间比较长?
6 参考
【1】 总结近期遇到的几个问题
【2】 Elasticsearch 7.4的 soft-deletes 是个什么鬼
【3】 清除translog
【4】 soft-delete 可能导致 write queue 持续积压的问题
【5】 elasticsearch translog恢复到一定百分比卡住(stuck),导致索引无法恢复
【6】 Lucene软删除