Apache Hudi压缩Compaction源码解析

2022-12-05 09:10:54 浏览数 (1)

一、基础概念

了解过hudi的新手或者专家都知道,hudi不管是COW还是MOR表,其文件还是存储在hdfs上。因为下来介绍我在学习hudi压缩的一些东西,所以下方就以MOR表文件做下介绍。下方会由浅入深尝试说明压缩计划和压缩策略等等之间的关系。

对文件排列方面,例如FileGroup、FileSlice等概念还未学习的同学可以先从了解这个图后再继续。

可以理解为如果数据文件(若干log和parquet文件)的instantTime一样,那么他们都属于一个fileSlice。fileSlice所以也有FileGroup来标记自己属于哪个分区和自己的ID。

首先是数据文件,分为log文件和data文件。前者是avro格式的,保存较新的数据,而后者是parquet文件,内部数据则是更早或是和log合并后的。下方以一个非压缩非clustering的表文件为例子:

basePath就是/user/ocdp/test/hudi/mor_noComNoClu。表名字就是mor_noComNoClu

而后面time_15min则表示hudi表各个分区目录,当前是以系统时间,每15分钟一个分区。

接下来以一个分区为例子:

上方的是log文件,下方是parquet文件。根据代码中的命名含义,parquet文件的名由4部分组成:

5eaef46b-f581-41a2-8012-c66f608b3070-0_8-514-118306_20220329133607800.parquet

fileId_writeToken_instantTime.parquet

fileId就是FileGroup中的ID(FileGroup的另一个组成部分为Partition也就是分区名)

writeToken暂时理解为一个随机数,他主要在执行压缩计划阶段会用。

instantTime就是时间戳,时间戳可以理解为稍后要说的元数据文件(202204xxx.deltacommit.requested)的时间戳。

parquet表示了文件里的格式,也可能在有的环境上用的是.orc 或.hfile等。

下方是该表./hoodie下的文件样例:

只有MOR表才有这种deltaCommit(COW表在这元数据目录下只有commit),xxx.request表示待执行的commit,如果xx这个instanTime出现了对应的inflight状态文件,就表示该commit正在处理。一定时间后若是出现该instanTime的.deltacommit文件,那就表示这个commit已经完成。注意这里一个deltacommit,其instantTime实际对应了若干个数据文件、例如20220419110525544.celtacommit 他对应了很多log和parquet文件。

可见每个文件都比较小,不利于集群维护以及hudi查找时的效率。所以我们无论是以同步(intime)还是异步(offline)压缩,都是为了把这些打把的小文件合并成少数的大文件。

hudi自己提供了很多压缩的配置来应对使用者的需求,包括指定不同的压缩策略,压缩触发条件等等。

二、代码解读

以手动调用compactor生成一个压缩计划的方式为例。也是在yarn上生成一个spark作业。提交该作业时有很多参数,其中压缩策略、并行度、内存都是可选参数,而目标instantTime必须要给出,以及-sc表示“scheduleCompaction”也要开启(反之就是执行压缩计划而不是生成压缩计划)。

跳过若干调用过程,从下方开始重点介绍

ScheduleCompactionActionExecutor.java

这里就是对上方生成压缩计划时,给定的instantTime进行“合法性验证”。长话短说就是要求目标instantTime要同时满足下方两个条件。

  1. 目标instantTime必须大于所有activecommit时间且小于最小的inflight状态的非compaction的instantTime。这样是为了确保压缩时还有没有正常写入、清除的动作正在进行,从而影响了压缩时间范围。
  2. 目标instantTime必须大于所有commit、deltacommit的完成时instantTime,和现存(老的)compaction的instantTime。和上方目的类似,保证了压缩时间轴范围上的连贯性。

紧接着使用HoodieCompactionPlan plan = scheduleCompaction();去获取压缩计划,其逻辑如下:

拿到一个存放“所有现有的压缩计划的operation 对应的HoodieFileGroupId”的set,叫做fgInPendingCompactionAndClustering,来继续参与下方generateCompactionPlan方法逻辑

partitionPaths为所有分区的路径名,并进一步根据压缩计划所给出(或默认)的压缩策略,筛选出需要参与压缩的分区。而从下方“获取opeartion”就是更进一步探索压缩计划中的operation为何

