从hudi持久化文件理解其核心概念

2023-02-28 14:56:28 浏览数 (1)

【概述】


这是hudi系列的第一篇文章,先从核心概念,存储的文件格式加深对概念的理解,后续再逐步对使用(spark/flink入hudi,hudi同步hive等)、原理(压缩机制,索引,聚族等)展开分享~

【什么是数据湖】


简单来说,数据湖技术是计算引擎和底层存储格式之间的一种数据组织格式,用来定义数据、元数据的组织方式,并实现以下的功能:

  • 支持事务(ACID)
  • 支持流批一体
  • 支持schema演化和schema结束
  • 支持多种底层数据存储HDFS、OSS、S3

从实现上来说,基于分布式文件系统之上,以传统关系型数据库的方式对外提供使用。

开源的数据湖实现有Hudi、IceBerg、Delta。

【hudi介绍】


Apache hudi代表Hadoop Upserts Deletes Incrementals。能够使HDFS数据集在分钟级的延时内支持变更,也支持下游系统对这个数据集的增量处理。

hudi数据集通过自定义的InputFormat兼容当前hadoop生态系统,包括Hive、Presto、Trino、Spark、Flink,使得终端用户可以无缝的对接。

Hudi会维护一个时间轴(这个是hudi的核心),在每次执行操作时(如写入、删除、压缩等),均会带有一个时间戳。通过时间轴,可以实现在仅查询某个时间点之后成功提交的数据,或是仅查询某个时间点之前的数据。这样可以避免扫描更大的时间范围,并非常高效地只消费更改过的文件。

上面是一些理论上的介绍,简单的使用,官网也有对应的例子,这里就不再啰嗦,下面我们介绍下hudi的一些核心概念,hudi的持久化文件及文件格式。

【相关概念】


1. 表类型

hudi中表有两种类型

  • MOR(Merge on Read)

在读取时进行合并处理的表。通常而言,写入时其数据以日志形式写入到行式存储到文件中,然后通过压缩将行式存储文件转为列式存储文件。读取时,则可能需要将存储在日志文件中的数据和存储在列式文件中的数据进行合并处理,得到用户期望查询的结果。

  • COW(Copy on Write)

在写入的时候进行拷贝合并处理的表。每次写入时,就完成数据的合并处理,并以列式存储格式存储,即没有增量的日志文件。

两者的一些对比

权衡

COW

MOR

数据延迟

更高

更低

更新代价(I/O)

更高

更低

parquet文件大小

更小

更大

写放大

更高

更低(取决于压缩策略)

2. 时间轴

hudi维护了在不同时间点中(instant time)在表上的所有(instant)操作的时间轴,这有助于提供表的即时视图,同时还能有效的提供顺序检索数据。

instant由以下组件组成:

  • instant action:对数据集(表)的操作类型(动作)。
  • instant time:通常是一个时间戳,它按照操作开始时间的顺序单调递增。
  • state:当前的状态

关键的操作类型包括:

  • commit 原子的将一批数据写入数据集(表)中
  • cleans 清除数据集(表)中不再需要的老版本文件
  • delta_commit 增量提交,表示将一批记录原子的写入MOR类型的表中,其中一些/所有数据可能仅被写入增量日志文件中
  • compaction 通常而言,是将基于行式的日志文件移动更新到列式文件中。
  • rollback 表明提交或增量提交不成功后的回滚,此时会删除写过程中产生的任意分区文件。
  • savepoint 将某些文件组标识为"已保存",这样在清理时不会进行删除。在灾备或数据恢复的场景中,有助于恢复到时间轴上的某个点。

任意给定的instant只能处于下面的其中一个状态:

  • REQUESTED:指明一个动作已经被调度,但还未进行执行
  • INFLIGHT:指明一个动作正在被执行
  • COMPLETED:指明时间轴上的一个已完成的动作

状态由requested->inflight->complete进行转换。

3. 视图

hudi支持三种类型的视图:

  • 读优化视图(Read Optimized Queries)

该视图仅将最新文件切片中的基本/列文件暴露给查询,并保证与非hudi列式数据集相比,具有相同的列式查询性能。简单而言,对于MOR表来说,仅读取提交或压缩后的列式存储文件,而不读取增量提交的日志文件。

  • 增量视图(Incremental Queries)

