apache hudi 0.13.0版本重磅发布

2023-03-07 21:03:36 浏览数 (2)

Apache Hudi 0.13.0引入了一系列新特性,包括Metaserver, Change Data Capture, new Record Merge API, new sources for Deltastreamer等。虽然此版本不需要表版本升级,但希望用户在使用 0.13.0 版本之前按照下面的迁移指南采取相关重大更改和行为更改的操作。

迁移指南概览

此版本与 0.12.0 版本保持相同的表版本 (5),如果您从 0.12.0 升级,则无需升级表版本。 如下所述,存在一些重大更改和行为更改,希望用户在使用 0.13.0 版本之前采取相应的措施。

迁移指南:重大更改

Bundle Updates

Spark bundle Support

从现在开始,hudi-spark3.2-bundle 可与 Apache Spark 3.2.1 和 Spark 3.2.x 的更新版本一起使用。 由于 HiveClientImpl 的 getHive 方法的 Spark 实现更改在 Spark 版本 3.2.0 和 3.2.1 之间不兼容,因此放弃了对带有 hudi-spark3.2-bundle 的 Spark 3.2.0 的支持。

Utilities Bundle Change

AWS 和 GCP bundle jar 与 hudi-utilities-bundle 是分开的。 用户在使用云服务时需要使用 hudi-aws-bundle 或 hudi-gcp-bundle 以及 hudi-utilities-bundle。

New Flink Bundle

Hudi 现在通过新的 hudi-flink1.16-bundle 在 Flink 1.16.x 上得到支持。

Spark 中的惰性文件索引

Hudi 在 Spark 中的文件索引默认切换为惰性列出:这意味着它只会列出查询请求的分区(即,在分区修剪之后),而不是在此版本之前总是列出整个表。 这有望为大型表带来相当大的性能提升。

如果用户想要更改列表行为,则会添加一个新的配置属性:hoodie.datasource.read.file.index.listing.mode(现在默认为惰性)。 您可以设置两个可能的值:

  • eager:这会在初始化期间急切地列出所有分区路径和其中相应的文件切片。 这是 0.13.0 之前的默认行为。如果一个Hudi表有1000个分区,eager模式在构建文件索引时会列出所有分区下的文件。
  • lazy:其中的分区和文件切片将被延迟列出,允许分区修剪谓词被适当地向下推,因此只列出已经被修剪的分区。初始化文件索引时,文件未列在分区下。 在查询中使用谓词(例如,datestr=2023-02-19)进行分区修剪后,文件仅列在目标分区下。

要保留 0.13.0 之前的行为,用户需要设置 hoodie.datasource.read.file.index.listing.mode=eager。 重大更改:只有当表同时具有以下两种情况时才会发生重大更改:多个分区列和分区值包含未进行 URL 编码的斜杠。

例如,假设我们要从分区路径 2022/01/03 解析两个分区列 – 月 (2022/01) 和日 (03)。 由于分区列的数量(此处为 2 – 月和日)与分区路径中由 / 分隔的组件数量(在本例中为 3 – 月、年和日)不匹配,因此会导致歧义。 在这种情况下,不可能恢复每个分区列对应的分区值。

有两种方法可以避免重大更改:

  • 第一个选项是更改分区值的构造方式。 用户可以切换月份列的分区值,避免任何分区列值出现斜杠,比如202201,那么解析分区路径(202201/03)就没有问题了。
  • 第二个选项是将列表模式切换为 eager。 文件索引将“优雅地回归”以假定表未分区并仅牺牲分区修剪,但将能够像表未分区一样处理查询(因此可能导致性能损失),而不是失败 查询。

Spark Structured Streaming 中的检查点管理

