ApacheHudi Archive(归档)实现分析

2021-04-13 11:02:09 浏览数 (1)

1. 介绍

Timline(时间轴)由很多 instant构成,按照时间由小到大排列。当不断写入Hudi数据集时,Timeline上的 Instant会不断增加,为减小 Timeline的操作压力,会在 commit时按照配置对 instant进行归档,并从 Timeline上将已归档的 instant删除。

2. 分析

在每次 commit时会调用 HoodieCommitArchiveLog#archiveIfRequired来判断是否需要进行归档,其核心代码如下

代码语言:javascript复制
public boolean archiveIfRequired(final JavaSparkContext jsc) throws IOException {
    try {
      List<HoodieInstant> instantsToArchive = getInstantsToArchive(jsc).collect(Collectors.toList());

      boolean success = true;
      if (!instantsToArchive.isEmpty()) {
        this.writer = openWriter();
        archive(instantsToArchive);
        success = deleteArchivedInstants(instantsToArchive);
      } 
      return success;
    } finally {
      close();
    }
  }

其中需要通过 getInstantsToArchive获取需要归档的 Instant,然后在进行归档,接着再将 Instant删除。

2.1 获取Instant

通过 getInstantsToArchive来获取待归档的所有 Instant,其核心代码如下

代码语言:javascript复制
private Stream<HoodieInstant> getInstantsToArchive(JavaSparkContext jsc) {
    // 根据配置获取最大/小保留数
    int maxCommitsToKeep = config.getMaxCommitsToKeep();
    int minCommitsToKeep = config.getMinCommitsToKeep();

    HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc);

    // 获取完成的CLEAN类型的timeline
    HoodieTimeline cleanAndRollbackTimeline = table.getActiveTimeline()
        .getTimelineOfActions(Sets.newHashSet(HoodieTimeline.CLEAN_ACTION)).filterCompletedInstants();
    // 按照action进行一次排序,并根据最大保留数过滤出需要处理的instant
    Stream<HoodieInstant> instants = cleanAndRollbackTimeline.getInstants()
        .collect(Collectors.groupingBy(s -> s.getAction())).entrySet().stream().map(i -> {
          if (i.getValue().size() > maxCommitsToKeep) {
            // 待处理的instant
            return i.getValue().subList(0, i.getValue().size() - minCommitsToKeep);
          } else {
            return new ArrayList<HoodieInstant>();
          }
        }).flatMap(i -> i.stream());

    // 获取所有已完成的instant的timeline
    HoodieTimeline commitTimeline = table.getCompletedCommitsTimeline();
    // 最早的处于pending的compaction
    Option<HoodieInstant> oldestPendingCompactionInstant =
        table.getActiveTimeline().filterPendingCompactionTimeline().firstInstant();

    // 获取第一个(最早)的savepoint
    Option<HoodieInstant> firstSavepoint = table.getCompletedSavepointTimeline().firstInstant();
    // 已完成的timeline不为空,并且instant大于最大保留数,进一步处理
    if (!commitTimeline.empty() && commitTimeline.countInstants() > maxCommitsToKeep) {
      // 将已完成clean类型的instant与其他instant连接在一起
      instants = Stream.concat(instants, commitTimeline.getInstants().filter(s -> {
        // 如果savepoint不存在,则不用过滤,若存在,那么需要过滤出小于savepoint的所有instant
        return !(firstSavepoint.isPresent() && HoodieTimeline.compareTimestamps(firstSavepoint.get().getTimestamp(),
            s.getTimestamp(), HoodieTimeline.LESSER_OR_EQUAL));
      }).filter(s -> {
        // 再进行一次过滤,过滤出小于最早的pending的compaction,即大于的instant会被保留,不会被archive
        return oldestPendingCompactionInstant.map(instant -> {
          return HoodieTimeline.compareTimestamps(instant.getTimestamp(), s.getTimestamp(), HoodieTimeline.GREATER);
        }).orElse(true);
      }).limit(commitTimeline.countInstants() - minCommitsToKeep));
    }

    // 所有instant组成的timeline(包括各种中间状态的instant)
    HoodieActiveTimeline rawActiveTimeline = new HoodieActiveTimeline(metaClient, false);
    // 按照时间和action进行分类
    Map<Pair<String, String>, List<HoodieInstant>> groupByTsAction = rawActiveTimeline.getInstants()
        .collect(Collectors.groupingBy(i -> Pair.of(i.getTimestamp(),
            HoodieInstant.getComparableAction(i.getAction()))));
    // 所有待处理的instant
    return instants.flatMap(hoodieInstant ->
        groupByTsAction.get(Pair.of(hoodieInstant.getTimestamp(),
            HoodieInstant.getComparableAction(hoodieInstant.getAction()))).stream());
  }