对该视图的查询只能看到从某个提交/压缩后写入数据集的新数据。该视图有效地提供了更改流,来支持增量数据。

  • 实时视图(Snapshot Queries)

在此视图上的查询将某个增量提交操作中数据集的最新快照。该视图通过动态合并最新的基本文件来提供近实时的数据集。

视图类型和表的关系为:

COW

MOR

实时视图

Y

Y

增量视图

Y

Y

读优化视图

N

Y

【持久化文件】


如果上面的概念还有些抽象,那么来看看写入hudi的数据是如何在hdfs上存储的,再来理解前面提到的概念。

根据官网的示例,写入表中的数据,其在hdfs上存储的文件,大概是这样的:

代码语言:javascript复制
[root@localhost ~] hdfs dfs -ls -R /user/hncscwc/hudiedemo
drwxr-xr-x - root supergroup 0 2021-11-30 14:39 /user/hncscwc/hudidemo/.hoodie
drwxr-xr-x - root supergroup 0 2021-11-30 14:39 /user/hncscwc/hudidemo/.hoodie/.aux
drwxr-xr-x - root supergroup 0 2021-11-30 14:39 /user/hncscwc/hudidemo/.hoodie/.aux/.bootstrap
drwxr-xr-x - root supergroup 0 2021-11-30 14:39 /user/hncscwc/hudidemo/.hoodie/.aux/.bootstrap/.fileids
drwxr-xr-x - root supergroup 0 2021-11-30 14:39 /user/hncscwc/hudidemo/.hoodie/.aux/.bootstrap/.partitions
drwxr-xr-x - root supergroup 0 2021-11-30 14:39 /user/hncscwc/hudidemo/.hoodie/.temp
-rw-r--r-- 3 root supergroup 2017 2021-11-30 14:39 /user/hncscwc/hudidemo/.hoodie/20211130143947.deltacommit
-rw-r--r-- 3 root supergroup 0 2021-11-30 14:39 /user/hncscwc/hudidemo/.hoodie/20211130143947.deltacommit.inflight
-rw-r--r-- 3 root supergroup 0 2021-11-30 14:39 /user/hncscwc/hudidemo/.hoodie/20211130143947.deltacommit.requested
drwxr-xr-x - root supergroup 0 2021-11-30 14:39 /user/hncscwc/hudidemo/.hoodie/archived
-rw-r--r-- 3 root supergroup 388 2021-11-30 14:39 /user/hncscwc/hudidemo/.hoodie/hoodie.properties
drwxr-xr-x - root supergroup 0 2021-11-30 14:39 /user/hncscwc/hudidemo/par1
-rw-r--r-- 3 root supergroup 960 2021-11-30 14:39 /user/hncscwc/hudidemo/par1/.f9037b56-d84c-4b9a-87db-7cae41ab2505_20211130143947.log.1_2-4-0
-rw-r--r-- 3 root supergroup 93 2021-11-30 14:39 /user/hncscwc/hudidemo/par1/.hoodie_partition_metadata

从hdfs的存储文件中可以看出几点:

  • 表的数据都存储在指定配置目录中(这里为/user/hncscwc)
  • 数据大概分为多个目录存储,其中.hoodie目录下存储元数据相关的信息,本质上也就是时间轴对应的相关数据,以分区命名(这里为par1)的目录中则存放数据表在该分区中的具体数据。

先来看看.hoodie目录下元数据相关的持久化文件:这里包括:

  • yyyyMMddHHmmss.deltacommit

记录MOR表一次事务的执行结果,包括该事务对哪些分区的哪些数据(日志)文件进行了操作,对(日志)文件操作的类型(插入或更新),写入的长度,表的元数据信息等内容。文件内容以json格式存储。

文件中几个比较重要的字段有:

  • partitionToWrtieStats

以分区为key,记录每个分区的实际操作信息,包括本次事务写入的分区的ID、路径、写入/删除/更新的记录数、实际写入的字节长度等。

  • compacted

标记本次提交操作是否是压缩操作触发进行的

  • extraMetadata

最重要的是schema字段,记录了表的schema信息。

另外需要注意:文件名中yyyyMMddHHmmss为本次事务提交的时间戳,其后缀为deltacommit,并且对应文件内容非空,即表示该事务已经完成,相关的文件还有yyyyMMddHHmmss.deltacommit.inflight 和 yyyyMMddHHmmss.deltacommit.requested。恰好对应前面概念中提到的instant对应的三种状态。也就是说,通过将内容写入到不同后缀的文件中,来表示某个操作的当前状态。

