lucene的段合并策略(MergePolicy)

2019-12-20 10:33:56 浏览数 (1)

本篇文章介绍lucene的索引合并策略,IndexWriter的多种行为会触发索引段合并流程,例如commit、flush、NRT reader open。lucene内部提供多种索引段合并策略,如LogMergePolicy、TieredMergePolicy等。

TieredMergePolicy是lucene 4.0以后版本默认的段合并策略,之前默认的段合并策略是LogMergePolicy。

两种合并策略最大的区别是:

LogMergePolicy总是合并相邻的段文件,对于IndexWriter提供的段集合,LogMergePolicy会选取连续的段集区间来生成一个OneMerge。

TieredMergePolicy会将IndexWriter提供的段集进行排序,在排序后的段集中选取部分(可能不连续)的段来生成一个OneMerge。

TieredMergePolicy中findMerges函数源码解析

函数定义:

代码语言:txt复制
public MergeSpecification findMerges(MergeTrigger mergeTrigger, SegmentInfos infos, IndexWriter writer) throws IOException

函数源码解析:

代码语言:txt复制
    // 如果段的个数等于0,直接返回null
    if (infos.size() == 0) {
      return null;
    }
    // 获取IndexWriter中正在合并的段
    final Set<SegmentCommitInfo> merging = writer.getMergingSegments();
    final Set<SegmentCommitInfo> toBeMerged = new HashSet<>();

    final List<SegmentCommitInfo> infosSorted = new ArrayList<>(infos.asList());

    // The size can change concurrently while we are running here, because deletes
    // are now applied concurrently, and this can piss off TimSort!  So we
    // call size() once per segment and sort by that:
    // 获取所有的段除去删除文档后的字节大小,getSegmentSizes会对每一个段通过计算
    // byteSize * (1.0 - delCount / maxDoc)
    Map<SegmentCommitInfo,Long> sizeInBytes = getSegmentSizes(writer, infos.asList());
    // 对所有的段根据上面计算的大小进行降序排序
    infosSorted.sort(new SegmentByteSizeDescending(sizeInBytes));
代码语言:txt复制
    // Compute total index bytes & print details about the index
    long totIndexBytes = 0; // 所有段的大小
    long minSegmentBytes = Long.MAX_VALUE; // 段大小的最小值
    // 遍历每一个段文件,计算所有段的大小和段大小的最小值
    for(SegmentCommitInfo info : infosSorted) {
      final long segBytes = sizeInBytes.get(info);
      if (verbose(writer)) {
        String extra = merging.contains(info) ? " [merging]" : "";
        if (segBytes >= maxMergedSegmentBytes/2.0) {
          extra  = " [skip: too large]";
        } else if (segBytes < floorSegmentBytes) {
          extra  = " [floored]";
        }
        message("  seg="   writer.segString(info)   " size="   String.format(Locale.ROOT, "%.3f", segBytes/1024/1024.)   " MB"   extra, writer);
      }

      minSegmentBytes = Math.min(segBytes, minSegmentBytes);
      // Accum total byte size
      totIndexBytes  = segBytes;
    }
代码语言:txt复制
    // If we have too-large segments, grace them out
    // of the maxSegmentCount:
    // 上面已经对段进行降序排序,依次遍历段文件,将段的大小大于等于maxMergedSegmentBytes/2.0移除
    // tooBigCount 指向第一个段大小小于maxMergedSegmentBytes/2.0的段
    // totIndexBytes的值是除去段大小大于maxMergedSegmentBytes/2.0的段的总大小
    int tooBigCount = 0;
    while (tooBigCount < infosSorted.size()) {
      long segBytes = sizeInBytes.get(infosSorted.get(tooBigCount));
      if (segBytes < maxMergedSegmentBytes/2.0) {
        break;
      }
      totIndexBytes -= segBytes;
      tooBigCount  ;
    }
    // 将minSegmentBytes计算Math.max(floorSegmentBytes, minSegmentBytes)
    // 参数floorSegmentBytes的默认值是2M,该参数的详细解释如下
    minSegmentBytes = floorSize(minSegmentBytes);

floorSegmentBytes参数的解释

代码语言:txt复制
  floorSegmentBytes参数的默认值是2M,该值描述了段的大小segmentSize小于floorSegmentBytes的段,他们的
segmentSize都当做floorSegmentBytes。该值还会影响OneMerge的打分。
  Segments smaller than this are "rounded up" to this size, ie treated as equal (floor) 
