本文将演示如何使用 Flink DataStream API 开发一个 Flink CDC 应用。
本文的目标:
1.体验如何使用 Flink Stream API 开发一个 Flink CDC Demo,超级简单。
2.以Mysql为例,采集Mysql binlog数据。账号需要什么权限?需要注意什么?
3.生成 checkpoint 数据,重启程序从执行的状态恢复数据。
4.演示2.2版本动态加加载表的新特性,在2.1版本是一个BUG。
Flink CDC 使用 SQL 的方式,可以非常快速的开始一个 Flink CDC 的任务,就像下面这样:
下面开始,我使用Flink代码写一个简单的 Flink CDC 应用
第一步,创建一个 Flink 空项目
代码语言:javascript复制mvn archetype:generate
-DarchetypeGroupId=org.apache.flink
-DarchetypeArtifactId=flink-quickstart-java
-DarchetypeVersion=1.13.3
第二步,引入 Flink CDC 相关的依赖
代码语言:javascript复制<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- flink-cdc-mysql -->
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>2.2.1</version>
</dependency>
第三步,编写 Flink 代码
代码如下:
代码语言:javascript复制package test;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.Properties;
public class MysqlCDC {
public static void main(String[] args) throws Exception {
Properties debeziumProperties = new Properties();
debeziumProperties.put("decimal.handling.mode", "String");
MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
.hostname("localhost")
.port(3306)
.scanNewlyAddedTableEnabled(true) // 开启支持新增表
.databaseList("user") // set captured database
.tableList("user.user_1,user.user_2,user.user_3") // set captured table
.username("test_cdc")
.password("tsl")
.debeziumProperties(debeziumProperties)
.deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
.build();
Configuration configuration = new Configuration();
// 生产环境夏下,改成参数传进来
configuration.setString("execution.savepoint.path","file:///tmp/flink-ck/1980d53f557a886f885172bcdf4be8e8/chk-21");
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration);
// enable checkpoint
env.enableCheckpointing(3000);
// 设置本地
env.getCheckpointConfig().setCheckpointStorage("file:///tmp/flink-ck");
env
.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
// set 4 parallel source tasks
.setParallelism(4)
.print("==>").setParallelism(1); // use parallelism 1 for sink to keep message ordering
env.execute("Print MySQL Snapshot Binlog");
}
}
MySqlSource 的账号需要有SELECT、REPLICATION SLAVE、REPLICATION CLIENT的权限
代码语言:javascript复制SELECT 权限代表允许从表中查看数据
REPLICATION SLAVE 权限代表允许执行show master status,show slave status,show binary logs命令
REPLICATION CLIENT 权限代表允许slave主机通过此用户连接master以便建立主从 复制关系
开启 Checkpoint 之后,重启的时候需要执行从哪一个状态恢复,这样可以采集任务从上一次的位置开始。
Flink 2.2 也支持了动态新增表,需要手动在程序里面开启这个功能[1]。见代码第 18 行。这样在我们停掉任务之后,程序里面新增了一张表,这样从上一次状态恢复的时候,其他已有的表可以接着上次的状态开始采集,新增的这一张表,从全量 增量开始。
[1]https://ververica.github.io/flink-cdc-connectors/master/content/connectors/mysql-cdc.html#scan-newly-added-tables