使用python+binlog实现正向和反向sql的记录

2024-08-23 09:22:17 浏览数 (1)

背景

我们如果能将binlog全量采集下来,可以便于事件的回溯,也便于审计。

如果我们能在记录binlog的同时把对应的回滚的sql也生成,这样在需要回滚的时候,就可以不用去使用my2sql等工具再次进行处理,增加特殊场景下的数据闪回的处理的效率。

基于这个这个设想,产生了这篇博文。

架构图

大致思路: 使用python-replication 消费MySQL的binlog,并对其进行处理后将生成的结果存到kafka,然后使用clickhouse_sinker去消费kafka数据,将最终结果存到clickhouse里。

archarch

说明:

上图中,其中的clickhouse_sinker也可以使用自己编写python代码去实现对应的consumer功能,ClickHouse也可以使用databend或StarRocks之类的数据据库来替代。

代码实现

注意,下面的代码只是一个最基础的实现,有很多硬编码的地方。

代码语言:python代码运行次数:0复制
# -*- coding: utf-8 -*-

"""
binlogevent.event_type的数值和编码的对应关系:
WRITE_ROWS_EVENT_V2 = 30
UPDATE_ROWS_EVENT_V2 = 31
DELETE_ROWS_EVENT_V2 = 32
"""


import json
import datetime
from kafka import KafkaProducer
import mysql.connector
import logging
import sys

logging.basicConfig(
    level=logging.INFO,
    filename="binlog_audit.log",
    filemode="a",
    format="%(asctime)s - "
    "%(pathname)s[line:%(lineno)d] - "
    "%(levelname)s: %(message)s",
)

from pymysqlreplication import BinLogStreamReader
from pymysqlreplication.row_event import (
    DeleteRowsEvent,
    UpdateRowsEvent,
    WriteRowsEvent,
)

MYSQL_SETTINGS = {"host": "127.0.0.1", "port": 3306, "user": "dts", "passwd": "123456"}

# kafka配置信息
bootstrap_servers = "127.0.0.1:9092"
producer_config = {
    "bootstrap_servers": bootstrap_servers,
    "batch_size": 16384,
    "linger_ms": 10,
    "acks": 1,
    "retries": 3,
}

producer = KafkaProducer(**producer_config)


def get_binlog_pos():
    # 用于获取最新的binlog的位置
    connection = mysql.connector.connect(
        host="127.0.0.1", user="dts", password="123456"
    )

    res = []

    mycursor = connection.cursor()
    mycursor.execute("SHOW MASTER STATUS")
    result = mycursor.fetchone()
    log_file = result[0]
    log_pos = result[1]
    res.append(log_file)
    res.append(log_pos)
    return res


def main():
    try:
        with open("binlog.pos", "r") as f:
            res = f.readlines()
            res1, res2 = res[0].split(",")
            logging.info(f"读取到的之前binlog pos文件信息, binlog文件名:{res1} ,位移: {res2}")

        stream = BinLogStreamReader(
            connection_settings=MYSQL_SETTINGS,
            ignore_decode_errors=True,
            resume_stream=True,
            server_id=311,
            only_events=[DeleteRowsEvent, WriteRowsEvent, UpdateRowsEvent],
            blocking=True,
            log_file=res1,
            log_pos=int(res2),
        )

    # 如果读取之前的binlog.pos失败,则从最新的位置开始消费
    except Exception as e:
        print(str(e))
        logging.info("未找到之前的binlog pos文件信息,从最新的binlog pos开始读取")
        stream = BinLogStreamReader(
            connection_settings=MYSQL_SETTINGS,
            ignore_decode_errors=True,
            resume_stream=True,
            server_id=311,
            only_events=[DeleteRowsEvent, WriteRowsEvent, UpdateRowsEvent],
            blocking=True,
        )

    for binlogevent in stream:
        ts1 = datetime.datetime.fromtimestamp(binlogevent.timestamp)
        ts2 = binlogevent.timestamp
        event_type_code = binlogevent.event_type  # 这里返回的是数值

        if int(event_type_code) == 30:
            event_type = "INSERT"
        elif int(event_type_code) == 31:
            event_type = "UPDATE"
        elif int(event_type_code) == 32:
            event_type = "DELETE"
        else:
            event_type = "不支持的event类型"

        event_size = binlogevent.event_size  # binlog event的大小
        schema = binlogevent.schema
        table = binlogevent.table
        # 这里我们只采集需要关注的库,减少处理数据量
        if schema not in ["sbtest", "percona"]:
            continue
        # rows指的是binlog event涉及到的数据行数(对于大事务在python-replicaiton处理的时候会被拆成多条,这里的值并不准确)
        rows = len(binlogevent.rows)
        binlog_detail = binlogevent._RowsEvent__rows

        msg = dict()
        msg["ts1"] = str(ts1)
        msg["ts2"] = ts2
        msg["event_type"] = event_type
        msg["event_size"] = event_size
        msg["schema_name"] = schema
        msg["table_name"] = table
        msg["row_count"] = rows
        msg["cluster"] = "DBA-TEST"  # 这里加了一个字段,用于标识MySQL所属的集群

        for row in binlog_detail:
            if event_type == "INSERT":
                values = row["values"]
                # 正向SQL
                insert_sql = ""
                for ii in values.items():
                    kv = f"`{ii[0]}`='{ii[1]}'"
                    insert_sql = insert_sql   kv   ","
                forward_sql = (
                    f"/* PY-FORWARD */ INSERT INTO {schema}.{table} SET {insert_sql}".rstrip(
                        ","
                    )
                      ";"
                )
                # print(f"正向SQL --- {forward_sql}")
                msg["forward_sql"] = forward_sql

                # 反向SQL
                delete_sql = ""
                for ii in values.items():
                    kv = f"`{ii[0]}`='{ii[1]}'"
                    delete_sql = delete_sql   kv   " AND "
                rollback_sql = (
                    f"/* PY-ROLLBACK */ DELETE FROM {schema}.{table} WHERE {delete_sql}".rstrip(
                        " AND "
                    )
                      ";"
                )
                # print(f"反向SQL --- {rollback_sql}")
                msg["rollback_sql"] = rollback_sql

            if event_type == "UPDATE":
                before_values = row["before_values"]
                after_values = row["after_values"]

                # 反向sql (回滚sql)
                before_sql = ""
                for ii in before_values.items():
                    kv = f"`{ii[0]}`='{ii[1]}'"
                    before_sql = before_sql   kv   ","
                before_conditions = f"{before_sql}".rstrip(",")

                after_sql = ""
                for ii in after_values.items():
                    kv = f"`{ii[0]}`='{ii[1]}'"
                    after_sql = after_sql   kv   " AND "
                after_conditions = f"{after_sql}".rstrip(",")

                rollback_sql = (
                    f"/* PY-ROLLBACK */ UPDATE {schema}.{table} SET {before_conditions}"
                      " WHERE "
                      after_conditions.rstrip(" AND ")
                      ";"
                )
                # print(rollback_sql)
                msg["rollback_sql"] = rollback_sql

                # 正向sql(原始sql)
                before_sql = ""
                for ii in before_values.items():
                    kv = f"`{ii[0]}`='{ii[1]}'"
                    before_sql = before_sql   kv   " AND "
                before_conditions = f"{before_sql}".rstrip(",")

                after_sql = ""
                for ii in after_values.items():
                    kv = f"`{ii[0]}`='{ii[1]}'"
                    after_sql = after_sql   kv   ","
                after_conditions = f"{after_sql}".rstrip(",")

                forward_sql = (
                    f"/* PY-FORWARD */ UPDATE {schema}.{table} SET {after_conditions}"
                      " WHERE "
                      before_conditions.rstrip(" AND ")
                      ";"
                )
                # print(forward_sql)
                msg["forward_sql"] = forward_sql

            if event_type == "DELETE":
                values = row["values"]

                # 正向sql(原始sql)
                delete_sql = ""
                for ii in values.items():
                    kv = f"`{ii[0]}`='{ii[1]}'"
                    delete_sql = delete_sql   kv   " AND "
                forward_sql = (
                    f"/* PY-FORWARD */ DELETE FROM {schema}.{table} WHERE {delete_sql}".rstrip(
                        " AND "
                    )
                      ";"
                )
                # print(forward_sql)
                msg["forward_sql"] = forward_sql

                # 反向sql(回滚sql)
                insert_sql = ""
                for ii in values.items():
                    kv = f"`{ii[0]}`='{ii[1]}'"
                    insert_sql = insert_sql   kv   ","
                rollback_sql = (
                    f"/* PY-ROLLBACK */ INSERT INTO {schema}.{table} SET {insert_sql}".rstrip(
                        ","
                    )
                      ";"
                )
                # print(rollback_sql)
                msg["rollback_sql"] = rollback_sql

            producer.send(
                "binlog_audit",
                value=json.dumps(msg).encode("utf-8"),
            )
            
            # 记录下binlog位移
            start_pos = get_binlog_pos()
            with open("binlog.pos", "w ") as f:
                f.writelines(str(start_pos[0]   ","   str(start_pos[1])))

    stream.close()


if __name__ == "__main__":
    main()

注意: 为了保证数据的有序性,我们的kakfka需要使用单partition方式,但是这样的话,处理效率就会比较低。

一个比较好的优化方法:

在创建kafka topic的时候创建多个partition;同时修改上面的python代码,通过自定义partition策略(例如基于table进行分发,同一个table的binlog进到相同的partition里面,这样就可以保证同一个table的binlog是有序的)。

优化后的代码,敬请期待。

clickhouse_sinker的部署

我这里用的版本是 clickhouse_sinker_3.1.8

binlog_audit.hjson 内容如下:

代码语言:txt复制
{
    clickhouse: {
        hosts: [
            [
                127.0.0.1
            ]
        ]
        port: 9000
        db: default
        username: ""
        password: ""
        retryTimes: 0
    }
    kafka: {
        brokers: 127.0.0.1:9092
    }
    task: {
        name: binlog_audit
        topic: binlog_audit
        consumerGroup: clickhouse_sinker_binlog_audit
        earliest: false
        parser: json
        autoSchema: true
        tableName: binlog_audit
        excludeColumns: []
        bufferSize: 50000
        "dynamicSchema": {
            "enable": true,
            "maxDims": 1024,
            "whiteList": "^[0-9A-Za-z_] $",
            "blackList": "@"
        },
        flushInterval: 1,
    }
    logLevel: info
}

前台启动:

代码语言:txt复制
./clickhouse_sinker --local-cfg-file binlog_audit.hjson

clickhouse中的记录的binlog明细

0 人点赞