1.环境准备
1.1 mysql 开启binlog
代码语言:javascript复制log_bin=mysql-bin
binlog_format=ROW
expire_logs_days=30
1.2 flink的cdc依赖
代码语言:javascript复制<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>1.3.0</version>
</dependency>
说明: 该依赖已经内置了debezium进行处理mysql 变更数据并发送了,所以我们不需要额外的方式,简化了异常 mysql → debezium → kafka的这种方式和数据流程。
2.代码开发
2.1 数据库和表准备
代码语言:javascript复制CREATE TABLE t_students (
`id` BIGINT(11) UNSIGNED NOT NULL AUTO_INCREMENT,
`name` VARCHAR(24) DEFAULT NULL,
`age` INT(4) DEFAULT NULL,
`create_time` DATETIME NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_time` DATETIME NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
PRIMARY KEY (`id`) USING BTREE
)
;
2.2 flink代码编写
2.2.1 stream api方式
代码语言:javascript复制final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment().setParallelism(1);
final DebeziumDeserializationSchema deserializationSchema = new StringDebeziumDeserializationSchema();
final DebeziumSourceFunction sourceFunction = MySQLSource.builder()
.hostname("127.0.0.1").port(3306)
.databaseList("flink_cdc")
.username("root")
.password("123456")
.deserializer(deserializationSchema)
.build();
env.addSource(sourceFunction).setParallelism(1).print();
说明:这种不是很方便,数据解析也比较麻烦
2.2.2 table api 方式
代码语言:javascript复制final EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
TableEnvironment tabEnv = TableEnvironment.create(settings);
String ddlMysql = "CREATE TABLE mysql_binlog (id INT NOT NULL, "
"name STRING, "
"age INT, "
"create_time STRING, "
"update_time STRING "
") "
"WITH ('connector' = 'mysql-cdc', "
"'hostname' = '127.0.0.1', "
"'port' = '3306', "
"'username' = 'root', "
"'password' = '123456', "
"'database-name' = 'flink_cdc', "
"'table-name' = 't_students')n"
;
tabEnv.executeSql(ddlMysql);
// String sink = "CREATE TABLE sink_table (id INT NOT NULL, "
// "name STRING, "
// "age INT, "
// "create_time STRING, "
// "update_time STRING "
// ") "
// "WITH ('connector' = 'print')"
// ;
// tabEnv.executeSql(sink);
//
// String dml = "INSERT INTO sink_table SELECT id, name, age, create_time, update_time FROM mysql_binlog";
//
// final TableResult result = tabEnv.executeSql(dml);
final TableResult result = tabEnv.executeSql("select * from mysql_binlog");
result.print();
说明:cdc 最开始发起人是 吴邪,所以是通过table api的方式处理的,目前已经代码实现了很多对应的逻辑处理,方便使用和统计。