使用FLINK SQL从savepoint恢复hudi作业 (flink 1.13)

2022-01-19 09:45:39 浏览数 (1)

Flink从1.13版本开始支持在SQL Client从savepoint恢复作业。flink-savepoint介绍

接下来我们从Flink SQL Client构建一个mysql cdc数据经kafka入hudi数据湖的例子。整体流程如下:

在上述第二步中,我们通过手工停止kafka→hudi的Flink任务,然后在Flink SQL Client从savepoint进行恢复。

下述工作类似于Flink SQL Client实战CDC数据入湖只是本文的flink版本为1.13.1,可参考其完成本文验证。

环境依赖

hadoop 3.2.0

zookeeper 3.6.3

kafka 2.8.0

mysql 5.7.35

flink 1.13.1-scala_2.12

flink cdc 1.4

hudi 0.10.0-SNAPSHOT (最新master分支编译 2021/10/08)

datafaker 0.7.6

各组件说明见Flink SQL Client实战CDC数据入湖

组件说明、安装以及使用可直接在右上角搜索框使用关键词进行搜索。

操作指南

使用datafaker将测试数据导入mysql

  1. 在数据库中新建stu8表
代码语言:javascript复制
mysql -u root -p

create database test;
use test;
create table stu8 (
  id int unsigned auto_increment primary key COMMENT '自增id',
  name varchar(20) not null comment '学生名字',
  school varchar(20) not null comment '学校名字',
  nickname varchar(20) not null comment '学生小名',
  age int not null comment '学生年龄',
  score decimal(4,2) not null comment '成绩',
  class_num int not null comment '班级人数',
  phone bigint not null comment '电话号码',
  email varchar(64) comment '家庭网络邮箱',
  ip varchar(32) comment 'IP地址'
  ) engine=InnoDB default charset=utf8;Copy
  1. 新建meta.txt文件,文件内容为:
代码语言:javascript复制
id||int||自增id[:inc(id,1)]
name||varchar(20)||学生名字
school||varchar(20)||学校名字[:enum(qinghua,beida,shanghaijiaoda,fudan,xidian,zhongda)]
nickname||varchar(20)||学生小名[:enum(tom,tony,mick,rich,jasper)]
age||int||学生年龄[:age]
score||decimal(4,2)||成绩[:decimal(4,2,1)]
class_num||int||班级人数[:int(10, 100)]
phone||bigint||电话号码[:phone_number]
email||varchar(64)||家庭网络邮箱[:email]
ip||varchar(32)||IP地址[:ipv4]Copy
  1. 生成1000000条数据并写入到mysql中的test.stu8表(将数据设置尽量大,让写入hudi的任务能够不断进行)
代码语言:javascript复制
datafaker rdb mysql mysqldb://root:Pass-123-root@hadoop:3306/test?charset=utf8 stu8 1000000 --meta meta.txt Copy

hudi、flink-mysql-cdc、flink-kafka相关jar包下载

本文提供编译好的hudi-flink-bundle_2.12-0.10.0-SNAPSHOT.jar,如果你想自己编译hudi那么直接clone master分支进行编译即可。(注意指定hadoop版本)

将jar包下载到flink的lib目录下

代码语言:javascript复制
cd flink-1.13.1/lib
wget https://obs-githubhelper.obs.cn-east-3.myhuaweicloud.com/blog-images/category/bigdata/flink/flink-sql-client-savepoint-example/hudi-flink-bundle_2.12-0.10.0-SNAPSHOT.jar
wget https://repo1.maven.org/maven2/com/alibaba/ververica/flink-sql-connector-mysql-cdc/1.4.0/flink-sql-connector-mysql-cdc-1.4.0.jar
wget https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka_2.12/1.13.1/flink-sql-connector-kafka_2.12-1.13.1.jarCopy

如果你在启动以及运行flink任务中遇到缺少某些类问题,请下载相关jar包并放置到flink-1.13.1/lib目录下,本实验在操作过程中遇到的缺少的包如下(点击可下载):

  • commons-logging-1.2.jar
  • htrace-core-3.1.0-incubating.jar
  • htrace-core4-4.1.0-incubating.jar
  • hadoop-mapreduce-client-core-3.2.0.jar

在yarn上启动flink session集群

首先确保已经配置好HADOOP_CLASSPATH,对于开源版本hadoop3.2.0,可通过如下方式设置:

