Flink 实践教程:进阶11-SQL 关联:Regular Join

2022-04-07 09:36:27 浏览数 (2)

流计算 Oceanus 简介

流计算 Oceanus 是大数据产品生态体系的实时化分析利器,是基于 Apache Flink 构建的具备一站开发、无缝连接、亚秒延时、低廉成本、安全稳定等特点的企业级实时大数据分析平台。流计算 Oceanus 以实现企业数据价值最大化为目标,加速企业实时化数字化的建设进程。

Flink SQL 提供了 Regular Joins、Interval Joins、Temporal Joins、Lookup Join、Array 展平和 Table Function 六种方式实现数据关联。本文将为您介绍如何使用 Regualr Joins 实现数据关联。Regualr Joins 在使用时有一定的限制条件,比如只能在 Equi-Join 条件下使用。下面将以 Kafka 作为源表的左右表为例,将商品订单 order-source 中商品 ID 与 product-info 中商品 ID 进行左关联得到商品名称,最终将结果数据到 Logger Sink 中去。

Flink 实践教程:进阶11-SQL 关联:Regular Join

前置准备

创建流计算 Oceanus 集群

进入 Oceanus 控制台 [1],点击左侧【集群管理】,点击左上方【创建集群】,具体可参考 Oceanus 官方文档 创建独享集群 [2]。

创建 Kafka Topic

进入 CKafka 控制台 [3],点击左上角【新建】,即可完成 CKafka 实例的创建,并创建 2 个 Topic,order-sourceproduct-info

流计算 Oceanus 作业

1. 上传依赖

在 Oceanus 控制台,点击左侧【依赖管理】,点击左上角【新建】新建依赖,上传 Logger Sink [4] JAR 包。

2. 创建作业

在 Oceanus 控制台,点击左侧【作业管理】,点击左上角【新建】新建作业,作业类型选择 SQL 作业,点击【开发调试】进入作业编辑页面。

代码语言:sql复制
CREATE TABLE `order_source` (
    `id` INT,
    `user_id` INT,
    `product_id` INT,
    `create_time` TIMESTAMP(3)
) WITH (
  'connector' = 'kafka',
  'topic' = 'order-source',  
  'scan.startup.mode' = 'latest-offset', -- 可以是 latest-offset / earliest-offset / specific-offsets / group-offsets / timestamp 的任何一种
  'properties.bootstrap.servers' = 'x.x.x.x:9092',  -- 替换为您的 Kafka 连接地址
  'properties.group.id' = 'testGroup0', -- 必选参数, 一定要指定 Group ID
  'format' = 'json',
  'json.fail-on-missing-field' = 'false',  -- 如果设置为 false, 则遇到缺失字段不会报错。
  'json.ignore-parse-errors' = 'true'    -- 如果设置为 true,则忽略任何解析报错。
);

CREATE TABLE `product_info` (
    `product_id` INT,
    `product_name` STRING,
    `update_time` TIMESTAMP(3)
) WITH (
  'connector' = 'kafka',
  'topic' = 'product-info',  -- 替换为您要消费的 Topic
  'scan.startup.mode' = 'latest-offset', -- 可以是 latest-offset / earliest-offset / specific-offsets / group-offsets / timestamp 的任何一种
  'properties.bootstrap.servers' = 'x.x.x.x:9092',  -- 替换为您的 Kafka 连接地址
  'properties.group.id' = 'testGroup0', -- 必选参数, 一定要指定 Group ID
  'format' = 'json',
  'json.fail-on-missing-field' = 'false',  -- 如果设置为 false, 则遇到缺失字段不会报错。
  'json.ignore-parse-errors' = 'true'    -- 如果设置为 true,则忽略任何解析报错。
);

CREATE TABLE logger_sink_table ( 
    `id` INT PRIMARY KEY NOT ENFORCED,
    `user_id` INT,
    `product_id` INT,
    `product_name` STRING,
    `create_time` TIMESTAMP(3)
) WITH ( 
    'connector' = 'logger',
    'print-identifier' = 'DebugData'
);

INSERT INTO logger_sink_table
SELECT order_source.id, order_source.user_id, order_source.product_id, product_info.product_name, order_source.create_time
FROM order_source left join product_info 
on order_source.product_id = product_info.product_id;
3. 运行作业

点击【发布草稿】后启动作业,可通过【日志】面板 TaskManager 或 Flink UI 查看运行信息。

4. 模拟数据

通过 Kafka Client 发送数据到关联的左表 order-source 和右表 product-info。 发送消息命令:

代码语言:shell复制
[root@VM-3-centos ~]# cd /opt/kafka_2.11-2.4.1/bin/
[root@VM-3-centos bin]# bash kafka-console-producer.sh --broker-list 10.0.0.29:9092 --topic order-source

Topic order-source 模拟数据示例:

代码语言:json复制
{"id":1,"user_id":10,"product_id":1001,"create_time":"2022-03-17 16:47:00"}
{"id":2,"user_id":10,"product_id":1001,"create_time":"2022-03-17 16:48:00"}
{"id":3,"user_id":10,"product_id":1002,"create_time":"2022-03-17 16:49:00"}

发送消息命令:

代码语言:shell复制
[root@VM-3-centos ~]# cd /opt/kafka_2.11-2.4.1/bin/
[root@VM-3-centos bin]# bash kafka-console-producer.sh --broker-list 10.0.0.29:9092 --topic product-info

Topic product-info 模拟数据示例:

代码语言:json复制
{"id":1,"user_id":10,"product_id":1001,"create_time":"2022-03-17 16:47:00"}
{"id":2,"user_id":10,"product_id":1001,"create_time":"2022-03-17 16:48:00"}
{"id":3,"user_id":10,"product_id":1002,"create_time":"2022-03-17 16:49:00"}

更多接入方式请参考 CKafka 收发消息 [5]

5. 查看运行结果

在【日志】面板的 TaskManager 中查看收到的数据,可以看到已经关联到了 product_id 为1001的商品名称。

总结

Regular Joins 比较适合批量加载数据的场景,而当关联的右表为时常更新的维表时会出现关联不到的情况。此外,从上述运行结果可以看出:Regular Joins关联的记录为 Retract Stream(回撤流)下游需为 Upsert 类型 Sink。有一个特例:当 Regular Joins 的左右表均为 CDC Connector 时,比如左右表都是使用的 flink-connector-mysql-cdc 连接器时,由于 CDC(Change Data Capture) 的特性使得 Regular Joins 也可以正常关联到。

更多 SQL Join 详情请参考开源 Flink官方文章 SQL Join 章节 [5]。 更多 Flink 实践教程详见 流计算 Oceanus 教程 [6]

参考链接

[1] Oceanus 控制台:https://console.cloud.tencent.com/oceanus/overview

[2] 创建独享集群:https://cloud.tencent.com/document/product/849/48298

[3] Kafka 控制台:https://console.cloud.tencent.com/ckafka

[4] Logger Sink 下载地址:https://cloud.tencent.com/document/product/849/58713

[5] Flink SQL Join:https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/docs/dev/table/sql/queries/joins

[6] 流计算 Oceanus 教程:https://cloud.tencent.com/developer/tag/10509

0 人点赞