lucene添加文档源码解析(第二篇)

2020-02-10 17:14:22 浏览数 (1)

在lucene中添加文档是通过IndexWriter.addDocument方法,我们先给出添加文档的示例代码

代码语言:javascript复制
        IndexWriterConfig config = new IndexWriterConfig(new WhitespaceAnalyzer());
        config.setUseCompoundFile(false);
        config.setMaxBufferedDocs(2);
        IndexWriter writer = new IndexWriter(dir, config);
        //
        FieldType type = new FieldType();
        type.setStored(true);
        type.setTokenized(true);
        type.setIndexOptions(IndexOptions.DOCS_AND_FREQS_AND_POSITIONS_AND_OFFSETS);
        type.setStoreTermVectors(true);
        type.setStoreTermVectorPositions(true);
        type.setStoreTermVectorOffsets(true);
        type.freeze();
        //
        Document doc = new Document();
        doc.add(new Field("content", "one", type));
        writer.addDocument(doc);

上一篇文章中介绍了lucene添加、修改文档前的流程,在这一篇文章中,介绍处理文档的流程。

lucene添加文档的流程lucene添加文档的流程

1. 获取ThreadState源码解析

lucene在处理文档前需要获取一个ThreadState

代码语言:javascript复制
  ThreadState obtainAndLock() {
    // 从DocumentsWriterPerThreadPool中获取ThreadState
    final ThreadState perThread = perThreadPool.getAndLock(Thread
        .currentThread(), documentsWriter);
    boolean success = false;
    try {
      // 如果获取的ThreadState已经初始化DWPT,并且该ThreadState的deleteQueue
      // 和IndexWriter的deleteQueue不一致
      if (perThread.isInitialized() && perThread.dwpt.deleteQueue != documentsWriter.deleteQueue) {
        // There is a flush-all in process and this DWPT is
        // now stale -- enroll it for flush and try for
        // another DWPT:
        // 有一个full flush(IndexWriter.flush)操作正在进行中,如果该ThreadState关联的DWPT
        // 索引的文档数大于0,将该ThreadState添加到fullFlushBuffer中,否则重置该DWPT
        addFlushableState(perThread);
      }
      success = true;
      // simply return the ThreadState even in a flush all case sine we already hold the lock
      return perThread;
    } finally {
      if (!success) { // make sure we unlock if this fails
        perThreadPool.release(perThread);
      }
    }
  }
代码语言:javascript复制
  /** This method is used by DocumentsWriter/FlushControl to obtain a ThreadState to do an indexing operation (add/updateDocument). */
  // 获取ThreadState
  ThreadState getAndLock(Thread requestingThread, DocumentsWriter documentsWriter) {
    ThreadState threadState = null;
    synchronized (this) {
      // 如果freeList中ThreadState为空,创建新的ThreadState,并且将其添加到threadStates中
      if (freeList.isEmpty()) {
        // ThreadState is already locked before return by this method:
        return newThreadState();
      } else {
        // Important that we are LIFO here! This way if number of concurrent indexing 
        // threads was once high, but has now reduced, we only use a
        // limited number of thread states:
        // LIFO(后进先出),获取最后一个ThreadState
        threadState = freeList.remove(freeList.size()-1);
        // 如果最后一个ThreadState中的DWPT为空
        if (threadState.dwpt == null) {
          // This thread-state is not initialized, e.g. it
          // was just flushed. See if we can instead find
          // another free thread state that already has docs
          // indexed. This way if incoming thread concurrency
          // has decreased, we don't leave docs
          // indefinitely buffered, tying up RAM.  This
          // will instead get those thread states flushed,
          // freeing up RAM for larger segment flushes:
          // 从头到尾遍历freeList中的ThreadState,寻找DWPT不为空的ThreadState,
          // 并且将其和最后一个元素进行交换
          for(int i=0;i<freeList.size();i  ) {
            ThreadState ts = freeList.get(i);
            if (ts.dwpt != null) {
              // Use this one instead, and swap it with
              // the un-initialized one:
              freeList.set(i, threadState);
              threadState = ts;
              break;
            }
          }
        }
      }
    }
   
    // This could take time, e.g. if the threadState is [briefly] checked for flushing:
    // 对选中的ThreadState进行锁定
    threadState.lock();

    return threadState;
  } 

2. DocumentsWriterPerThread.updateDocument更新文档源码解析