如果您使用 Spark streaming 摄取到 Hudi,Hudi 会在内部自行管理检查点。 我们现在正在添加对多个编写器的支持,每个编写器都通过流式摄取摄取到同一个 Hudi 表中。 在旧版本的 hudi 中,您不能将多个流式摄取编写器摄取到同一个 hudi 表中(一个具有并发 Spark 数据源编写器的流式摄取编写器与锁提供程序一起工作;但是,不支持两个 Spark 流式摄取编写器)。 在 0.13.0 中,我们添加了对同一个表进行多个流式摄取的支持。 如果是单个流摄取,用户无需执行任何操作; 旧管道无需任何额外更改即可工作。 但是,如果您有多个流式写入器到同一个 Hudi 表,则每个表都必须为配置 hoodie.datasource.write.streaming.checkpoint.identifier 设置一个唯一的值。 此外,用户应该设置通常的多写入器配置。 更多详情可在这找到。

Spark中的ORC支持

此版本中删除了对 Spark 2.x 的 ORC 支持,因为 Hudi 中对 orc-core:nohive 的依赖现在被 orc-core 取代,以与 Spark 3 兼容。ORC 支持现在可用于 Spark 3.x ,这在以前的版本中被破坏了。

强制记录关键字段

设置record key字段的配置hoodie.datasource.write.recordkey.field现在需要设置,没有默认值。 以前,默认值为 uuid。

迁移指南:行为更改

写路径中的模式处理

许多用户已请求将 Hudi 用于 CDC 用例,他们希望在新模式中删除现有列时能够实现模式自动演化。 从 0.13.0 版本开始,Hudi 现在具有此功能。 您可以允许模式自动演化,其中可以将现有列删除到新模式中。

由于根据源架构在目标表中删除列构成了相当大的行为更改,因此默认情况下禁用此功能并由以下配置保护:hoodie.datasource.write.schema.allow.auto.evolution.column.drop。 要启用自动删除列以及传入批次的新演变模式,请将其设置为 true。

此配置不需要通过使用例如 ALTER TABLE … Spark 中的 DROP COLUMN 手动演变模式。

删除默认Shuffle并行度

此版本更改了 Hudi 决定写入操作的shuffle并行度的方式,包括 INSERT、BULK_INSERT、UPSERT 和 DELETE (hoodie.insert|bulkinsert|upsert|delete.shuffle.parallelism),这最终会影响写入性能。

之前,如果用户不配置,Hudi 会使用 200 作为默认的 shuffle 并行度。 从 0.13.0 开始,默认情况下,Hudi 通过使用由 Spark 确定的输出 RDD 分区数(如果可用)或使用 spark.default.parallelism 值自动推导shuffle并行度。 如果上述Hudi shuffle并行度是用户明确配置的,那么用户配置的并行度仍然用于定义实际的并行度。 对于具有合理输入大小的工作负载,此类行为更改可将开箱即用的性能提高 20%。

如果输入数据文件很小,例如小于 10MB,我们建议显式配置 Hudi shuffle 并行度(hoodie.insert|bulkinsert|upsert|delete.shuffle.parallelism),这样并行度至少为 total_input_data_size/500MB,以 避免潜在的性能下降(有关更多信息,请参阅调整指南)。

默认的简单写执行器

对于插入/更新插入操作的执行,Hudi 过去使用执行器的概念,依靠内存中的队列将摄取操作(以前通常由 I/O 操作获取shuffle blocks)与写入操作分离。 从那时起,Spark 架构有了很大的发展,使得这种编写架构变得多余。 为了发展这种编写模式并利用 Spark 中的变化,在 0.13.0 中,我们引入了一个新的简化版本的执行程序,(创造性地)命名为 SimpleExecutor 并将其设置为开箱即用的默认值。

SimpleExecutor 没有任何内部缓冲(即不在内存中保存记录),它在内部实现对提供的迭代器的简单迭代(类似于默认的 Spark 行为)。 它在现代 Spark 版本 (3.x) 上提供了约 10% 的开箱即用性能改进,与 Spark 的本机 SparkRecordMerger 一起使用时甚至更多。

NONE 用于批量插入以匹配 Parquet 写入的排序模式

