1. 介绍
前面分析了Hudi默认的索引实现HoodieBloomIndex,其是基于分区记录所在文件,即分区路径 recordKey唯一即可,Hudi还提供了HoodieGlobalBloomIndex的实现,即全局索引实现,只需要recordKey唯一即可,下面分析其实现。
2. 分析
HoodieGlobalBloomIndex是HoodieBloomIndex的子类,其主要重写了父类的如下几个方法
代码语言:javascript复制// 加载分区下所有最新的文件
List<Tuple2<String, BloomIndexFileInfo>> loadInvolvedFiles(List<String> partitions, final JavaSparkContext jsc, final HoodieTable hoodieTable)
// 查找记录对应的文件
JavaRDD<Tuple2<String, HoodieKey>> explodeRecordRDDWithFileComparisons(final Map<String, List<BloomIndexFileInfo>> partitionToFileIndexInfo, JavaPairRDD<String, String> partitionRecordKeyPairRDD)
// 将位置信息推回至原始记录
protected JavaRDD<HoodieRecord<T>> tagLocationBacktoRecords(JavaPairRDD<HoodieKey, HoodieRecordLocation> keyLocationPairRDD, JavaRDD<HoodieRecord<T>> recordRDD)
对于加载分区下所有最新文件而言, loadInvolvedFiles
核心代码如下
List<Tuple2<String, BloomIndexFileInfo>> loadInvolvedFiles(List<String> partitions, final JavaSparkContext jsc,
final HoodieTable hoodieTable) {
HoodieTableMetaClient metaClient = hoodieTable.getMetaClient();
try {
// 获取所有分区
List<String> allPartitionPaths = FSUtils.getAllPartitionPaths(metaClient.getFs(), metaClient.getBasePath(),
config.shouldAssumeDatePartitioning());
// 调用父类方法加载所有分区下最新数据文件
return super.loadInvolvedFiles(allPartitionPaths, jsc, hoodieTable);
} catch (IOException e) {
throw new HoodieIOException("Failed to load all partitions", e);
}
}
首先会获取所有的分区路径,然后调用父类方法获取分区下最新数据文件。
对于查找记录对应的文件而言, explodeRecordRDDWithFileComparisons
核心代码如下
JavaRDD<Tuple2<String, HoodieKey>> explodeRecordRDDWithFileComparisons(
final Map<String, List<BloomIndexFileInfo>> partitionToFileIndexInfo,
JavaPairRDD<String, String> partitionRecordKeyPairRDD) {
// 使用索引过滤器,根据之前读取的最大和最小recordKey进行初始化
IndexFileFilter indexFileFilter =
config.getBloomIndexPruneByRanges() ? new IntervalTreeBasedGlobalIndexFileFilter(partitionToFileIndexInfo)
: new ListBasedGlobalIndexFileFilter(partitionToFileIndexInfo);
return partitionRecordKeyPairRDD.map(partitionRecordKeyPair -> {
String recordKey = partitionRecordKeyPair._2();
String partitionPath = partitionRecordKeyPair._1();
// 获取匹配的文件和partition
return indexFileFilter.getMatchingFilesAndPartition(partitionPath, recordKey).stream()
.map(partitionFileIdPair -> new Tuple2<>(partitionFileIdPair.getRight(),
new HoodieKey(recordKey, partitionFileIdPair.getLeft())))
.collect(Collectors.toList());
}).flatMap(List::iterator);
}
可以看到和 HoodieBloomIndex#explodeRecordRDDWithFileComparisons
处理逻辑类似,在使用索引过滤器获取所有匹配的文件和分区路径时,此时比较的是所有分区下的文件,不再是指定的分区路径。
对于将位置信息推回至原始记录而言, tagLocationBacktoRecords
核心代码如下
protected JavaRDD<HoodieRecord<T>> tagLocationBacktoRecords(
JavaPairRDD<HoodieKey, HoodieRecordLocation> keyLocationPairRDD, JavaRDD<HoodieRecord<T>> recordRDD) {
// 对原始记录进行转化
JavaPairRDD<String, HoodieRecord<T>> incomingRowKeyRecordPairRDD =
recordRDD.mapToPair(record -> new Tuple2<>(record.getRecordKey(), record));
// 对带有位置信息的记录也进行一次转化
JavaPairRDD<String, Tuple2<HoodieRecordLocation, HoodieKey>> existingRecordKeyToRecordLocationHoodieKeyMap =
keyLocationPairRDD.mapToPair(p -> new Tuple2<>(p._1.getRecordKey(), new Tuple2<>(p._2, p._1)));
// 以recordKey进行一次左外连接,确定位置信息
return incomingRowKeyRecordPairRDD.leftOuterJoin(existingRecordKeyToRecordLocationHoodieKeyMap).values().map(record -> {
final HoodieRecord<T> hoodieRecord = record._1;
final Optional<Tuple2<HoodieRecordLocation, HoodieKey>> recordLocationHoodieKeyPair = record._2;
if (recordLocationHoodieKeyPair.isPresent()) {
// Record key matched to file
return getTaggedRecord(new HoodieRecord<>(recordLocationHoodieKeyPair.get()._2, hoodieRecord.getData()), Option.ofNullable(recordLocationHoodieKeyPair.get()._1));
} else {
return getTaggedRecord(hoodieRecord, Option.empty());
}
});
}
其处理逻辑与父类处理逻辑相同,也是使用一次左外连接将位置信息推回至原始记录。
3. 总结
对于 HoodieGlobalBloomIndex
而言,其是全局的索引,即会在所有分区内查找指定的recordKey,而非像 HoodieBloomIndex
只在指定的分区内查找,同时在加载分区下所有最新文件时,其会首先获取所有分区,然后再获取所有分区下的最新文件,而非使用从原始记录中解析出来的分区路径。