代码语言:javascript复制
  public long updateDocument(Iterable<? extends IndexableField> doc, Analyzer analyzer, Term delTerm) throws IOException, AbortingException {
    testPoint("DocumentsWriterPerThread addDocument start");
    assert deleteQueue != null;
    // pendingNumDocs计数递增
    reserveOneDoc();
    docState.doc = doc;
    docState.analyzer = analyzer;
    docState.docID = numDocsInRAM;
    if (INFO_VERBOSE && infoStream.isEnabled("DWPT")) {
      infoStream.message("DWPT", Thread.currentThread().getName()   " update delTerm="   delTerm   " docID="   docState.docID   " seg="   segmentInfo.name);
    }
    // Even on exception, the document is still added (but marked
    // deleted), so we don't need to un-reserve at that point.
    // Aborting exceptions will actually "lose" more than one
    // document, so the counter will be "wrong" in that case, but
    // it's very hard to fix (we can't easily distinguish aborting
    // vs non-aborting exceptions):
    boolean success = false;
    try {
      try {
        // 处理文档
        consumer.processDocument();
      } finally {
        // DocState清空状态
        docState.clear();
      }
      success = true;
    } finally {
      if (!success) {
        // 如果该文档处理异常,将该docId添加到pendingUpdates中,在flush的时候处理该docId
        // mark document as deleted
        deleteDocID(docState.docID);
        // 递增numDocsInRAM以便为下一篇文档的docId
        numDocsInRAM  ;
      }
    }
    // 处理完文档后更新deleteSlice
    return finishDocument(delTerm);
  }

3. DocConsumer.processDocument源码解析

该方法会处理文档的每一个field,生成存储域、倒排索引、DocValue、Point类型

代码语言:javascript复制
  public void processDocument() throws IOException, AbortingException {

    // How many indexed field names we've seen (collapses
    // multiple field instances by the same name):
    int fieldCount = 0;

    long fieldGen = nextFieldGen  ;

    // NOTE: we need two passes here, in case there are
    // multi-valued fields, because we must process all
    // instances of a given field at once, since the
    // analyzer is free to reuse TokenStream across fields
    // (i.e., we cannot have more than one TokenStream
    // running "at once"):
    // 倒排索引处理前的操作
    termsHash.startDocument();
    // 开始处理存储域
    startStoredFields(docState.docID);

    boolean aborting = false;
    try {
      // 依次处理每一个field
      for (IndexableField field : docState.doc) {
        // 处理每一个field,涉及倒排索引、termVector的生成、存储域、DocValue、Point类型的处理
        fieldCount = processField(field, fieldGen, fieldCount);
      }
    } catch (AbortingException ae) {
      aborting = true;
      throw ae;
    } finally {
      if (aborting == false) {
        // Finish each indexed field name seen in the document:
        // 依次调用每一个field的finish方法
        for (int i=0;i<fieldCount;i  ) {
          fields[i].finish();
        }
        // 结束处理存储域
        finishStoredFields();
      }
    }

    try {
      // 结束处理倒排索引
      termsHash.finishDocument();
    } catch (Throwable th) {
      // Must abort, on the possibility that on-disk term
      // vectors are now corrupt:
      throw AbortingException.wrap(th);
    }
  }

4. DocumentsWriterFlushControl.doAfterDocument源码解析

处理完文档后,会调用DocumentsWriterFlushControl.doAfterDocument进行后置处理。

代码语言:javascript复制
  synchronized DocumentsWriterPerThread doAfterDocument(ThreadState perThread, boolean isUpdate) {
    try {
      // 更新ThreadState占用的内存,如果该ThreadState处于flushPending状态,将占用的内存
      // 增加到flushBytes中,否则添加到activeBytes中
      commitPerThreadBytes(perThread);
      // 如果该ThreadState没有处于flushPending状态
      if (!perThread.flushPending) {
        // 如果是更新操作,调用FlushPolicy的onUpdate,否则调用onInsert
        if (isUpdate) {
          flushPolicy.onUpdate(this, perThread);
        } else {
          flushPolicy.onInsert(this, perThread);
        }
        // 如果ThreadState没有处于flushPending状态,并且ThreadState占用的内存大于hardMaxBytesPerDWPT
        // 设置该ThreadState为flushPending状态
        if (!perThread.flushPending && perThread.bytesUsed > hardMaxBytesPerDWPT) {
          // Safety check to prevent a single DWPT exceeding its RAM limit. This
          // is super important since we can not address more than 2048 MB per DWPT
          setFlushPending(perThread);
        }
      }
      // checkout该ThreadState,下面描述该方法的流程
      return checkout(perThread, false);
    } finally {
      boolean stalled = updateStallState();
      assert assertNumDocsSinceStalled(stalled) && assertMemory();
    }
  }
代码语言:javascript复制
  private DocumentsWriterPerThread checkout(ThreadState perThread, boolean markPending) {
    if (fullFlush) {
      // 处于full flush状态,如果该ThreadState处于flushPending状态,将该ThreadState添加到
      // blockedFlushes中, 并且取出一个待flush的DWPT, 否则返回空
      if (perThread.flushPending) {
        checkoutAndBlock(perThread);
        return nextPendingFlush();
      } else {
        return null;
      }
    } else {
      // 如果需要将该ThreadState设置为flushPending
      if (markPending) {
        assert perThread.isFlushPending() == false;
        setFlushPending(perThread);
      }
      // 取出该ThreadState相关的DWPT
      return tryCheckoutForFlush(perThread);
    }
  }

该文章描述了处理文档流程,并没有展开介绍field的具体处理逻辑,我们会在后面的文章中介绍不同type的field的具体文件格式。下一篇文章 https://cloud.tencent.com/developer/article/1579684 中我们会介绍处理文档后的流程。

0 人点赞