1. 介绍
Hudi
将记录写入数据 parquet
文件或日志 log
文件,而这些文件在内存中是如何进行管理呢?如之前的文章中提到过的 HoodieFileGroup
、 FileSlice
等与数据文件和日志文件是什么对应关系?本篇详细分析 Hudi
的文件管理。
2. 分析
2.1 文件组
HoodieFileGroup
表示文件组,其包含的字段信息如下
public class HoodieFileGroup implements Serializable {
// 文件组ID
private final HoodieFileGroupId fileGroupId;
// FileSlices,按照提交时间大小排序
private final TreeMap<String, FileSlice> fileSlices;
// 时间轴
private final HoodieTimeline timeline;
// 上一次完成的Instant,充当水位基准.
private final Option<HoodieInstant> lastInstant;
}
其中, HoodieFileGroup
由 HoodieFileGroupId
唯一标识;每个 HoodieFileGroup
中会包含一个 TreeMap<CommitTime,FileSlice>
,按照 CommitTime
从大到小排序;为方便操作会保存一个 Timeline
,以及最后完成的 Instant
。
2.2 文件组ID
HoodieFileGroupId
表示文件组ID,其包含字段信息如下
public class HoodieFileGroupId implements Serializable {
// 分区路径
private final String partitionPath;
// 文件ID
private final String fileId;
}
每个文件组ID由分区路径和文件ID唯一标识,不同的分区或不同的文件ID均属于不同的 HoodieFileGroup
。
2.3 文件片
FileSlice
表示文件片,其包含字段信息如下
public class FileSlice implements Serializable {
// 文件组ID
private HoodieFileGroupId fileGroupId;
// Instant的时间
private String baseInstantTime;
// 数据文件
private HoodieDataFile dataFile;
// 日志文件列表,按照更低版本排序,在MOR时存在,COW时为空
private final TreeSet<HoodieLogFile> logFiles;
}
一个 FileSlice
对应一个数据文件和日志文件列表,并且其包含一个基准时间(数据文件和日志文件都有相同的时间基准)。
2.4 数据文件
HoodieDataFile
表示数据文件,其包含字段信息如下
public class HoodieDataFile implements Serializable {
// 文件状态
private transient FileStatus fileStatus;
// 文件全路径
private final String fullPath;
// 文件大小
private long fileLen;
}
每个数据文件包含了一个文件状态,文件的全路径以及文件的长度。
2.5 日志文件
HoodieLogFile
包含的字段信息如下
public class HoodieLogFile implements Serializable {
// 日志文件扩展名
public static final String DELTA_EXTENSION = ".log";
// 日志文件基准版本
public static final Integer LOGFILE_BASE_VERSION = 1;
// 文件状态
private transient FileStatus fileStatus;
// 文件路径
private final String pathStr;
// 文件大小
private long fileLen;
}
日志文件与数据文件包含信息类似,日志文件的初始化版本为1。
2.6 生成文件组
下面以 AbstractTableFileSystemView#buildFileGroups
为例,分析 HoodieFileGroup
的生成逻辑,其核心代码如下
protected List<HoodieFileGroup> buildFileGroups(Stream<HoodieDataFile> dataFileStream,
Stream<HoodieLogFile> logFileStream, HoodieTimeline timeline, boolean addPendingCompactionFileSlice) {
// 获取所有数据文件对应的分区路径、文件ID(相同的分区路径、文件ID会对应数据文件列表)
Map<Pair<String, String>, List<HoodieDataFile>> dataFiles =
dataFileStream.collect(Collectors.groupingBy((dataFile) -> {
String partitionPathStr = getPartitionPathFromFilePath(dataFile.getPath());
return Pair.of(partitionPathStr, dataFile.getFileId());
}));
// 获取所有日志文件对应的分区路径、文件ID(相同的分区路径、文件ID会对应日志文件列表)
Map<Pair<String, String>, List<HoodieLogFile>> logFiles = logFileStream.collect(Collectors.groupingBy((logFile) -> {
String partitionPathStr =
FSUtils.getRelativePartitionPath(new Path(metaClient.getBasePath()), logFile.getPath().getParent());
return Pair.of(partitionPathStr, logFile.getFileId());
}));
// 初始化所有的数据文件和日志文件(过滤掉相同的<Partition, FileID>)
Set<Pair<String, String>> fileIdSet = new HashSet<>(dataFiles.keySet());
fileIdSet.addAll(logFiles.keySet());
List<HoodieFileGroup> fileGroups = new ArrayList<>();
fileIdSet.forEach(pair -> {
// 获取文件ID
String fileId = pair.getValue();
// 生成新的文件组
HoodieFileGroup group = new HoodieFileGroup(pair.getKey(), fileId, timeline);
if (dataFiles.containsKey(pair)) { // 包含在数据文件集合中
// 添加该数据文件
dataFiles.get(pair).forEach(group::addDataFile);
}
if (logFiles.containsKey(pair)) { // 包含在日志文件集合中
// 添加该日志文件
logFiles.get(pair).forEach(group::addLogFile);
}
if (addPendingCompactionFileSlice) { // 添加pending的compaction的FileSlice
Option<Pair<String, CompactionOperation>> pendingCompaction =
getPendingCompactionOperationWithInstant(group.getFileGroupId());
if (pendingCompaction.isPresent()) { // 存在pending的compaction
// 添加至文件组
group.addNewFileSliceAtInstant(pendingCompaction.get().getKey());
}
}
fileGroups.add(group);
});
return fileGroups;
}
可以看到,对于文件组的构建,首先会对指定分区的所有数据文件和日志文件进行一次排序(按照分区路径和文件ID),然后对每个 <分区路径,文件ID>
生成一个文件组,并将具有相同 <分支路径,文件ID>
的日志文件和数据文件放入该文件组。
下面简要介绍数据文件和日志文件的文件名的生成。
2.7 数据文件名生成
代码语言:javascript复制public static String makeDataFileName(String commitTime, String writeToken, String fileId) {
return String.format("%s_%s_%s.parquet", fileId, writeToken, commitTime);
}
使用文件ID、writeToken、提交时间组成完整的文件名,其中writeToken的生成方法如下
代码语言:javascript复制public static String makeWriteToken(int taskPartitionId, int stageId, long taskAttemptId) {
return String.format("%d-%d-%d", taskPartitionId, stageId, taskAttemptId);
}
2.8 日志文件名生成
代码语言:javascript复制public static String makeLogFileName(String fileId, String logFileExtension, String baseCommitTime, int version,
String writeToken) {
String suffix =
(writeToken == null) ? String.format("%s_%s%s.%d", fileId, baseCommitTime, logFileExtension, version)
: String.format("%s_%s%s.%d_%s", fileId, baseCommitTime, logFileExtension, version, writeToken);
return LOG_FILE_PREFIX suffix;
}
使用文件ID、提交时间、日志文件扩展名、版本号、writeToken生成完整的文件名。
而对于文件ID、提交时间等可直接按照 _
进行分割或正则表达式来获取。
3. 总结
Hudi
中对文件的管理的核心是 HoodieFileGroup
,由 <分区路径,文件ID>
唯一标识,并且会保存不同的 FileSlice
,每个 FileSlice
包含最多一个数据文件和一个日志文件列表,对于有相同文件ID但不同提交时间的数据文件会保存在同一个 HoodieFileGroup
,而不同文件ID会保存在不同 HoodieFileGroup
中;而对于有相同文件ID和提交时间的数据文件和日志文件会被放入同一个 FileSlice
,对于具有相同文件ID,但不同提交时间的日志文件和数据文件会被放入不同的 FileSlice
。