Flink CDC我吃定了耶稣也留不住他!| Flink CDC线上问题小盘点

2021-07-30 10:34:22 浏览数 (1)

我在之前的文章中已经详细的介绍过Flink CDC的原理和实践了。

如果你对Flink CDC 还没有什么概念,可以参考这里:Flink CDC 原理及生产实践

在实际生产中相信已经有很多小伙伴尝试过了,我在这里将一些个人遇到的、搜索到的、官方博客中总结的以及在Flink的邮件组中的看到过的一些常见问题进行了总结。供大家参考。

不同的kafka版本依赖冲突

不同的kafka版本依赖冲突会造成cdc报错,参考这个issue:

http://apache-flink.147419.n8.nabble.com/cdc-td8357.html#a8393

代码语言:javascript复制
2020-11-04 16:39:10.972 [Source: Custom Source -> Sink: Print to Std. Out (1/1)] WARN  org.apache.flink.runtime.taskmanager.Task  - Source: Custom Source -> Sink: Print to Std. Out (1/1) (7c3ccf7686ccfb33254e8cb785cd339d) switched from RUNNING to FAILED.
java.lang.AbstractMethodError: org.apache.kafka.connect.json.JsonSerializer.configure(Ljava/util/Map;Z)V
at org.apache.kafka.connect.json.JsonConverter.configure(JsonConverter.java:300)
at org.apache.kafka.connect.json.JsonConverter.configure(JsonConverter.java:311)
at io.debezium.embedded.EmbeddedEngine.<init>(EmbeddedEngine.java:583)
at io.debezium.embedded.EmbeddedEngine.<init>(EmbeddedEngine.java:80)
at io.debezium.embedded.EmbeddedEngine$BuilderImpl.build(EmbeddedEngine.java:301)
at io.debezium.embedded.EmbeddedEngine$BuilderImpl.build(EmbeddedEngine.java:217)
at io.debezium.embedded.ConvertingEngineBuilder.build(ConvertingEngineBuilder.java:139)
at com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction.run(DebeziumSourceFunction.java:299)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:213)

源码如下:

代码语言:javascript复制
public class CdcTest {
    public static void main(String[] args) throws Exception {
        SourceFunction<String> sourceFunction = MySQLSource.<String>builder()
                .hostname("localhost")
                .port(3306)
                .databaseList("sohay") // monitor all tables under inventory database
                .username("root")
                .password("123456")
                .deserializer(new StringDebeziumDeserializationSchema()) // converts SourceRecord to String
                .build();

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.addSource(sourceFunction).print().setParallelism(1); // use parallelism 1 for sink to keep message ordering

        env.execute();
    }
}

确实是pom中存在一个Kafka的依赖包,导致冲突,去掉后问题解决。

MySQL CDC源等待超时

在扫描表期间,由于没有可恢复的位置,因此无法执行checkpoints。为了不执行检查点,MySQL CDC源将保持检查点等待超时。超时检查点将被识别为失败的检查点,默认情况下,这将触发Flink作业的故障转移。因此,如果数据库表很大,则建议添加以下Flink配置,以避免由于超时检查点而导致故障转移:

代码语言:javascript复制
execution.checkpointing.interval: 10min
execution.checkpointing.tolerable-failed-checkpoints: 100
restart-strategy: fixed-delay
restart-strategy.fixed-delay.attempts: 2147483647
数据库切换,重新开启binlog,Mysql全局锁无法释放

原因是因为切换了数据库环境,重新开启binlog,所有的作业都重新同步binlog的全量数据,导致了全局锁一直在等待,所有作业都无法执行。解决方法:记录checkpoint的地址,取消作业,然后根据checkpoint重启作业。

使用Flink SQL CDC模式创建维表异常
代码语言:javascript复制
CREATE TABLE cdc_test
(
    id  STRING,
    ip  STRING,
    url STRING,
    PRIMARY KEY (id) NOT ENFORCED
) WITH (
    'connector' = 'mysql-cdc', 
    'hostname' = '127.0.0.1',
    'port' = '3306',
    'database-name' = 'xx',
    'table-name' = 'xx',
    'username' = 'xx',
    'password' = 'xx'
);

执行查询:

代码语言:javascript复制
SELECT * FROM cdc_test;

任务无法运行,抛出异常

