【BUG】Flink CDC 2.0.0迷之异常!!!

2022-05-17 15:59:14 浏览数 (1)

一、场景还原

基于 Flink CDC 的 SQL Api 实现实时监听 MySQL 的 binlog 数据发送到 Kafka

二、框架版本

框架

版本

Flink

1.13.2

MySQL

5.7.25

connector-mysql-cdc

2.0.0

三、测试代码
代码语言:javascript复制
public class CDCWithSqlTest {
    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.enableCheckpointing(120000, CheckpointingMode.EXACTLY_ONCE);
        env.getCheckpointConfig().setCheckpointTimeout(60000);
        env.getCheckpointConfig()
            .enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        env.setStateBackend(new FsStateBackend("hdfs://namenode_ip:8020/data/checkpoint/flink_cdc/"));
        System.setProperty("HADOOP_USER_NAME", "hdfs");
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        String sourceDDL = "create table cdc_from_mysql("  
                "  id      INT                      "  
                " ,name    STRING                   "  
                " ,PRIMARY KEY(id)     NOT enforced "  
                " ) WITH ( "  
                " 'connector'                         = 'mysql-cdc'      ,"  
                " 'scan.startup.mode'                 = 'latest-offset'  ,"  
                " 'server-time-zone'                  = 'Asia/Shanghai'  ,"  
                " 'scan.incremental.snapshot.enabled' = 'true'           ,"  
                " 'hostname'                          = 'mysql_ip'       ,"  
                " 'port'                              = 'mysql_port'     ,"  
                " 'username'                          = 'mysql_username' ,"  
                " 'password'                          = 'mysql_password' ,"  
                " 'database-name'                     = 'mysql_databse'  ,"  
                " 'table-name'                        = 'mysql_table'    ,"  
                " 'server-id'                         = '5400'       "  
                " ) ";
        tableEnv.executeSql(sourceDDL);

        String sinkDDL = "create table cdc_to_kafka("  
                "  id      INT                      "  
                " ,name    STRING                   "  
                " ,PRIMARY KEY(id)     NOT enforced "  
                " ) WITH ( "  
                " 'connector'                    = 'upsert-kafka'       ,"  
                " 'topic'                        = 'ZGN_CDC_TEST'       ,"  
                " 'properties.bootstrap.servers' = 'kafka_ip:9092' ,"  
                " 'key.json.ignore-parse-errors' = 'true'               ,"  
                " 'key.format'                   = 'json'               ,"  
                " 'value.format'                 = 'json'               ,"  
                " 'value.fields-include'         = 'ALL'                 "  
                " ) ";
        tableEnv.executeSql(sinkDDL);

        tableEnv.executeSql("INSERT INTO cdc_to_kafka SELECT * FROM cdc_from_mysql");
    }
}
四、BUG 重现
1.先向 MySQL 插入几条数据
1.1 MySQL 端

id

name

1

1

2

2

3

3

1.2.控制台消费 Kafka 数据
代码语言:javascript复制
kafka-console-consumer.sh --bootstrap-server kafka_ip:9092 --topic ZGN_CDC_TEST

{"id":1,"name":"1"}
{"id":2,"name":"2"}
{"id":3,"name":"3"}
2.模拟 Flink 任务失败(停止 Flink 任务)

我这里直接通过Web UI Cancel掉任务

3.继续向 MySQL插入数据

id

name

1(上次添加)

1(上次添加)

2(上次添加)

2(上次添加)

3(上次添加)

3(上次添加)

4(此次添加)

4(此次添加)

5(此次添加)

5(此次添加)

6(此次添加)

6(此次添加)

4.从检查点重启 Flink 任务,控制台继续观测消费 Kafka 数据
代码语言:javascript复制
kafka-console-consumer.sh --bootstrap-server kafka_ip:9092 --topic ZGN_CDC_TEST

{"id":1,"name":"1"}
{"id":2,"name":"2"}
{"id":3,"name":"3"}