此版本调整了 BULK_INSERT 写入操作的 NONE 排序模式(默认排序模式)的并行度。 从现在开始,默认情况下,使用输入并行性而不是shuffle并行性 (hoodie.bulkinsert.shuffle.parallelism) 来写入数据,以匹配默认的 parquet 写入行为。 这不会更改使用 NONE 排序模式的聚类行为。

BULK_INSERT 写入操作的这种行为更改提高了开箱即用的写入性能。

如果在默认的NONE排序方式下还是发现小文件问题,我们建议在写入Hudi表之前,先根据分区路径和记录键对输入数据进行排序。 您还可以使用 GLOBAL_SORT 来确保最佳文件大小。

Deltstreamer 中的元同步失败

在早期版本中,我们使用了一种快速失败的方法,如果任何目录同步失败,则不会尝试同步到剩余的目录。 在 0.13.0 中,在任何目录同步失败的操作失败之前尝试同步到所有配置的目录。 在一个目录同步失败的情况下,其他目录的同步仍然可以成功,所以用户现在只需要重试失败的目录即可。

不覆盖内部元数据表配置

由于错误配置可能导致数据完整性问题,在 0.13.0 中,我们努力使用户的元数据表配置更加简单。 在内部,Hudi 确定这些配置的最佳选择,以实现系统的最佳性能和稳定性。

以下与元数据表相关的配置是内部的; 您不能再显式配置这些配置:

代码语言:javascript复制
hoodie.metadata.clean.async
hoodie.metadata.cleaner.commits.retained
hoodie.metadata.enable.full.scan.log.files
hoodie.metadata.populate.meta.fields

Spark SQL CTAS 性能修复

以前,由于配置错误,CTAS 写入操作被错误地设置为使用 UPSERT。 在 0.13.0 版本中,我们修复了这个问题,以确保 CTAS 使用 BULK_INSERT 操作来提高第一批写入 Hudi 表的性能(没有真正需要为此使用 UPSERT,因为正在创建表)。

Flink CkpMetadata

在 0.13.0 之前,我们通过清理所有消息来引导 ckp 元数据(检查点相关元数据)。 一些极端情况没有得到正确处理。 例如:

重新启动作业时,写任务无法正确获取挂起的瞬间。

如果检查点成功并且作业突然崩溃,则瞬间没有时间提交。 数据丢失,因为最后一个挂起的瞬间被回滚; 然而,Flink 引擎仍然认为检查点/即时是成功的。

问:为什么我们要在 0.13.0 版本之前清理消息?

A:为了防止时间线和消息不一致。

问:为什么我们要保留 0.13.0 版本中的消息?

A:不一致有两种情况:

时间线即时完成但 ckp 消息正在传输(用于提交即时)。

时间线时刻处于待定状态,而 ckp 消息未启动(用于启动新时刻)。

对于case 1,不需要re-commit instant,如果write task在恢复的时候没有得到任何pending instant就可以了。

对于case 2,instant基本上是悬而未决的。 瞬间将被回滚(如预期的那样)。 因此,保持 ckp 消息原样实际上可以保持正确性。

版本亮点

Metaserver

在 0.13.0 中,我们引入了元数据集中管理服务 Metaserver。 这是我们在未来引入的首批平台服务组件之一。 Metaserver 帮助用户轻松管理数据湖平台中的大量表。

注意,这是实验性的特性

要在您的环境中设置元服务器,请使用 hudi-metaserver-server-bundle 并将其作为 java 服务器应用程序运行,例如 java -jar hudi-metaserver-server-bundle-.jar。 在客户端,添加以下选项以与元服务器集成:

代码语言:javascript复制
hoodie.metaserver.enabled=true
hoodie.metaserver.uris=thrift://<server url>:9090

Metaserver 存储 Hudi 表的元数据,如表名、数据库、所有者; 以及时间线的元数据,如提交瞬间、动作、状态等。此外,Metaserver 通过 Hudi Spark 包支持 Spark 写入器和读取器。

