Flink CDC 2.2.1 + Flink 1.13 开发一个简单的CDC项目

2022-05-26 08:56:37 浏览数 (1)

本文将演示如何使用 Flink DataStream API 开发一个 Flink CDC 应用。

本文的目标:

1.体验如何使用 Flink Stream API 开发一个 Flink CDC Demo,超级简单。

2.以Mysql为例,采集Mysql binlog数据。账号需要什么权限?需要注意什么?

3.生成 checkpoint 数据,重启程序从执行的状态恢复数据。

4.演示2.2版本动态加加载表的新特性,在2.1版本是一个BUG。

Flink CDC 使用 SQL 的方式,可以非常快速的开始一个 Flink CDC 的任务,就像下面这样:

下面开始,我使用Flink代码写一个简单的 Flink CDC 应用

第一步,创建一个 Flink 空项目

代码语言:javascript复制
mvn archetype:generate                
  -DarchetypeGroupId=org.apache.flink   
  -DarchetypeArtifactId=flink-quickstart-java 
  -DarchetypeVersion=1.13.3

第二步,引入 Flink CDC 相关的依赖

代码语言:javascript复制
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
  <version>${flink.version}</version>
</dependency>
<!--  flink-cdc-mysql  -->
<dependency>
  <groupId>com.ververica</groupId>
  <artifactId>flink-connector-mysql-cdc</artifactId>
  <version>2.2.1</version>
</dependency>

第三步,编写 Flink 代码

代码如下:

代码语言:javascript复制
package test;

import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.Properties;

public class MysqlCDC {
    public static void main(String[] args) throws Exception {
        Properties debeziumProperties = new Properties();
        debeziumProperties.put("decimal.handling.mode", "String");
        MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
                .hostname("localhost")
                .port(3306)
                .scanNewlyAddedTableEnabled(true) // 开启支持新增表
                .databaseList("user") // set captured database
                .tableList("user.user_1,user.user_2,user.user_3") // set captured table
                .username("test_cdc")
                .password("tsl")
                .debeziumProperties(debeziumProperties)
                .deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
                .build();

        Configuration configuration = new Configuration();
        // 生产环境夏下,改成参数传进来
        configuration.setString("execution.savepoint.path","file:///tmp/flink-ck/1980d53f557a886f885172bcdf4be8e8/chk-21");
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration);

        // enable checkpoint
        env.enableCheckpointing(3000);
        // 设置本地
        env.getCheckpointConfig().setCheckpointStorage("file:///tmp/flink-ck");
        env
                .fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
                // set 4 parallel source tasks
                .setParallelism(4)
                .print("==>").setParallelism(1); // use parallelism 1 for sink to keep message ordering

        env.execute("Print MySQL Snapshot   Binlog");
    }
}

MySqlSource 的账号需要有SELECT、REPLICATION SLAVE、REPLICATION CLIENT的权限

代码语言:javascript复制
SELECT 权限代表允许从表中查看数据
REPLICATION SLAVE 权限代表允许执行show master status,show slave status,show binary logs命令
REPLICATION CLIENT 权限代表允许slave主机通过此用户连接master以便建立主从 复制关系

开启 Checkpoint 之后,重启的时候需要执行从哪一个状态恢复,这样可以采集任务从上一次的位置开始。

Flink 2.2 也支持了动态新增表,需要手动在程序里面开启这个功能[1]。见代码第 18 行。这样在我们停掉任务之后,程序里面新增了一张表,这样从上一次状态恢复的时候,其他已有的表可以接着上次的状态开始采集,新增的这一张表,从全量 增量开始。

[1]https://ververica.github.io/flink-cdc-connectors/master/content/connectors/mysql-cdc.html#scan-newly-added-tables

0 人点赞