https://github.com/moiot/gravity是摩拜单车开源的一款mysql同步工具:它是由一系列简单插件组成的同步工具,提升了足够的可扩展性的同时,也保证了架构的简洁性。下面先分析下它的原理以及如何使用,然后分析下它的源码。
Gravity是由下列插件组成:
Input:作为数据输入的适配源,比如解析mysql的binlog,来产生gravity内部的数据结构:core.Msg.
Filter:它转换输入产生的数据,比如过滤掉一些数据,修改表的列名,加密表的某些列等。
Scheduler:它负责数据流向的调度,将Input产生的数据写到OutPut。定义了数据一致性的策略,默认的Scheduler支持在同一行数据修改的顺序性。
Output:将输入数据写入到目标存储:比如Kafka或者Mysql(Tidb)。在这个过程中会使用Router插件定义的路由策略。
Matcher:匹配Input插件产生的数据,Filter 和 Router 插件都使用了Matcher 插件来进行数据的匹配。
用户可以自定义上述几类插件来满足具体的业务要求。
它的核心数据结构core.Msg定义如下:
代码语言:javascript复制type DDLMsg struct {
Statement string
}
type DMLMsg struct {
Operation DMLOp
Data map[string]interface{}
Old map[string]interface{}
Pks map[string]interface{}
PkColumns []string
}
type Msg struct {
Type MsgType
Host string
Database string
Table string
Timestamp time.Time
DdlMsg *DDLMsg
DmlMsg *DMLMsg
...
}
gravity支持mysql和mongo两种数据库,都支持三种同步模式:batch, stream, replication
代码语言:javascript复制mode = "stream":增量同步
mode = "batch":批量同步
mode = "replication":first do a batch mode table scan, and then start stream mode automatically.
开始使用同步工具之前需要满足如下前提条件
1,mysql的 binlog GTID mode 为On模式:即GTID_MODE=ON
2,创建_gravity 账户,赋予这个账户replication相关权限
3,相关的表,在源和目标mysql集群都要建立完成。
这里复习下知识:MySQL有2种方式指定复制同步的方式,分别为:
1,基于binlog文件名及位点的指定方式- 匿名事务(Anonymous_gtid_log_event)
2,基于GTID(全局事务ID)的指定方式- GTID事务(Gtid_log_event)
MySQL 5.7.6之后便开始支持动态开启和关闭GTID模式,其参数GTID_MODE有以下取值
1,OFF - 只允许匿名事务被复制同步
2,OFF_PERMISSIVE - 新产生的事务都是匿名事务,但也允许有GTID事务被复制同步
3,ON_PERMISSIVE - 新产生的都是GTID事务,但也允许有匿名事务被复制同步
4,ON - 只允许GTID事务被复制同步
下面我们开始使用:首先配置mysql
% vi /usr/local/etc/my.cnf
代码语言:javascript复制[mysqld]
server_id=4
log_bin=mysql-bin
enforce-gtid-consistency=ON
gtid-mode=ON
binlog_format=ROW
启动mysql
代码语言:javascript复制% mysql.server restart
Shutting down MySQL
.. SUCCESS!
Starting MySQL
. SUCCESS!
创建账户并赋予权限:
代码语言:javascript复制CREATE USER _gravity IDENTIFIED BY 'xxx';
GRANT SELECT, RELOAD, LOCK TABLES, REPLICATION SLAVE, REPLICATION CLIENT, CREATE, INSERT, UPDATE, DELETE ON *.* TO '_gravity'@'%';
GRANT ALL PRIVILEGES ON _gravity.* TO '_gravity'@'%';
创建源,目标表
代码语言:javascript复制CREATE TABLE `test`.`test_source_table` (
`id` int(11),
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
CREATE TABLE `test`.`test_target_table` (
`id` int(11),
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
编译源码
代码语言:javascript复制% make
go build -ldflags '-X "github.com/moiot/gravity/pkg/utils.Version=0.0.1 git.e934fa33" -X "github.com/moiot/gravity/pkg/utils.BuildTS=2023-04-02 02:50:11" -X "github.com/moiot/gravity/pkg/utils.GitHash=e934fa33dfbf5a155fb17ad65acf36ad91f4f616" -X "github.com/moiot/gravity/pkg/utils.GitBranch=master"' -o bin/gravity cmd/gravity/main.go
#go build -ldflags '-X "github.com/moiot/gravity/pkg/utils.Version=0.0.1 git.e934fa33" -X "github.com/moiot/gravity/pkg/utils.BuildTS=2023-04-02 02:50:11" -X "github.com/moiot/gravity/pkg/utils.GitHash=e934fa33dfbf5a155fb17ad65acf36ad91f4f616" -X "github.com/moiot/gravity/pkg/utils.GitBranch=master"' -o bin/padder cmd/padder/main.go
配置同步参数
代码语言:javascript复制 # name (required)
name = "mysql2mysqlDemo"
version = "1.0"
# optional.
# database name used to store position, heartbeat, etc.
# default to "_gravity"
internal-db-name = "_gravity"
#
# The definition of Input. `mysqlbinlog` is used for this definition.
#
[input]
type = "mysql"
mode = "stream"
[input.config.source]
host = "127.0.0.1"
username = "root"
password = ""
port = 3306
location = "Local"
#
# The definition of Output. `mysql` is used for this definition.
#
[output]
type = "mysql"
[output.config.target]
host = "127.0.0.1"
username = "root"
password = ""
port = 3306
location = "Local"
# The definition of the routing rule
[[output.config.routes]]
match-schema = "test"
match-table = "test_source_table"
target-schema = "test"
target-table = "test_target_table"
开启同步
代码语言:javascript复制bin/gravity -config mysql2mysql.toml
可以看到如下输出
代码语言:javascript复制INFO[0000] xxhash backend: GoUnsafe
{"level":"info","msg":"Welcome to gravity.","time":"2023-04-02T22:54:38 08:00"}
{"level":"info","msg":"Release Version: 0.0.1 git.e934fa33","time":"2023-04-02T22:54:38 08:00"}
{"level":"info","msg":"Git Commit Hash: e934fa33dfbf5a155fb17ad65acf36ad91f4f616","time":"2023-04-02T22:54:38 08:00"}
{"level":"info","msg":"Git Branch: master","time":"2023-04-02T22:54:38 08:00"}
{"level":"info","msg":"UTC Build Time: 2023-04-02 02:50:11","time":"2023-04-02T22:54:38 08:00"}
{"level":"info","msg":"[output-mysql] Using mysql-replace-engine","time":"2023-04-02T22:54:38 08:00"}
{"level":"info","msg":"[batchScheduler] {NrWorker:10 MaxBatchPerWorker:1 QueueSize:1024 SlidingWindowSize:10240 NrRetries:3 RetrySleepString:1s HealthyThreshold:0 RetrySleep:1s}","time":"2023-04-02T22:54:38 08:00"}
{"level":"info","msg":"DSN is root:@tcp(127.0.0.1:3306)/?interpolateParams=trueu0026timeout=5su0026readTimeout=5su0026writeTimeout=5su0026parseTime=trueu0026collation=utf8mb4_general_ciu0026loc=Local","time":"2023-04-02T22:54:38 08:00"}
{"level":"info","msg":"DSN is root:@tcp(127.0.0.1:3306)/?interpolateParams=trueu0026timeout=5su0026readTimeout=5su0026writeTimeout=5su0026parseTime=trueu0026collation=utf8mb4_general_ciu0026loc=Local","time":"2023-04-02T22:54:38 08:00"}
{"level":"info","msg":"DSN is root:@tcp(127.0.0.1:3306)/?interpolateParams=trueu0026timeout=5su0026readTimeout=5su0026writeTimeout=5su0026parseTime=trueu0026collation=utf8mb4_general_ciu0026loc=Local","time":"2023-04-02T22:54:38 08:00"}
{"level":"info","msg":"[Server] start input","time":"2023-04-02T22:54:38 08:00"}
{"level":"info","msg":"DSN is root:@tcp(127.0.0.1:3306)/?interpolateParams=trueu0026timeout=5su0026readTimeout=5su0026writeTimeout=5su0026parseTime=trueu0026collation=utf8mb4_general_ciu0026loc=Local","time":"2023-04-02T22:54:38 08:00"}
{"level":"info","msg":"DSN is root:@tcp(127.0.0.1:3306)/?interpolateParams=trueu0026timeout=5su0026readTimeout=5su0026writeTimeout=5su0026parseTime=trueu0026collation=utf8mb4_general_ciu0026loc=Local","time":"2023-04-02T22:54:38 08:00"}
{"level":"info","msg":"[binlog_checker] start","time":"2023-04-02T22:54:38 08:00"}
[2023/04/02 22:54:38] [info] binlogsyncer.go:141 create BinlogSyncer with config {9467353 mysql 127.0.0.1 3306 root false false <nil> true UTC false 0 0s 0s 0 false 0}
{"level":"info","msg":"[binlogTailer] start","time":"2023-04-02T22:54:38 08:00"}
{"level":"info","msg":"[binlogTailer] getBinlogStreamer gtid: b35ff9d4-2ff7-11eb-ba5c-0802f4c2f06c:1-8","time":"2023-04-02T22:54:38 08:00"}
[2023/04/02 22:54:38] [info] binlogsyncer.go:380 begin to sync binlog from GTID set b35ff9d4-2ff7-11eb-ba5c-0802f4c2f06c:1-8
[2023/04/02 22:54:38] [info] binlogsyncer.go:211 register slave for master server 127.0.0.1:3306
{"level":"info","msg":"[Server] started","time":"2023-04-02T22:54:38 08:00"}
[2023/04/02 22:54:38] [info] binlogsyncer.go:731 rotate to (mysql-bin.000006, 4)
{"level":"info","msg":"[binlogTailer] skip rotate event: source binlog Name mysql-bin.000006, source binlog Pos: 4; store Name: mysql-bin.000006, store Pos: 2498","time":"2023-04-02T22:54:38 08:00"}
{"level":"info","msg":"[batchScheduler.SubmitMsg] added new sliding window, key=mysqlstream, type=ctl","time":"2023-04-02T22:54:38 08:00"}
{"level":"info","msg":"[staticSlidingWindow] init nextItemToCommit: core.Msg{ mysqlstream-1 ctl {xid {mysql-bin.000006 %!s(uint32=2577) b35ff9d4-2ff7-11eb-ba5c-0802f4c2f06c:1-9}} }","time":"2023-04-02T22:54:38 08:00"}
我们输入数据尝试下看是否同步成功:
代码语言:javascript复制mysql> use test;
Reading table information for completion of table and column names
You can turn off this feature to get a quicker startup with -A
Database changed
mysql> select * from test_target_table;
Empty set (0.00 sec)
mysql> select * from test_source_table;
Empty set (0.00 sec)
mysql> insert into test_source_table values(1);
Query OK, 1 row affected (0.00 sec)
mysql> select * from test_source_table;
----
| id |
----
| 1 |
----
1 row in set (0.00 sec)
mysql> select * from test_target_table;
----
| id |
----
| 1 |
----
1 row in set (0.00 sec)
默认如果唯一键冲突,会被覆盖掉,并不会报错。
代码语言:javascript复制mysql> insert into test_target_table values(2);
Query OK, 1 row affected (0.00 sec)
mysql> insert into test_source_table values(2);
Query OK, 1 row affected (0.01 sec)
mysql> select * from test_target_table;
----
| id |
----
| 1 |
| 2 |
----
2 rows in set (0.00 sec)
默认插件是不支持ddl的同步的,但是日志里会有记录。
代码语言:javascript复制mysql> alter table test_source_table add column name char(255) not null default '';
Query OK, 0 rows affected (0.01 sec)
Records: 0 Duplicates: 0 Warnings: 0
mysql> show create table test_source_tableG
*************************** 1. row ***************************
Table: test_source_table
Create Table: CREATE TABLE `test_source_table` (
`id` int NOT NULL,
`name` char(255) NOT NULL DEFAULT '',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb3
1 row in set (0.00 sec)
mysql> show create table test_target_tableG
*************************** 1. row ***************************
Table: test_target_table
Create Table: CREATE TABLE `test_target_table` (
`id` int NOT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb3
1 row in set (0.00 sec)
代码语言:javascript复制{"level":"info","msg":"QueryEvent: database: test, sql: alter table test_source_table add column name char(255) not null default '', position: {mysql-bin.000006 105150 b35ff9d4-2ff7-11eb-ba5c-0802f4c2f06c:1-212}","time":"2023-04-02T23:00:44 08:00"}
{"level":"info","msg":"[binlogTailer] ddl done with gtid: b35ff9d4-2ff7-11eb-ba5c-0802f4c2f06c:1-212, stmt: alter table test_source_table add column name char(255) not null default ''","time":"2023-04-02T23:00:44 08:00"}
这样做的好处是不像dm,经常因为ddl问题卡住,或者源分表schema不一致导致同步失败。
gravity的监控采用常见的grafana prometheus。
代码语言:javascript复制docker run -v ${PWD}/config.toml:/etc/gravity/config.toml -d --net=host moiot/gravity:latest
当然也提供了docker运行方式,如何使用我们介绍完毕,下一讲,将详细分析下它的源码。