一个简单示例为:

代码语言:javascript复制
{
  "partitionToWriteStats" : {
    "par1" : [ {
      "fileId" : "f9037b56-d84c-4b9a-87db-7cae41ab2505",
      "path" : "par1/.f9037b56-d84c-4b9a-87db-7cae41ab2505_20211130143947.log.1_2-4-0",
      "prevCommit" : "20211130143947",
      "numWrites" : 1,
      "numDeletes" : 0,
      "numUpdateWrites" : 0,
      "numInserts" : 1,
      "totalWriteBytes" : 960,
      "totalWriteErrors" : 0,
      "tempPath" : null,
      "partitionPath" : "par1",
      "totalLogRecords" : 0,
      "totalLogFilesCompacted" : 0,
      "totalLogSizeCompacted" : 0,
      "totalUpdatedRecordsCompacted" : 0,
      "totalLogBlocks" : 0,
      "totalCorruptLogBlock" : 0,
      "totalRollbackBlocks" : 0,
      "fileSizeInBytes" : 960,
      "minEventTime" : null,
      "maxEventTime" : null,
      "logVersion" : 1,
      "logOffset" : 0,
      "baseFile" : "",
      "logFiles" : [ ".f9037b56-d84c-4b9a-87db-7cae41ab2505_20211130143947.log.1_2-4-0" ]
    } ]
  },
  "compacted" : false,
  "extraMetadata" : {
    "schema" : "{"type":"record","name":"record","fields":[{"name":"uuid","type":["null","string"],"default":null},{"name":"name","type":["null","string"],"default":null},{"name":"age","type":["null","int"],"default":null},{"name":"ts","type":["null",{"type":"long","logicalType":"timestamp-millis"}],"default":null},{"name":"partition","type":["null","string"],"default":null}]}"
  },
  "operationType" : null,
  "totalCreateTime" : 0,
  "totalUpsertTime" : 6446,
  "totalRecordsDeleted" : 0,
  "totalLogRecordsCompacted" : 0,
  "fileIdAndRelativePaths" : {
    "f9037b56-d84c-4b9a-87db-7cae41ab2505" : "par1/.f9037b56-d84c-4b9a-87db-7cae41ab2505_20211130143947.log.1_2-4-0"
  },
  "writePartitionPaths" : [ "par1" ],
  "totalScanTime" : 0,
  "totalCompactedRecordsUpdated" : 0,
  "totalLogFilesCompacted" : 0,
  "totalLogFilesSize" : 0,
  "minAndMaxEventTime" : {
    "Optional.empty" : {
      "val" : null,
      "present" : false
    }
  }
}
  • yyyyMMddHHmmss.commit

与deltacommit类似,不过通常是COW表一次事务的执行结果,或者是压缩的执行结果。但文件内容和deltacommit基本相同,文件内容同样采用json格式存储。

  • hoodie.properties

该文件记录表的相关属性,例如:

代码语言:javascript复制
#Properties saved on Tue Nov 30 14:39:29 CST 2021
#Tue Nov 30 14:39:29 CST 2021
hoodie.compaction.payload.class=org.apache.hudi.common.model.OverwriteWithLatestAvroPayload
hoodie.table.precombine.field=ts
hoodie.table.name=t1
hoodie.archivelog.folder=archived
hoodie.table.type=MERGE_ON_READ
hoodie.table.version=2
hoodie.table.partition.fields=partition
hoodie.timeline.layout.version=1

hoodie.compaction.payload.class:数据插入/更新时对payload的处理类

hoodie.table.precombine.field:写入之前进行预合并处理的字段(数据)

hoodie.table.name:表的名称

hoodie.archivelog.folder:表的归档路径

hoodie.table.type:表的类型(MOR或COW)

hoodie.table.version:表的版本号(默认为2)

hoodie.table.partition.fields:表的分区字段,每个分区按照分区字段的值作为对应的目录名称,其数据就存储分区的目录中

hoodie.timeline.layout.version:时间轴布局的版本(默认为1)

以上几个文件应该是最常见的,除此之外,你可能还会看到如下文件:

  • yyyyMMddHHmmss.compaction.requested/inflight

