利用 Canal 将 MySQL 数据实时同步至 Kafka 极简教程

2023-09-10 15:49:53 浏览数 (1)

笔者使用 Canal 将 MySQL 数据同步至 Kafka 时遇到了不少坑,还好最后终于成功了,这里分享一下极简教程,希望能帮到你。

使用版本说明:

组件

版本号

Zookeeper

3.5.7

Kafka

2.12-3.0.0

Canal

1.1.4

MySQL

5.7.16

1.前置条件

  • 已部署 Zookeeper 集群(建议配置环境变量)
  • 已部署 Kafka 集群(建议配置环境变量)

2.设置 MySQL

开启 binlog

开启 binlog 写入功能,并将 binlog-format 设置为 ROW 模式

[omc@hadoop102 ~]$ sudo vi /etc/my.cnf ,在[mysqld] 下方添加如下内容

代码语言:javascript复制
[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复

完成设置后,重启 MySQL

设置 MySQL 专用账户用于授权 Canal

登录 MySQL 执行如下命令:

代码语言:javascript复制
CREATE USER canal IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;

3.安装 Canal-1.1.4

可以在 Linux 机器上使用如下命令下载:

代码语言:javascript复制
wget https://github.com/alibaba/canal/releases/download/canal-1.1.14/canal.deployer-1.1.14.tar.gz

也可以通过 Windows 浏览器访问页面 https://github.com/alibaba/canal/releases/tag/canal-1.1.4 下载,只需要下载 canal.deployer-1.1.14.tar.gz 即可。依次创建目录:

代码语言:javascript复制
[omc@hadoop102 ~]$ cd /opt/module
[omc@hadoop102 module]$ mkdir canal-1.1.4
[omc@hadoop102 module]$ cd canal-1.1.4
[omc@hadoop102 canal-1.1.4]$ mkdir canal.deployer-1.1.4

解压缩下载好的 tar 包:

代码语言:javascript复制
[omc@hadoop102 software]$ tar -zxvf canal.deployer-1.1.4.tar.gz -C /opt/module/canal-1.1.4/canal.deployer-1.1.4

4.修改配置文件

修改 canal.properties

代码语言:javascript复制
[omc@hadoop102 ~]$ cd /opt/module/canal-1.1.4/canal.deployer-1.1.4/conf
[omc@hadoop102 conf]$ vi canal.properties

修改如下内容即可:

代码语言:javascript复制
 7 canal.register.ip = 192.168.10.102
 # 这里的 IP 替换成你的 Canal 服务器 IP
20 canal.zkServers =192.168.10.102,192.168.10.103,192.168.10.104
# 这里的 IP 替换成你的 Zookeeper 集群 IP
25 canal.serverMode = kafka
# tcp 修改为 kafka
116 canal.mq.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
# 这里的 hostname 替换成你的 Kafka 集群 IP 或者是 Kafka 集群的 hostname
110 #canal.instance.global.spring.xml = classpath:spring/file-instance.xml
111 canal.instance.global.spring.xml = classpath:spring/default-instance.xml
# 将 110 行 file-instance.xml 注释掉,将 111 行 default-instance.xml 解除注释

注:

  1. 首列的数字为行序号,便于你进行查找
  2. canal_local.properties 对应于本地模式设置,我们这里讲的是集群模式,无需改动。

修改 instance.properties

代码语言:javascript复制
[omc@hadoop102 ~]$ cd /opt/module/canal-1.1.4/canal.deployer-1.1.4/conf/example
[omc@hadoop102 example]$ vi instance.properties

修改如下内容即可:

代码语言:javascript复制
 3 canal.instance.mysql.slaveId=1234
 9 canal.instance.master.address=192.168.10.104:3306
 # 这里的 IP 替换为你的 MySQL 服务器(数据源)地址
32 # username/password
33 canal.instance.dbUsername=canal
34 canal.instance.dbPassword=canal
# 这里的用户名密码要和前面授权 Canal 的 MySQL 专用账户设置一致

5.测试验证

首先要依次启动 Zookeeper 和 Kafka,然后启动 Canal

代码语言:javascript复制
[omc@hadoop102 ~]$ cd /opt/module/canal-1.1.4/canal.deployer-1.1.4/bin
[omc@hadoop102 bin]$ sh startup.sh

输入 Kafka 监听命令,监听 topic:example,默认情况 canal 任务生成的 topic 默认名称是 example,这种情况下,所有的 MySQL 数据库变更都会显示在这个 topic,如果想使用动态 topic,需要调整 canal.properties,相关内容以后再分享。

代码语言:javascript复制
[omc@hadoop102 ~]$ kafka-console-consumer.sh --bootstrap-server hadoop102:9092,hadoop103:9092,hadoop104:9092 --from-beginning --topic example

操作 MySQL

首先登录MySQL

代码语言:javascript复制
[omc@hadoop104 ~]$ mysql -uroot -p123456

登录之后,创建数据库 test01,选中 test01,创建数据表 canaltest,并进行插入和更新操作。

代码语言:javascript复制
CREATE DATABASE test01;

USE test01;

CREATE TABLE canaltest
(id INT,
code VARCHAR(10),
updatetime timestamp(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3) ON UPDATE CURRENT_TIMESTAMP(3));


INSERT INTO canaltest (id,code) VALUES (1,'Canal');

INSERT INTO canaltest (id,code) VALUES (2,'Flume');

UPDATE canaltest SET code='Flink' WHERE id=2;

验证结果

操作完成后,再去看 Kafka 监控 topic 情况,发现 Kafka 接收到了所有的 DDL DML 命令。参考下图可以对比出,Canal 将 MySQL 数据实时同步至 Kafka,数据延迟约 300ms。

0 人点赞