代码语言:javascript复制
export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:$HADOOP_HOME/share/hadoop/client/*:$HADOOP_HOME/share/hadoop/common/*:$HADOOP_HOME/share/hadoop/hdfs/*:$HADOOP_HOME/share/hadoop/mapreduce/*:$HADOOP_HOME/share/hadoop/tools/*:$HADOOP_HOME/share/hadoop/yarn/*:$HADOOP_HOME/etc/hadoop/*Copy

flink需要开启checkpoint,和配置savepoint目录,修改flink-conf.yaml配置文件

代码语言:javascript复制
execution.checkpointing.interval: 150000ms
state.backend: rocksdb
state.checkpoints.dir: hdfs://hadoop:9000/flink-chk
state.backend.rocksdb.localdir: /tmp/rocksdb
state.savepoints.dir: hdfs://hadoop:9000/flink-1.13-savepoints
Copy

启动flink session集群

代码语言:javascript复制
cd flink-1.13.1
bin/yarn-session.sh -s 4 -jm 2048 -tm 2048 -nm flink-hudi-test -dCopy

启动flink sql client

代码语言:javascript复制
cd flink-1.13.1
bin/sql-client.sh embedded -s yarn-session -j ./lib/hudi-flink-bundle_2.12-0.10.0-SNAPSHOT.jar shellCopy

flink读取mysql binlog并写入kafka

创建mysql源表

代码语言:javascript复制
create table stu8_binlog(
  id bigint not null,
  name string,
  school string,
  nickname string,
  age int not null,
  score decimal(4,2) not null,
  class_num int not null,
  phone bigint not null,
  email string,
  ip string
) with (
  'connector' = 'mysql-cdc',
  'hostname' = 'hadoop',
  'port' = '3306',
  'username' = 'root',
  'password' = 'Pass-123-root',
  'database-name' = 'test',
  'table-name' = 'stu8'
);Copy

创建kafka目标表

代码语言:javascript复制
create table stu8_binlog_sink_kafka(
  id bigint not null,
  name string,
  school string,
  nickname string,
  age int not null,
  score decimal(4,2) not null,
  class_num int not null,
  phone bigint not null,
  email string,
  ip string,
  primary key (id) not enforced
) with (
  'connector' = 'kafka'
  ,'topic' = 'cdc_mysql_test_stu8_sink'
  ,'properties.zookeeper.connect' = 'hadoop1:2181'
  ,'properties.bootstrap.servers' = 'hadoop1:9092'
  ,'format' = 'debezium-json'
);Copy

创建任务将mysql binlog日志写入kafka

代码语言:javascript复制
insert into stu8_binlog_sink_kafka
select * from stu8_binlog;Copy

flink读取kafka数据并写入hudi数据湖

创建kafka源表

代码语言:javascript复制
create table stu8_binlog_source_kafka(
  id bigint not null,
  name string,
  school string,
  nickname string,
  age int not null,
  score decimal(4,2) not null,
  class_num int not null,
  phone bigint not null,
  email string,
  ip string
 ) with (
  'connector' = 'kafka',
  'topic' = 'cdc_mysql_test_stu8_sink',
  'properties.bootstrap.servers' = 'hadoop1:9092',
  'format' = 'debezium-json',
  'scan.startup.mode' = 'earliest-offset',
  'properties.group.id' = 'testGroup'
  );Copy

创建hudi目标表

代码语言:javascript复制
 create table stu8_binlog_sink_hudi(
  id bigint not null,
  name string,
  `school` string,
  nickname string,
  age int not null,
  score decimal(4,2) not null,
  class_num int not null,
  phone bigint not null,
  email string,
  ip string,
  primary key (id) not enforced
)
 partitioned by (`school`)
 with (
  'connector' = 'hudi',
  'path' = 'hdfs://hadoop:9000/tmp/test_stu8_binlog_sink_hudi',
  'table.type' = 'MERGE_ON_READ',
  'write.precombine.field' = 'school'
  );Copy

创建任务将kafka数据写入到hudi中

代码语言:javascript复制
insert into stu8_binlog_sink_hudi
select * from  stu8_binlog_source_kafka;Copy

待任务运行一段时间后,我们手动保存hudi作业并停止任务

代码语言:javascript复制
bin/flink stop --savepointPath hdfs://hadoop:9000/flink-1.13-savepoint/ 0128b183276022367e15b017cb682d61 -yid application_1633660054258_0001Copy

得到该任务的具体savepoint路径:

代码语言:javascript复制
[root@hadoop flink]# bin/flink stop --savepointPath hdfs://hadoop:9000/flink-1.13-savepoint/ 0128b183276022367e15b017cb682d61 -yid application_1633660054258_0001
2021-10-07 22:55:51,171 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                [] - Found Yarn properties file under /tmp/.yarn-properties-root.
2021-10-07 22:55:51,171 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                [] - Found Yarn properties file under /tmp/.yarn-properties-root.
Suspending job "0128b183276022367e15b017cb682d61" with a savepoint.
2021-10-07 22:55:51,317 WARN  org.apache.flink.yarn.configuration.YarnLogConfigUtil        [] - The configuration directory ('/data/flink-1.13.1/conf') already contains a LOG4J config file.If you want to use logback, then please delete or rename the log configuration file.
2021-10-07 22:55:51,364 INFO  org.apache.hadoop.yarn.client.RMProxy                        [] - Connecting to ResourceManager at hadoop/192.168.241.128:8032
2021-10-07 22:55:51,504 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2021-10-07 22:55:51,506 WARN  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Neither the HADOOP_CONF_DIR nor the YARN_CONF_DIR environment variable is set.The Flink YARN Client needs one of these to be set to properly load the Hadoop configuration for accessing YARN.
2021-10-07 22:55:51,567 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Found Web Interface hadoop:34681 of application 'application_1633660054258_0001'.
Savepoint completed. Path: hdfs://hadoop:9000/flink-1.13-savepoint/savepoint-0128b1-8970a7371adbCopy

从savepoint恢复任务:(在Flink SQL Client执行)

代码语言:javascript复制
 SET execution.savepoint.path=hdfs://hadoop:9000/flink-1.13-savepoint/savepoint-0128b1-8970a7371adb

 insert into stu8_binlog_sink_hudi
select * from  stu8_binlog_source_kafka;Copy

可以看到该任务从上述检查点恢复:

本文为从大数据到人工智能博主「xiaozhch5」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。

原文链接:https://cloud.tencent.com/developer/article/1936582

0 人点赞