导读:本文来自用户投稿,介绍了 Dinky 如何通过 SavePoint 来恢复 FlinkSQL 作业。
社区公告:
问题反馈、代码提交、文章投稿与社区贡献请移步 Github issue。
Github issue #66 登记企业或组织生产使用,可邀请至企业支持群,获取团队技术支持与其他企业用户的经验分享。
社区团队正在构建 Dinky 1.0 全新版本,带来更多创新实用功能,欢迎参与贡献,共建共赢。
GitHub 地址
https://github.com/DataLinkDC/dinky
https://gitee.com/DataLinkDC/Dinky
欢迎大家为 Dinky 送上小星星~
一、运行环境
说明项 | 内容 |
---|---|
hadoop 版本 | hadoop-3.1.4 |
Flink 任务执行模式 | Yarn Session |
Flink 版本 | Flink-1.17.0 |
Dinky 版本 | dlink-release-0.7.4 |
Kafka 版本 | kafka_2.12-3.5.1 |
Kafka 运行模式 | kraft |
Mysql 版本 | 5.7.28 |
HDFS 集群、YARN 集群、Dinky 环境的搭建和启动,这里略过,假设已经完成。
Dinky 所需 Jar 包
在本用例中,以 Kafka 作为 source,以 MySQL 作为 sink;
把 Kafka 的依赖包放到 dlink-release-0.7.4/plugins/flink1.17 下,另外还增加:
- flink-connector-jdbc-1.17.0.jar
- flink-sql-connector-kafka-1.17.0.jar
Flink 配置中指定 Savepoint 存储路径
修改Flink家目录下 flink/conf/flink-conf.yaml 文件,指定savepoint目录位置。
代码语言:javascript复制# Default target directory for savepoints, optional.
#
# state.savepoints.dir: hdfs://namenode-host:port/flink-savepoints
state.savepoints.dir: hdfs://bd171:8020/sp
二、在 Dinky 中恢复 FlinkSQL 作业
创建 Yarn Session 集群
在 Flink 根目录下执行以下命令向 Yarn 集群申请资源,开启一个Yarn 会话,启动 Flink 集群:
代码语言:javascript复制./bin/yarn-session.sh -d -nm ww
可以在 Yarn Web UI 中看到我们新启动的 Yarn 会话:
参数说明:
- -d:分离模式,如果你不想让Flink YARN客户端一直前台运行,可以使用这个参数,即使关掉当前对话窗口,YARN session也可以后台运行。
- -nm(--name):配置在YARN UI界面上显示的任务名。
编写 FlinkSQL 作业
在编辑器中输入以下内容:
代码语言:javascript复制SET pipeline.operator-chaining = false;
DROP TABLE IF EXISTS employees_kafka;
CREATE TABLE IF NOT EXISTS employees_kafka (
`emp_no` INT NOT NULL,
`birth_date` DATE,
`first_name` STRING,
`last_name` STRING,
`gender` STRING,
`hire_date` DATE
) WITH (
'connector' = 'kafka',
'topic' = 'flink-cdc-kafka',
'properties.bootstrap.servers' = 'bd171:19092,bd172:19092,bd173:19092',
'properties.group.id' = 'flink-cdc-kafka-group',
'format' = 'json',
'scan.startup.mode' = 'latest-offset'
);
CREATE TABLE IF NOT EXISTS employees_sink (
`emp_no` INT NOT NULL,
`birth_date` DATE,
`first_name` STRING,
`last_name` STRING,
`gender` STRING,
`hire_date` DATE,
PRIMARY KEY (`emp_no`) NOT ENFORCED
) WITH (
'connector' = 'jdbc-newtec',
'url' = 'jdbc:mysql://mysql201:3306/employees?serverTimezone=UTC&useUnicode=true&characterEncoding=utf-8&useSSL=false',
'table-name' = 'employees_kafka_sink',
'driver' = 'com.mysql.cj.jdbc.Driver',
'username' = 'root',
'password' = '****'
);
insert into
employees_sink
select
emp_no,
birth_date,
first_name,
last_name,
gender,
hire_date
from
employees_kafka;
同时注意右边 SavePoint 策略,选择 “最近一次”,然后运行这个作业:
此时我们向kafka相关topic插入300条记录,随后这些数据写到了MySQL数据库的相关表里:
SavePoint 停止 FlinkSQL 作业
点击 Dinky 的运维中心菜单,在任务列表里点击上面运行的这个任务进入任务详情页面,在页面右上角点击三个点的省略号按钮,弹出框中点击 “SavePoint停止”:
在 HDFS 中可以看到相关的 SavePoint 保存记录:
点击链接查看:
在Dinky 的运维中心,任务列表,任务详情页面,作业快照sheet下面的SavePoint 这个 Sheet 下,也可以看到 SavePoint 保存的路径信息:
在 Dinky 的数据开发的作业中, 右边“保存点”栏也可以查看到 savepoint 记录:
向 Kafka 相关 topic 写入 300 条数据
FlinlSQL 作业当前是停止状态,此时,向 Kafka 相关 Topic 写入300条记录。
重启作业
在 Dinky 的运维中心,任务列表,任务详情页面,重启任务;任务重启完成后,可以看到,FlinlSQL 作业实现了从 SavePoint 中的状态恢复,找到 Kafka 的正确偏移,在任务停止期间进行 Kafka 相关 Topic 中的数据,被 FlinkSQL 作业找到并读到到,最终写到了任务的 Sink,MySQL 数据库的相关表里:
三、结论
Dinky 这个图形化的 FlinkSQL 开发工具,不仅简化了 FlinkSQL 的开发调试,还集成了对从 SavePoint 恢复作业运行的支持,非常方便。