----------------任务启停的分界线------------------

{"id":4,"name":"4"}
{"id":5,"name":"5"}
{"id":6,"name":"6"}
{"id":4,"name":"4"}    -- 异常: 数据产生了重复消费
{"id":5,"name":"5"}    -- 异常: 数据产生了重复消费
{"id":6,"name":"6"}    -- 异常: 数据产生了重复消费
{"id":4,"name":"4"}    -- 异常: 数据产生了重复消费
{"id":5,"name":"5"}    -- 异常: 数据产生了重复消费
{"id":6,"name":"6"}    -- 异常: 数据产生了重复消费
{"id":4,"name":"4"}    -- 异常: 数据产生了重复消费
{"id":5,"name":"5"}    -- 异常: 数据产生了重复消费
{"id":6,"name":"6"}    -- 异常: 数据产生了重复消费
......
五、错误日志
代码语言:javascript复制
java.lang.RuntimeException: One or more fetchers have encountered exception
......
Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received unexpected exception while polling the records
......
Caused by: org.apache.kafka.connect.errors.ConnectException: An exception occurred in the change event producer. This connector will be stopped.
......
Caused by: io.debezium.DebeziumException: Failed to deserialize data of EventHeaderV4{timestamp=1639381876000, eventType=EXT_WRITE_ROWS, serverId=999, headerLength=19, dataLength=23, nextPosition=356319039, flags=0}
......
Caused by: com.github.shyiko.mysql.binlog.event.deserialization.EventDataDeserializationException: Failed to deserialize data of EventHeaderV4{timestamp=1639381876000, eventType=EXT_WRITE_ROWS, serverId=999, headerLength=19, dataLength=23, nextPosition=356319039, flags=0}
......
Caused by: com.github.shyiko.mysql.binlog.event.deserialization.MissingTableMapEventException: No TableMapEventData has been found for table id:1800034. Usually that means that you have started reading binary log 'within the logical event group' (e.g. from WRITE_ROWS and not proceeding TABLE_MAP
......
四、问题排查
1.从控制台消费 kafka 的数据来看

预取的数据是只消费一次 {"id":4,"name":"4"} {"id":5,"name":"5"} {"id":6,"name":"6"} 数据,但是事实却是一直在重复消费,怀疑重启后的 Flink CDC 程序不能很好的解析存储在 hdfs 中的检查点信息

2.从报错日志来看

主要报的错就是反序列化 MySQL 的 binlog 有问题,很难于上述的猜测达成一致

3.从 Flink CDC 社区查阅了 issue,没找到相类似错误
4.从 Flink CDC 的项目地址,发现在 2.0.1 版本修复了一个问题(第10条)
代码语言:javascript复制
Improvements and Bug
1.[postgres] Fix Validator didn't implement Serializable
2.[mysql] Correct the initial binlog offset for MySqlHybridSplitAssigner
3.[mysql] Optimize the checkpoint be optional under single parallelism
4.[postgres] Fix postgres-cdc connector cannot recognize the optional option 'slot.name'
5.[mysql] Improve the code format in SignalEventDispatcher
6.[mysql] Add default value for 'database.history.instance.name' in MySqlParallelSource
7.[mysql] Add tests to check mysql-cdc works well under various timezones
8.[common] Remove useless parameter 'converter.schemas.enable'
9.[build] Run integration tests for each building
10.[changelog] fix changelog-deserialize exception message typo
11.[docs] Add FAQ for MySQL 8.0 Public Key Retrieval setting
12.[docs] Update the debezium document link to version 1.5
13.[docs] Add checkpoint and primary key setting for example in tutorials

在 2.0.1 版本修复了日志变更反序列化的异常,刚好能对应的上报错日志的信息,因此,定位到此结束

五、解决方案

将 Flink CDC 版本做一次升级,从 2.0.0 -> 2.0.2

0 人点赞