Flink写入数据到Hudi数据湖的各种方式

2022-12-05 09:10:07 浏览数 (1)

1. 写入方式

1.1 CDC Ingestion

有两种方式同步数据到Hudi

  1. 使用Flink CDC直接将Mysql的binlog日志同步到Hudi
  2. 数据先同步到Kafka/Pulsar等消息系统,然后再使用Flink cdc-format将数据同步到Hudi

注意:

  1. 如果upstream不能保证数据的order,则需要显式指定write.precombine.field
  2. MOR类型的表,还不能处理delete,所以会导致数据不一致。可以通过changelog.enabled转换到change log模式
1.2 Bulk Insert

主要用于数据初始化导入。Bulk Insert不会进行数据去重,需要用户在数据插入前进行数据去重

Bulk Insert在batch execution mode下更高效

使用参数如下:

1.3 Index Bootstrap

用于snapshot data incremental data数据导入。snapshot data部分使用Bulk insert方式完成。incremental data进行实时导入

使用参数如下:

但是incremental data如何不丢失数据,又不重复导入数据:

  1. incremental data导入部分刚开始可以多导入一部分数据,确保数据不丢失。同时开启index bootstrap function避免数据重复。
  2. 等Flink第一次checkpoint成功,关闭index bootstrap function,从Flink的State恢复状态进行incremental data导入

详细使用步骤如下:

  1. 在flink-conf.yaml中设置一个application允许checkpoint失败的次数:execution.checkpointing.tolerable-failed-checkpoints = n
  2. 在Flink的Catalog创建Hudi表,创建Hudi表的SQL中添加参数index.bootstrap.enabled = true
  3. 启动Application将incremental data导入到Hudi表
  4. 等第一次checkpoint成功,表明index bootstrap完成
  5. 停止Flink的Application,并进行Savepoint
  6. 重新在Flink的Catalog创建Hudi表,创建Hudi表的SQL中添加参数index.bootstrap.enabled = false
  7. 重启Application,从Savepoint或checkpoint恢复状态执行

注意:

  1. index bootstrap是一个阻塞过程,因此在index bootstrap期间无法完成checkpoint
  2. index bootstrap由输入input data触发。用户需要确保每个分区中至少有一条数据
  3. index bootstrap是并发执行的。用户可以在日志文件中通过finish loading the index under partition和Load record form file观察index bootstrap的进度

2. 写入模式

2.1 Changelog Mode

使用参数如下:

保留消息的all changes(I / -U / U / D),Hudi MOR类型的表将all changes append到file log中,但是compaction会对all changes进行merge。如果想消费all changes,需要调整compaction参数:compaction.delta_commitscompaction.delta_seconds

Snapshot读取,永远读取merge后的结果数据

2.2 Append Mode

使用参数如下:

3. write写入速率限制

场景:使用Flink消费历史数据 实时增量数据,然后写入到Hudi。会造成写入吞吐量巨大 写入分区乱序严重,影响集群和application的稳定性。所以需要限制速率

使用参数如下:

4. 读取方式

4.1 Streaming Query

默认是Batch query,查询最新的Snapshot

Streaming Query需要设置read.streaming.enabled = true。再设置read.start-commit,如果想消费所以数据,设置值为earliest

使用参数如下:

注意:如果开启read.streaming.skip_compaction,但stream reader的速度比clean.retain_commits慢,可能会造成数据丢失

4.2 Incremental Query

有3种使用场景

  • Streaming query: 设置read.start-commit
  • Batch query: 同时设置read.start-commitread.end-commit,start commit和end commit都包含
  • TimeTravel: 设置read.end-commit为大于当前的一个instant time,read.start-commit默认为latest

使用参数如下:

0 人点赞