揭秘ApacheHudi数据湖的文件管理

2021-04-13 10:39:21 浏览数 (1)

1. 介绍

Hudi将记录写入数据 parquet文件或日志 log文件,而这些文件在内存中是如何进行管理呢?如之前的文章中提到过的 HoodieFileGroupFileSlice等与数据文件和日志文件是什么对应关系?本篇详细分析 Hudi的文件管理。

2. 分析

2.1 文件组

HoodieFileGroup表示文件组,其包含的字段信息如下

代码语言:javascript复制
public class HoodieFileGroup implements Serializable {
  // 文件组ID
  private final HoodieFileGroupId fileGroupId;

  // FileSlices,按照提交时间大小排序
  private final TreeMap<String, FileSlice> fileSlices;

  // 时间轴
  private final HoodieTimeline timeline;

  // 上一次完成的Instant,充当水位基准.
  private final Option<HoodieInstant> lastInstant;
}

其中, HoodieFileGroupHoodieFileGroupId唯一标识;每个 HoodieFileGroup中会包含一个 TreeMap<CommitTime,FileSlice>,按照 CommitTime从大到小排序;为方便操作会保存一个 Timeline,以及最后完成的 Instant

2.2 文件组ID

HoodieFileGroupId表示文件组ID,其包含字段信息如下

代码语言:javascript复制
public class HoodieFileGroupId implements Serializable {
  // 分区路径
  private final String partitionPath;
  // 文件ID
  private final String fileId;
}

每个文件组ID由分区路径和文件ID唯一标识,不同的分区或不同的文件ID均属于不同的 HoodieFileGroup

2.3 文件片

FileSlice表示文件片,其包含字段信息如下

代码语言:javascript复制
public class FileSlice implements Serializable {
  // 文件组ID
  private HoodieFileGroupId fileGroupId;

  // Instant的时间
  private String baseInstantTime;

  // 数据文件
  private HoodieDataFile dataFile;

  // 日志文件列表,按照更低版本排序,在MOR时存在,COW时为空
  private final TreeSet<HoodieLogFile> logFiles;
}

一个 FileSlice对应一个数据文件和日志文件列表,并且其包含一个基准时间(数据文件和日志文件都有相同的时间基准)。

2.4 数据文件

HoodieDataFile表示数据文件,其包含字段信息如下

代码语言:javascript复制
public class HoodieDataFile implements Serializable {
  // 文件状态
  private transient FileStatus fileStatus;
  // 文件全路径
  private final String fullPath;
  // 文件大小
  private long fileLen;
}

每个数据文件包含了一个文件状态,文件的全路径以及文件的长度。

2.5 日志文件

HoodieLogFile包含的字段信息如下

代码语言:javascript复制
public class HoodieLogFile implements Serializable {
  // 日志文件扩展名
  public static final String DELTA_EXTENSION = ".log";
  // 日志文件基准版本
  public static final Integer LOGFILE_BASE_VERSION = 1;
  // 文件状态
  private transient FileStatus fileStatus;
  // 文件路径
  private final String pathStr;
  // 文件大小
  private long fileLen;
}

日志文件与数据文件包含信息类似,日志文件的初始化版本为1。

2.6 生成文件组

下面以 AbstractTableFileSystemView#buildFileGroups为例,分析 HoodieFileGroup的生成逻辑,其核心代码如下