代码语言:javascript复制
User does not have the 'LOCK TABLES' privilege required to obtain a consistent snapshot by preventing concurrent writes to tables.

原因是连接MySQL的用户缺乏必要的CDC权限。

Flink SQL CDC基于Debezium实现。当启动MySQL CDC源时,它将获取一个全局读取锁(FLUSH TABLES WITH READ LOCK),该锁将阻止其他数据库的写入,然后读取当前binlog位置以及数据库和表的schema,之后将释放全局读取锁。然后它扫描数据库表并从先前记录的位置读取binlog,Flink将定期执行checkpoints以记录binlog位置。如果发生故障,作业将重新启动并从checkpoint完成的binlog位置恢复,因此它保证了仅一次的语义。

解决办法:创建一个新的MySQL用户并授予其必要的权限。

代码语言:javascript复制
mysql> CREATE USER 'user'@'localhost' IDENTIFIED BY 'password';
mysql> GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'user' IDENTIFIED BY 'password';
mysql> FLUSH PRIVILEGES;
Flink作业扫描MySQL全量数据出现fail-over

Flink 作业在扫描 MySQL 全量数据时,checkpoint 超时,出现作业 failover,如下图:

原因:Flink CDC 在 scan 全表数据(我们的实收表有千万级数据)需要小时级的时间(受下游聚合反压影响),而在 scan 全表过程中是没有 offset 可以记录的(意味着没法做 checkpoint),但是 Flink 框架任何时候都会按照固定间隔时间做 checkpoint,所以此处 mysql-cdc source 做了比较取巧的方式,即在 scan 全表的过程中,会让执行中的 checkpoint 一直等待甚至超时。超时的 checkpoint 会被仍未认为是 failed checkpoint,默认配置下,这会触发 Flink 的 failover 机制,而默认的 failover 机制是不重启。所以会造成上面的现象。

解决办法:在 flink-conf.yaml 配置 failed checkpoint 容忍次数,以及失败重启策略,如下:

代码语言:javascript复制
execution.checkpointing.interval: 10min   # checkpoint间隔时间
execution.checkpointing.tolerable-failed-checkpoints: 100  # checkpoint 失败容忍次数
restart-strategy: fixed-delay  # 重试策略
restart-strategy.fixed-delay.attempts: 2147483647   # 重试次数
作业在运行时 mysql cdc source 报 no viable alternative at input 'alter table std'

原因:因为数据库中别的表做了字段修改,CDC source 同步到了 ALTER DDL 语句,但是解析失败抛出的异常。

解决方法:在 flink-cdc-connectors 最新版本中已经修复该问题(跳过了无法解析的 DDL)。升级 connector jar 包到最新版本 1.1.0:flink-sql-connector-mysql-cdc-1.1.0.jar,替换 flink/lib 下的旧包。

多个作业共用同一张 source table 时,没有修改 server id 导致读取出来的数据有丢失。

原因:MySQL binlog 数据同步的原理是,CDC source 会伪装成 MySQL 集群的一个 slave(使用指定的 server id 作为唯一 id),然后从 MySQL 拉取 binlog 数据。如果一个 MySQL 集群中有多个 slave 有同样的 id,就会导致拉取数据错乱的问题。

解决方法:默认会随机生成一个 server id,容易有碰撞的风险。所以建议使用动态参数(table hint)在 query 中覆盖 server id。如下所示:

代码语言:javascript复制
FROM bill_info /*  OPTIONS('server-id'='123456') */ ;
CDC source 扫描 MySQL 表期间,发现无法往该表 insert 数据

原因:由于使用的 MySQL 用户未授权 RELOAD 权限,导致无法获取全局读锁(FLUSH TABLES WITH READ LOCK), CDC source 就会退化成表级读锁,而使用表级读锁需要等到全表 scan 完,才能释放锁,所以会发现持锁时间过长的现象,影响其他业务写入数据。

解决方法:给使用的 MySQL 用户授予 RELOAD 权限即可。所需的权限列表详见文档:

代码语言:javascript复制
https://github.com/ververica/flink-cdc-connectors/wiki/mysql-cdc-connector#setup-mysql-server

如果出于某些原因无法授予 RELOAD 权限,也可以显式配上 'debezium.snapshot.locking.mode' = 'none'来避免所有锁的获取,但要注意只有当快照期间表的 schema 不会变更才安全。

0 人点赞