Change Data Capture

在 Hudi 表用作流源的情况下,我们希望了解属于单个提交的记录的所有更改。 例如,我们想知道哪些记录被插入、删除和更新。 对于更新的记录,后续管道可能希望获取更新前的旧值和更新后的新值。 0.13.0之前,增量查询不包含硬删除记录,用户需要使用软删除流删除,可能不符合GDPR要求。

Change-Data-Capture (CDC) 功能使 Hudi 能够通过生成更改来显示记录是如何更改的,从而处理 CDC 查询用例。

CDC 是一项实验性功能,支持用于带有 Spark 和 Flink 引擎的 COW 表。 CDC 查询尚不支持 MOR 表。

要使用 CDC,用户需要先在写入表时启用它以记录额外的数据,这些数据由 CDC 增量查询返回。

对于写入,设置 hoodie.table.cdc.enabled=true 并通过 hoodie.datasource.query.incremental.format 指定 CDC 日志记录模式,以控制记录的数据。 有3种模式可供选择:

  • data_before_after:这记录了更改记录的操作以及更改前后的整个记录。 这种模式在存储上产生最多的 CDC 数据,并且查询 CDC 结果的计算量最少。
  • data_before:这记录了更改记录的操作和更改前的整个记录。
  • op_key_only:这只记录更改记录的操作和键。 这种模式在存储上产生最少的 CDC 数据,并且需要最多的计算工作来查询 CDC 结果。

默认值为data_before_after

对于读,设置:

代码语言:javascript复制
hoodie.datasource.query.type=incremental
hoodie.datasource.query.incremental.format=cdc

和其他通常的增量查询选项,如开始和结束即时时间,并返回 CDC 结果。

请注意,hoodie.table.cdc.enabled 是表配置。 一旦启用,就不允许为该表关闭它。 同样,您不能更改 hoodie.table.cdc.supplemental.logging.mode,一旦它被保存为表配置。

优化记录负载处理

此版本引入了期待已久的支持,可将记录作为其引擎原生表示进行处理,从而避免将它们转换为中间形式 (Avro) 的需要。

此功能处于实验模式,目前仅支持 Spark。

通过引入新的 HoodieRecordMerger 抽象,RFC-46 使这成为可能。 HoodieRecordMerger 是未来在 Hudi 中实现任何合并语义的核心和真实来源。 在这种能力下,它取代了以前用于实现自定义合并语义的 HoodieRecordPayload 层次结构。 通过依赖 HoodieRecordMerger 形式的统一组件,我们可以在写入操作的整个生命周期内以统一的方式处理记录。 这大大减少了延迟,因为记录现在保存在引擎本机表示中,避免了不必要的复制、反序列化和转换为中间表示 (Avro)。 在我们的基准测试中,与 0.13.0 默认状态相比,upsert 性能提高了 10%,与 0.12.2 相比提高了 20%。

今天要尝试,您需要为每个 Hudi 表指定不同的配置:

对于 COW,指定 hoodie.datasource.write.record.merger.impls=org.apache.hudi.HoodieSparkRecordMerger

对于 MOR,指定 hoodie.datasource.write.record.merger.impls=org.apache.hudi.HoodieSparkRecordMerger 和 hoodie.logfile.data.block.format=parquet

请注意,当前的 HoodieSparkRecordMerger 实现仅支持与 OverwriteWithLatestAvroPayload 类等效的合并语义,这是当前用于合并记录的默认 HoodieRecordPayload 实现(设置为“hoodie.compaction.payload.class”)。 因此,如果您正在使用任何其他 HoodieRecordPayload 实现,不幸的是,您需要等到它被相应的 HoodieRecordMerger 实现替换。

Deltastreamer 中的新源支持

Deltastreamer 是一个完全托管的增量 ETL 实用程序,支持各种来源。 在此版本中,我们在其曲目中添加了三个新来源。

Proto Kafka Source