大致过程就是把上方筛选后的partitionPaths,经过flatmap里的方法处理每一个分区目录。处理逻辑就是每个分区目录先转换为其分区区下所有fileSlice,该分区所有flieSlice在过滤掉其中对应的HoodieFileGroupId已经出现在现存压缩计划operation中的(不在上方fgInPendingCompactionAndClustering里的),可以理解为“先前压缩计划就已经包含了的数据,本次生成压缩计划就不再包含这些”。

在经过map继续处理每一个fileSlice,把每个fileSlice的logfile和baseFile也就是所有log和parquet文件获取到,再构造成一个个新的CompactionOperation。所有新Operation需要再过滤掉其deltaFileNames为空的。

到此

List<HoodieCompactionOperation> operations已经获取到,再经过下方步骤就能产生新的压缩计划

也就是把新生成的operations以及用户给定的writeConfig和现存的老的压缩计划,进行处理生成一个新的压缩计划。继续回到上方这里

HoodieCompactionPlan plan = scheduleCompaction();

此时构建一个新的元数据文件 :InstantTime.compaction.request(在.hoodie下),再用

table.getActiveTimeline().saveToCompactionRequested(compactionInstant, TimelineMetadataUtils.serializeCompactionPlan(plan));

把压缩继续写入到该文件里,到此压缩计划就已生成。

补充:

我们压缩计划生成时,例如给定了一个压缩策略为

BoundedPartitionAwareCompactionStrategy,那么它会具体实现filterPartitionPaths方法逻辑来筛选出要参与压缩的数据分区

他的筛选逻辑大概就是根据给定的参数hoodie.compaction.daybased.target.partitions(default =10),然后进一步就能获取到一个过去的时间点作为阈值,然后把所有分区对应的时间拿出来跟这个标杆对比,留下只比其小的时间(更早时间)的分区作为本次压缩计划涉及范围。

压缩计划生成后,被保存在basePath/.hoodie下的instanttime.compaction.request文件里。现在可以继续从执行压缩计划的角度进行。同样使用compactor类来提交spark作业,参数里可以带压缩计划对应的instantTime,也可以不带,不带的话则是找到时间最早对应的压缩计划。本文还是以MOR表,经手动异步压缩作为开始。

执行压缩计划部分,需要提前了解下该过程涉及的封装类,例如

RunCompactionActionExecutor、IndexedRecord、各种handle类等等

下方为执行异步压缩计划的提交命令,并给出了目标instantTime:

通常这个spark会跑2-10分钟就自己停了。该作业跑完并不是表示真正完成了压缩,而是把压缩作为一个特殊的commit,继续按照顺序处理各种commit。以0.10.0版本来看,这里触发的压缩过程和正常写入hudi是独占的,即同一时间只能进行写入或压缩。

省掉前面一部分调用,代码从SparkRDDWriteClient.java 的compact方法开始。

HoodieInstant inflightInstant = HoodieTimeline.getCompactionInflightInstant(compactionInstantTime);

pendingCompactionTimeline.containsInstant(inflightInstant)

就是确保给的instantTime,对应其inflight的instant不能也出现在pending状态的hoodieInstant,否则就要回滚这个inflight的instant。简单来说就是不允许该时间的compaction类型的hoodieInstant,又是pending又是inflight,避免逻辑上矛盾。

这步同时也获得了table。于是继续table.compact(context, compactionInstantTime);

即用HoodieEngineContext、HoodieWriteConfig、table、compactionInstantTime、HoodieSparkMergeOnReadTableCompactor、HoodieSparkCopyOnWriteTable这些变量构造了一个RunCompactionActionExecutor对象。

HoodieEngineContext由HoodieSparkEngineContext实现,其包括

JavaSparkContextSQLContext,最后转为spark作业dag才会用到;

HoodieWriteConfig就是用户给定的各种参数,包括压缩策略名等等配置。

table就是HoodieSparkMergeOnReadTable,也就是前面就已经从HoodieWriteConfig和HoodieEngineContext里得到的table

compactionInstantTime是String类型的instantTime,也就是执行压缩时用户指定的(现有最早的可压缩计划)

HoodieSparkMergeOnReadTableCompactor暂时先不讲,他负责preCompact(确保目标instantTime对应的hoodieInstant被pendingCompactionInstant包含着,也就是被先前生成的压缩计划里所包含)和maybePersist(设置RDDstorage level)。

HoodieSparkCopyOnWriteTable一个根据相同HoodieWriteConfig和HoodieEngineContext构建的cow表。

