1. 介绍
Timline
(时间轴)由很多 instant
构成,按照时间由小到大排列。当不断写入Hudi数据集时,Timeline上的 Instant
会不断增加,为减小 Timeline
的操作压力,会在 commit
时按照配置对 instant
进行归档,并从 Timeline
上将已归档的 instant
删除。
2. 分析
在每次 commit
时会调用 HoodieCommitArchiveLog#archiveIfRequired
来判断是否需要进行归档,其核心代码如下
public boolean archiveIfRequired(final JavaSparkContext jsc) throws IOException {
try {
List<HoodieInstant> instantsToArchive = getInstantsToArchive(jsc).collect(Collectors.toList());
boolean success = true;
if (!instantsToArchive.isEmpty()) {
this.writer = openWriter();
archive(instantsToArchive);
success = deleteArchivedInstants(instantsToArchive);
}
return success;
} finally {
close();
}
}
其中需要通过 getInstantsToArchive
获取需要归档的 Instant
,然后在进行归档,接着再将 Instant
删除。
2.1 获取Instant
通过 getInstantsToArchive
来获取待归档的所有 Instant
,其核心代码如下
private Stream<HoodieInstant> getInstantsToArchive(JavaSparkContext jsc) {
// 根据配置获取最大/小保留数
int maxCommitsToKeep = config.getMaxCommitsToKeep();
int minCommitsToKeep = config.getMinCommitsToKeep();
HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc);
// 获取完成的CLEAN类型的timeline
HoodieTimeline cleanAndRollbackTimeline = table.getActiveTimeline()
.getTimelineOfActions(Sets.newHashSet(HoodieTimeline.CLEAN_ACTION)).filterCompletedInstants();
// 按照action进行一次排序,并根据最大保留数过滤出需要处理的instant
Stream<HoodieInstant> instants = cleanAndRollbackTimeline.getInstants()
.collect(Collectors.groupingBy(s -> s.getAction())).entrySet().stream().map(i -> {
if (i.getValue().size() > maxCommitsToKeep) {
// 待处理的instant
return i.getValue().subList(0, i.getValue().size() - minCommitsToKeep);
} else {
return new ArrayList<HoodieInstant>();
}
}).flatMap(i -> i.stream());
// 获取所有已完成的instant的timeline
HoodieTimeline commitTimeline = table.getCompletedCommitsTimeline();
// 最早的处于pending的compaction
Option<HoodieInstant> oldestPendingCompactionInstant =
table.getActiveTimeline().filterPendingCompactionTimeline().firstInstant();
// 获取第一个(最早)的savepoint
Option<HoodieInstant> firstSavepoint = table.getCompletedSavepointTimeline().firstInstant();
// 已完成的timeline不为空,并且instant大于最大保留数,进一步处理
if (!commitTimeline.empty() && commitTimeline.countInstants() > maxCommitsToKeep) {
// 将已完成clean类型的instant与其他instant连接在一起
instants = Stream.concat(instants, commitTimeline.getInstants().filter(s -> {
// 如果savepoint不存在,则不用过滤,若存在,那么需要过滤出小于savepoint的所有instant
return !(firstSavepoint.isPresent() && HoodieTimeline.compareTimestamps(firstSavepoint.get().getTimestamp(),
s.getTimestamp(), HoodieTimeline.LESSER_OR_EQUAL));
}).filter(s -> {
// 再进行一次过滤,过滤出小于最早的pending的compaction,即大于的instant会被保留,不会被archive
return oldestPendingCompactionInstant.map(instant -> {
return HoodieTimeline.compareTimestamps(instant.getTimestamp(), s.getTimestamp(), HoodieTimeline.GREATER);
}).orElse(true);
}).limit(commitTimeline.countInstants() - minCommitsToKeep));
}
// 所有instant组成的timeline(包括各种中间状态的instant)
HoodieActiveTimeline rawActiveTimeline = new HoodieActiveTimeline(metaClient, false);
// 按照时间和action进行分类
Map<Pair<String, String>, List<HoodieInstant>> groupByTsAction = rawActiveTimeline.getInstants()
.collect(Collectors.groupingBy(i -> Pair.of(i.getTimestamp(),
HoodieInstant.getComparableAction(i.getAction()))));
// 所有待处理的instant
return instants.flatMap(hoodieInstant ->
groupByTsAction.get(Pair.of(hoodieInstant.getTimestamp(),
HoodieInstant.getComparableAction(hoodieInstant.getAction()))).stream());
}
在获取待处理的 instant
时,会先根据配置项读取最大/最小需要保留的 commit
,接着单独处理 clean
类型的 instant
,之后根据最早的 savepoint
和处于 pending
的 compaction
来过滤出需要被处理的 instant
(时间小于最早的 savepoint
并且也小于最早的 compaction
),然后与 clean
类型的 instant
合并,最后返回待处理的所有 instant
。
2.2 归档instant
在获取所有待归档的 Instant
后,便调用 HoodieCommitArchiveLog#archive
开始归档,其核心代码如下
public void archive(List<HoodieInstant> instants) throws HoodieCommitException {
try {
// 获取完成的instant的timeline
HoodieTimeline commitTimeline = metaClient.getActiveTimeline().getAllCommitsTimeline().filterCompletedInstants();
Schema wrapperSchema = HoodieArchivedMetaEntry.getClassSchema();
List<IndexedRecord> records = new ArrayList<>();
for (HoodieInstant hoodieInstant : instants) {
try {
// 将instant转化为IndexedRecord
records.add(convertToAvroRecord(commitTimeline, hoodieInstant));
if (records.size() >= this.config.getCommitArchivalBatchSize()) {
// 写入文件
writeToFile(wrapperSchema, records);
}
} catch (Exception e) {
if (this.config.isFailOnTimelineArchivingEnabled()) {
// 抛出异常
throw e;
}
}
}
// 写入剩余的records
writeToFile(wrapperSchema, records);
} catch (Exception e) {
throw new HoodieCommitException("Failed to archive commits", e);
}
}
可以看到,首先会调用 convertToAvroRecord
将 instant
根据不同类型转化为 IndexedRecord
并放入集合中,然后调用 writeToFile
将集合的记录写入文件,其中 writeToFile
方法核心代码如下
private void writeToFile(Schema wrapperSchema, List<IndexedRecord> records) throws Exception {
if (records.size() > 0)
// Block头信息
Map<HeaderMetadataType, String> header = Maps.newHashMap();
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, wrapperSchema.toString());
// 生成Avro Block块
HoodieAvroDataBlock block = new HoodieAvroDataBlock(records, header);
// 添加block块写入文件
this.writer = writer.appendBlock(block);
// 清空集合
records.clear();
}
}
可以看到,写入文件流程非常简单,即将集合放入 Block
块中,然后再写入文档文件中(与写入日志文件类似)。
2.3 删除Instant
在归档完后,需要调用 HoodieCommitArchiveLog#deleteArchivedInstants
方法来删除Timeline中的已归档的 instant
,其核心代码如下
private boolean deleteArchivedInstants(List<HoodieInstant> archivedInstants) throws IOException {
boolean success = true;
for (HoodieInstant archivedInstant : archivedInstants) {
Path commitFile = new Path(metaClient.getMetaPath(), archivedInstant.getFileName());
try {
if (metaClient.getFs().exists(commitFile)) { // 文件存在
// 删除
success &= metaClient.getFs().delete(commitFile, false);
}
} catch (IOException e) {
throw new HoodieIOException("Failed to delete archived instant " archivedInstant, e);
}
}
// 获取最后一个instant
Option<HoodieInstant> latestCommitted = Option.fromJavaOptional(archivedInstants.stream().filter(i -> {
return i.isCompleted() && (i.getAction().equals(HoodieTimeline.COMMIT_ACTION)
|| (i.getAction().equals(HoodieTimeline.DELTA_COMMIT_ACTION)));
}).max(Comparator.comparing(HoodieInstant::getTimestamp)));
if (latestCommitted.isPresent()) { // 存在
// 删除aux目录下的文件
success &= deleteAllInstantsOlderorEqualsInAuxMetaFolder(latestCommitted.get());
}
return success;
}
可以看到删除 instant
的主要逻辑就是删除其对应的文件(包括元数据目录和aux目录)。
3. 总结
对于 archive
而言,在每次 commit
时都会判断是否需要进行 archive
。Hudi分为三个步骤处理 archive
,即找出待归档的instant(根据配置找出,必须小于最早的 savepoint
和 pending
状态的 compaction
)、归档 instant
(将instant对应的文件的内容按照日志文件的写入方式写入归档文件)、删除已归档 instant
(删除对应的文件)。
欢迎Star&Fork. https://github.com/apache/incubator-hudi