笔者使用 Canal 将 MySQL 数据同步至 Kafka 时遇到了不少坑,还好最后终于成功了,这里分享一下极简教程,希望能帮到你。
使用版本说明:
组件 | 版本号 |
---|---|
Zookeeper | 3.5.7 |
Kafka | 2.12-3.0.0 |
Canal | 1.1.4 |
MySQL | 5.7.16 |
1.前置条件
- 已部署 Zookeeper 集群(建议配置环境变量)
- 已部署 Kafka 集群(建议配置环境变量)
2.设置 MySQL
开启 binlog
开启 binlog 写入功能,并将 binlog-format 设置为 ROW 模式
[omc@hadoop102 ~]$ sudo vi /etc/my.cnf
,在[mysqld] 下方添加如下内容
[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
完成设置后,重启 MySQL
设置 MySQL 专用账户用于授权 Canal
登录 MySQL 执行如下命令:
代码语言:javascript复制CREATE USER canal IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;
3.安装 Canal-1.1.4
可以在 Linux 机器上使用如下命令下载:
代码语言:javascript复制wget https://github.com/alibaba/canal/releases/download/canal-1.1.14/canal.deployer-1.1.14.tar.gz
也可以通过 Windows 浏览器访问页面 https://github.com/alibaba/canal/releases/tag/canal-1.1.4 下载,只需要下载 canal.deployer-1.1.14.tar.gz 即可。依次创建目录:
代码语言:javascript复制[omc@hadoop102 ~]$ cd /opt/module
[omc@hadoop102 module]$ mkdir canal-1.1.4
[omc@hadoop102 module]$ cd canal-1.1.4
[omc@hadoop102 canal-1.1.4]$ mkdir canal.deployer-1.1.4
解压缩下载好的 tar 包:
代码语言:javascript复制[omc@hadoop102 software]$ tar -zxvf canal.deployer-1.1.4.tar.gz -C /opt/module/canal-1.1.4/canal.deployer-1.1.4
4.修改配置文件
修改 canal.properties
代码语言:javascript复制[omc@hadoop102 ~]$ cd /opt/module/canal-1.1.4/canal.deployer-1.1.4/conf
[omc@hadoop102 conf]$ vi canal.properties
修改如下内容即可:
代码语言:javascript复制 7 canal.register.ip = 192.168.10.102
# 这里的 IP 替换成你的 Canal 服务器 IP
20 canal.zkServers =192.168.10.102,192.168.10.103,192.168.10.104
# 这里的 IP 替换成你的 Zookeeper 集群 IP
25 canal.serverMode = kafka
# tcp 修改为 kafka
116 canal.mq.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
# 这里的 hostname 替换成你的 Kafka 集群 IP 或者是 Kafka 集群的 hostname
110 #canal.instance.global.spring.xml = classpath:spring/file-instance.xml
111 canal.instance.global.spring.xml = classpath:spring/default-instance.xml
# 将 110 行 file-instance.xml 注释掉,将 111 行 default-instance.xml 解除注释
注:
- 首列的数字为行序号,便于你进行查找
canal_local.properties
对应于本地模式设置,我们这里讲的是集群模式,无需改动。
修改 instance.properties
代码语言:javascript复制[omc@hadoop102 ~]$ cd /opt/module/canal-1.1.4/canal.deployer-1.1.4/conf/example
[omc@hadoop102 example]$ vi instance.properties
修改如下内容即可:
代码语言:javascript复制 3 canal.instance.mysql.slaveId=1234
9 canal.instance.master.address=192.168.10.104:3306
# 这里的 IP 替换为你的 MySQL 服务器(数据源)地址
32 # username/password
33 canal.instance.dbUsername=canal
34 canal.instance.dbPassword=canal
# 这里的用户名密码要和前面授权 Canal 的 MySQL 专用账户设置一致
5.测试验证
首先要依次启动 Zookeeper 和 Kafka,然后启动 Canal
代码语言:javascript复制[omc@hadoop102 ~]$ cd /opt/module/canal-1.1.4/canal.deployer-1.1.4/bin
[omc@hadoop102 bin]$ sh startup.sh
输入 Kafka 监听命令,监听 topic:example,默认情况 canal 任务生成的 topic 默认名称是 example,这种情况下,所有的 MySQL 数据库变更都会显示在这个 topic,如果想使用动态 topic,需要调整 canal.properties,相关内容以后再分享。
代码语言:javascript复制[omc@hadoop102 ~]$ kafka-console-consumer.sh --bootstrap-server hadoop102:9092,hadoop103:9092,hadoop104:9092 --from-beginning --topic example
操作 MySQL
首先登录MySQL
代码语言:javascript复制[omc@hadoop104 ~]$ mysql -uroot -p123456
登录之后,创建数据库 test01,选中 test01,创建数据表 canaltest,并进行插入和更新操作。
代码语言:javascript复制CREATE DATABASE test01;
USE test01;
CREATE TABLE canaltest
(id INT,
code VARCHAR(10),
updatetime timestamp(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3) ON UPDATE CURRENT_TIMESTAMP(3));
INSERT INTO canaltest (id,code) VALUES (1,'Canal');
INSERT INTO canaltest (id,code) VALUES (2,'Flume');
UPDATE canaltest SET code='Flink' WHERE id=2;
验证结果
操作完成后,再去看 Kafka 监控 topic 情况,发现 Kafka 接收到了所有的 DDL DML 命令。参考下图可以对比出,Canal 将 MySQL 数据实时同步至 Kafka,数据延迟约 300ms。