Flink实时消费业务数据Demo
Debezium监控MySQL用FlinkSQL实时消费
1、环境准备
代码语言:javascript复制## 各组件版本
MySQL:5.7.21-log ## 开启binlog
kafka_2.11-2.4.1 ## Kafka
Flink:1.12.0 ## Flink_1.12.0官方推荐使用Kafka_2.4.1
Zookeeper:3.4.6
## 所需组件下载地址
## kafka_2.11-2.4.1.tgz
链接:https://pan.baidu.com/s/1-YUvHj8B10VG_LA7O_akPA 提取码:pv7f
## flink-1.12.0-bin-scala_2.11.tgz
链接:https://pan.baidu.com/s/1GDmKNbaEmq9fpCx93a41pg 提取码:hz5b
## debezium-connector-mysql-1.3.1.Final-plugin.tar.gz
链接:https://pan.baidu.com/s/1AtR9buds1AvfRnJ4JU-v6g 提取码:lkm2
## 所需jar包
链接:https://pan.baidu.com/s/1HFLuMcEdQN48DJplCx_e8A 提取码:5ipk
2、环境部署
前提:开启MySQL并启用binlog 启动zookeeper、kafka、flink
2.1、在kafka环境下安装debezium连接器
代码语言:javascript复制在kafka目录下新建plugins目录
将debezium-connector-mysql-1.3.1.Final-plugin.tar.gz解压到plugins下
2.2、编辑kafka-connect配置信息
代码语言:javascript复制connect-distribute.properties
## 修改如下内容
bootstrap.servers=master:9092,slave1:9092,slave2:9092
## 重点配置 plugin.path,注意:路径为连接器解压路径的父级目录
plugin.path=/user/kafka/plugins
2.3、开启kafka-connect服务
代码语言:javascript复制## 启动
bin/connect-distributed.sh config/connect-distributed.properties
## 后台启动
bin/connect-distributed.sh -daemon config/connect-distributed.properties
## 测试是否启动成功
curl -H "Accept:application/json" master:8083/
## 查看connectors下已有的连接器
curl -H "Accept:application/json" localhost:8083/connectors/
2.4、注册MySQL的监听器
代码语言:javascript复制详细信息在Debezium官网都能找到详细解释 地址: https://debezium.io/documentation/reference/1.3/connectors/mysql.html#configure-the-mysql-connector_debezium
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" master:8083/connectors/ -d '{
"name": "inventory-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "master",
"database.port": "3306",
"database.user": "root",
"database.password": "root",
"database.server.id": "184054",
"database.server.name": "dbserver1",
"database.include.list": "test",
"table.include.list":"test.customers",
"database.history.kafka.bootstrap.servers": "master:9092", "database.history.kafka.topic": "dbhistory.master" }
}'
## 配置解读:
name:在Kafka Connect服务中注册时的连接器名称
connector.class:连接器的类名
database.hostname:MySQL服务器地址
database.server.id:该数据库客户端的数字ID,在MySQL集群中所有当前正在运行的数据库进程中,该ID必须唯一。该连接器作为另一个服务器(具有此唯一ID)加入MySQL数据库集群,因此它可以读取binlog。默认情况下,尽管我们建议设置一个显式值,但是会在5400和6400之间生成一个随机数。
database.server.name:MySQL服务器或群集的逻辑名称
database.include.list:数据库的列表
table.include.list:表名
database.history.kafka.bootstrap.servers:连接器将用于建立与Kafka群集的初始连接的主机/端口对的列表。该连接将用于检索先前由连接器存储的数据库架构历史,并用于写入从源数据库读取的每个DDL语句。这应该指向Kafka Connect进程使用的同一Kafka群集。
database.history.kafka.topic:连接器将在其中存储数据库架构历史记录的Kafka主题的全名
2.5、查看Kafka的Topic
代码语言:javascript复制真正存储binlog的topic:dbserver1.test.customers
2.6、配置FlinkSQL连接Kafka源表
代码语言:javascript复制-- 开启FlinkSQL
./bin/sql-client.sh embedded
-- MySQL中建表语句
CREATE TABLE customers(
id int,
first_name varchar(255),
last_name varchar(255),
email varchar(255)
);
-- FlinkSQL客户端连接Kafka
CREATE TABLE customers(
id int,
first_name varchar(255),
last_name varchar(255),
email varchar(255)
) WITH (
'connector' = 'kafka',
'topic' = 'dbserver1.test.customers',
'properties.bootstrap.servers' = 'master:9092',
'properties.group.id' = 'testGroup',
'debezium-json.schema-include'='true',
'debezium-json.ignore-parse-errors'='true',
'scan.startup.mode' = 'earliest-offset',
'format' = 'debezium-json'
);
-- FlinkSQL结果sink到mysql
CREATE TABLE datashow (
first_name varchar(255),
nums bigint,
PRIMARY KEY (first_name) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://master:3306/test',
'table-name' = 'datashow',
'username' = 'root',
'password' = 'root'
);
-- 统计每个姓名出现的次数
insert into datashow
select first_name, count(1) cnt from customers group by first_name;
代码语言:javascript复制提交统计SQL未执行,原因是我提交了一条空记录,查看日志发现报错: You can set job configuration ‘table.exec.sink.not-null-enforcer’=‘drop’
## 设置参数将key为null的值过滤掉
##在FlinkSQL客户端执行命令
set table.exec.sink.not-null-enforcer=drop
## 再次提交统计SQL即可