我在之前的文章中已经详细的介绍过Flink CDC的原理和实践了。
如果你对Flink CDC 还没有什么概念,可以参考这里:Flink CDC 原理及生产实践。
在实际生产中相信已经有很多小伙伴尝试过了,我在这里将一些个人遇到的、搜索到的、官方博客中总结的以及在Flink的邮件组中的看到过的一些常见问题进行了总结。供大家参考。
不同的kafka版本依赖冲突
不同的kafka版本依赖冲突会造成cdc报错,参考这个issue:
http://apache-flink.147419.n8.nabble.com/cdc-td8357.html#a8393
代码语言:javascript复制2020-11-04 16:39:10.972 [Source: Custom Source -> Sink: Print to Std. Out (1/1)] WARN org.apache.flink.runtime.taskmanager.Task - Source: Custom Source -> Sink: Print to Std. Out (1/1) (7c3ccf7686ccfb33254e8cb785cd339d) switched from RUNNING to FAILED.
java.lang.AbstractMethodError: org.apache.kafka.connect.json.JsonSerializer.configure(Ljava/util/Map;Z)V
at org.apache.kafka.connect.json.JsonConverter.configure(JsonConverter.java:300)
at org.apache.kafka.connect.json.JsonConverter.configure(JsonConverter.java:311)
at io.debezium.embedded.EmbeddedEngine.<init>(EmbeddedEngine.java:583)
at io.debezium.embedded.EmbeddedEngine.<init>(EmbeddedEngine.java:80)
at io.debezium.embedded.EmbeddedEngine$BuilderImpl.build(EmbeddedEngine.java:301)
at io.debezium.embedded.EmbeddedEngine$BuilderImpl.build(EmbeddedEngine.java:217)
at io.debezium.embedded.ConvertingEngineBuilder.build(ConvertingEngineBuilder.java:139)
at com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction.run(DebeziumSourceFunction.java:299)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:213)
源码如下:
代码语言:javascript复制public class CdcTest {
public static void main(String[] args) throws Exception {
SourceFunction<String> sourceFunction = MySQLSource.<String>builder()
.hostname("localhost")
.port(3306)
.databaseList("sohay") // monitor all tables under inventory database
.username("root")
.password("123456")
.deserializer(new StringDebeziumDeserializationSchema()) // converts SourceRecord to String
.build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.addSource(sourceFunction).print().setParallelism(1); // use parallelism 1 for sink to keep message ordering
env.execute();
}
}
确实是pom中存在一个Kafka的依赖包,导致冲突,去掉后问题解决。
MySQL CDC源等待超时
在扫描表期间,由于没有可恢复的位置,因此无法执行checkpoints。为了不执行检查点,MySQL CDC源将保持检查点等待超时。超时检查点将被识别为失败的检查点,默认情况下,这将触发Flink作业的故障转移。因此,如果数据库表很大,则建议添加以下Flink配置,以避免由于超时检查点而导致故障转移:
代码语言:javascript复制execution.checkpointing.interval: 10min
execution.checkpointing.tolerable-failed-checkpoints: 100
restart-strategy: fixed-delay
restart-strategy.fixed-delay.attempts: 2147483647
数据库切换,重新开启binlog,Mysql全局锁无法释放
原因是因为切换了数据库环境,重新开启binlog,所有的作业都重新同步binlog的全量数据,导致了全局锁一直在等待,所有作业都无法执行。解决方法:记录checkpoint的地址,取消作业,然后根据checkpoint重启作业。
使用Flink SQL CDC模式创建维表异常
代码语言:javascript复制CREATE TABLE cdc_test
(
id STRING,
ip STRING,
url STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = '127.0.0.1',
'port' = '3306',
'database-name' = 'xx',
'table-name' = 'xx',
'username' = 'xx',
'password' = 'xx'
);
执行查询:
代码语言:javascript复制SELECT * FROM cdc_test;
任务无法运行,抛出异常
代码语言:javascript复制User does not have the 'LOCK TABLES' privilege required to obtain a consistent snapshot by preventing concurrent writes to tables.
原因是连接MySQL的用户缺乏必要的CDC权限。
Flink SQL CDC基于Debezium实现。当启动MySQL CDC源时,它将获取一个全局读取锁(FLUSH TABLES WITH READ LOCK),该锁将阻止其他数据库的写入,然后读取当前binlog位置以及数据库和表的schema,之后将释放全局读取锁。然后它扫描数据库表并从先前记录的位置读取binlog,Flink将定期执行checkpoints以记录binlog位置。如果发生故障,作业将重新启动并从checkpoint完成的binlog位置恢复,因此它保证了仅一次的语义。
解决办法:创建一个新的MySQL用户并授予其必要的权限。
代码语言:javascript复制mysql> CREATE USER 'user'@'localhost' IDENTIFIED BY 'password';
mysql> GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'user' IDENTIFIED BY 'password';
mysql> FLUSH PRIVILEGES;
Flink作业扫描MySQL全量数据出现fail-over
Flink 作业在扫描 MySQL 全量数据时,checkpoint 超时,出现作业 failover,如下图:
原因:Flink CDC 在 scan 全表数据(我们的实收表有千万级数据)需要小时级的时间(受下游聚合反压影响),而在 scan 全表过程中是没有 offset 可以记录的(意味着没法做 checkpoint),但是 Flink 框架任何时候都会按照固定间隔时间做 checkpoint,所以此处 mysql-cdc source 做了比较取巧的方式,即在 scan 全表的过程中,会让执行中的 checkpoint 一直等待甚至超时。超时的 checkpoint 会被仍未认为是 failed checkpoint,默认配置下,这会触发 Flink 的 failover 机制,而默认的 failover 机制是不重启。所以会造成上面的现象。
解决办法:在 flink-conf.yaml 配置 failed checkpoint 容忍次数,以及失败重启策略,如下:
代码语言:javascript复制execution.checkpointing.interval: 10min # checkpoint间隔时间
execution.checkpointing.tolerable-failed-checkpoints: 100 # checkpoint 失败容忍次数
restart-strategy: fixed-delay # 重试策略
restart-strategy.fixed-delay.attempts: 2147483647 # 重试次数
作业在运行时 mysql cdc source 报 no viable alternative at input 'alter table std'
原因:因为数据库中别的表做了字段修改,CDC source 同步到了 ALTER DDL 语句,但是解析失败抛出的异常。
解决方法:在 flink-cdc-connectors 最新版本中已经修复该问题(跳过了无法解析的 DDL)。升级 connector jar 包到最新版本 1.1.0:flink-sql-connector-mysql-cdc-1.1.0.jar,替换 flink/lib 下的旧包。
多个作业共用同一张 source table 时,没有修改 server id 导致读取出来的数据有丢失。
原因:MySQL binlog 数据同步的原理是,CDC source 会伪装成 MySQL 集群的一个 slave(使用指定的 server id 作为唯一 id),然后从 MySQL 拉取 binlog 数据。如果一个 MySQL 集群中有多个 slave 有同样的 id,就会导致拉取数据错乱的问题。
解决方法:默认会随机生成一个 server id,容易有碰撞的风险。所以建议使用动态参数(table hint)在 query 中覆盖 server id。如下所示:
代码语言:javascript复制FROM bill_info /* OPTIONS('server-id'='123456') */ ;
CDC source 扫描 MySQL 表期间,发现无法往该表 insert 数据
原因:由于使用的 MySQL 用户未授权 RELOAD 权限,导致无法获取全局读锁(FLUSH TABLES WITH READ LOCK), CDC source 就会退化成表级读锁,而使用表级读锁需要等到全表 scan 完,才能释放锁,所以会发现持锁时间过长的现象,影响其他业务写入数据。
解决方法:给使用的 MySQL 用户授予 RELOAD 权限即可。所需的权限列表详见文档:
代码语言:javascript复制https://github.com/ververica/flink-cdc-connectors/wiki/mysql-cdc-connector#setup-mysql-server
如果出于某些原因无法授予 RELOAD 权限,也可以显式配上 'debezium.snapshot.locking.mode' = 'none'来避免所有锁的获取,但要注意只有当快照期间表的 schema 不会变更才安全。