在lucene中添加文档是通过IndexWriter.addDocument方法,我们先给出添加文档的示例代码
代码语言:javascript复制 IndexWriterConfig config = new IndexWriterConfig(new WhitespaceAnalyzer());
config.setUseCompoundFile(false);
config.setMaxBufferedDocs(2);
IndexWriter writer = new IndexWriter(dir, config);
//
FieldType type = new FieldType();
type.setStored(true);
type.setTokenized(true);
type.setIndexOptions(IndexOptions.DOCS_AND_FREQS_AND_POSITIONS_AND_OFFSETS);
type.setStoreTermVectors(true);
type.setStoreTermVectorPositions(true);
type.setStoreTermVectorOffsets(true);
type.freeze();
//
Document doc = new Document();
doc.add(new Field("content", "one", type));
writer.addDocument(doc);
上一篇文章中介绍了lucene添加、修改文档前的流程,在这一篇文章中,介绍处理文档的流程。
1. 获取ThreadState源码解析
lucene在处理文档前需要获取一个ThreadState
代码语言:javascript复制 ThreadState obtainAndLock() {
// 从DocumentsWriterPerThreadPool中获取ThreadState
final ThreadState perThread = perThreadPool.getAndLock(Thread
.currentThread(), documentsWriter);
boolean success = false;
try {
// 如果获取的ThreadState已经初始化DWPT,并且该ThreadState的deleteQueue
// 和IndexWriter的deleteQueue不一致
if (perThread.isInitialized() && perThread.dwpt.deleteQueue != documentsWriter.deleteQueue) {
// There is a flush-all in process and this DWPT is
// now stale -- enroll it for flush and try for
// another DWPT:
// 有一个full flush(IndexWriter.flush)操作正在进行中,如果该ThreadState关联的DWPT
// 索引的文档数大于0,将该ThreadState添加到fullFlushBuffer中,否则重置该DWPT
addFlushableState(perThread);
}
success = true;
// simply return the ThreadState even in a flush all case sine we already hold the lock
return perThread;
} finally {
if (!success) { // make sure we unlock if this fails
perThreadPool.release(perThread);
}
}
}
代码语言:javascript复制 /** This method is used by DocumentsWriter/FlushControl to obtain a ThreadState to do an indexing operation (add/updateDocument). */
// 获取ThreadState
ThreadState getAndLock(Thread requestingThread, DocumentsWriter documentsWriter) {
ThreadState threadState = null;
synchronized (this) {
// 如果freeList中ThreadState为空,创建新的ThreadState,并且将其添加到threadStates中
if (freeList.isEmpty()) {
// ThreadState is already locked before return by this method:
return newThreadState();
} else {
// Important that we are LIFO here! This way if number of concurrent indexing
// threads was once high, but has now reduced, we only use a
// limited number of thread states:
// LIFO(后进先出),获取最后一个ThreadState
threadState = freeList.remove(freeList.size()-1);
// 如果最后一个ThreadState中的DWPT为空
if (threadState.dwpt == null) {
// This thread-state is not initialized, e.g. it
// was just flushed. See if we can instead find
// another free thread state that already has docs
// indexed. This way if incoming thread concurrency
// has decreased, we don't leave docs
// indefinitely buffered, tying up RAM. This
// will instead get those thread states flushed,
// freeing up RAM for larger segment flushes:
// 从头到尾遍历freeList中的ThreadState,寻找DWPT不为空的ThreadState,
// 并且将其和最后一个元素进行交换
for(int i=0;i<freeList.size();i ) {
ThreadState ts = freeList.get(i);
if (ts.dwpt != null) {
// Use this one instead, and swap it with
// the un-initialized one:
freeList.set(i, threadState);
threadState = ts;
break;
}
}
}
}
}
// This could take time, e.g. if the threadState is [briefly] checked for flushing:
// 对选中的ThreadState进行锁定
threadState.lock();
return threadState;
}
2. DocumentsWriterPerThread.updateDocument更新文档源码解析
代码语言:javascript复制 public long updateDocument(Iterable<? extends IndexableField> doc, Analyzer analyzer, Term delTerm) throws IOException, AbortingException {
testPoint("DocumentsWriterPerThread addDocument start");
assert deleteQueue != null;
// pendingNumDocs计数递增
reserveOneDoc();
docState.doc = doc;
docState.analyzer = analyzer;
docState.docID = numDocsInRAM;
if (INFO_VERBOSE && infoStream.isEnabled("DWPT")) {
infoStream.message("DWPT", Thread.currentThread().getName() " update delTerm=" delTerm " docID=" docState.docID " seg=" segmentInfo.name);
}
// Even on exception, the document is still added (but marked
// deleted), so we don't need to un-reserve at that point.
// Aborting exceptions will actually "lose" more than one
// document, so the counter will be "wrong" in that case, but
// it's very hard to fix (we can't easily distinguish aborting
// vs non-aborting exceptions):
boolean success = false;
try {
try {
// 处理文档
consumer.processDocument();
} finally {
// DocState清空状态
docState.clear();
}
success = true;
} finally {
if (!success) {
// 如果该文档处理异常,将该docId添加到pendingUpdates中,在flush的时候处理该docId
// mark document as deleted
deleteDocID(docState.docID);
// 递增numDocsInRAM以便为下一篇文档的docId
numDocsInRAM ;
}
}
// 处理完文档后更新deleteSlice
return finishDocument(delTerm);
}
3. DocConsumer.processDocument源码解析
该方法会处理文档的每一个field,生成存储域、倒排索引、DocValue、Point类型
代码语言:javascript复制 public void processDocument() throws IOException, AbortingException {
// How many indexed field names we've seen (collapses
// multiple field instances by the same name):
int fieldCount = 0;
long fieldGen = nextFieldGen ;
// NOTE: we need two passes here, in case there are
// multi-valued fields, because we must process all
// instances of a given field at once, since the
// analyzer is free to reuse TokenStream across fields
// (i.e., we cannot have more than one TokenStream
// running "at once"):
// 倒排索引处理前的操作
termsHash.startDocument();
// 开始处理存储域
startStoredFields(docState.docID);
boolean aborting = false;
try {
// 依次处理每一个field
for (IndexableField field : docState.doc) {
// 处理每一个field,涉及倒排索引、termVector的生成、存储域、DocValue、Point类型的处理
fieldCount = processField(field, fieldGen, fieldCount);
}
} catch (AbortingException ae) {
aborting = true;
throw ae;
} finally {
if (aborting == false) {
// Finish each indexed field name seen in the document:
// 依次调用每一个field的finish方法
for (int i=0;i<fieldCount;i ) {
fields[i].finish();
}
// 结束处理存储域
finishStoredFields();
}
}
try {
// 结束处理倒排索引
termsHash.finishDocument();
} catch (Throwable th) {
// Must abort, on the possibility that on-disk term
// vectors are now corrupt:
throw AbortingException.wrap(th);
}
}
4. DocumentsWriterFlushControl.doAfterDocument源码解析
处理完文档后,会调用DocumentsWriterFlushControl.doAfterDocument进行后置处理。
代码语言:javascript复制 synchronized DocumentsWriterPerThread doAfterDocument(ThreadState perThread, boolean isUpdate) {
try {
// 更新ThreadState占用的内存,如果该ThreadState处于flushPending状态,将占用的内存
// 增加到flushBytes中,否则添加到activeBytes中
commitPerThreadBytes(perThread);
// 如果该ThreadState没有处于flushPending状态
if (!perThread.flushPending) {
// 如果是更新操作,调用FlushPolicy的onUpdate,否则调用onInsert
if (isUpdate) {
flushPolicy.onUpdate(this, perThread);
} else {
flushPolicy.onInsert(this, perThread);
}
// 如果ThreadState没有处于flushPending状态,并且ThreadState占用的内存大于hardMaxBytesPerDWPT
// 设置该ThreadState为flushPending状态
if (!perThread.flushPending && perThread.bytesUsed > hardMaxBytesPerDWPT) {
// Safety check to prevent a single DWPT exceeding its RAM limit. This
// is super important since we can not address more than 2048 MB per DWPT
setFlushPending(perThread);
}
}
// checkout该ThreadState,下面描述该方法的流程
return checkout(perThread, false);
} finally {
boolean stalled = updateStallState();
assert assertNumDocsSinceStalled(stalled) && assertMemory();
}
}
代码语言:javascript复制 private DocumentsWriterPerThread checkout(ThreadState perThread, boolean markPending) {
if (fullFlush) {
// 处于full flush状态,如果该ThreadState处于flushPending状态,将该ThreadState添加到
// blockedFlushes中, 并且取出一个待flush的DWPT, 否则返回空
if (perThread.flushPending) {
checkoutAndBlock(perThread);
return nextPendingFlush();
} else {
return null;
}
} else {
// 如果需要将该ThreadState设置为flushPending
if (markPending) {
assert perThread.isFlushPending() == false;
setFlushPending(perThread);
}
// 取出该ThreadState相关的DWPT
return tryCheckoutForFlush(perThread);
}
}
该文章描述了处理文档流程,并没有展开介绍field的具体处理逻辑,我们会在后面的文章中介绍不同type的field的具体文件格式。下一篇文章 https://cloud.tencent.com/developer/article/1579684 中我们会介绍处理文档后的流程。