在lucene中可以对一篇文档进行添加、修改、删除操作,在这篇文章中我们详细介绍lucene添加文档的流程,对添加文档的源码进行解析。
在lucene中添加文档是通过IndexWriter.addDocument方法,我们先给出添加文档的示例代码
代码语言:javascript复制 IndexWriterConfig config = new IndexWriterConfig(new WhitespaceAnalyzer());
config.setUseCompoundFile(false);
config.setMaxBufferedDocs(2);
IndexWriter writer = new IndexWriter(dir, config);
//
FieldType type = new FieldType();
type.setStored(true);
type.setTokenized(true);
type.setIndexOptions(IndexOptions.DOCS_AND_FREQS_AND_POSITIONS_AND_OFFSETS);
type.setStoreTermVectors(true);
type.setStoreTermVectorPositions(true);
type.setStoreTermVectorOffsets(true);
type.freeze();
//
Document doc = new Document();
doc.add(new Field("content", "one", type));
writer.addDocument(doc);
单文档添加的流程图
我们首先介绍添加文档前的处理流程,在lucene中添加文档前的处理源码如下。
代码语言:javascript复制 private boolean preUpdate() throws IOException, AbortingException {
ensureOpen();
boolean hasEvents = false;
// 判断是否有需要待flush的DWPT
if (flushControl.anyStalledThreads() || (flushControl.numQueuedFlushes() > 0 && config.checkPendingFlushOnUpdate)) {
// Help out flushing any queued DWPTs so we can un-stall:
do {
// Try pick up pending threads here if possible
DocumentsWriterPerThread flushingDWPT;
// 取出每一个DWPT进行flush
while ((flushingDWPT = flushControl.nextPendingFlush()) != null) {
// Don't push the delete here since the update could fail!
// 对该flushingDWPT进行flush
hasEvents |= doFlush(flushingDWPT);
}
flushControl.waitIfStalled(); // block if stalled
} while (flushControl.numQueuedFlushes() != 0); // still queued DWPTs try help flushing
}
return hasEvents;
}
1. DocumentsWriterFlushControl.anyStalledThreads
DocumentsWriterFlushControl通过DocumentsWriterStallControl成员变量中的stalled标志来判断当前是否处于停顿状态。持有相同IndexWriter对象的多个线程并发的添加、更新文档时,每一个线程都会获取一个ThreadState,该ThreadState持有的DWPT在处理完文档后,如果该ThreadState持有的DWPT达到flush条件,处理文档占用的内存会累加到flushBytes中,否则累加到activeBytes中。
多线程并发添加、更新文档会降低内存的健康度,flush一个段会提高内存的健康度,如果添加、更新文档的速度快于flush的速度,如果不控制添加、更新文档的线程,很容易造成内存OOM。lucene内部通过下面的条件来停顿添加、更新文档的线程。
代码语言:javascript复制 /*
* we block indexing threads if net byte grows due to slow flushes
* yet, for small ram buffers and large documents we can easily
* reach the limit without any ongoing flushes. we need to ensure
* that we don't stall/block if an ongoing or pending flush can
* not free up enough memory to release the stall lock.
*/
stall = (activeBytes flushBytes) > limit && activeBytes < limit && !closed;
2. IndexWriterConfig.checkPendingFlushOnUpdate
该标志表示在添加、更新文档时是否检查有需要flush的段,如果该值设置为true,则处理待flush的段。
代码语言:javascript复制/**
if an indexing thread should check for pending flushes on update
in order to help out on a full flush
*/
protected volatile boolean checkPendingFlushOnUpdate = true;
2. DocumentsWriterFlushControl.nextPendingFlush解析
2.1 nextPendingFlush是取出一个待flush的段的DWPT
代码语言:javascript复制 DocumentsWriterPerThread nextPendingFlush() {
int numPending;
boolean fullFlush;
synchronized (this) {
final DocumentsWriterPerThread poll;
// 取出添加到flushQueue中的DWPT
if ((poll = flushQueue.poll()) != null) {
// 更新线程停滞标志
updateStallState();
return poll;
}
fullFlush = this.fullFlush;
numPending = this.numPending;
}
// 如果标记为flushPending状态的ThreadState的个数大于0,并且当前没有处于full flush状态(主动执行flush)
if (numPending > 0 && !fullFlush) { // don't check if we are doing a full flush
// 处理每一个ThreadState,如果该ThreadState被标记为flushPending状态,尝试取出该ThreadState
// 关联的的DWPT,返回该DWPT进行flush
final int limit = perThreadPool.getActiveThreadStateCount();
for (int i = 0; i < limit && numPending > 0; i ) {
final ThreadState next = perThreadPool.getThreadState(i);
if (next.flushPending) {
final DocumentsWriterPerThread dwpt = tryCheckoutForFlush(next);
if (dwpt != null) {
return dwpt;
}
}
}
}
return null;
}
2.2 下面我们详细介绍tryCheckoutForFlush方法的源码
代码语言:javascript复制synchronized DocumentsWriterPerThread tryCheckoutForFlush(ThreadState perThread) {
// 如果该ThreadState处于flushPending状态,尝试取出该ThreadState关联的DWPT
return perThread.flushPending ? internalTryCheckOutForFlush(perThread) : null;
}
代码语言:javascript复制 private DocumentsWriterPerThread internalTryCheckOutForFlush(ThreadState perThread) {
assert Thread.holdsLock(this);
assert perThread.flushPending;
try {
// We are pending so all memory is already moved to flushBytes
// 由于添加、更新文档是并发操作,可能其他的线程获得了该ThreadState的锁,
// 如果无法获取该ThreadState的锁,返回空
if (perThread.tryLock()) {
try {
// 如果该ThreadState中的DWPT已经初始化
if (perThread.isInitialized()) {
assert perThread.isHeldByCurrentThread();
final DocumentsWriterPerThread dwpt;
final long bytes = perThread.bytesUsed; // do that before
// replace!
// 重置该ThreadState
dwpt = perThreadPool.reset(perThread);
assert !flushingWriters.containsKey(dwpt) : "DWPT is already flushing";
// Record the flushing DWPT to reduce flushBytes in doAfterFlush
// 记录该ThreadState关联的DWPT占用的内存,以便在doAfterFlush进行处理
flushingWriters.put(dwpt, Long.valueOf(bytes));
// 待flush的DWPT的个数递减
numPending--; // write access synced
return dwpt;
}
} finally {
perThread.unlock();
}
}
return null;
} finally {
updateStallState();
}
}
以上介绍了lucene添加、更新文档前的流程。下一篇文章 https://cloud.tencent.com/developer/article/1579685
中我们将介绍处理文档的详细逻辑。