并调用该对象的execute()方法

先经过上方讲过的preCompact过程,即把现存的pending状态的compaction拿出来,然后检查本次压缩时给定的instantTime在不在其中,不在其中就会抛出异常。

然后再是CompactionUtils.getCompactionPlan(table.getMetaClient(), instantTime),目的就是从输入的instantTime对应的istantTime.compaction.requested文件里获取压缩计划。并将压缩计划和新版本号写入元数据中。

再进一步执行compactor.compact( context, compactionPlan, table, config, instantTime, compactionHandler); 注意这里的compactionHandler就是前面创建的HoodieSparkCopyOnWriteTable对象。

新的instant就是instantTime.compaction.requested,instantTime还是一开始给的那个目标时间。

然后把这个requested的instant,经过transitionState方法转为inflight的instant,即开始执行压缩计划。

下方try部分,就是从一开始给定的schema里(已经在HoodieTableMetaClient)创建新的schema,注意这里指定了不添加新的元数据字段。如果添加那schema就多5个字段,包括主键(_hoodie_record_key)、分区(_hoodie_partition_path)等字段。

这里是把上方拿到的压缩计划,取出其operations(operation包含什么信息请看上篇)转成流,就能进行map操作。map里的逻辑是把每个operation从HoodieCompactionOperation转为CompactionOperation并保存到List中(前者是avro格式,其他详细的后面再区分)。

context.parallelize(operations).map(operation -> compact( compactionHandler, metaClient, config, operation, compactionInstantTime, taskContextSupplier)) .flatMap(List::iterator);

context.parallelize(operations)就是把operation转为spark的JavaRDD(不准确,本身还是HoodieData)。之后对每个RDD都单独调用一次中间的compact( compactionHandler, metaClient, config, operation, compactionInstantTime, taskContextSupplier)

到此,compactionHandler即上方的cow表、HoodieTableMetaClient、writeConfig、压缩计划operation、目标instantTime以及SparkTaskContextSupplier都参与进一步的针对每个operation,如下所示:

先给上一步拿到的schema增加提到的5个元数据字段,是否额外添加第6个字段operation,则要看用户配置的writeConfig中是否明确相关参数为true,否则还是默认的只加5个字段。

maxInstantTime是在所有active的timeline里拿出deltacommit commit rollback这三种action且timeline对应的instant为completed状态的timeline,并且取其最新(最大的时间值)。

maxMemoryPerCompaction是从writeConfig里读取到的参数,表示压缩可用最大的内存,超出这个阈值的则会被暂存到本地磁盘上。

logFiles说的就是FileSlice里哪些avro的*.log*文件。这里他从CompactionOperation的deltaFileNames里获得。那么这个daltaFileNames当初怎么初始化的值呢?其实同样也是从logFiles里折腾出来的:

回到daltaFileNames,他经过给自己加上bastPath(含表名)和分区目录后,也就是重新回到了logFile。

这里的scanner是比较麻烦的部分,包含了很多用户指定(或default)参数,以及提到的logfile、带meta字段的新schema等等。下方会用到一个scanner.getRecords()

this.records = new ExternalSpillableMap<>(maxMemorySizeInBytes, spillableMapBasePath, new DefaultSizeEstimator(), new HoodieRecordSizeEstimator(readerSchema), diskMapType, isBitCaskDiskMapCompressionEnabled);

ExternalSpillableMap类按照其类最上方的解释来看,是为了防止内存满时,以kv形式存储数据,同时也存储了Key-ValueMetadata即保存在磁盘上的位置。这一点还需要进一步确认 其参数分别是

hoodie.memory.compaction.max.size、

hoodie.memory.spillable.map.path、也就是baseFilepath(待补全)

DefaultSizeEstimator用来评估key大小、 HoodieRecordSizeEstimator(readerSchema)用来评估value大小、

diskMapType(分为BITCASK或ROCKS_DB)、

hoodie.common.diskmap.compression.enabled默认是true

oldDataFileOpt暂时理解为basefile,是否为空取决于是只有logfile还是logfilebasefile都存在。前者情况下oldDataFileOpt为空,后者则为basefile。(因为oldDataFileOpt也是来在option中的bootstrapFilePath,所以他跟basefile一样可能有也可能不存在)

所以这里走else的逻辑,即handleInsert()。下方继续构建HoodieCreateHandle实例,注意

