debezium采集MySQL CDC指南

2023-10-18 14:48:39 浏览数 (2)

Debezium 是一个开源的分布式平台,用于捕获数据库的变更数据(Change Data Capture,CDC)。它支持多种数据库,包括 MySQL。下面我们详细说一下如何进行配置。

MySQL配置

创建用户

代码语言:javascript复制
CREATE USER 'debezium_user'@'localhost' IDENTIFIED BY 'Pass-123-debezium_user';

GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'debezium_user' IDENTIFIED BY 'Pass-123-debezium_user';

flush privileges;

开启binlog

检查binlog是否开启

代码语言:javascript复制
// for MySql 5.x
SELECT variable_value as "BINARY LOGGING STATUS (log-bin) ::"
FROM information_schema.global_variables WHERE variable_name='log_bin';
// for MySql 8.x
SELECT variable_value as "BINARY LOGGING STATUS (log-bin) ::"
FROM performance_schema.global_variables WHERE variable_name='log_bin';

在执行上述命令时如果出现如下报错:

代码语言:javascript复制
ERROR 3167 (HY000): The 'INFORMATION_SCHEMA.GLOBAL_VARIABLES' feature is disabled; see the documentation for 'show_compatibility_56'

请先修改数据库配置,将show_compatibility_56设置为ON

设置完上述配置后,再次执行检查binlog是否开启的SQL,如果为 OFF,请使用以下属性配置 MySQL 服务器配置文件,如下表所述:

代码语言:javascript复制
server-id         = 223344 # Querying variable is called server_id, e.g. SELECT variable_value FROM information_schema.global_variables WHERE variable_name='server_id';
log_bin           = mysql-bin
binlog_format     = ROW
binlog_row_image  = FULL
expire_logs_days  = 10

重启MySQL之后,通过再次检查 binlog 状态来确认您的更改:

代码语言:javascript复制
// for MySql 5.x
mysql> SELECT variable_value as "BINARY LOGGING STATUS (log-bin) ::"
FROM information_schema.global_variables WHERE variable_name='log_bin';
// for MySql 8.x
mysql> SELECT variable_value as "BINARY LOGGING STATUS (log-bin) ::"
FROM performance_schema.global_variables WHERE variable_name='log_bin';

得到:

开启GTIDs

全局事务标识符 (GTID) 唯一标识集群内服务器上发生的事务。 虽然 Debezium MySQL 连接器不需要,但使用 GTID 可以简化复制,并使您能够更轻松地确认主服务器和副本服务器是否一致。

基本步骤:

代码语言:javascript复制
set GLOBAL gtid_mode=OFF_PERMISSIVE;
set GLOBAL gtid_mode=ON_PERMISSIVE;
set GLOBAL gtid_mode=ON;

set GLOBAL enforce_gtid_consistency=ON;

查看修改:

代码语言:javascript复制
show global variables like '%GTID%';

得到:

设置Session超时时间

代码语言:javascript复制
set interactive_timeout=60;
set wait_timeout=60;

开启query log events

代码语言:javascript复制
set binlog_rows_query_log_events=ON;

查看当前变量值:

代码语言:javascript复制
show global variables where variable_name = 'binlog_row_value_options';

开始部署

在开始部署之前,确定你已经安装了kafka,并且配置了Debezium MySQL connector的kafka connect已经启动。

kafka安装可参考:

下面说一下kafka connect配置问题。

首先下载kafka二进制包,例如下属例子中,将其下载到/data/app目录下。

代码语言:javascript复制
cd /data/app && wget https://archive.apache.org/dist/kafka/3.3.1/kafka_2.12-3.3.1.tgz
tar zxvf kafka_2.12-3.3.1.tgz
ln -s kafka_2.12-3.3.1 kafka

下载MySQL connector plug-in.

代码语言:javascript复制
# 新建plugin目录
cd kafka && mkdir plugins
cd plugins && wget https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/1.9.7.Final/debezium-connector-mysql-1.9.7.Final-plugin.tar.gz
tar zxvf debezium-connector-mysql-1.9.7.Final-plugin.tar.gz

修改配置,设置kafka plugin目录

代码语言:javascript复制
vim /data/app/kafka/config/connect-distributed.properties

# 设置

plugin.path=/data/app/kafka/plugins

接下来便可以启动kafka connect

代码语言:javascript复制
bin/connect-distributed.sh config/connect-distributed.properties 

kafka connect默认启动的端口为8083

创建MySQL同步任务

在mysql中新建products 表

代码语言:javascript复制
create database if not exists inventory;
CREATE TABLE IF NOT EXISTS inventory.products (
 id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
 name VARCHAR(255) NOT NULL,
 description VARCHAR(512),
 weight FLOAT
);

插入数据:

代码语言:javascript复制
insert inventory.products values(1, 'tom', 'tall', 1.8);

创建同步任务

代码语言:javascript复制
{
    "name": "inventory-connector", 
    "config": {
        "connector.class": "io.debezium.connector.mysql.MySqlConnector", 
        "database.hostname": "192.168.74.129", 
        "database.port": "3306", 
        "database.user": "debezium_user", 
        "database.password": "Pass-123-debezium_user", 
        "database.server.id": "223344", 
        "database.server.name": "fullfillment", 
        "database.include.list": "inventory", 
        "database.history.kafka.bootstrap.servers": "kafka:30092", 
        "database.history.kafka.topic": "dbhistory.fullfillment", 
        "include.schema.changes": "true" 
    }
}

可以看到kafka connect控制台输出:

kafka中查看数据

相关DDL

0 0 投票数

文章评分

本文为从大数据到人工智能博主「xiaozhch5」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。

原文链接:https://cloud.tencent.com/developer/article/2345732

0 人点赞