DTS双向同步的实现思路探索

2023-12-08 21:21:00 浏览数 (2)

某云厂商的DTS白皮书介绍:

与单向增量同步类似, 模拟Slave来获取增量数据。 同时UDTS对写下去的数据做标记,当有新的Binlog Event的时候, 会先检查是否有标记。 如果有标记则说明是循环数据,直接丢弃,如果没有标记则加上标记写到对端。

据此,我们可以基于debezium来实现一个双向同步脚本DEMO(只是demo代码)。

debezium捕获到的binlog events类型mapping如下:

https://debezium.io/documentation/reference/stable/connectors/mysql.html#mysql-events

u update

c insert

d delete

debezium记录的binlog events例子如下:

代码语言:python代码运行次数:0复制

-- insert 例子
{
    "before":"none",
    "after":{
        "id":160120,
        "b":"provident optio veritatis conseq"
    },
    "source":{
        "version":"2.1.1.Final",
        "connector":"mysql",
        "name":"yyx",
        "ts_ms":1672407157000,
        "snapshot":"false",
        "db":"db1",
        "sequence":"none",
        "table":"t1",
        "server_id":300,
        "gtid":"none",
        "file":"mysql-bin.000004",
        "pos":28589840,
        "row":9,
        "thread":182
    },
    "op":"c",
    "ts_ms":1672407157196,
    "transaction":"none"
}

-- delete 例子
{
    "before":{
        "id":114381,
        "b":"111"
    },
    "after":"None",
    "source":{
        "version":"2.1.1.Final",
        "connector":"mysql",
        "name":"yyx",
        "ts_ms":1672407489000,
        "snapshot":"false",
        "db":"db1",
        "sequence":"None",
        "table":"t1",
        "server_id":300,
        "gtid":"None",
        "file":"mysql-bin.000004",
        "pos":28590986,
        "row":0,
        "thread":154,
        "query":"delete from t1 where id=114381"
    },
    "op":"d",
    "ts_ms":1672407489300,
    "transaction":"None"
}

-- update 例子
{
    "before":{
        "id":114381,
        "b":"itaque error sit sunt aliquid it"
    },
    "after":{
        "id":114381,
        "b":"111"
    },
    "source":{
        "version":"2.1.1.Final",
        "connector":"mysql",
        "name":"yyx",
        "ts_ms":1672407316000,
        "snapshot":"false",
        "db":"db1",
        "sequence":"None",
        "table":"t1",
        "server_id":300,
        "gtid":"None",
        "file":"mysql-bin.000004",
        "pos":28590602,
        "row":0,
        "thread":154,
        "query":"update t1 set b=111 where id=114381"
    },
    "op":"u",
    "ts_ms":1672407316221,
    "transaction":"None"
}

-- replace 不存在记录
{
    "before":"None",
    "after":{
        "id":114381,
        "b":"aaaaa"
    },
    "source":{
        "version":"2.1.1.Final",
        "connector":"mysql",
        "name":"yyx",
        "ts_ms":1672407628000,
        "snapshot":"false",
        "db":"db1",
        "sequence":"None",
        "table":"t1",
        "server_id":300,
        "gtid":"None",
        "file":"mysql-bin.000004",
        "pos":28591342,
        "row":0,
        "thread":154,
        "query":"replace into t1 (id,b) values(114381,"aaaaa")"
    },
    "op":"c",
    "ts_ms":1672407628115,
    "transaction":"None"
}

-- replace 已存在记录
{
    "before":{
        "id":114381,
        "b":"aaaaa"
    },
    "after":{
        "id":114381,
        "b":"bbbbbb"
    },
    "source":{
        "version":"2.1.1.Final",
        "connector":"mysql",
        "name":"yyx",
        "ts_ms":1672407717000,
        "snapshot":"false",
        "db":"db1",
        "sequence":"None",
        "table":"t1",
        "server_id":300,
        "gtid":"None",
        "file":"mysql-bin.000004",
        "pos":28591701,
        "row":0,
        "thread":154,
        "query":"replace into t1 (id,b) values(114381,"bbbbbb")"
    },
    "op":"u",
    "ts_ms":1672407717924,
    "transaction":"None"
}

如果  op=c ,且 before is none ,则这是一个 insert into 语句
如果  op=u ,则这是一个 update 语句, 可以改写为 replace into 语句
如果  op=d ,且 after is none ,则这是一个 delete 语句

python代码实现的demo (目前看还有些bug,见文末)

配置文件 configs.py

代码语言:python代码运行次数:0复制
# 下游MySQL的信息
mysql_host = "192.168.31.181"
mysql_port = "3306"
mysql_user = "dts"
mysql_pass = "dts"
mysql_db = "test"

# kafka连接信息
kafka_topic_name = "yyx.db1.t1"
kafka_group_id = "my-group"
kafka_bootstrap_servers = ["192.168.31.181:9092"]
kafka_auto_offset_reset = "earliest"    # 可选值 earliest  latest
kafka_auto_commit_interval_ms = 100  # 注意这里是整型

