1. 写入方式
1.1 CDC Ingestion
有两种方式同步数据到Hudi
- 使用Flink CDC直接将Mysql的binlog日志同步到Hudi
- 数据先同步到Kafka/Pulsar等消息系统,然后再使用Flink cdc-format将数据同步到Hudi
注意:
- 如果upstream不能保证数据的order,则需要显式指定
write.precombine.field
- MOR类型的表,还不能处理delete,所以会导致数据不一致。可以通过
changelog.enabled
转换到change log模式
1.2 Bulk Insert
主要用于数据初始化导入。Bulk Insert不会进行数据去重,需要用户在数据插入前进行数据去重
Bulk Insert在batch execution mode下更高效
使用参数如下:
1.3 Index Bootstrap
用于snapshot data incremental data数据导入。snapshot data部分使用Bulk insert方式完成。incremental data进行实时导入
使用参数如下:
但是incremental data如何不丢失数据,又不重复导入数据:
- incremental data导入部分刚开始可以多导入一部分数据,确保数据不丢失。同时开启index bootstrap function避免数据重复。
- 等Flink第一次checkpoint成功,关闭index bootstrap function,从Flink的State恢复状态进行incremental data导入
详细使用步骤如下:
- 在flink-conf.yaml中设置一个application允许checkpoint失败的次数:
execution.checkpointing.tolerable-failed-checkpoints = n
- 在Flink的Catalog创建Hudi表,创建Hudi表的SQL中添加参数
index.bootstrap.enabled = true
- 启动Application将incremental data导入到Hudi表
- 等第一次checkpoint成功,表明index bootstrap完成
- 停止Flink的Application,并进行Savepoint
- 重新在Flink的Catalog创建Hudi表,创建Hudi表的SQL中添加参数
index.bootstrap.enabled = false
- 重启Application,从Savepoint或checkpoint恢复状态执行
注意:
- index bootstrap是一个阻塞过程,因此在index bootstrap期间无法完成checkpoint
- index bootstrap由输入input data触发。用户需要确保每个分区中至少有一条数据
- index bootstrap是并发执行的。用户可以在日志文件中通过finish loading the index under partition和Load record form file观察index bootstrap的进度
2. 写入模式
2.1 Changelog Mode
使用参数如下:
保留消息的all changes(I / -U / U / D),Hudi MOR类型的表将all changes append到file log中,但是compaction会对all changes进行merge。如果想消费all changes,需要调整compaction参数:compaction.delta_commits
和 compaction.delta_seconds
Snapshot读取,永远读取merge后的结果数据
2.2 Append Mode
使用参数如下:
3. write写入速率限制
场景:使用Flink消费历史数据 实时增量数据,然后写入到Hudi。会造成写入吞吐量巨大 写入分区乱序严重,影响集群和application的稳定性。所以需要限制速率
使用参数如下:
4. 读取方式
4.1 Streaming Query
默认是Batch query,查询最新的Snapshot
Streaming Query需要设置read.streaming.enabled = true
。再设置read.start-commit
,如果想消费所以数据,设置值为earliest
使用参数如下:
注意:如果开启read.streaming.skip_compaction
,但stream reader的速度比clean.retain_commits
慢,可能会造成数据丢失
4.2 Incremental Query
有3种使用场景
- Streaming query: 设置
read.start-commit
- Batch query: 同时设置
read.start-commit
和read.end-commit
,start commit和end commit都包含 - TimeTravel: 设置
read.end-commit
为大于当前的一个instant time,read.start-commit
默认为latest
使用参数如下: