Hudi社区最近发生了一些有趣的变化,Hudi集成Flink的方案也已经发布,我个人在官网根据文档试验了一把,整体感觉还不错。我们目前并没有在生产环境中使用,但是随着社区发展和功能越来越完善,相信会有更多的业务开始尝试使用Hudi。本文在此做一个Flink和Hudi集成的分享,作者明喆sama。
一、组件下载
1.1、Flink1.12.2编译包下载:https://mirrors.tuna.tsinghua.edu.cn/apache/flink/flink-1.12.2/flink-1.12.2-bin-scala_2.11.tgz
1.2、Hudi编译:https://github.com/apache/hudi
代码语言:javascript复制git clone https://github.com/apache/hudi.git && cd hudi
mvn clean package -DskipTests
注意:默认是用scala-2.11编译的
如果我们用的是flink1.12.2-2.12版本,可以自己编译成scala-2.12版本的
mvn clean package -DskipTests -Dscala-2.12
包的路径在packaging/hudi-flink-bundle/target/hudi-flink-bundle_2.12-*.*.*-SNAPSHOT.jar
1.3、其他实时步骤可以参考官网的步骤了:https://hudi.apache.org/docs/flink-quick-start-guide.html
但是官网有个坑就是使用的是flink-1.11.x版本,但是我自己测试了是不行的,会报下面的错误:
1.4、后面从flink社区同学建议用flink1.12.2 hudi0.9.0(master),亲测可以。
二、Batch模式具体实施步骤:
2.1、 启动flink-sql客户端,可以提前把hudi-flink-bundle_2.12-0.9.0-SNAPSHOT.jar(我用的flink是scala2.12版本,如果是scala2.11版本需要编译成hudi-flink-bundle_2.11-0.9.0-SNAPSHOT.jar)拷贝到 $FLINK_HOME/lib目录下
代码语言:javascript复制export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath`
./bin/sql-client.sh embedded
2.2、 创建表结构
代码语言:javascript复制CREATE TABLE t1(
uuid VARCHAR(20),
name VARCHAR(10),
age INT,
ts TIMESTAMP(3),
`partition` VARCHAR(20)
)
PARTITIONED BY (`partition`)
WITH (
'connector' = 'hudi',
'path' = 'hdfs://localhost:9000/hudi/t1',
'table.type' = 'MERGE_ON_READ'
);
2.3、 插入数据
代码语言:javascript复制INSERT INTO t1 VALUES
('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1'),
('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'),
('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par2'),
('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'),
('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'),
('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06','par3'),
('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'),
('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4');
2.4、 效果图
2.5、 查询表数据,设置一下查询模式为tableau
代码语言:javascript复制-- sets up the result mode to tableau to show the results directly in the CLI
set execution.result-mode=tableau;
2.6、 根据主键更新数据
代码语言:javascript复制INSERT INTO t1 VALUES ('id1','Danny',24,TIMESTAMP '1970-01-01 00:00:01','par1');
id1的数据age由23变为了24
三、支持stream读模式:
3.1、 创建表
代码语言:javascript复制CREATE TABLE t1(
uuid VARCHAR(20),
name VARCHAR(10),
age INT,
ts TIMESTAMP(3),
`partition` VARCHAR(20)
)
PARTITIONED BY (`partition`)
WITH (
'connector' = 'hudi',
'path' = 'hdfs://localhost:9000/hudi/t1',
'table.type' = 'MERGE_ON_READ',
'read.streaming.enabled' = 'true',
'read.streaming.start-commit' = '20210401134557' ,
'read.streaming.check-interval' = '4'
);
这里将 table option read.streaming.enabled 设置为 true,表明通过 streaming 的方式读取表数据;
opiton read.streaming.check-interval 指定了 source 监控新的 commits 的间隔为 4s;
option table.type 设置表类型为 MERGE_ON_READ,目前只有 MERGE_ON_READ 表支持 streaming 读。
3.2、 查询流模式的表t1,这里的数据就是刚刚批模式写入的数据
3.3、 从批模式写入一条数据
代码语言:javascript复制insert into t1 values ('id9','test',27,TIMESTAMP '1970-01-01 00:00:01','par5');
3.4、 隔几秒后在流模式可以读取到一条新增的数据
参考
1、https://hudi.apache.org/docs/flink-quick-start-guide.html
2、https://github.com/MyLanPangzi/flink-demo/blob/main/docs/增量型数仓探索:Flink Hudi.md