主程序 main_program.py

代码语言:python代码运行次数:0复制
import json
import logging

import configs
import mysql.connector
from kafka import KafkaConsumer

logging.basicConfig(
    level=logging.DEBUG,
    filename="dts.log",
    filemode="a",
    format="%(asctime)s - "
    "%(pathname)s[line:%(lineno)d] - "
    "%(levelname)s: %(message)s",
)

mydb = mysql.connector.connect(
    host=configs.mysql_host,
    port=configs.mysql_port,
    user=configs.mysql_user,
    passwd=configs.mysql_pass,
    database=configs.mysql_db,
    autocommit=True,
)
mycursor = mydb.cursor()

consumer = KafkaConsumer(
    configs.kafka_topic_name,
    group_id=configs.kafka_group_id,
    bootstrap_servers=configs.kafka_bootstrap_servers,
    auto_offset_reset=configs.kafka_auto_offset_reset,
    auto_commit_interval_ms=configs.kafka_auto_commit_interval_ms,
    fetch_max_bytes=524288000,
    enable_auto_commit=True,
    max_poll_records=1000,
    send_buffer_bytes=1310720,
    receive_buffer_bytes=327680,
)


RUN_COUNTER = 0  # 排查bug,临时加了个counter变量,用于统计写入数据库的次数

for message in consumer:
    #  print(f'KEY: {message.key}nVALUE:{message.value}')
    print(f"KEY: {message.key}")
    cc = json.loads(message.value)
    # print(cc)

    op = cc.get("op")

    db = cc.get("source")["db"]
    tb = cc.get("source")["table"]

    db_tb = cc.get("source")["db"]   "."   cc.get("source")["table"]

    before_data = cc.get("before")
    after_data = cc.get("after")

    event_query = cc.get("source")["query"]

    if "/* dts */" in event_query:
        print("检测到debezium标识,这个event是dts同步产生的sql,将跳过")
        # continue

    # 如果  op=c ,且 before is none ,则这是一个 insert into 语句
    # 如果  op=u ,则这是一个 update 语句, 可以改写为 replace into 语句
    # 如果  op=d ,且 after is none ,则这是一个 delete 语句

    elif after_data is None and op == "d":
        print("这是delete语句")
        EVENT_SQL = ""
        for i, v in enumerate(before_data):
            CONDITION = "`"   str(v)   "`"   "="   "'"   str(before_data[v])   "'"
            EVENT_SQL = EVENT_SQL   " and "   CONDITION
            # print(EVENT_SQL)
        EVENT_SQL = "DELETE FROM "   db_tb   " where 1=1 "   EVENT_SQL   ";"
        # print('提取到的sql ---> ', EVENT_SQL)

    elif op in ("c", "u"):
        print("这是insert或者update语句,统一转成replace into写法")
        VALUES = ""
        COLUMNS = ""
        for i, v in enumerate(after_data):
            COLUMNS  = "`"   str(v)   "`"   ","
            VALUES  = "'"   str(after_data[v]).replace(r"'", r"'")   "'"   ","
        EVENT_SQL = (
            "INSERT INTO "
              db_tb
              " ("
              COLUMNS[0:-1]
              ") "
              " values "
              " ("
              VALUES[0:-1]
              ");"
        )
        # print('提取到的sql ---> ', EVENT_SQL)

    else:
        print("未识别的sql类型")
        # EVENT_SQL = 'select 1;'
        continue

    QUERY_SQL = "/* dts */ "   EVENT_SQL

    # print('python 最终执行的sql ---> ', QUERY_SQL)

    with open("./QUERY_SQL.sql", "a ", encoding="utf-8") as f:
        f.write(QUERY_SQL   "n")
    try:
        mycursor.execute(QUERY_SQL)  # 默认开的自动提交
        RUN_COUNTER  = 1
        print("数据库尝试写入次数 ---> ", RUN_COUNTER)
    except Exception as e:
        with open("./err.log", "a ", encoding="utf-8") as f:
            f.write(str(e)   QUERY_SQL   "n")

其它

代码语言:shell复制
kafka里面的数据来源,是根据《基于debezium的双向复制初探.docx》 这种CDC方式写入的。
这里代码里的数据读取和清洗逻辑,都是和debezium的强绑定的。

写入速度上:使用顺序写入MySQL,自动提交,差不多每分钟3w行记录。

目前测试发现有未知的bug,具体现象:
如果一次性写入1w条记录,则dts到dest的写入过程中不会丢数据
如果一次性写入5w或者更多的记录,则dts到dest的写入过程中会出现丢数据的情况
已经验证过kafka里面的数据是没有缺失的,因此问题出在了dts到dest的写入过程中,但是try except并没有捕获到丢数据的报错日志

0 人点赞