size for merge selection.  This is to prevent frequent flushing of tiny segments from 
allowing a long tail in the index. Default is 2 MB.
  设置了不合适的floorSegmentBytes可能会造成如下影响:
  floorSegmentBytes的值设置的太小,可能导致allowedSegCount过大,特别是段集中最小的段占段集的总大小特别的低,
最终使得一段时间内索引中存在大量的小段,因为段集的个数小于allowedSegCount是不会参与段合并的。
  floorSegmentBytes的值设置的太大,导致allowedSegCount太小,较大的段合并可能更频繁,段越大,合并开销越大,
合并线程占用的时间

选择段生成OneMerge

代码语言:txt复制
    MergeSpecification spec = null;

    // Cycle to possibly select more than one merge:
    while(true) {
      // 正在合并的段的大小
      long mergingBytes = 0;

      // Gather eligible segments for merging, ie segments
      // not already being merged and not already picked (by
      // prior iteration of this loop) for merging:
      // 可以参与合并的段
      final List<SegmentCommitInfo> eligible = new ArrayList<>();
      // 遍历段的大小小于maxMergedSegmentBytes/2.0的段,如果该段被包含在正在合并的段集中,累计mergingBytes
      // 的大小, 如果段不在toBeMerged中,将该段添加到eligible中
      for(int idx = tooBigCount; idx<infosSorted.size(); idx  ) {
        final SegmentCommitInfo info = infosSorted.get(idx);
        if (merging.contains(info)) {
          mergingBytes  = sizeInBytes.get(info);
        } else if (!toBeMerged.contains(info)) {
          eligible.add(info);
        }
      }
      // 如果上面计算的mergingBytes大于maxMergedSegmentBytes,标记maxMergeIsRunning为true,
      // 说明正在合并的段的总大小已经超过了maxMergedSegmentBytes
      final boolean maxMergeIsRunning = mergingBytes >= maxMergedSegmentBytes;

      if (verbose(writer)) {
        message("  allowedSegmentCount="   allowedSegCountInt   " vs count="   infosSorted.size()   " (eligible count="   eligible.size()   ") tooBigCount="   tooBigCount, writer);
      }
      // 如果eligible列表为空,返回spec对象,结束调用
      if (eligible.size() == 0) {
        return spec;
      }
      // 只有eligible的大小大于allowedSegCountInt,才会选取OneMerge
      if (eligible.size() > allowedSegCountInt) {

        // OK we are over budget -- find best merge!
        MergeScore bestScore = null;
        List<SegmentCommitInfo> best = null;
        boolean bestTooLarge = false;
        long bestMergeBytes = 0;

        // Consider all merge starts:
        // 依次遍历eligible,选取candidate
        for(int startIdx = 0;startIdx <= eligible.size()-maxMergeAtOnce; startIdx  ) {

          long totAfterMergeBytes = 0;

          final List<SegmentCommitInfo> candidate = new ArrayList<>();
          boolean hitTooLarge = false;
          for(int idx = startIdx;idx<eligible.size() && candidate.size() < maxMergeAtOnce;idx  ) {
            final SegmentCommitInfo info = eligible.get(idx);
            final long segBytes = sizeInBytes.get(info);
            // 如果candidate中段的总大小加上当前段的大小大于maxMergedSegmentBytes,跳过该段
            if (totAfterMergeBytes   segBytes > maxMergedSegmentBytes) {
              hitTooLarge = true;
              // NOTE: we continue, so that we can try
              // "packing" smaller segments into this merge
              // to see if we can get closer to the max
              // size; this in general is not perfect since
              // this is really "bin packing" and we'd have
              // to try different permutations.
              continue;
            }
            candidate.add(info);
            totAfterMergeBytes  = segBytes;
          }

          // We should never see an empty candidate: we iterated over maxMergeAtOnce
          // segments, and already pre-excluded the too-large segments:
          assert candidate.size() > 0;
          // 对选取的candidate进行打分
          final MergeScore score = score(candidate, hitTooLarge, mergingBytes, writer, sizeInBytes);
          if (verbose(writer)) {
            message("  maybe="   writer.segString(candidate)   " score="   score.getScore()   " "   score.getExplanation()   " tooLarge="   hitTooLarge   " size="   String.format(Locale.ROOT, "%.3f MB", totAfterMergeBytes/1024./1024.), writer);
          }

          // If we are already running a max sized merge
          // (maxMergeIsRunning), don't allow another max
          // sized merge to kick off:
          // 如果bestScore为空,或者上面candidate的score小于bestScore,
          // 并且hitTooLarge或者maxMergeIsRunning有一个为false
          // 更新best、bestScore、bestTooLarge、bestMergeBytes
          if ((bestScore == null || score.getScore() < bestScore.getScore()) && (!hitTooLarge || !maxMergeIsRunning)) {
            best = candidate;
            bestScore = score;
            bestTooLarge = hitTooLarge;
            bestMergeBytes = totAfterMergeBytes;
          }
        }
        
        if (best != null) {
          if (spec == null) {
            spec = new MergeSpecification();
          }
          // 生成OneMerge,添加到MergeSpecification中
          final OneMerge merge = new OneMerge(best);
          spec.add(merge);
          toBeMerged.addAll(merge.segments);

          if (verbose(writer)) {
            message("  add merge="   writer.segString(merge.segments)   " size="   String.format(Locale.ROOT, "%.3f MB", bestMergeBytes/1024./1024.)   " score="   String.format(Locale.ROOT, "%.3f", bestScore.getScore())   " "   bestScore.getExplanation()   (bestTooLarge ? " [max merge]" : ""), writer);
          }
        } else {
          return spec;
        }
      } else {
        return spec;
      }
    }

对candidate的段进行score

代码语言:txt复制
/** Expert: scores one merge; subclasses can override. */
  protected MergeScore score(List<SegmentCommitInfo> candidate, boolean hitTooLarge, long mergingBytes, IndexWriter writer, Map<SegmentCommitInfo, Long> sizeInBytes) throws IOException {
    long totBeforeMergeBytes = 0;
    long totAfterMergeBytes = 0;
    long totAfterMergeBytesFloored = 0;
    for(SegmentCommitInfo info : candidate) {
      final long segBytes = sizeInBytes.get(info);
      totAfterMergeBytes  = segBytes;
      totAfterMergeBytesFloored  = floorSize(segBytes);
      totBeforeMergeBytes  = info.sizeInBytes();
    }

    // Roughly measure "skew" of the merge, i.e. how
    // "balanced" the merge is (whether the segments are
    // about the same size), which can range from
    // 1.0/numSegsBeingMerged (good) to 1.0 (poor). Heavily
    // lopsided merges (skew near 1.0) is no good; it means
    // O(N^2) merge cost over time:
    final double skew;
    if (hitTooLarge) {
      // Pretend the merge has perfect skew; skew doesn't
      // matter in this case because this merge will not
      // "cascade" and so it cannot lead to N^2 merge cost
      // over time:
      skew = 1.0/maxMergeAtOnce;
    } else {
      skew = ((double) floorSize(sizeInBytes.get(candidate.get(0))))/totAfterMergeBytesFloored;
    }

    // Strongly favor merges with less skew (smaller
    // mergeScore is better):
    double mergeScore = skew;

    // Gently favor smaller merges over bigger ones.  We
    // don't want to make this exponent too large else we
    // can end up doing poor merges of small segments in
    // order to avoid the large merges:
    mergeScore *= Math.pow(totAfterMergeBytes, 0.05);

    // Strongly favor merges that reclaim deletes:
    final double nonDelRatio = ((double) totAfterMergeBytes)/totBeforeMergeBytes;
    mergeScore *= Math.pow(nonDelRatio, reclaimDeletesWeight);

    final double finalMergeScore = mergeScore;

    return new MergeScore() {

      @Override
      public double getScore() {
        return finalMergeScore;
      }

      @Override
      public String getExplanation() {
        return "skew="   String.format(Locale.ROOT, "%.3f", skew)   " nonDelRatio="   String.format(Locale.ROOT, "%.3f", nonDelRatio);
      }
    };
  }

socre的计算公式

candidate打分公式candidate打分公式

TieredMergePolicy配置参数说明

1. segsPerTier

代码语言:txt复制
该参数的默认值是10.0,描述某一层需要包含segsPerTier个段才允许合并 

2. maxMergeAtOnce

代码语言:txt复制
该参数的默认值是10,描述了在nature状态下的合并最多包含的段的个数

3. maxMergedSegmentBytes

代码语言:txt复制
  该参数的默认值是5G,该参数可以限制待合并的段集大小总和不能超过该值,用来限制大段,该值的一半,用于限制段的
大小(不包含删除的文档)超过该值得一半即不参与合并

0 人点赞