hudi表流式regular inner join关联写入宽表实践

2022-06-19 13:04:19 浏览数 (1)

基本环境

  • mysql 5.7
  • hadoop 3.2.2
  • flink 1.14.4
  • hudi 0.11.0
  • flink-cdc-mysql 2.2

操作步骤

  1. 使用flink cdc将mysql中两个表的数据同步到hudi表
  2. 增量读取hudi表,增量关联两个表中的数据
  3. 将关联后的数据写入宽表中

具体实施

mysql中建表

代码语言:javascript复制
create database hudi_test;
use hudi_test;

create table orders(id int primary key not null, num int);
create table product(id int primary key not null, name varchar(50));

往两个表中插入数据

代码语言:javascript复制
insert into orders values(1, 2);
insert into product values(1, "phone");

启动flink sql,

flink sql中创建catalog

代码语言:javascript复制
create catalog hudi with ('type'='hudi','catalog.path'='hdfs://bigdata:9000/user/hive/warehouse');
create database huditest;
use huditest;

在flink sql中建hudi表

代码语言:javascript复制
--- order表
CREATE TABLE hudi.huditest.orders_hudi(
  id INT,
  num INT,
  PRIMARY KEY(id) NOT ENFORCED
) WITH (
  'connector' = 'hudi',
  'table.type' = 'MERGE_ON_READ',
  'read.streaming.enabled' = 'true',
  'compaction.async.enabled' = 'false'
);

--- product表
CREATE TABLE hudi.huditest.product_hudi(
  id INT,
  name STRING,
  PRIMARY KEY(id) NOT ENFORCED
) WITH (
  'connector' = 'hudi',
  'table.type' = 'MERGE_ON_READ',
  'read.streaming.enabled' = 'true',
  'compaction.async.enabled' = 'false'
);

--- 宽表
CREATE TABLE hudi.huditest.orders_product_hudi(
  id INT,
  name STRING,
  num INT,
  PRIMARY KEY(id) NOT ENFORCED
) WITH (
  'connector' = 'hudi',
  'table.type' = 'MERGE_ON_READ',
  'read.streaming.enabled' = 'true',
  'compaction.async.enabled' = 'false'
);

在flink sql中创建mysql源表

代码语言:javascript复制
CREATE TABLE orders_mysql (
  id INT,
  num INT,
  PRIMARY KEY(id) NOT ENFORCED
) WITH (
  'connector' = 'mysql-cdc',
  'hostname' = 'localhost',
  'port' = '3306',
  'username' = 'root',
  'password' = 'Pass-123-root',
  'database-name' = 'hudi_test',
  'table-name' = 'orders');

CREATE TABLE product_mysql (
  id INT,
  name STRING,
  PRIMARY KEY(id) NOT ENFORCED
) WITH (
  'connector' = 'mysql-cdc',
  'hostname' = 'localhost',
  'port' = '3306',
  'username' = 'root',
  'password' = 'Pass-123-root',
  'database-name' = 'hudi_test',
  'table-name' = 'product');

数据写入

代码语言:javascript复制
insert into hudi.huditest.orders_hudi select * from orders_mysql;

insert into hudi.huditest.product_hudi select * from product_mysql;

flink界面如图所示

两个hudi表关联

使用inner join

代码语言:javascript复制
insert into hudi.huditest.orders_product_hudi
select
  hudi.huditest.orders_hudi.id as id,
    hudi.huditest.product_hudi.name as name,
  hudi.huditest.orders_hudi.num as num
from hudi.huditest.orders_hudi
inner join hudi.huditest.product_hudi on hudi.huditest.orders_hudi.id = hudi.huditest.product_hudi.id;

数据查询

代码语言:javascript复制
select * from hudi.huditest.orders_product_hudi;

得到:

再次往orders表中插入数据

代码语言:javascript复制
insert into orders values(2, 13);

这种情况下会等待product流到来并关联再往下游输出,此时再往product表写数据

代码语言:javascript复制
insert into product values(2, "door");

查询数据得到

异常流操作

往orders表先后写入两条数据,由于flink checkpoint时间设置为3分钟,该时间为hudi表数据具体落盘的时间,所以为了让orders和product流数据乱序,进行如下操作

orders流插入数据

product流插入数据

  1. 看到join接收到数据之后,再次往product流插入id为3的数据
代码语言:javascript复制
insert into product values(3, "screen");

再次查询数据

可见乱序数据可正常关联

本文为从大数据到人工智能博主「xiaozhch5」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。

原文链接:https://lrting.top/backend/bigdata/hudi/hudi-advanced/6479/

0 人点赞