压缩操作的具体内容,包括压缩操作的时间戳,以及对哪些分区下的哪些文件进行压缩合并。

压缩操作的文件内容是按一个标准avro格式存储的,可以通过avro-tool工具将文件内容转换为json来查看。例如:

代码语言:javascript复制
{
  "operations":{
    "array":[
      {
        "baseInstantTime":{"string":"20220106084115"},
        "deltaFilePaths":{
          "array":[".97ae031c-189b-4ade-9044-781e840c7e01_20220106084115.log.1_2-4-0"]},
        "dataFilePath":null,
        "fileId":{"string":"97ae031c-189b-4ade-9044-781e840c7e01"},
        "partitionPath":{"string":"par1"},
        "metrics":{
          "map":{
            "TOTAL_LOG_FILES":1.0,
            "TOTAL_IO_READ_MB":0.0,
            "TOTAL_LOG_FILES_SIZE":3000.0,
            "TOTAL_IO_WRITE_MB":120.0,
            "TOTAL_IO_MB":120.0}
        },
        "bootstrapFilePath":null
      }]},
  "extraMetadata":null,
  "version":{"int":2}
}
  • yyyyMMddHHmmss.clean.requested/inflight

清理操作的内容,包括清理操作的时间戳,以及对哪些分区下的哪些文件进行清理。

和压缩操作的文件一样,文件内容也是按标准的avro格式存储的,也可以通过工具转换成json来查看。

  • yyyyMMddHHmmss.rollback.requested/inflight

记录回滚操作的内容,同样也是以标准avro格式进行存储,例如:

代码语言:javascript复制
{
  "startRollbackTime":"20220112151350",
  "timeTakenInMillis":331,
  "totalFilesDeleted":0,
  "commitsRollback":["20220112151328"],
  "partitionMetadata":{
    "par1":{
      "partitionPath":"par1",
      "successDeleteFiles":[],
      "failedDeleteFiles":[],
      "rollbackLogFiles":{"map":{}},
      "writtenLogFiles":{"map":{}}
    }
  },
  "version":{"int":1},
  "instantsRollback":[{"commitTime":"20220112151328","action":"deltacommit"}]
}

小结一下:对表的每个操作,都记录在以带时间戳加不同的后缀的文件中,其操作又按照状态分别存储在不同的文件中,所有这些就对应了时间轴的实现。

再来看看表分区中的持久化文件,这里主要包含几种类型的文件:

  • .hoodie_partition_metadata

记录分区的元数据信息,在写入时,先写.hoodie_partition_metadata_$partitionID,然后再进行重命名。文件中的内容如下所示:

代码语言:javascript复制
#partition metadata
#Mon Dec 13 09:13:56 2021
commitTime=20211213091354
partitionDepth=1

其中commitTime为写入操作对应的提交时间,partitionDepth为相对表的根目录(上面提到的/user/hncscwc/hudidemo)的层级深度。在进行增量视图、快照视图查询时,通常会直接传递分区目录对应的路径,因此需要从分区路径中读取该文件,拿到层级深度,进而定位表的根目录,从而得到表的元数据信息。

  • xx.log.xx

MOR表操作的日志数据,类似于mysql的binlog文件。

文件命名有两种形式:

不带writeToken:文件名为 FileID_Instant.log.

带writeToken:文件名为 FileID_Instant.log.Version_WriteToken

其中

FileID为36字节的UUID,Instant为操作提交的时间戳,

WriteToken也有固定格式,为Partition_StageID_AttemptID。

注意:文件前会有个".",即以隐藏文件的方式存储,另外,带token是允许多进程并发写入,防止写同一个文件引起错乱。

文件的具体格式为:由一个或多个提交记录组成,每个记录都是一个类avro的行式存储格式的数据。

一个记录表示一次事务对表的写操作记录(包括插入、删除、更新),多个事务会append追加写入同一个文件中,以减少不必要的文件创建造成海量小文件问题。

文件格式如下图所示:

另外,每个事务中的多条写入记录,最终保存在content中,同时在原有数据的基础上,新增了下面5个字段:

"_hoodie_commit_time"

"_hoodie_commit_seqno"

"_hoodie_record_key"

"_hoodie_partition_path"

"_hoodie_file_name"