Deltastreamer 已经支持使用 JSON 和 Avro 格式从 Kafka 中一次性摄取新事件。 ProtoKafkaSource 也将此支持扩展到基于 Protobuf 类的模式。 只需一个额外的配置,就可以轻松设置此源。 查看文档以获取更多详细信息。

GCS Incremental Source

沿着 S3 事件源的路线,我们现在有一种可靠且快速的方法来通过 GcsEventsHoodieIncrSource 从 Google Cloud Storage (GCS) 中的对象中摄取。 查看有关如何设置此源的文档。

Pulsar Source

Apache Pulsar 是一个为云构建的开源分布式消息传递和流媒体平台。 PulsarSource 支持通过 Deltastreamer 从 Apache Pulsar 摄取。 查看有关如何设置此源的文档。

Partial Payload Update支持

部分更新是社区中的一个常见用例,它需要能够仅更新某些字段而不是替换整个记录。 以前,我们建议用户通过引入他们自己的自定义记录负载实现来满足此用例。 随着它的流行,在 0.13.0 版本中,我们添加了一个新的记录有效负载实现 PartialUpdateAvroPayload,以支持这种开箱即用的功能,因此用户可以使用该实现而不必编写自己的自定义实现。

一致性哈希索引

我们引入了 Consistent Hashing Index 作为您使用 Hudi 写入的另一种索引选项。 这是对 0.11.0 版本中添加的 Bucket Index 的增强。 使用Bucket索引,每个分区的Bucket/文件组是静态分配的,而使用一致性哈希索引,Bucket可以动态增长,因此用户无需担心数据倾斜。 Bucket将根据每个分区的负载因子扩展和收缩。 您可以找到此功能设计的 RFC。

如果您想尝试一下,这里是您感兴趣的配置。

代码语言:javascript复制
hoodie.index.type=bucket
hoodie.index.bucket.engine=CONSISTENT_HASHING
hoodie.bucket.index.max.num.buckets=128
hoodie.bucket.index.min.num.buckets=32
hoodie.bucket.index.num.buckets=4
## do split if the bucket size reach 1.5 * max_file_size
hoodie.bucket.index.split.threshold=1.5
## do merge if the bucket size smaller than 0.2 * max_file_size
hoodie.bucket.index.merge.threshold=0.1 

要强制缩小或扩大存储bucket,您需要使用以下配置启用clustering

代码语言:javascript复制
## check resize for every 4 commit
hoodie.clustering.inline=true
hoodie.clustering.inline.max.commits=4
hoodie.clustering.plan.strategy.class=org.apache.hudi.client.clustering.plan.strategy.SparkConsistentBucketClusteringPlanStrategy
hoodie.clustering.execution.strategy.class=org.apache.hudi.client.clustering.run.strategy.SparkConsistentBucketClusteringExecutionStrategy
## for supporting concurrent write & resizing
hoodie.clustering.updates.strategy=org.apache.hudi.client.clustering.update.strategy.SparkConsistentBucketDuplicateUpdateStrategy

Consistent Hashing Index 仍然是一个不断发展的特性,目前从 0.13.0 开始使用它有一些限制:

  1. 只有使用 MOR 表的 Spark 引擎才支持此索引。
  2. 它不适用于启用元数据表。
  3. 要扩大或缩小buckets,用户必须使用上述配置(以某种节奏)手动触发clustering,但他们不能同时运行压缩。
  4. 因此,如果您的常规写入管道启用了压缩,请遵循以下建议:您可以选择每 12 小时触发一次缩放/收缩。 在这种情况下,每 12 小时一次,您可能需要禁用压缩、停止写入管道并启用clustering。 您应该格外小心,不要同时运行两者,因为这可能会导致冲突和管道失败。clustering完成后,您可以恢复常规写入管道,这将启用压缩。

我们正在努力实现这些自动化,并使用户更容易利用 Consistent Hashing Index。 您可以在此处关注 Consistent Hashing Index 正在进行的工作。

多个writer写入的早期冲突检查