代码语言:javascript复制
protected List<HoodieFileGroup> buildFileGroups(Stream<HoodieDataFile> dataFileStream,
      Stream<HoodieLogFile> logFileStream, HoodieTimeline timeline, boolean addPendingCompactionFileSlice) {
    // 获取所有数据文件对应的分区路径、文件ID(相同的分区路径、文件ID会对应数据文件列表)
    Map<Pair<String, String>, List<HoodieDataFile>> dataFiles =
        dataFileStream.collect(Collectors.groupingBy((dataFile) -> {
          String partitionPathStr = getPartitionPathFromFilePath(dataFile.getPath());
          return Pair.of(partitionPathStr, dataFile.getFileId());
        }));
    // 获取所有日志文件对应的分区路径、文件ID(相同的分区路径、文件ID会对应日志文件列表)
    Map<Pair<String, String>, List<HoodieLogFile>> logFiles = logFileStream.collect(Collectors.groupingBy((logFile) -> {
      String partitionPathStr =
          FSUtils.getRelativePartitionPath(new Path(metaClient.getBasePath()), logFile.getPath().getParent());
      return Pair.of(partitionPathStr, logFile.getFileId());
    }));
    // 初始化所有的数据文件和日志文件(过滤掉相同的<Partition, FileID>)
    Set<Pair<String, String>> fileIdSet = new HashSet<>(dataFiles.keySet());
    fileIdSet.addAll(logFiles.keySet());

    List<HoodieFileGroup> fileGroups = new ArrayList<>();
    fileIdSet.forEach(pair -> {
      // 获取文件ID
      String fileId = pair.getValue();
      // 生成新的文件组
      HoodieFileGroup group = new HoodieFileGroup(pair.getKey(), fileId, timeline);
      if (dataFiles.containsKey(pair)) { // 包含在数据文件集合中
        // 添加该数据文件
        dataFiles.get(pair).forEach(group::addDataFile);
      }
      if (logFiles.containsKey(pair)) { // 包含在日志文件集合中
        // 添加该日志文件
        logFiles.get(pair).forEach(group::addLogFile);
      }
      if (addPendingCompactionFileSlice) { // 添加pending的compaction的FileSlice
        Option<Pair<String, CompactionOperation>> pendingCompaction =
            getPendingCompactionOperationWithInstant(group.getFileGroupId());
        if (pendingCompaction.isPresent()) { // 存在pending的compaction
          // 添加至文件组
          group.addNewFileSliceAtInstant(pendingCompaction.get().getKey());
        }
      }
      fileGroups.add(group);
    });

    return fileGroups;
  }

可以看到,对于文件组的构建,首先会对指定分区的所有数据文件和日志文件进行一次排序(按照分区路径和文件ID),然后对每个 <分区路径,文件ID>生成一个文件组,并将具有相同 <分支路径,文件ID>的日志文件和数据文件放入该文件组。

下面简要介绍数据文件和日志文件的文件名的生成。

2.7 数据文件名生成

代码语言:javascript复制
public static String makeDataFileName(String commitTime, String writeToken, String fileId) {
    return String.format("%s_%s_%s.parquet", fileId, writeToken, commitTime);
  }

使用文件ID、writeToken、提交时间组成完整的文件名,其中writeToken的生成方法如下

代码语言:javascript复制
public static String makeWriteToken(int taskPartitionId, int stageId, long taskAttemptId) {
    return String.format("%d-%d-%d", taskPartitionId, stageId, taskAttemptId);
  }

2.8 日志文件名生成

代码语言:javascript复制
public static String makeLogFileName(String fileId, String logFileExtension, String baseCommitTime, int version,
      String writeToken) {
    String suffix =
        (writeToken == null) ? String.format("%s_%s%s.%d", fileId, baseCommitTime, logFileExtension, version)
            : String.format("%s_%s%s.%d_%s", fileId, baseCommitTime, logFileExtension, version, writeToken);
    return LOG_FILE_PREFIX   suffix;
  }

使用文件ID、提交时间、日志文件扩展名、版本号、writeToken生成完整的文件名。

而对于文件ID、提交时间等可直接按照 _进行分割或正则表达式来获取。

3. 总结

Hudi中对文件的管理的核心是 HoodieFileGroup,由 <分区路径,文件ID>唯一标识,并且会保存不同的 FileSlice,每个 FileSlice包含最多一个数据文件和一个日志文件列表,对于有相同文件ID但不同提交时间的数据文件会保存在同一个 HoodieFileGroup,而不同文件ID会保存在不同 HoodieFileGroup中;而对于有相同文件ID和提交时间的数据文件和日志文件会被放入同一个 FileSlice,对于具有相同文件ID,但不同提交时间的日志文件和数据文件会被放入不同的 FileSlice

0 人点赞