lucene添加文档源码解析(第一篇)

2020-02-10 17:14:35 浏览数 (1)

在lucene中可以对一篇文档进行添加、修改、删除操作,在这篇文章中我们详细介绍lucene添加文档的流程,对添加文档的源码进行解析。

在lucene中添加文档是通过IndexWriter.addDocument方法,我们先给出添加文档的示例代码

代码语言:javascript复制
        IndexWriterConfig config = new IndexWriterConfig(new WhitespaceAnalyzer());
        config.setUseCompoundFile(false);
        config.setMaxBufferedDocs(2);
        IndexWriter writer = new IndexWriter(dir, config);
        //
        FieldType type = new FieldType();
        type.setStored(true);
        type.setTokenized(true);
        type.setIndexOptions(IndexOptions.DOCS_AND_FREQS_AND_POSITIONS_AND_OFFSETS);
        type.setStoreTermVectors(true);
        type.setStoreTermVectorPositions(true);
        type.setStoreTermVectorOffsets(true);
        type.freeze();
        //
        Document doc = new Document();
        doc.add(new Field("content", "one", type));
        writer.addDocument(doc);

单文档添加的流程图

lucene添加文档流程图lucene添加文档流程图

我们首先介绍添加文档前的处理流程,在lucene中添加文档前的处理源码如下。

代码语言:javascript复制
  private boolean preUpdate() throws IOException, AbortingException {
    ensureOpen();
    boolean hasEvents = false;
    // 判断是否有需要待flush的DWPT
    if (flushControl.anyStalledThreads() || (flushControl.numQueuedFlushes() > 0 && config.checkPendingFlushOnUpdate)) {
      // Help out flushing any queued DWPTs so we can un-stall:
      do {
        // Try pick up pending threads here if possible
        DocumentsWriterPerThread flushingDWPT;
        // 取出每一个DWPT进行flush
        while ((flushingDWPT = flushControl.nextPendingFlush()) != null) {
          // Don't push the delete here since the update could fail!
          // 对该flushingDWPT进行flush
          hasEvents |= doFlush(flushingDWPT);
        }
        
        flushControl.waitIfStalled(); // block if stalled
      } while (flushControl.numQueuedFlushes() != 0); // still queued DWPTs try help flushing
    }
    return hasEvents;
  }

1. DocumentsWriterFlushControl.anyStalledThreads

DocumentsWriterFlushControl通过DocumentsWriterStallControl成员变量中的stalled标志来判断当前是否处于停顿状态。持有相同IndexWriter对象的多个线程并发的添加、更新文档时,每一个线程都会获取一个ThreadState,该ThreadState持有的DWPT在处理完文档后,如果该ThreadState持有的DWPT达到flush条件,处理文档占用的内存会累加到flushBytes中,否则累加到activeBytes中。

多线程并发添加、更新文档会降低内存的健康度,flush一个段会提高内存的健康度,如果添加、更新文档的速度快于flush的速度,如果不控制添加、更新文档的线程,很容易造成内存OOM。lucene内部通过下面的条件来停顿添加、更新文档的线程。

代码语言:javascript复制
    /*
     * we block indexing threads if net byte grows due to slow flushes
     * yet, for small ram buffers and large documents we can easily
     * reach the limit without any ongoing flushes. we need to ensure
     * that we don't stall/block if an ongoing or pending flush can
     * not free up enough memory to release the stall lock.
     */
     stall = (activeBytes   flushBytes) > limit && activeBytes < limit && !closed;

2. IndexWriterConfig.checkPendingFlushOnUpdate

该标志表示在添加、更新文档时是否检查有需要flush的段,如果该值设置为true,则处理待flush的段。

代码语言:javascript复制
/** 
  if an indexing thread should check for pending flushes on update 
  in order to help out on a full flush
  */
protected volatile boolean checkPendingFlushOnUpdate = true;

2. DocumentsWriterFlushControl.nextPendingFlush解析

2.1 nextPendingFlush是取出一个待flush的段的DWPT

代码语言:javascript复制
  DocumentsWriterPerThread nextPendingFlush() {
    int numPending;
    boolean fullFlush;
    synchronized (this) {
      final DocumentsWriterPerThread poll;
      // 取出添加到flushQueue中的DWPT
      if ((poll = flushQueue.poll()) != null) {
        // 更新线程停滞标志
        updateStallState();
        return poll;
      }
      fullFlush = this.fullFlush;
      numPending = this.numPending;
    }
    // 如果标记为flushPending状态的ThreadState的个数大于0,并且当前没有处于full flush状态(主动执行flush)
    if (numPending > 0 && !fullFlush) { // don't check if we are doing a full flush
      // 处理每一个ThreadState,如果该ThreadState被标记为flushPending状态,尝试取出该ThreadState
      // 关联的的DWPT,返回该DWPT进行flush
      final int limit = perThreadPool.getActiveThreadStateCount();
      for (int i = 0; i < limit && numPending > 0; i  ) {
        final ThreadState next = perThreadPool.getThreadState(i);
        if (next.flushPending) {
          final DocumentsWriterPerThread dwpt = tryCheckoutForFlush(next);
          if (dwpt != null) {
            return dwpt;
          }
        }
      }
    }
    return null;
  }

2.2 下面我们详细介绍tryCheckoutForFlush方法的源码

代码语言:javascript复制
synchronized DocumentsWriterPerThread tryCheckoutForFlush(ThreadState perThread) {
   // 如果该ThreadState处于flushPending状态,尝试取出该ThreadState关联的DWPT
   return perThread.flushPending ? internalTryCheckOutForFlush(perThread) : null;
}
代码语言:javascript复制
  private DocumentsWriterPerThread internalTryCheckOutForFlush(ThreadState perThread) {
    assert Thread.holdsLock(this);
    assert perThread.flushPending;
    try {
      // We are pending so all memory is already moved to flushBytes
      // 由于添加、更新文档是并发操作,可能其他的线程获得了该ThreadState的锁,
      // 如果无法获取该ThreadState的锁,返回空
      if (perThread.tryLock()) {
        try {
          // 如果该ThreadState中的DWPT已经初始化
          if (perThread.isInitialized()) {
            assert perThread.isHeldByCurrentThread();
            final DocumentsWriterPerThread dwpt;
            final long bytes = perThread.bytesUsed; // do that before
                                                         // replace!
            // 重置该ThreadState
            dwpt = perThreadPool.reset(perThread);
            assert !flushingWriters.containsKey(dwpt) : "DWPT is already flushing";
            // Record the flushing DWPT to reduce flushBytes in doAfterFlush
            // 记录该ThreadState关联的DWPT占用的内存,以便在doAfterFlush进行处理
            flushingWriters.put(dwpt, Long.valueOf(bytes));
            // 待flush的DWPT的个数递减
            numPending--; // write access synced
            return dwpt;
          }
        } finally {
          perThread.unlock();
        }
      }
      return null;
    } finally {
      updateStallState();
    }
  }

以上介绍了lucene添加、更新文档前的流程。下一篇文章 https://cloud.tencent.com/developer/article/1579685

中我们将介绍处理文档的详细逻辑。

0 人点赞