TimeLimitingCollector源码解析

2019-12-16 18:24:15 浏览数 (1)

在solr的查询请求中添加timeAllowed参数,可以限定solr查询的请求时间,在solr内部,是通过TimeLimitingCollector类来实现该功能的

在org.apache.solr.handler.component.QueryComponent类中的process方法中处理该参数

代码语言:javascript复制
// -1 as flag if not set.
// 从请求中获取timeAllowed参数,默认值是-1
long timeAllowed = params.getLong(CommonParams.TIME_ALLOWED, -1L);
if (null != rb.getCursorMark() && 0 < timeAllowed) {
    // fundamentally incompatible
    throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Can not search using both "  
                              CursorMarkParams.CURSOR_MARK_PARAM   " and "   CommonParams.TIME_ALLOWED);
}
QueryCommand cmd = rb.createQueryCommand();
// 设置solr的query请求的超时时间
cmd.setTimeAllowed(timeAllowed);

在org.apache.solr.search.SolrIndexSearcher类中getDocListNC方法中,通过调用buildAndRunCollectorChain方法,来生成TimeLimitingCollector类

代码语言:javascript复制
// 获取请求中query的请求超时时间
final long timeAllowed = cmd.getTimeAllowed();
// 如果请求中设置的超时时间大于0, 生成TimeLimitingCollector
if (timeAllowed > 0) {
    collector = new TimeLimitingCollector(collector, TimeLimitingCollector.getGlobalCounter(), timeAllowed);
}

最终通过调用org.apache.lucene.search.IndexSearcher类中的public void search(Query query, Collector results)方法来起作用

在org.apache.lucene.search.IndexSearcher类中,lucene请求的源码如下

代码语言:javascript复制
protected void search(List<LeafReaderContext> leaves, Weight weight, Collector collector)
      throws IOException {

    // TODO: should we make this
    // threaded...?  the Collector could be sync'd?
    // always use single thread:
    for (LeafReaderContext ctx : leaves) { // search each subreader
      final LeafCollector leafCollector;
      try {
        // 如果是第一个segment的reader, 则通过调用setBaseline()方法来设置请求的最终执行结束时间
        leafCollector = collector.getLeafCollector(ctx);
      } catch (CollectionTerminatedException e) {
        // there is no doc of interest in this reader context
        // continue with the following leaf
        continue;
      }
      // 生成BulkScorer对象
      BulkScorer scorer = weight.bulkScorer(ctx);
      if (scorer != null) {
        try {
          // 对query查询相关的文档进行打分
          scorer.score(leafCollector, ctx.reader().getLiveDocs());
        } catch (CollectionTerminatedException e) {
          // collection was terminated prematurely
          // continue with the following leaf
        }
      }
    }
  }

org.apache.lucene.search.Weight.DefaultBulkScorer中对每一个找到的docId进行评分

代码语言:javascript复制
    /** Specialized method to bulk-score all hits; we
     *  separate this from {@link #scoreRange} to help out
     *  hotspot.
     *  See <a href="https://issues.apache.org/jira/browse/LUCENE-5487">LUCENE-5487</a> */
    static void scoreAll(LeafCollector collector, DocIdSetIterator iterator, TwoPhaseIterator twoPhase, Bits acceptDocs) throws IOException {
      if (twoPhase == null) {
        // 遍历每一个找到的docId
        for (int doc = iterator.nextDoc(); doc != DocIdSetIterator.NO_MORE_DOCS; doc = iterator.nextDoc()) {
          // 判断找到的docId是不是满足条件,是否被删除
          if (acceptDocs == null || acceptDocs.get(doc)) {
            // 调用collector的collect方法对每一个docId进行评分,该collector会在调用实际的
            // collector之前判断请求是否超时,如果超时,则抛出
            collector.collect(doc);
          }
        }
      } else {
        // The scorer has an approximation, so run the approximation first, then check acceptDocs, then confirm
        final DocIdSetIterator approximation = twoPhase.approximation();
        for (int doc = approximation.nextDoc(); doc != DocIdSetIterator.NO_MORE_DOCS; doc = approximation.nextDoc()) {
          if ((acceptDocs == null || acceptDocs.get(doc)) && twoPhase.matches()) {
            collector.collect(doc);
          }
        }
      }
    }

TimeLimitingCollector中collect方法对找到的每一个docId进行判断是否已经超过结束时间

代码语言:javascript复制
  @Override
  public LeafCollector getLeafCollector(LeafReaderContext context) throws IOException {
    this.docBase = context.docBase;
    if (Long.MIN_VALUE == t0) {
      setBaseline();
    }
    final long time = clock.get();
    if (time - timeout > 0L) {
      throw new TimeExceededException(timeout - t0, time - t0, -1);
    }
    return new FilterLeafCollector(collector.getLeafCollector(context)) {
      
      @Override
      public void collect(int doc) throws IOException {
        final long time = clock.get();
        // 判断是否已经超过结束时间
        if (time - timeout > 0L) {
          if (greedy) {
            //System.out.println(this "  greedy: before failing, collecting doc: " (docBase   doc) "  " (time-t0));
            in.collect(doc);
          }
          //System.out.println(this "  failing on:  " (docBase   doc) "  " (time-t0));
          throw new TimeExceededException( timeout-t0, time-t0, docBase   doc );
        }
        //System.out.println(this "  collecting: " (docBase   doc) "  " (time-t0));
        in.collect(doc);
      }
      
    };
  }

lucene中使用org.apache.lucene.search.TimeLimitingCollector.TimerThread类进行模拟时钟计时,可以参考这篇文章 http://pzemtsov.github.io/2017/07/23/the-slow-currenttimemillis.html 来了解lucene这样设计的原因。

0 人点赞