taskContextSupplier就是上方构建过的,recordMap就是上面提到的map类型的数据。

write方法如下:

hoodieTable.requireSortedRecords()首先是false,因为当前basefile都是.parquet文件,只有当为.hfile时才为true。

再把上方说的map形式的recordMap,其key的集合搞Iterator,通过该iterator来循环处理所有map中的key对应的value。具体value也就是代码中的"record",HoodieRecord类型。

关于useWriterSchema,上方new这个HoodieCreateHandle对象时,已经把useWriterSchema写死了。

所以继续 write(record, record.getData().getInsertValue(tableSchemaWithMetaFields, config.getProps()))。注意后续一系列方法调用中都是这两个参数,均来自value:"record"这个值。

其中,record.getData().getInsertValue(tableSchemaWithMetaFields, config.getProps())

tableSchemaWithMetaFields变量就是之前写入writeConfig的schema并且是有5个元数据字段的schema。config.getProps()就是所有writeConfig的配置。

后续会走processNotMatchedRecord来处理这些消息,把byte消息转为avro,再结合schema生成GenericRecord以及后面的SqlTypedRecord,省掉getEvaluator等逻辑,最后会把这些消息的结果返回(avro的IndexedRecord类型,可以理解为avro包下实现了一种对数据的封装,可以set和get其中的值)。

再回到这里的write方法延申,注意preserveHoodieMetadata参数为false,创建HoodieCreateHandle对象时依然保持其为false。

IndexedRecord recordWithMetadataInSchema = rewriteRecord((GenericRecord) avroRecord.get());

大致是把老数据重新写入一个新schema里,依旧是IndexedRecord 。(待确认:过程中writeSchemaWithMetaFields 这个schema是怎么生成的?)

fileWriter.writeAvroWithMetadata(recordWithMetadataInSchema, record);

这个就是把参数里的recordWithMetadataInSchema调用org.apache.parquet.hadoop的write方法去写入。写parquet文件的同时再把对应的消息的key更新到布隆过滤器中(顺便也判断下是否再插入后修改布隆过滤器的最大最小边界为刚才的key)

record.setNewLocation(new HoodieRecordLocation(instantTime, writeStatus.getFileId()));

给本条消息确认出他归属的位置(文件名中的instant和Fileid)并保存。

到此insert场景大致流程已经有了,下方继续update分支

回到 HoodieCompactor.java此处

新创建HoodieMergeHandle对象,其中

keyToNewRecords和insert过程一样,scanner.getRecords()返回的一个ExternalSpillableMap

oldDataFile是现存的baseFile

config.populateMetaFields()这个值默认为true,于是keyGeneratorOpt暂时为空。

requireSortedRecords还是false,因为是parquet不是hfile。

再创建对象:

new HoodieMergeHandle(config, instantTime, this, keyToNewRecords, partitionPath, fileId, dataFileToBeMerged, taskContextSupplier, keyGeneratorOpt);

注意dataFileToBeMerged就是oldDataFile

重点说下init方法

baseFileToMerge为dataFileToBeMerged 现存的basefile。

String latestValidFilePath = baseFileToMerge.getFileName();获得baseFile名且包含了完整的路径。

String newFileName = FSUtils.makeDataFileName(instantTime, writeToken, fileId, hoodieTable.getBaseFileExtension());

大致就是根据提供的instantTIme等因素,拼凑出一个.parquet文件的名字例如

/user/ocdp/test/hudi/表名/time_15min=20220329134500/5eaef46b-f581-41a2-8012-c66f608b3070-0_8-514-118306_20220329133607800.parquet

上方就先后获取了老的和新baseFile的完整名字,再进行

makeOldAndNewFilePaths(partitionPath, latestValidFilePath, newFileName);

也就是为HoodieMergeHandle的两个path oldFilePath、newFilePath附上各自处理后的值。也就是新老各自的basePath 分区名 xxx.parquet。

到此init方法结束,继续把包括空的keyGeneratorOpt等变量都继续赋给HoodieMergeHandle对象upsertHandle。再回到handleUpdateInternal(upsertHandle, instantTime, fileId)

因为upsertHandle.getOldFilePath()是上一步刚获得的oldFilePath,正常情况下肯定不为空。

所以走分支:SparkMergeHelper.newInstance().runMerge(this, upsertHandle)

0 人点赞