一、场景还原
基于 Flink CDC 的 SQL Api 实现实时监听 MySQL 的 binlog 数据发送到 Kafka
二、框架版本
框架 | 版本 |
---|---|
Flink | 1.13.2 |
MySQL | 5.7.25 |
connector-mysql-cdc | 2.0.0 |
三、测试代码
代码语言:javascript复制public class CDCWithSqlTest {
public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.enableCheckpointing(120000, CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setCheckpointTimeout(60000);
env.getCheckpointConfig()
.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
env.setStateBackend(new FsStateBackend("hdfs://namenode_ip:8020/data/checkpoint/flink_cdc/"));
System.setProperty("HADOOP_USER_NAME", "hdfs");
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
String sourceDDL = "create table cdc_from_mysql("
" id INT "
" ,name STRING "
" ,PRIMARY KEY(id) NOT enforced "
" ) WITH ( "
" 'connector' = 'mysql-cdc' ,"
" 'scan.startup.mode' = 'latest-offset' ,"
" 'server-time-zone' = 'Asia/Shanghai' ,"
" 'scan.incremental.snapshot.enabled' = 'true' ,"
" 'hostname' = 'mysql_ip' ,"
" 'port' = 'mysql_port' ,"
" 'username' = 'mysql_username' ,"
" 'password' = 'mysql_password' ,"
" 'database-name' = 'mysql_databse' ,"
" 'table-name' = 'mysql_table' ,"
" 'server-id' = '5400' "
" ) ";
tableEnv.executeSql(sourceDDL);
String sinkDDL = "create table cdc_to_kafka("
" id INT "
" ,name STRING "
" ,PRIMARY KEY(id) NOT enforced "
" ) WITH ( "
" 'connector' = 'upsert-kafka' ,"
" 'topic' = 'ZGN_CDC_TEST' ,"
" 'properties.bootstrap.servers' = 'kafka_ip:9092' ,"
" 'key.json.ignore-parse-errors' = 'true' ,"
" 'key.format' = 'json' ,"
" 'value.format' = 'json' ,"
" 'value.fields-include' = 'ALL' "
" ) ";
tableEnv.executeSql(sinkDDL);
tableEnv.executeSql("INSERT INTO cdc_to_kafka SELECT * FROM cdc_from_mysql");
}
}
四、BUG 重现
1.先向 MySQL 插入几条数据
1.1 MySQL 端
id | name |
---|---|
1 | 1 |
2 | 2 |
3 | 3 |
1.2.控制台消费 Kafka 数据
代码语言:javascript复制kafka-console-consumer.sh --bootstrap-server kafka_ip:9092 --topic ZGN_CDC_TEST
{"id":1,"name":"1"}
{"id":2,"name":"2"}
{"id":3,"name":"3"}
2.模拟 Flink 任务失败(停止 Flink 任务)
我这里直接通过Web UI Cancel掉任务
3.继续向 MySQL插入数据
id | name |
---|---|
1(上次添加) | 1(上次添加) |
2(上次添加) | 2(上次添加) |
3(上次添加) | 3(上次添加) |
4(此次添加) | 4(此次添加) |
5(此次添加) | 5(此次添加) |
6(此次添加) | 6(此次添加) |
4.从检查点重启 Flink 任务,控制台继续观测消费 Kafka 数据
代码语言:javascript复制kafka-console-consumer.sh --bootstrap-server kafka_ip:9092 --topic ZGN_CDC_TEST
{"id":1,"name":"1"}
{"id":2,"name":"2"}
{"id":3,"name":"3"}
----------------任务启停的分界线------------------
{"id":4,"name":"4"}
{"id":5,"name":"5"}
{"id":6,"name":"6"}
{"id":4,"name":"4"} -- 异常: 数据产生了重复消费
{"id":5,"name":"5"} -- 异常: 数据产生了重复消费
{"id":6,"name":"6"} -- 异常: 数据产生了重复消费
{"id":4,"name":"4"} -- 异常: 数据产生了重复消费
{"id":5,"name":"5"} -- 异常: 数据产生了重复消费
{"id":6,"name":"6"} -- 异常: 数据产生了重复消费
{"id":4,"name":"4"} -- 异常: 数据产生了重复消费
{"id":5,"name":"5"} -- 异常: 数据产生了重复消费
{"id":6,"name":"6"} -- 异常: 数据产生了重复消费
......
五、错误日志
代码语言:javascript复制java.lang.RuntimeException: One or more fetchers have encountered exception
......
Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received unexpected exception while polling the records
......
Caused by: org.apache.kafka.connect.errors.ConnectException: An exception occurred in the change event producer. This connector will be stopped.
......
Caused by: io.debezium.DebeziumException: Failed to deserialize data of EventHeaderV4{timestamp=1639381876000, eventType=EXT_WRITE_ROWS, serverId=999, headerLength=19, dataLength=23, nextPosition=356319039, flags=0}
......
Caused by: com.github.shyiko.mysql.binlog.event.deserialization.EventDataDeserializationException: Failed to deserialize data of EventHeaderV4{timestamp=1639381876000, eventType=EXT_WRITE_ROWS, serverId=999, headerLength=19, dataLength=23, nextPosition=356319039, flags=0}
......
Caused by: com.github.shyiko.mysql.binlog.event.deserialization.MissingTableMapEventException: No TableMapEventData has been found for table id:1800034. Usually that means that you have started reading binary log 'within the logical event group' (e.g. from WRITE_ROWS and not proceeding TABLE_MAP
......
四、问题排查
1.从控制台消费 kafka 的数据来看
预取的数据是只消费一次 {"id":4,"name":"4"} {"id":5,"name":"5"} {"id":6,"name":"6"} 数据,但是事实却是一直在重复消费,怀疑重启后的 Flink CDC 程序不能很好的解析存储在 hdfs 中的检查点信息
2.从报错日志来看
主要报的错就是反序列化 MySQL 的 binlog 有问题,很难于上述的猜测达成一致
3.从 Flink CDC 社区查阅了 issue,没找到相类似错误
4.从 Flink CDC 的项目地址,发现在 2.0.1 版本修复了一个问题(第10条)
代码语言:javascript复制Improvements and Bug
1.[postgres] Fix Validator didn't implement Serializable
2.[mysql] Correct the initial binlog offset for MySqlHybridSplitAssigner
3.[mysql] Optimize the checkpoint be optional under single parallelism
4.[postgres] Fix postgres-cdc connector cannot recognize the optional option 'slot.name'
5.[mysql] Improve the code format in SignalEventDispatcher
6.[mysql] Add default value for 'database.history.instance.name' in MySqlParallelSource
7.[mysql] Add tests to check mysql-cdc works well under various timezones
8.[common] Remove useless parameter 'converter.schemas.enable'
9.[build] Run integration tests for each building
10.[changelog] fix changelog-deserialize exception message typo
11.[docs] Add FAQ for MySQL 8.0 Public Key Retrieval setting
12.[docs] Update the debezium document link to version 1.5
13.[docs] Add checkpoint and primary key setting for example in tutorials
在 2.0.1 版本修复了日志变更反序列化的异常,刚好能对应的上报错日志的信息,因此,定位到此结束
五、解决方案
将 Flink CDC 版本做一次升级,从 2.0.0 -> 2.0.2