背景
我们如果能将binlog全量采集下来,可以便于事件的回溯,也便于审计。
如果我们能在记录binlog的同时把对应的回滚的sql也生成,这样在需要回滚的时候,就可以不用去使用my2sql等工具再次进行处理,增加特殊场景下的数据闪回的处理的效率。
基于这个这个设想,产生了这篇博文。
架构图
大致思路: 使用python-replication 消费MySQL的binlog,并对其进行处理后将生成的结果存到kafka,然后使用clickhouse_sinker去消费kafka数据,将最终结果存到clickhouse里。
说明:
上图中,其中的clickhouse_sinker也可以使用自己编写python代码去实现对应的consumer功能,ClickHouse也可以使用databend或StarRocks之类的数据据库来替代。
代码实现
注意,下面的代码只是一个最基础的实现,有很多硬编码的地方。
代码语言:python代码运行次数:0复制# -*- coding: utf-8 -*-
"""
binlogevent.event_type的数值和编码的对应关系:
WRITE_ROWS_EVENT_V2 = 30
UPDATE_ROWS_EVENT_V2 = 31
DELETE_ROWS_EVENT_V2 = 32
"""
import json
import datetime
from kafka import KafkaProducer
import mysql.connector
import logging
import sys
logging.basicConfig(
level=logging.INFO,
filename="binlog_audit.log",
filemode="a",
format="%(asctime)s - "
"%(pathname)s[line:%(lineno)d] - "
"%(levelname)s: %(message)s",
)
from pymysqlreplication import BinLogStreamReader
from pymysqlreplication.row_event import (
DeleteRowsEvent,
UpdateRowsEvent,
WriteRowsEvent,
)
MYSQL_SETTINGS = {"host": "127.0.0.1", "port": 3306, "user": "dts", "passwd": "123456"}
# kafka配置信息
bootstrap_servers = "127.0.0.1:9092"
producer_config = {
"bootstrap_servers": bootstrap_servers,
"batch_size": 16384,
"linger_ms": 10,
"acks": 1,
"retries": 3,
}
producer = KafkaProducer(**producer_config)
def get_binlog_pos():
# 用于获取最新的binlog的位置
connection = mysql.connector.connect(
host="127.0.0.1", user="dts", password="123456"
)
res = []
mycursor = connection.cursor()
mycursor.execute("SHOW MASTER STATUS")
result = mycursor.fetchone()
log_file = result[0]
log_pos = result[1]
res.append(log_file)
res.append(log_pos)
return res
def main():
try:
with open("binlog.pos", "r") as f:
res = f.readlines()
res1, res2 = res[0].split(",")
logging.info(f"读取到的之前binlog pos文件信息, binlog文件名:{res1} ,位移: {res2}")
stream = BinLogStreamReader(
connection_settings=MYSQL_SETTINGS,
ignore_decode_errors=True,
resume_stream=True,
server_id=311,
only_events=[DeleteRowsEvent, WriteRowsEvent, UpdateRowsEvent],
blocking=True,
log_file=res1,
log_pos=int(res2),
)
# 如果读取之前的binlog.pos失败,则从最新的位置开始消费
except Exception as e:
print(str(e))
logging.info("未找到之前的binlog pos文件信息,从最新的binlog pos开始读取")
stream = BinLogStreamReader(
connection_settings=MYSQL_SETTINGS,
ignore_decode_errors=True,
resume_stream=True,
server_id=311,
only_events=[DeleteRowsEvent, WriteRowsEvent, UpdateRowsEvent],
blocking=True,
)
for binlogevent in stream:
ts1 = datetime.datetime.fromtimestamp(binlogevent.timestamp)
ts2 = binlogevent.timestamp
event_type_code = binlogevent.event_type # 这里返回的是数值
if int(event_type_code) == 30:
event_type = "INSERT"
elif int(event_type_code) == 31:
event_type = "UPDATE"
elif int(event_type_code) == 32:
event_type = "DELETE"
else:
event_type = "不支持的event类型"
event_size = binlogevent.event_size # binlog event的大小
schema = binlogevent.schema
table = binlogevent.table
# 这里我们只采集需要关注的库,减少处理数据量
if schema not in ["sbtest", "percona"]:
continue
# rows指的是binlog event涉及到的数据行数(对于大事务在python-replicaiton处理的时候会被拆成多条,这里的值并不准确)
rows = len(binlogevent.rows)
binlog_detail = binlogevent._RowsEvent__rows
msg = dict()
msg["ts1"] = str(ts1)
msg["ts2"] = ts2
msg["event_type"] = event_type
msg["event_size"] = event_size
msg["schema_name"] = schema
msg["table_name"] = table
msg["row_count"] = rows
msg["cluster"] = "DBA-TEST" # 这里加了一个字段,用于标识MySQL所属的集群
for row in binlog_detail:
if event_type == "INSERT":
values = row["values"]
# 正向SQL
insert_sql = ""
for ii in values.items():
kv = f"`{ii[0]}`='{ii[1]}'"
insert_sql = insert_sql kv ","
forward_sql = (
f"/* PY-FORWARD */ INSERT INTO {schema}.{table} SET {insert_sql}".rstrip(
","
)
";"
)
# print(f"正向SQL --- {forward_sql}")
msg["forward_sql"] = forward_sql
# 反向SQL
delete_sql = ""
for ii in values.items():
kv = f"`{ii[0]}`='{ii[1]}'"
delete_sql = delete_sql kv " AND "
rollback_sql = (
f"/* PY-ROLLBACK */ DELETE FROM {schema}.{table} WHERE {delete_sql}".rstrip(
" AND "
)
";"
)
# print(f"反向SQL --- {rollback_sql}")
msg["rollback_sql"] = rollback_sql
if event_type == "UPDATE":
before_values = row["before_values"]
after_values = row["after_values"]
# 反向sql (回滚sql)
before_sql = ""
for ii in before_values.items():
kv = f"`{ii[0]}`='{ii[1]}'"
before_sql = before_sql kv ","
before_conditions = f"{before_sql}".rstrip(",")
after_sql = ""
for ii in after_values.items():
kv = f"`{ii[0]}`='{ii[1]}'"
after_sql = after_sql kv " AND "
after_conditions = f"{after_sql}".rstrip(",")
rollback_sql = (
f"/* PY-ROLLBACK */ UPDATE {schema}.{table} SET {before_conditions}"
" WHERE "
after_conditions.rstrip(" AND ")
";"
)
# print(rollback_sql)
msg["rollback_sql"] = rollback_sql
# 正向sql(原始sql)
before_sql = ""
for ii in before_values.items():
kv = f"`{ii[0]}`='{ii[1]}'"
before_sql = before_sql kv " AND "
before_conditions = f"{before_sql}".rstrip(",")
after_sql = ""
for ii in after_values.items():
kv = f"`{ii[0]}`='{ii[1]}'"
after_sql = after_sql kv ","
after_conditions = f"{after_sql}".rstrip(",")
forward_sql = (
f"/* PY-FORWARD */ UPDATE {schema}.{table} SET {after_conditions}"
" WHERE "
before_conditions.rstrip(" AND ")
";"
)
# print(forward_sql)
msg["forward_sql"] = forward_sql
if event_type == "DELETE":
values = row["values"]
# 正向sql(原始sql)
delete_sql = ""
for ii in values.items():
kv = f"`{ii[0]}`='{ii[1]}'"
delete_sql = delete_sql kv " AND "
forward_sql = (
f"/* PY-FORWARD */ DELETE FROM {schema}.{table} WHERE {delete_sql}".rstrip(
" AND "
)
";"
)
# print(forward_sql)
msg["forward_sql"] = forward_sql
# 反向sql(回滚sql)
insert_sql = ""
for ii in values.items():
kv = f"`{ii[0]}`='{ii[1]}'"
insert_sql = insert_sql kv ","
rollback_sql = (
f"/* PY-ROLLBACK */ INSERT INTO {schema}.{table} SET {insert_sql}".rstrip(
","
)
";"
)
# print(rollback_sql)
msg["rollback_sql"] = rollback_sql
producer.send(
"binlog_audit",
value=json.dumps(msg).encode("utf-8"),
)
# 记录下binlog位移
start_pos = get_binlog_pos()
with open("binlog.pos", "w ") as f:
f.writelines(str(start_pos[0] "," str(start_pos[1])))
stream.close()
if __name__ == "__main__":
main()
注意: 为了保证数据的有序性,我们的kakfka需要使用单partition方式,但是这样的话,处理效率就会比较低。
一个比较好的优化方法:
在创建kafka topic的时候创建多个partition;同时修改上面的python代码,通过自定义partition策略(例如基于table进行分发,同一个table的binlog进到相同的partition里面,这样就可以保证同一个table的binlog是有序的)。
优化后的代码,敬请期待。
clickhouse_sinker的部署
我这里用的版本是 clickhouse_sinker_3.1.8
binlog_audit.hjson 内容如下:
代码语言:txt复制{
clickhouse: {
hosts: [
[
127.0.0.1
]
]
port: 9000
db: default
username: ""
password: ""
retryTimes: 0
}
kafka: {
brokers: 127.0.0.1:9092
}
task: {
name: binlog_audit
topic: binlog_audit
consumerGroup: clickhouse_sinker_binlog_audit
earliest: false
parser: json
autoSchema: true
tableName: binlog_audit
excludeColumns: []
bufferSize: 50000
"dynamicSchema": {
"enable": true,
"maxDims": 1024,
"whiteList": "^[0-9A-Za-z_] $",
"blackList": "@"
},
flushInterval: 1,
}
logLevel: info
}
前台启动:
代码语言:txt复制./clickhouse_sinker --local-cfg-file binlog_audit.hjson