Instant
在表格上执行的所有操作都表示为 Hudi 时间轴中的Instant(瞬间)。可以在表基本路径下找到一个名为“.hoodie”的目录,其中维护这些Instant。Hudi instant由以下组件组成:
- • Instant操作:在表上执行的操作类型。
- • Instant时间:毫秒格式的时间戳,被视为时间线上操作的标识符。
- • 状态:当前Instant状态。有 3 种不同的状态:Requested(请求)、Inflight(执行)和 Completed(完成)。给定instant将处于任何时间点的状态之一。每个操作都从"requested"状态开始,然后移至"inflight",最后进入 "completed" 状态,在这种情况下,整个操作被视为已完成。在操作进入 "completed" 状态之前,其被视为待处理,并且不允许读取查询从任何此类操作中读取任何数据。
Hudi保证在Timeline时间轴上执行的操作是原子的并且基于Instant时间的时间轴一致。
Action
我们在 Apache Hudi 表上发生了很多不同的操作,每个操作都有不同的目的,例如由常规写入者摄取数据、压缩和聚簇、清理和归档等表服务。
对于博客的大部分内容,我们将假设单写入模型,因为重点是说明时间线事件。但如果有必要的话,也会讨论一些多写入端的场景。
Commit
Commit(提交) 操作代表写入 COW 表。每当新批次被摄取到表中时,就会生成新的 CommtTime 并且操作进入请求状态。可以在 Hudi 时间轴中找到"tN.commit.requested"。例如,20230705155904980.commit.requested(其中"20230705155904980"是该操作的提交时间,请求标志着规划阶段的完成。对于常规写入,准备阶段没有太多事情要做,执行阶段从添加"inflight"开始,一旦执行完成,在时间轴中看到“已完成”的提交文件。
代码语言:javascript复制| — 20230705155904980.commit.requested
| — 20230705155904980.commit.inflight
| — 20230705155904980.commit
因此在我们看到 20230705155904980.commit 之前,所有查询都不会读取此提交部分写入的任何数据。一旦通过将 20230705155904980.commit 添加到时间线来标记完成,任何命中表的新读取都将读取此感兴趣的提交提交的数据。
Delta Commit
Delta Commit(增量提交)表示对 MOR 表的写入。这可能会产生日志文件或基本Parquet文件。但"增量提交"是指定期写入 MOR 表。该序列类似于我们上面看到的"提交"。
代码语言:javascript复制| — 20230707081934362.deltacommit.requested
| — 20230707081934362.deltacommit.inflight
| — 20230707081934362.deltacommit
提交和增量提交都只会导致添加新文件。完成的文件将列出有关添加的文件的所有元信息,以及写入的字节数、写入的记录、更新的记录等统计信息。
Clean
Hudi 在对现有文件组的任何更新中添加名为FileSlice(文件切片)的新版本文件。旧版本的文件切片由Cleaner(清理器)根据清理器配置清理(或删除)。与常规写入(提交和增量提交)不同,Cleaner 还将经历一个计划阶段,最终将导致 tX.clean.requested 包含清理计划。它将跟踪需要在清理过程中删除的所有文件。
将计划序列化到请求文件中的主要原因是为了确保幂等性。为了在清理过程中能够防止中途崩溃,我们希望确保清理计划一旦完成就能够顺利完成而不会失败。此外完成的清理准确显示了哪些文件作为清理提交的一部分被删除,而不仅仅是部分文件列表,无论重新尝试清理多少次。同样的原理也适用于聚簇计划、压缩计划和恢复计划。
代码语言:javascript复制| — 20230708091954360.clean.requested
| — 20230708091954360.clean.inflight
| — 20230708091954360.clean
可以在完成的"20230708091954360.clean"文件中找到有关清理器删除的所有文件的信息。让我们通过一个简单的示例来了解 Cleaner 的作用。
t1.commit:
- • 插入新数据
- • 添加新文件fg1_fs1(fg指文件组,fs指文件切片)
t2.commit:
- • 更新同一组数据。
- • 将新文件片 fg1_fs2 添加到现有文件组 fg1。
t3.commit:
- • 更新同一组数据。
- • 将新文件片 fg1_fs3 添加到现有文件组 fg1。
现在Cleaner被触发,Cleaner配置设置为“2”,以保留要保留的提交数。因此任何早于最近 2 次提交创建的文件切片都会被清理。因此 Cleaner 会将 fg1_fs1 添加到 clean 计划中,然后在执行过程中将其删除。因此存储中仅留下 fg1_fs2 和 fg1_fs3。 t4.clean
- • 清理fg1_fs1
这个循环将会重复。例如 t5.commit 将添加 fg1_fs4,t6.clean 将删除 fg1_fs2 等等。可以在此处阅读有关Cleaner的更多信息。
Replace Commit
与提交和增量提交不同,某些操作可能会导致替换某些数据文件。例如,对于Clustering(聚簇),insert_overwrite 操作会添加新的数据文件,但也会替换某些数据文件。其中大多数都是异步的,因为替换的文件不会同步删除,而只是标记为替换。在稍后的某个时间点,由清理器负责删除文件。
代码语言:javascript复制| — 20230707081954360.replacecommit.requested
| — 20230707081954360.replacecommit.inflight
| — 20230707081954360.replacecommit
比方说,使用 4 个提交 t1.commit(file1)、t2.commit(file2)、t3.commit(file3) 和 t4.commit(file4) 将 4 个数据文件写入表中。这里的每个文件代表Hudi中的一个不同的文件组。假设我们触发将小文件批处理为大文件。
t1.commit:
- • 插入新数据 fg1_fs1
t2.commit:
- • 插入新数据 fg2_fs1
t3.commit:
- • 插入新数据 fg3_fs1
t4.commit:
- • 插入新数据 fg4_fs1
t5.replacecommit 将创建一个新文件,file5 替换先前提交创建的 4 个文件。t5.replacecommit
- • 通过替换文件组(1 至 4)创建新文件组 fg5_fs1
在将 t5.replacecommit(已完成的时间线文件)添加到时间线之前,读取查询将从 4 个文件中读取数据,一旦将完成的 t5.replacecommit 添加到时间线,任何新的读取查询将仅读取 file5 并忽略 file1 到 file4。完成的 t5.replacecommit 将包含有关添加哪些文件和替换哪些文件的所有信息。
此外,Commit和Replace Commit之间的另一个区别是,常规提交的规划阶段没有太多涉及。但在Replace Commit情况下,规划涉及遍历现有文件组,并根据聚簇计划策略和配置,Hudi 将确定要考虑聚簇的文件组以及如何将它们打包到不同的聚簇操作中。因此对于非常大的表,即使是计划也可能需要一些不小的时间。此外在规划阶段结束时,有可能不会生成任何聚簇计划,因此我们可能看不到任何".replacecommit.requested"文件。这意味着此时没有任何东西可以聚簇,并且聚簇计划将在稍后的某个时间再次重新尝试。可以在此处阅读有关聚簇的更多信息。
聚簇就是这样的一个例子。但还有其他操作会导致Replace Commit操作,其中包括insert_overwrite、insert_overwrite_table 和 delete_partition 操作。
Compaction Commit
Compaction(压缩)是指将 MOR 表中的基础文件和关联日志文件压缩为新的基础文件的过程。可以在此处阅读有关压缩的更多信息。与聚簇类似,这也将经历一个规划阶段,并基于压缩策略,可选地生成一个压缩计划,跟踪日志文件列表和要压缩的基本文件。如果生成了计划,它将在时间线中生成一个compaction.requested 文件。这标志着规划阶段的结束。然后在执行阶段,将创建一个inflight文件,最终一旦压缩完成,一个完成的文件将被添加到时间线中以标记感兴趣的压缩的完成。
代码语言:javascript复制| — 20230707091954370.compaction.requested
| — 20230707091954370.compaction.inflight
| — 20230707091954370.commit
同样与 Clean 和 Clustering 类似,计划一旦序列化(换句话说,一旦requested文件写出),Hudi 就可以适应任意数量的崩溃和重新尝试,最终 Hudi 一定会完成它,确保所有部分失败的尝试都得到正确清理,并且只有最终成功尝试的数据文件完好无损。当操作一个非常大的表并且必须压缩大量文件组时,这一点非常关键。此外假设计划的压缩最终完成,表中的其他操作也将继续进行。因此我们永远无法恢复计划的压缩。如果表中有更多写入端,则必须不惜一切代价完成它,这是Hudi支持异步压缩的关键设计之一。如果看到具有以下序列的时间线,则它是有效的事件序列。
代码语言:javascript复制| — t100.compaction.requested
| — t110.deltacommit.requested
| — t110.deltacommit.inflight
| — t100.compaction.inflight
| — t110.deltacommit
| — t100.commit
如果在连续模式下使用 Deltastreamer,这是通常看到的时间线事件序列。
Rollback
使用Rollback(回滚)操作回滚任何部分失败的写入。在单写入端模式下,回滚是急切的,即每当开始新的提交时,Hudi 都会检查任何待处理的提交并触发回滚。在Hudi支持的所有不同操作中,只有Clean、Rollback和Restore会删除文件,其他操作都不会删除任何数据文件,Replace Commit可以将某些文件标记为已替换,但不会删除它们。
回滚计划阶段包括查找作为部分失败提交的一部分添加的所有文件并将其添加到回滚计划中。正如我们之前所看到的,计划被序列化到 rollback.requested 文件中。执行首先在时间线中创建一个运行中的文件,最终当回滚完成时,完成的回滚文件将被添加到时间线中。
假设这是崩溃之前的时间线。
代码语言:javascript复制| — t10.commit.requestet
| — t10.commit.inflight
| — t10.commit
| — t20.commit.requested
| — t20.commit.inflight
就在这之后,进程崩溃了。因此用户重新启动管道并将触发回滚,因为 t20 被推断为待处理。
代码语言:javascript复制| — t10.commit.requested
| — t10.commit.inflight
| — t10.commit
| — t20.commit.requested
| — t20.commit.inflight
| — t25.rollback.requested
回滚结束时,Hudi 会删除正在回滚的提交的提交元文件。在这种情况下,与提交 t20 相关的所有时间线文件都将被删除。因此回滚完成后的时间线可能如下所示。
代码语言:javascript复制| — t10.commit.requested
| — t10.commit.inflight
| — t10.commit
| — t25.rollback.requested。
| — t25.rollback.inflight
| — t25.rollback
对于多写入端,Hudi 还引入了延迟回滚,即它使用基于心跳的回滚机制,我们会在未来的博客中更深入地了解回滚算法。
与聚簇、压缩类似,回滚也被设计成幂等的。我们在请求文件中序列化计划,因此即使回滚中途崩溃,我们也可以重新尝试,不会出现任何问题。Hudi 确保重复使用相同的回滚即时时间来回滚给定的提交。完成的回滚文件将列出在回滚过程中删除的所有文件。COW中的回滚将删除部分写入的文件,但在MOR的情况下,如果部分失败的提交添加了一个日志文件,则回滚将添加另一个带有回滚块的日志文件,并且不会删除原始日志文件。这是 MOR 表的关键设计之一,以将任何写入保留为追加。我们还可以在以后的一些博客中查看日志文件设计。
Savepoint
为了在灾难和恢复场景中提供帮助,Hudi 引入了两种操作,称为Savepoint(保存点)和Restore(恢复)。将保存点添加到提交可确保清理和归档不会触及与保存点提交相关的任何内容。这意味着用户可以根据需要将表恢复到感兴趣的保存点提交。仅当保存点尚未清理时才允许将其添加到提交中。
Savepoint 只有两种状态:正在运行和已完成。由于没有计划阶段,因此没有保存点请求。在执行阶段,Hudi 会查找截至感兴趣的提交时间提供读取查询所需的所有文件。这些文件将添加到 tX.savepoint.inflight 文件中。并立即将完整的保存点文件添加到时间线中。
代码语言:javascript复制| — t10.commit.requested
| — t10.commit.inflight
| — t10.commit
| — t10.savepoint.inflight
| — t10.savepoint
也可以在稍后阶段添加保存点,只是清理程序不应该清理文件。例如表可能有从 t10 到 200 的提交(每 10 秒一次)。因此在时间 t210,如果 Cleaner 清理 t30 之前的数据文件,则允许为t50添加保存点。
Restore
Restore(恢复)用于将整个表恢复到某个较旧的时间点。万一表中出现了一些坏数据,或者数据损坏或其他正当原因,如果用户希望将表恢复到 10 小时前的状态,恢复操作就会派上用场。用户可以将保存点添加到 10 小时前的提交之一并触发恢复。从技术上讲,恢复意味着按时间倒序回滚 N 个提交。例如如果表有提交 t10、t20、t30、t40、t50、t60、t70、t80、t90 和 t100。用户更愿意将表恢复到 t40。Hudi 将回滚 t100,然后回滚 t90,然后回滚 t80,依此类推。直到 t50 回滚开始。
Hudi 将像其他表服务一样经历类似的状态转换。将生成请求的计划来跟踪需要回滚的所有提交,然后在执行过程中,将创建一个运行中的文件,最终完成后,完整的恢复文件将添加到时间线中。
代码语言:javascript复制| — t10.commit.requested
| — t10.commit.inflight
| — t10.commit
| — t10.savepoint.inflight
| — t10.savepoint
| — t20.commit.requested
| — t20.commit.inflight
| — t20.commit
| - ..
| - ..
| — t100.commit.requested
| — t100.commit.inflight
| — t100.commit
恢复后时间线可能如下所示
代码语言:javascript复制| — t10.commit.requested
| — t10.commit.inflight
| — t10.commit
| — t10.savepoint.inflight
| — t10.savepoint
| — t120.restore.requested
| — t120.restore.inflight|
| — t120.restore
Index
Hudi支持添加各种索引来辅助读写延迟,此类分区包括列统计分区和布隆过滤器分区,要首次为大型表初始化这些索引,我们不能阻止摄取写入器,因为它可能会占用大量时间。因此 Hudi 引入了 AsyncIndexer 来协助异步初始化这些分区。
代码语言:javascript复制| — t200.indexing.requested
| — t200.indexing.inflight
| — t200.indexing
与任何其他操作一样,这会经历典型的状态转换,我们将在单独的博客中详细介绍异步索引。
Active/Archive Timeline
Hudi 将整个时间线剖析为Active Timeline(活动时间线)和Archive Timeline(存档时间线)。在".hoodie"目录下看到的任何Instant均指活动时间线,而存档的那些Instant将进入".hoodie/archived"目录。可以在此处阅读有关存档时间表的更多信息。区分Ative/Archive Timeline背后的基本原理是确保我们对元数据(时间线)有最大限制,防止随着时间线越来越长,读取出现延迟增加的情况。
Hudi CLI
Hudi CLI 有查看表时间线的命令。我们将在其他一些博客中通过示例详细介绍它们,如果想尝试一下,命令是"timeline"。
结论
时间线在提供符合 ACID 的正确数据方面发挥着非常重要的作用。了解不同的时间线事件对于管理任何组织中的 Apache Hudi 表都非常有益,并且还有助于根据需要进行问题排查。
推荐阅读
万字长文 | 泰康人寿基于 Apache Hudi 构建湖仓一体平台的应用实践
CDC一键入湖:当 Apache Hudi DeltaStreamer 遇见 Serverless Spark
数据湖在快手的生产实践
图加速数据湖分析-GeaFlow和Apache Hudi集成
加速LakeHouse ACID Upsert的新写时复制方案