pt-deadlock-logger 用起来不太方便,主要是和我们的平台结合不够好,因此参考它的逻辑,我们使用python重新写了个类似功能。
大致逻辑:
1、获取生产环境需要监控巡检的MySQL实例信息
2、循环连接上去,执行check_deadlock函数,在里面判断这个死锁是否已经存在(根据时间戳作为key名,通过redis key判断,如果不存在则发邮件,并且把这个时间戳作为key存到redis里面)。
具体代码如下:
代码语言:txt复制# coding=utf-8
import json
import os
import re
import sys
import time
import redis
project_path = os.path.abspath('./.')
sys.path.append(project_path)
os.environ['DJANGO_SETTINGS_MODULE'] = 'dba-platform.settings'
import django
django.setup()
from dba-platform import configs
from mydb import tasks
from utils import db_conn
def check_deadlock(instance_cluster, instance_addr, instance_port, file_name):
with open(file_name, 'r', encoding='utf-8') as f:
text = str(f.readlines())
start_index = text.find("LATEST DETECTED DEADLOCK")
end_index = text.find("TRANSACTIONS")
extracted_text = text[start_index:end_index].strip()
res = extracted_text.replace(r"n", 'rn').replace("', '", '')
# 找到类似这种 2020-12-25 13:45:56 0x7fc0d7cf1700 截取出来把提取出来的时间作为redis的key,防止重复告警
pattern = r'bd{4}-d{2}-d{2} d{2}:d{2}:d{2}b'
match = re.search(pattern, res)
if match:
first_date_time = match.group()
# 把MySQL实例id和first_date_time拼起来,作为redis的key名称,可以不用设置过期时间,因为deadlock的情况会很少
redis_key = str(instance_addr) "-" str(instance_port) "-" first_date_time.replace(' ', '-').replace(
':', '-')
r = redis.Redis(
host=configs.REDIS_HOST,
port=configs.REDIS_PORT,
db=configs.REDIS_DB,
password=configs.REDIS_PASS,
socket_connect_timeout=10,
decode_responses=True,
)
if r.get(redis_key):
print("key已存在,说明这个deadlock不是最新的,本次不会邮件通知")
else:
r.set(redis_key, "1")
# 异步发送邮件
tasks.celery_sendemail.delay(
recipients=configs.dba_group,
cc=None,
mail_content=dict(
{
"subject": "dba-platform-生产-死锁告警-"
str(instance_cluster) "-" str(instance_addr) "-" str(instance_port),
"content_text": res,
}
),
)
def collect():
# 假设res是已经从cmdb或者dba-platform获取到的db_instance清单
for i, item in enumerate(res):
instance_cluster = item['instance_cluster']
instance_id = item['instance_id']
instance_addr = item['instance_addr']
instance_port = item['instance_port']
instance_engine = item['instance_engine']
instance_monit = item['instance_monit']
instance_env = item['instance_env']
if instance_engine != 'MySQL' or instance_monit != 'ON' or instance_env != '生产':
continue
# 连接远程RDS数据库采集信息
print(instance_addr)
try:
mydb = db_conn.conn_db(
host=instance_addr,
port=instance_port,
user=configs.REMOTE_MYSQL_DB_USER,
passwd=configs.REMOTE_MYSQL_DB_PASSWORD,
)
mycursor = mydb.cursor()
except Exception as e:
print(str(e))
continue
mycursor.execute('show engine innodb status;')
myresult = mycursor.fetchall()
now = int(time.time())
timeStruct = time.localtime(now)
strTime = time.strftime("%Y%m%d-%H%M", timeStruct)
file_name = configs.engine_base_path str(instance_id) '_' strTime '.log'
for x in myresult:
# 去掉里面的n,拆成多行显式
xx = str(x[2]).replace('\n', 'n')
with open(file_name, 'a ') as f:
f.write(str(xx))
check_deadlock(instance_cluster, instance_addr, instance_port, file_name)
mycursor.close()
mydb.close()
if __name__ == '__main__':
collect()
GreatSQL的这篇讲的也比较好,推荐阅读。