用户投稿 | Dinky 从保存点恢复 FlinkSQL 作业

2023-10-24 19:07:15 浏览数 (1)

导读:本文来自用户投稿,介绍了 Dinky 如何通过 SavePoint 来恢复 FlinkSQL 作业。

社区公告:

问题反馈、代码提交、文章投稿与社区贡献请移步 Github issue。

Github issue #66 登记企业或组织生产使用,可邀请至企业支持群,获取团队技术支持与其他企业用户的经验分享。

社区团队正在构建 Dinky 1.0 全新版本,带来更多创新实用功能,欢迎参与贡献,共建共赢。

GitHub 地址

https://github.com/DataLinkDC/dinky

https://gitee.com/DataLinkDC/Dinky

欢迎大家为 Dinky 送上小星星~

一、运行环境

说明项

内容

hadoop 版本

hadoop-3.1.4

Flink 任务执行模式

Yarn Session

Flink 版本

Flink-1.17.0

Dinky 版本

dlink-release-0.7.4

Kafka 版本

kafka_2.12-3.5.1

Kafka 运行模式

kraft

Mysql 版本

5.7.28

HDFS 集群、YARN 集群、Dinky 环境的搭建和启动,这里略过,假设已经完成。

Dinky 所需 Jar 包

在本用例中,以 Kafka 作为 source,以 MySQL 作为 sink;

把 Kafka 的依赖包放到 dlink-release-0.7.4/plugins/flink1.17 下,另外还增加:

  • flink-connector-jdbc-1.17.0.jar
  • flink-sql-connector-kafka-1.17.0.jar

Flink 配置中指定 Savepoint 存储路径

修改Flink家目录下 flink/conf/flink-conf.yaml 文件,指定savepoint目录位置。

代码语言:javascript复制
# Default target directory for savepoints, optional.
#
# state.savepoints.dir: hdfs://namenode-host:port/flink-savepoints
state.savepoints.dir: hdfs://bd171:8020/sp

二、在 Dinky 中恢复 FlinkSQL 作业

创建 Yarn Session 集群

在 Flink 根目录下执行以下命令向 Yarn 集群申请资源,开启一个Yarn 会话,启动 Flink 集群:

代码语言:javascript复制
./bin/yarn-session.sh -d -nm ww

可以在 Yarn Web UI 中看到我们新启动的 Yarn 会话:

参数说明:

  • -d:分离模式,如果你不想让Flink YARN客户端一直前台运行,可以使用这个参数,即使关掉当前对话窗口,YARN session也可以后台运行。
  • -nm(--name):配置在YARN UI界面上显示的任务名。

编写 FlinkSQL 作业

在编辑器中输入以下内容:

代码语言:javascript复制
SET pipeline.operator-chaining = false;
DROP TABLE IF EXISTS employees_kafka;
CREATE TABLE IF NOT EXISTS employees_kafka (
    `emp_no` INT NOT NULL,
    `birth_date` DATE,
    `first_name` STRING,
    `last_name` STRING,
    `gender` STRING,
    `hire_date` DATE
) WITH (
    'connector' = 'kafka',
    'topic' = 'flink-cdc-kafka',
    'properties.bootstrap.servers' = 'bd171:19092,bd172:19092,bd173:19092',
    'properties.group.id' = 'flink-cdc-kafka-group',
    'format' = 'json',
    'scan.startup.mode' = 'latest-offset'
);
CREATE TABLE IF NOT EXISTS employees_sink (
    `emp_no` INT NOT NULL,
    `birth_date` DATE,
    `first_name` STRING,
    `last_name` STRING,
    `gender` STRING,
    `hire_date` DATE,
    PRIMARY KEY (`emp_no`) NOT ENFORCED
) WITH (
    'connector' = 'jdbc-newtec',
    'url' = 'jdbc:mysql://mysql201:3306/employees?serverTimezone=UTC&useUnicode=true&characterEncoding=utf-8&useSSL=false',
    'table-name' = 'employees_kafka_sink',
    'driver' = 'com.mysql.cj.jdbc.Driver', 
    'username' = 'root', 
    'password' = '****' 
    );
insert into
    employees_sink
select
    emp_no,
    birth_date,
    first_name,
    last_name,
    gender,
    hire_date
from
    employees_kafka;

同时注意右边 SavePoint 策略,选择 “最近一次”,然后运行这个作业:

此时我们向kafka相关topic插入300条记录,随后这些数据写到了MySQL数据库的相关表里:

SavePoint 停止 FlinkSQL 作业

点击 Dinky 的运维中心菜单,在任务列表里点击上面运行的这个任务进入任务详情页面,在页面右上角点击三个点的省略号按钮,弹出框中点击 “SavePoint停止”:

在 HDFS 中可以看到相关的 SavePoint 保存记录:

点击链接查看:

在Dinky 的运维中心,任务列表,任务详情页面,作业快照sheet下面的SavePoint 这个 Sheet 下,也可以看到 SavePoint 保存的路径信息:

在 Dinky 的数据开发的作业中, 右边“保存点”栏也可以查看到 savepoint 记录:

向 Kafka 相关 topic 写入 300 条数据

FlinlSQL 作业当前是停止状态,此时,向 Kafka 相关 Topic 写入300条记录。

重启作业

在 Dinky 的运维中心,任务列表,任务详情页面,重启任务;任务重启完成后,可以看到,FlinlSQL 作业实现了从 SavePoint 中的状态恢复,找到 Kafka 的正确偏移,在任务停止期间进行 Kafka 相关 Topic 中的数据,被 FlinkSQL 作业找到并读到到,最终写到了任务的 Sink,MySQL 数据库的相关表里:

三、结论

Dinky 这个图形化的 FlinkSQL 开发工具,不仅简化了 FlinkSQL 的开发调试,还集成了对从 SavePoint 恢复作业运行的支持,非常方便。

0 人点赞