在获取待处理的 instant时,会先根据配置项读取最大/最小需要保留的 commit,接着单独处理 clean类型的 instant,之后根据最早的 savepoint和处于 pendingcompaction来过滤出需要被处理的 instant(时间小于最早的 savepoint并且也小于最早的 compaction),然后与 clean类型的 instant合并,最后返回待处理的所有 instant

2.2 归档instant

在获取所有待归档的 Instant后,便调用 HoodieCommitArchiveLog#archive开始归档,其核心代码如下

代码语言:javascript复制
public void archive(List<HoodieInstant> instants) throws HoodieCommitException {
    try {
      // 获取完成的instant的timeline
      HoodieTimeline commitTimeline = metaClient.getActiveTimeline().getAllCommitsTimeline().filterCompletedInstants();
      Schema wrapperSchema = HoodieArchivedMetaEntry.getClassSchema();
      List<IndexedRecord> records = new ArrayList<>();
      for (HoodieInstant hoodieInstant : instants) {
        try {
          // 将instant转化为IndexedRecord
          records.add(convertToAvroRecord(commitTimeline, hoodieInstant));
          if (records.size() >= this.config.getCommitArchivalBatchSize()) { 
            // 写入文件
            writeToFile(wrapperSchema, records);
          }
        } catch (Exception e) {
          if (this.config.isFailOnTimelineArchivingEnabled()) {
            // 抛出异常
            throw e;
          }
        }
      }
      // 写入剩余的records
      writeToFile(wrapperSchema, records);
    } catch (Exception e) {
      throw new HoodieCommitException("Failed to archive commits", e);
    }
  }

可以看到,首先会调用 convertToAvroRecordinstant根据不同类型转化为 IndexedRecord并放入集合中,然后调用 writeToFile将集合的记录写入文件,其中 writeToFile方法核心代码如下

代码语言:javascript复制
private void writeToFile(Schema wrapperSchema, List<IndexedRecord> records) throws Exception {
    if (records.size() > 0) 
      // Block头信息
      Map<HeaderMetadataType, String> header = Maps.newHashMap();
      header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, wrapperSchema.toString());
      // 生成Avro Block块
      HoodieAvroDataBlock block = new HoodieAvroDataBlock(records, header);
      // 添加block块写入文件
      this.writer = writer.appendBlock(block);
      // 清空集合
      records.clear();
    }
  }

可以看到,写入文件流程非常简单,即将集合放入 Block块中,然后再写入文档文件中(与写入日志文件类似)。

2.3 删除Instant

在归档完后,需要调用 HoodieCommitArchiveLog#deleteArchivedInstants方法来删除Timeline中的已归档的 instant,其核心代码如下

代码语言:javascript复制
private boolean deleteArchivedInstants(List<HoodieInstant> archivedInstants) throws IOException {
    boolean success = true;
    for (HoodieInstant archivedInstant : archivedInstants) {
      Path commitFile = new Path(metaClient.getMetaPath(), archivedInstant.getFileName());
      try {
        if (metaClient.getFs().exists(commitFile)) { // 文件存在
          // 删除
          success &= metaClient.getFs().delete(commitFile, false);
        }
      } catch (IOException e) {
        throw new HoodieIOException("Failed to delete archived instant "   archivedInstant, e);
      }
    }

    // 获取最后一个instant
    Option<HoodieInstant> latestCommitted = Option.fromJavaOptional(archivedInstants.stream().filter(i -> {
      return i.isCompleted() && (i.getAction().equals(HoodieTimeline.COMMIT_ACTION)
          || (i.getAction().equals(HoodieTimeline.DELTA_COMMIT_ACTION)));
    }).max(Comparator.comparing(HoodieInstant::getTimestamp)));
    if (latestCommitted.isPresent()) { // 存在
      // 删除aux目录下的文件
      success &= deleteAllInstantsOlderorEqualsInAuxMetaFolder(latestCommitted.get());
    }
    return success;
  }

可以看到删除 instant的主要逻辑就是删除其对应的文件(包括元数据目录和aux目录)。

3. 总结

对于 archive而言,在每次 commit时都会判断是否需要进行 archive。Hudi分为三个步骤处理 archive,即找出待归档的instant(根据配置找出,必须小于最早的 savepointpending状态的 compaction)、归档 instant(将instant对应的文件的内容按照日志文件的写入方式写入归档文件)、删除已归档 instant(删除对应的文件)。

欢迎Star&Fork. https://github.com/apache/incubator-hudi

0 人点赞