基本环境
- mysql 5.7
- hadoop 3.2.2
- flink 1.14.4
- hudi 0.11.0
- flink-cdc-mysql 2.2
操作步骤
- 使用flink cdc将mysql中两个表的数据同步到hudi表
- 增量读取hudi表,增量关联两个表中的数据
- 将关联后的数据写入宽表中
具体实施
mysql中建表
代码语言:javascript复制create database hudi_test;
use hudi_test;
create table orders(id int primary key not null, num int);
create table product(id int primary key not null, name varchar(50));
往两个表中插入数据
代码语言:javascript复制insert into orders values(1, 2);
insert into product values(1, "phone");
启动flink sql,
flink sql中创建catalog
代码语言:javascript复制create catalog hudi with ('type'='hudi','catalog.path'='hdfs://bigdata:9000/user/hive/warehouse');
create database huditest;
use huditest;
在flink sql中建hudi表
代码语言:javascript复制--- order表
CREATE TABLE hudi.huditest.orders_hudi(
id INT,
num INT,
PRIMARY KEY(id) NOT ENFORCED
) WITH (
'connector' = 'hudi',
'table.type' = 'MERGE_ON_READ',
'read.streaming.enabled' = 'true',
'compaction.async.enabled' = 'false'
);
--- product表
CREATE TABLE hudi.huditest.product_hudi(
id INT,
name STRING,
PRIMARY KEY(id) NOT ENFORCED
) WITH (
'connector' = 'hudi',
'table.type' = 'MERGE_ON_READ',
'read.streaming.enabled' = 'true',
'compaction.async.enabled' = 'false'
);
--- 宽表
CREATE TABLE hudi.huditest.orders_product_hudi(
id INT,
name STRING,
num INT,
PRIMARY KEY(id) NOT ENFORCED
) WITH (
'connector' = 'hudi',
'table.type' = 'MERGE_ON_READ',
'read.streaming.enabled' = 'true',
'compaction.async.enabled' = 'false'
);
在flink sql中创建mysql源表
代码语言:javascript复制CREATE TABLE orders_mysql (
id INT,
num INT,
PRIMARY KEY(id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'localhost',
'port' = '3306',
'username' = 'root',
'password' = 'Pass-123-root',
'database-name' = 'hudi_test',
'table-name' = 'orders');
CREATE TABLE product_mysql (
id INT,
name STRING,
PRIMARY KEY(id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'localhost',
'port' = '3306',
'username' = 'root',
'password' = 'Pass-123-root',
'database-name' = 'hudi_test',
'table-name' = 'product');
数据写入
代码语言:javascript复制insert into hudi.huditest.orders_hudi select * from orders_mysql;
insert into hudi.huditest.product_hudi select * from product_mysql;
flink界面如图所示
两个hudi表关联
使用inner join
代码语言:javascript复制insert into hudi.huditest.orders_product_hudi
select
hudi.huditest.orders_hudi.id as id,
hudi.huditest.product_hudi.name as name,
hudi.huditest.orders_hudi.num as num
from hudi.huditest.orders_hudi
inner join hudi.huditest.product_hudi on hudi.huditest.orders_hudi.id = hudi.huditest.product_hudi.id;
数据查询
代码语言:javascript复制select * from hudi.huditest.orders_product_hudi;
得到:
再次往orders表中插入数据
代码语言:javascript复制insert into orders values(2, 13);
这种情况下会等待product流到来并关联再往下游输出,此时再往product表写数据
代码语言:javascript复制insert into product values(2, "door");
查询数据得到
异常流操作
往orders表先后写入两条数据,由于flink checkpoint时间设置为3分钟,该时间为hudi表数据具体落盘的时间,所以为了让orders和product流数据乱序,进行如下操作
orders流插入数据
product流插入数据
- 看到join接收到数据之后,再次往product流插入id为3的数据
insert into product values(3, "screen");
再次查询数据
可见乱序数据可正常关联
本文为从大数据到人工智能博主「xiaozhch5」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://lrting.top/backend/bigdata/hudi/hudi-advanced/6479/