某云厂商的DTS白皮书介绍:
与单向增量同步类似, 模拟Slave来获取增量数据。 同时UDTS对写下去的数据做标记,当有新的Binlog Event的时候, 会先检查是否有标记。 如果有标记则说明是循环数据,直接丢弃,如果没有标记则加上标记写到对端。
据此,我们可以基于debezium来实现一个双向同步脚本DEMO(只是demo代码)。
debezium捕获到的binlog events类型mapping如下:
https://debezium.io/documentation/reference/stable/connectors/mysql.html#mysql-events
u update
c insert
d delete
debezium记录的binlog events例子如下:
代码语言:python代码运行次数:0复制
-- insert 例子
{
"before":"none",
"after":{
"id":160120,
"b":"provident optio veritatis conseq"
},
"source":{
"version":"2.1.1.Final",
"connector":"mysql",
"name":"yyx",
"ts_ms":1672407157000,
"snapshot":"false",
"db":"db1",
"sequence":"none",
"table":"t1",
"server_id":300,
"gtid":"none",
"file":"mysql-bin.000004",
"pos":28589840,
"row":9,
"thread":182
},
"op":"c",
"ts_ms":1672407157196,
"transaction":"none"
}
-- delete 例子
{
"before":{
"id":114381,
"b":"111"
},
"after":"None",
"source":{
"version":"2.1.1.Final",
"connector":"mysql",
"name":"yyx",
"ts_ms":1672407489000,
"snapshot":"false",
"db":"db1",
"sequence":"None",
"table":"t1",
"server_id":300,
"gtid":"None",
"file":"mysql-bin.000004",
"pos":28590986,
"row":0,
"thread":154,
"query":"delete from t1 where id=114381"
},
"op":"d",
"ts_ms":1672407489300,
"transaction":"None"
}
-- update 例子
{
"before":{
"id":114381,
"b":"itaque error sit sunt aliquid it"
},
"after":{
"id":114381,
"b":"111"
},
"source":{
"version":"2.1.1.Final",
"connector":"mysql",
"name":"yyx",
"ts_ms":1672407316000,
"snapshot":"false",
"db":"db1",
"sequence":"None",
"table":"t1",
"server_id":300,
"gtid":"None",
"file":"mysql-bin.000004",
"pos":28590602,
"row":0,
"thread":154,
"query":"update t1 set b=111 where id=114381"
},
"op":"u",
"ts_ms":1672407316221,
"transaction":"None"
}
-- replace 不存在记录
{
"before":"None",
"after":{
"id":114381,
"b":"aaaaa"
},
"source":{
"version":"2.1.1.Final",
"connector":"mysql",
"name":"yyx",
"ts_ms":1672407628000,
"snapshot":"false",
"db":"db1",
"sequence":"None",
"table":"t1",
"server_id":300,
"gtid":"None",
"file":"mysql-bin.000004",
"pos":28591342,
"row":0,
"thread":154,
"query":"replace into t1 (id,b) values(114381,"aaaaa")"
},
"op":"c",
"ts_ms":1672407628115,
"transaction":"None"
}
-- replace 已存在记录
{
"before":{
"id":114381,
"b":"aaaaa"
},
"after":{
"id":114381,
"b":"bbbbbb"
},
"source":{
"version":"2.1.1.Final",
"connector":"mysql",
"name":"yyx",
"ts_ms":1672407717000,
"snapshot":"false",
"db":"db1",
"sequence":"None",
"table":"t1",
"server_id":300,
"gtid":"None",
"file":"mysql-bin.000004",
"pos":28591701,
"row":0,
"thread":154,
"query":"replace into t1 (id,b) values(114381,"bbbbbb")"
},
"op":"u",
"ts_ms":1672407717924,
"transaction":"None"
}
如果 op=c ,且 before is none ,则这是一个 insert into 语句
如果 op=u ,则这是一个 update 语句, 可以改写为 replace into 语句
如果 op=d ,且 after is none ,则这是一个 delete 语句
python代码实现的demo (目前看还有些bug,见文末)
配置文件 configs.py
代码语言:python代码运行次数:0复制# 下游MySQL的信息
mysql_host = "192.168.31.181"
mysql_port = "3306"
mysql_user = "dts"
mysql_pass = "dts"
mysql_db = "test"
# kafka连接信息
kafka_topic_name = "yyx.db1.t1"
kafka_group_id = "my-group"
kafka_bootstrap_servers = ["192.168.31.181:9092"]
kafka_auto_offset_reset = "earliest" # 可选值 earliest latest
kafka_auto_commit_interval_ms = 100 # 注意这里是整型
主程序 main_program.py
代码语言:python代码运行次数:0复制import json
import logging
import configs
import mysql.connector
from kafka import KafkaConsumer
logging.basicConfig(
level=logging.DEBUG,
filename="dts.log",
filemode="a",
format="%(asctime)s - "
"%(pathname)s[line:%(lineno)d] - "
"%(levelname)s: %(message)s",
)
mydb = mysql.connector.connect(
host=configs.mysql_host,
port=configs.mysql_port,
user=configs.mysql_user,
passwd=configs.mysql_pass,
database=configs.mysql_db,
autocommit=True,
)
mycursor = mydb.cursor()
consumer = KafkaConsumer(
configs.kafka_topic_name,
group_id=configs.kafka_group_id,
bootstrap_servers=configs.kafka_bootstrap_servers,
auto_offset_reset=configs.kafka_auto_offset_reset,
auto_commit_interval_ms=configs.kafka_auto_commit_interval_ms,
fetch_max_bytes=524288000,
enable_auto_commit=True,
max_poll_records=1000,
send_buffer_bytes=1310720,
receive_buffer_bytes=327680,
)
RUN_COUNTER = 0 # 排查bug,临时加了个counter变量,用于统计写入数据库的次数
for message in consumer:
# print(f'KEY: {message.key}nVALUE:{message.value}')
print(f"KEY: {message.key}")
cc = json.loads(message.value)
# print(cc)
op = cc.get("op")
db = cc.get("source")["db"]
tb = cc.get("source")["table"]
db_tb = cc.get("source")["db"] "." cc.get("source")["table"]
before_data = cc.get("before")
after_data = cc.get("after")
event_query = cc.get("source")["query"]
if "/* dts */" in event_query:
print("检测到debezium标识,这个event是dts同步产生的sql,将跳过")
# continue
# 如果 op=c ,且 before is none ,则这是一个 insert into 语句
# 如果 op=u ,则这是一个 update 语句, 可以改写为 replace into 语句
# 如果 op=d ,且 after is none ,则这是一个 delete 语句
elif after_data is None and op == "d":
print("这是delete语句")
EVENT_SQL = ""
for i, v in enumerate(before_data):
CONDITION = "`" str(v) "`" "=" "'" str(before_data[v]) "'"
EVENT_SQL = EVENT_SQL " and " CONDITION
# print(EVENT_SQL)
EVENT_SQL = "DELETE FROM " db_tb " where 1=1 " EVENT_SQL ";"
# print('提取到的sql ---> ', EVENT_SQL)
elif op in ("c", "u"):
print("这是insert或者update语句,统一转成replace into写法")
VALUES = ""
COLUMNS = ""
for i, v in enumerate(after_data):
COLUMNS = "`" str(v) "`" ","
VALUES = "'" str(after_data[v]).replace(r"'", r"'") "'" ","
EVENT_SQL = (
"INSERT INTO "
db_tb
" ("
COLUMNS[0:-1]
") "
" values "
" ("
VALUES[0:-1]
");"
)
# print('提取到的sql ---> ', EVENT_SQL)
else:
print("未识别的sql类型")
# EVENT_SQL = 'select 1;'
continue
QUERY_SQL = "/* dts */ " EVENT_SQL
# print('python 最终执行的sql ---> ', QUERY_SQL)
with open("./QUERY_SQL.sql", "a ", encoding="utf-8") as f:
f.write(QUERY_SQL "n")
try:
mycursor.execute(QUERY_SQL) # 默认开的自动提交
RUN_COUNTER = 1
print("数据库尝试写入次数 ---> ", RUN_COUNTER)
except Exception as e:
with open("./err.log", "a ", encoding="utf-8") as f:
f.write(str(e) QUERY_SQL "n")
其它
代码语言:shell复制kafka里面的数据来源,是根据《基于debezium的双向复制初探.docx》 这种CDC方式写入的。
这里代码里的数据读取和清洗逻辑,都是和debezium的强绑定的。
写入速度上:使用顺序写入MySQL,自动提交,差不多每分钟3w行记录。
目前测试发现有未知的bug,具体现象:
如果一次性写入1w条记录,则dts到dest的写入过程中不会丢数据
如果一次性写入5w或者更多的记录,则dts到dest的写入过程中会出现丢数据的情况
已经验证过kafka里面的数据是没有缺失的,因此问题出在了dts到dest的写入过程中,但是try except并没有捕获到丢数据的报错日志