文件示例如下(本身无法直接查看,通过代码解析后打印的相关内容):

代码语言:javascript复制
magic:#HUDI#
block size:1061
log format version:1
block type:AVRO_DATA_BLOCK
block header:{INSTANT_TIME=20211230090953, SCHEMA={"type":"record","name":"record","fields":[{"name":"_hoodie_commit_time","type":["null","string"],"doc":"","default":null},{"name":"_hoodie_commit_seqno","type":["null","string"],"doc":"","default":null},{"name":"_hoodie_record_key","type":["null","string"],"doc":"","default":null},{"name":"_hoodie_partition_path","type":["null","string"],"doc":"","default":null},{"name":"_hoodie_file_name","type":["null","string"],"doc":"","default":null},{"name":"uuid","type":["null","string"],"default":null},{"name":"name","type":["null","string"],"default":null},{"name":"age","type":["null","int"],"default":null},{"name":"ts","type":["null",{"type":"long","logicalType":"timestamp-millis"}],"default":null},{"name":"partition","type":["null","string"],"default":null}]}}
content length:235
content:
log block version:1
total records:2
content:
[
  {
    "_hoodie_commit_time": "20211230090953", 
  "_hoodie_commit_seqno": "20211230090953_1_1", 
  "_hoodie_record_key": "id1", 
  "_hoodie_partition_path": "par1", 
  "_hoodie_file_name": "c6b44d5e-749d-4053-94bf-92b39828e065", 
  "uuid": "id1", 
  "name": "Danny", 
  "age": 27, 
  "ts": 661000, 
  "partition": "par1"
  }, 
  {
    "_hoodie_commit_time": "20211230090953", 
  "_hoodie_commit_seqno": "20211230090953_1_2", 
  "_hoodie_record_key": "id2", 
  "_hoodie_partition_path": "par1", 
  "_hoodie_file_name": "c6b44d5e-749d-4053-94bf-92b39828e065", 
  "uuid": "id2", "name": 
  "Stephen", "age": 33, 
  "ts": 2000, 
  "partition": "par1"
  }
]
footer:{}
log block length:1067

magic:#HUDI#
block size:947
log format version:1
block type:AVRO_DATA_BLOCK
block header:{INSTANT_TIME=20211230092036, SCHEMA={"type":"record","name":"record","fields":[{"name":"_hoodie_commit_time","type":["null","string"],"doc":"","default":null},{"name":"_hoodie_commit_seqno","type":["null","string"],"doc":"","default":null},{"name":"_hoodie_record_key","type":["null","string"],"doc":"","default":null},{"name":"_hoodie_partition_path","type":["null","string"],"doc":"","default":null},{"name":"_hoodie_file_name","type":["null","string"],"doc":"","default":null},{"name":"uuid","type":["null","string"],"default":null},{"name":"name","type":["null","string"],"default":null},{"name":"age","type":["null","int"],"default":null},{"name":"ts","type":["null",{"type":"long","logicalType":"timestamp-millis"}],"default":null},{"name":"partition","type":["null","string"],"default":null}]}}
content length:121
content:
        log block version:1
        total records:1
        content:[{"_hoodie_commit_time": "20211230092036", "_hoodie_commit_seqno": "20211230092036_1_1", "_hoodie_record_key": "id4", "_hoodie_partition_path": "par1", "_hoodie_file_name": "c6b44d5e-749d-4053-94bf-92b39828e065", "uuid": "id4", "name": "Fabian", "age": 31, "ts": 4000, "partition": "par1"}]
footer:{}
log block length:953
  • xxx.parquet(orc/hfile)

通常是COW表一次事务提交后,或者压缩操作后,将上面提到的log文件中的数据压缩合并写入后的文件。这就是一个标准的parquet文件格式,当然还支持orc和hfile格式。

注:spark对MOR表类型进行操作时,对于新增的数据,会直接写入列式(parquet)文件中,而对于更新操作则记录在增量的日志文件中(xx.log.xx),这个和spark/flink默认使用的索引类型有关。


好了,这就是本文的全部内容,简单回顾一下,先介绍了一下hudi的核心概念,然后对hudi的各个类型的持久化文件,以及具体的格式进行了说明,通过持久化文件可以反过来加深对hudi核心概念的理解。下篇文章,我们再来聊聊hudi的其他内容。

0 人点赞