Flink Mysql CDC 统计处理

2021-08-16 18:12:20 浏览数 (1)

1.环境准备

1.1 mysql 开启binlog

代码语言:javascript复制
log_bin=mysql-bin
binlog_format=ROW
expire_logs_days=30

1.2 flink的cdc依赖

代码语言:javascript复制
<dependency>
    <groupId>com.alibaba.ververica</groupId>
    <artifactId>flink-connector-mysql-cdc</artifactId>
    <version>1.3.0</version>
</dependency>

说明: 该依赖已经内置了debezium进行处理mysql 变更数据并发送了,所以我们不需要额外的方式,简化了异常 mysql → debezium → kafka的这种方式和数据流程。

2.代码开发

2.1 数据库和表准备

代码语言:javascript复制
CREATE TABLE t_students (
    `id` BIGINT(11) UNSIGNED NOT NULL AUTO_INCREMENT,
    `name` VARCHAR(24)  DEFAULT NULL,
    `age` INT(4) DEFAULT NULL,
    `create_time` DATETIME NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
    `update_time` DATETIME NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
    PRIMARY KEY (`id`) USING BTREE
)
;

2.2 flink代码编写

2.2.1 stream api方式

代码语言:javascript复制
final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment().setParallelism(1);
 
final DebeziumDeserializationSchema deserializationSchema = new StringDebeziumDeserializationSchema();
 
final DebeziumSourceFunction sourceFunction = MySQLSource.builder()
         .hostname("127.0.0.1").port(3306)
         .databaseList("flink_cdc")
         .username("root")
         .password("123456")
         .deserializer(deserializationSchema)
         .build();
 
env.addSource(sourceFunction).setParallelism(1).print();

说明:这种不是很方便,数据解析也比较麻烦

2.2.2 table api 方式

代码语言:javascript复制
final EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
        TableEnvironment tabEnv = TableEnvironment.create(settings);
        String ddlMysql = "CREATE TABLE mysql_binlog (id INT NOT NULL, "  
                "name STRING, "  
                "age INT, "  
                "create_time STRING, "  
                "update_time STRING "  
                ") "  
                "WITH ('connector' = 'mysql-cdc', "  
                "'hostname' = '127.0.0.1', "  
                "'port' = '3306', "  
                "'username' = 'root', "  
                "'password' = '123456', "  
                "'database-name' = 'flink_cdc', "  
                "'table-name' = 't_students')n"
                ;
        tabEnv.executeSql(ddlMysql);
 
//        String sink = "CREATE TABLE sink_table (id INT NOT NULL, "  
//                "name STRING, "  
//                "age INT, "  
//                "create_time STRING, "  
//                "update_time STRING "  
//                ") "  
//                "WITH ('connector' = 'print')"
//                ;
//        tabEnv.executeSql(sink);
//
//        String dml = "INSERT INTO sink_table SELECT id, name, age, create_time, update_time FROM mysql_binlog";
//
//        final TableResult result = tabEnv.executeSql(dml);
 
        final TableResult result = tabEnv.executeSql("select * from mysql_binlog");
        result.print();

说明:cdc 最开始发起人是 吴邪,所以是通过table api的方式处理的,目前已经代码实现了很多对应的逻辑处理,方便使用和统计。

3.效果展示

0 人点赞