Hudi提供乐观并发控制(OCC),允许多个写入者在没有重叠数据文件写入的情况下,并发写入并原子提交到Hudi表,保证数据的一致性、完整性和正确性。 在0.13.0版本之前,这种重叠数据文件的冲突检测是在提交元数据之前和数据写入完成之后进行的。 如果在最后阶段检测到任何冲突,则可能会浪费计算资源,因为数据写入已经完成。

为了提高并发控制,0.13.0版本引入了OCC早期冲突检测的新特性,利用Hudi的标记机制,在数据写入阶段检测到冲突,一旦检测到冲突就提前中止写入。 Hudi 现在可以更早地停止冲突写入器,因为它可以及早检测冲突并释放集群所需的计算资源,从而提高资源利用率。

OCC 中的早期冲突检测在 0.13.0 版本中是实验性的。

默认情况下,此功能处于关闭状态。 要尝试这一点,用户需要在使用 OCC 进行并发控制时将 hoodie.write.concurrency.early.conflict.detection.enable 设置为 true(有关更多详细信息,请参阅并发控制页面)。

写入数据中的无锁消息队列

在以前的版本中,Hudi 使用生产者-消费者模型通过有界内存队列将传入数据写入表中。 在此版本中,我们添加了一种新型队列,利用 Disruptor,它是无锁的。 当数据量很大时,这会增加写入吞吐量。 将 1 亿条记录写入云存储上的 Hudi 表中的 1000 个分区的基准显示,与现有的有界内存队列执行器类型相比,性能提高了 20%。

DisruptorExecutor 作为实验特性支持 Spark 插入和 Spark 批量插入操作

用户可以设置 hoodie.write.executor.type=DISRUPTOR_EXECUTOR 来启用该功能。 还有其他配置,如 hoodie.write.wait.strategy 和 hoodie.write.buffer.size 可以进一步调整性能。

Hudi CLI Bundle

我们为 Spark 3.x 引入了一个新的 Hudi CLI Bundle,hudi-cli-bundle_2.12,使 Hudi CLI 更简单易用。 用户现在可以使用这个单一的 bundle jar(发布到 Maven 存储库)和 Hudi Spark bundle 来启动脚本来启动带有 Spark 的 Hudi-CLI shell。 这为 Hudi-CLI 带来了轻松部署,因为用户不需要在本地编译 Hudi CLI 模块、上传 jar 和解决任何依赖冲突(如果有),而在此版本之前就是这种情况。 可以在 Hudi CLI 页面上找到详细说明。

Flink 1.16支持

Flink 1.16.x 集成Hudi,在编译源码时使用profile参数-Pflink1.16激活版本。 或者,使用 hudi-flink1.16-bundle。 Flink 1.15、Flink 1.14 和 Flink 1.13 将继续支持。 请查看迁移指南以获取捆绑包更新。

JSON模式转换

对于配置模式注册表的 DeltaStreamer 用户,添加了一个 JSON 模式转换器,以帮助将 JSON 模式转换为目标 Hudi 表的 AVRO。 将 hoodie.deltastreamer.schemaprovider.registry.schemaconverter 设置为 org.apache.hudi.utilities.schema.converter.JsonToAvroSchemaConverter 以使用此功能。 用户还可以实现此接口 org.apache.hudi.utilities.schema.SchemaRegistryProvider.SchemaConverter 以提供从原始模式到 AVRO 的自定义转换。

通过 Spark SQL Config 提供 Hudi Config

用户现在可以通过 Spark SQL conf 提供 Hudi 配置,例如,设置

代码语言:javascript复制
spark.sql("set hoodie.sql.bulk.insert.enable = true")

确保 Hudi 在执行 INSERT INTO 语句时能够使用 BULK_INSERT 操作。

0 0 投票数

文章评分

本文为从大数据到人工智能博主「xiaozhch5」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。

原文链接:https://cloud.tencent.com/developer/article/2235518

0 人点赞