生产上,某些情况下,可能会出现异常sql大量查询数据库,占用大量的cpu或者磁盘IO,这种情况下需要适当的止损。
如果有proxy的话,一般可以在proxy层面通过sql指纹进行限流或者熔断(例如proxysql就可以对指定的sql指纹进行阻断)。
如果没有proxy,则可以考虑在数据库层面添加持续kill会话的脚本,
下面就是一个持续kill符合条件的sql会话的例子:
代码语言:txt复制import datetime
import time as t_time
import mysql.connector
def kill_sessions(instance_host, instance_port, user, passwd, kill_condition, interval, time_range):
try:
mydb = mysql.connector.connect(
host=instance_host,
port=instance_port,
user=user,
passwd=passwd,
ssl_disabled=True,
)
except Exception as e:
print(f"Error initializing connection: {e}")
start_time = t_time.time()
while True:
current_time = t_time.time()
elapsed_time = current_time - start_time # 计算已经过去的时间
# 如果已经超过time_range指定的秒数,则退出循环
if elapsed_time > time_range:
print("已运行超过指定的阈值,退出循环。")
break
try:
# 查询活动会话
sql = ("SELECT ID, USER, HOST, DB, COMMAND, TIME, STATE, INFO "
"FROM information_schema.processlist "
"WHERE 1=1 AND ID != CONNECTION_ID() and " kill_condition)
cursor = mydb.cursor()
cursor.execute(sql)
processes = cursor.fetchall()
for process in processes:
id, user, host, db, command, time, state, info = process
if True:
print(
f"{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}, "
f"ID: {id},user: {user},host:{host},db: {db}, Command: {command},"
f"state: {state},Time: {time}s, Info: {info}")
try:
kill_query = f"KILL {id}"
cursor.execute(kill_query)
except Exception as e:
print(str(e))
except mysql.connector.Error as err:
print(f"Database error: {err}")
t_time.sleep(interval)
if __name__ == "__main__":
kill_sessions('192.168.31.181',
3306,
'dts',
'123456',
"time > 1 and info like '%select%sleep%' ",
0.5,
60)
"""
在celery里面的入参例子
对sql模糊匹配 select%t_order�sc 的,执行kill操作,每次检测间隔0.5秒,持续60秒
注意:
1、interval不能太低,information_schema.processlist表高频查询会导致性能问题
2、user需要配置成最高权限的账号(或者是慢查询对应的业务账号),否则可能出现kill失败,报错提示not thread owner
{
"instance_host": "192.168.31.181",
"instance_port": 3306,
"user": "dts",
"passwd": "dts",
"kill_condition": "time > 1 and info like '%select%t_order�sc%' ",
"interval": 0.5,
"time_range": 60
}
"""