docker安装canal数据同步工具
假设一个场景:编写一个博客系统,需要引入elasticsearch搜索引擎实现对文章内容的检索。则需要解决MySQL与elasticsearch数据同步的问题。
此时我们有三种选择:
- 1、使用业务代码实现同步: 在操作数据库数据同步操作elasticsearch中的数据。(优点:实现简单,缺点:代码耦合度高,效率低下)
在业务层执行增加、修改、删除改变mysql数据库之后,也执行操作redis的逻辑代码。 优点:操作简单 缺点:与业务操作代码耦合度变高;执行效率低。
- 2、使用MQ实现同步:
在执行完增加、修改、删除之后, 往MQ中发送一条消息 ;同步程序作为MQ中的消费者,从消息队列中获取消息,然后执行同步elasticsearch数据库的逻辑。 优点:业务代码解耦, 并且可以做到准实时。 缺点:需要在业务代码中加入发送消息到MQ中的代码 , API耦合。
- 3、binglog来实现同步(canal):
binglog实现同步的方法再细分不止一种,这个笔记主要学习canal,所以以canal为例。而且canal不止可以将数据同步给redis,也可以同步给其他类型的数据库。 优点:与业务代码完全解耦,API完全解耦,可以做到准实时。 缺点:canal是第三方实现的,需要学习成本(学无止尽,技多不压身)。
本章我们学习第三种学习思路,仅实现canal和mysql的数据同步。
1、 创建 canal用户, 并授权
代码语言:javascript复制create user canal identified by 'canal';
grant select,replication slave, replication client on *.* to 'canal'@'%';
flush privileges;
# 查看bin-log是否开启 on: 开启 off: 关闭
show variables like 'log_bin';
编写my.conf, 挂载容器中的mysql配置文件
代码语言:javascript复制[client]
default-character-set=utf8
[mysql]
default-character-set=utf8
[mysqld]
character-set-server=utf8
collation-server=utf8_unicode_ci
skip-character-set-client-handshake
skip-name-resolve
# start binlog
log-bin=mysql-bin
binlog-format=ROW
server_id=1
2、 docker安装canal
代码语言:javascript复制docker pull canal/canal-server:v1.1.5
# 创建一个容器
docker run --name canal -d canal/canal-server:v1.1.5
# 复制容器中的配置文件到本地
docker cp canal:/home/admin/canal-server/conf/canal.properties /wuming/canal
docker cp canal:/home/admin/canal-server/conf/example/instance.properties /wuming/canal
3、配置canal配置文件
查看容器mysql-8的ip
代码语言:javascript复制docker inspect mysql # 这里假设此处ip是 177.17.0.1
修改canal.properties配置文件
代码语言:javascript复制# 默认端口 11111
# 默认输出model为tcp, 这里根据使用的mq类型进行修改
# tcp, kafka, RocketMQ
canal.serverMode = tcp
#################################################
######### destinations #############
#################################################
# canal可以有多个instance,每个实例有独立的配置文件,默认只 有一个example实例。
# 如果需要处理多个mysql数据的话,可以复制出多个example,对其重新命名,
# 命令和配置文件中指定的名称一致。然后修改canal.properties 中的 canal.destinations
# canal.destinations=实例 1,实例 2,实例 3
canal.destinations = example
修改instance.properties配置文件
代码语言:javascript复制# 不能和mysql重复
canal.instance.mysql.slaveId=2
# 使用mysql的虚拟ip和端口
canal.instance.master.address=177.17.0.1:3306
# 使用已创建的canal用户
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.instance.connectionCharset = UTF-8
# canal.instance.defaultDatabaseName =test
# 问题:(原本这样的,值同步test库,此处没能解决,单据指定数据库同步配置)
# canal.instance.filter.regex=.*\..*
# canal.instance.defaultDatabaseName =test
# 注掉上面,然后添加,同步所有的库。
# .*\\..*: 表示匹配所有的库所有的表
canal.instance.filter.regex =.*\\..*
# 目的地,可以认识一个消息队列,不需要更改。
canal.mq.topic=example
# 如果是
4、重新创建canal容器并挂在配置文件。
代码语言:javascript复制docker run --name canal -p 11111:11111
-v /wuming/canal/instance.properties:/home/admin/canal-server/conf/example/instance.properties
-v /wuming/canal/canal.properties:/home/admin/canal-server/conf/canal.properties
-d canal/canal-server:v1.1.5
5、使用java程序连接canal验证是否可以实现同步
引入依赖
代码语言:javascript复制<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.5</version>
</dependency>
<!-- Message、CanalEntry.Entry等来自此安装包 -->
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.protocol</artifactId>
<version>1.1.5</version>
</dependency>
程序连接
代码语言:javascript复制@Slf4j
public class CanalTest {
//Canal服务地址 使用自己虚拟机的ip地址
private static final String SERVER_ADDRESS = "127.0.0.1";
//Canal Server 服务端口号
private static final Integer PORT = 11111;
//目的地,其实Canal Service内部有一个队列,和配置文件中一致即可,参考【修改instance.properties】图中
private static final String DESTINATION = "example";
//用户名和密码,但是目前不支持,只能为空
private static final String USERNAME = "";
//用户名和密码,但是目前不支持,只能为空
private static final String PASSWORD= "";
public static void main(String[] args){
CanalConnector canalConnector = CanalConnectors.newSingleConnector(
new InetSocketAddress(SERVER_ADDRESS, PORT), DESTINATION, USERNAME, PASSWORD);
canalConnector.connect();
//订阅所有消息
canalConnector.subscribe(".*\..*");
// 只订阅test数据库下的所有表
//canalConnector.subscribe("test");
//恢复到之前同步的那个位置
canalConnector.rollback();
for(;;){
//获取指定数量的数据,但是不做确认标记,下一次取还会取到这些信息。 注:不会阻塞,若不够100,则有多少返回多少
Message message = canalConnector.getWithoutAck(100);
//获取消息id
long batchId = message.getId();
if(batchId != -1){
log.info("msgId -> " batchId);
printEnity(message.getEntries());
//提交确认
//canalConnector.ack(batchId);
//处理失败,回滚数据
//canalConnector.rollback(batchId);
}
}
}
private static void printEnity(List<CanalEntry.Entry> entries) {
for (CanalEntry.Entry entry : entries) {
if(entry.getEntryType() != CanalEntry.EntryType.ROWDATA){
continue;
}
try{
// 序列化数据
CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
System.out.println(rowChange.getEventType());
switch (rowChange.getEventType()){
//如果希望监听多种事件,可以手动增加case
case INSERT:
// 表名
String tableName = entry.getHeader().getTableName();
//System.out.println("表名:" tableName);
//测试users表进行映射处
List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();
//for(CanalEntry.Column c:afterColumnsList){
// System.out.println("字段:" c.getName() "值:" c.getValue());
//}
System.out.println(afterColumnsList);
break;
case UPDATE:
List<CanalEntry.Column> afterColumnsList2 = rowData.getAfterColumnsList();
System.out.println("新插入的数据是:" afterColumnsList2);
break;
case DELETE:
List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList();
System.out.println("被删除的数据是:" beforeColumnsList);
break;
default:
}
}
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
}
}
}
}
执行程序并,使用navicat操作数据,查看程序是否读取bin-log。
6、canal是什么及工作原理
6.1、canal是什么
Canal 是用 Java 开发的基于数据库增量日志解析,提供增量数据订阅&消费的中间件。目前。Canal 主要支持了 MySQL 的 Binlog 解析,解析完成后才利用 Canal Client 来处理获得的相关数据。(数据库同步需要阿里的 Otter 中间件,基于 Canal)。
6.2、Canal 的工作原理 很简单,就是把自己伪装成 Slave,假装从 Master 复制数据。
1)Master 主库将改变记录,写到二进制日志(Binary Log)中; 2)Slave 从库向 MySQL Master 发送 dump 协议,将 Master 主库的 binary log events 拷贝到它的中继日志(relay log); 3)Slave 从库读取并重做中继日志中的事件,将改变的数据同步到自己的数据库。
补充:查看mysql的binlog日志情况
代码语言:javascript复制# 查看binlog文件列表
show binary logs;
# 查看当前正在写入的binlog文件
show master status;
# 查看指定binlog文件的内容
show binlog events [in 'log_name'] [FROM pos] [limit [offset,] row_count]