生产上,建议对pg的序列使用率做监控和告警。
下面的这个巡检sql 来自《红石PG公众号》,
具体sql内容如下:
代码语言:sql复制SELECT
seq.relname AS sequence,
format_type(s.seqtypid, NULL) sequence_datatype,
CONCAT(tbl.relname, '.', att.attname) AS owned_by,
format_type(att.atttypid, atttypmod) AS column_datatype,
pg_sequence_last_value(seq.oid::regclass) AS last_sequence_value,
TO_CHAR((
CASE WHEN format_type(s.seqtypid, NULL) = 'smallint' THEN
(pg_sequence_last_value(seq.relname::regclass) / 32767::float)
WHEN format_type(s.seqtypid, NULL) = 'integer' THEN
(pg_sequence_last_value(seq.relname::regclass) / 2147483647::float)
WHEN format_type(s.seqtypid, NULL) = 'bigint' THEN
(pg_sequence_last_value(seq.relname::regclass) / 9223372036854775807::float)
END) * 100, 'fm9999999999999999999990D00%') AS sequence_percent,
TO_CHAR((
CASE WHEN format_type(att.atttypid, NULL) = 'smallint' THEN
(pg_sequence_last_value(seq.relname::regclass) / 32767::float)
WHEN format_type(att.atttypid, NULL) = 'integer' THEN
(pg_sequence_last_value(seq.relname::regclass) / 2147483647::float)
WHEN format_type(att.atttypid, NULL) = 'bigint' THEN
(pg_sequence_last_value(seq.relname::regclass) / 9223372036854775807::float)
END) * 100, 'fm9999999999999999999990D00%') AS column_percent
FROM
pg_depend d
JOIN pg_class AS seq ON seq.relkind = 'S'
AND seq.oid = d.objid
JOIN pg_class AS tbl ON tbl.relkind = 'r'
AND tbl.oid = d.refobjid
JOIN pg_attribute AS att ON att.attrelid = d.refobjid
AND att.attnum = d.refobjsubid
JOIN pg_sequence s ON s.seqrelid = seq.oid
WHERE
d.deptype = 'a'
AND d.classid = 'pg_class'::regclass::oid
需要注意的是,需要先set search_path 把各个schema都拼起来,不然可能遇到sql提示找不到某个sequence的情况。
有了上面的sql后,就可以用程序包装下,加到巡检平台里,大致代码如下:
代码语言:python代码运行次数:0复制
# coding=utf-8
import json
import os
import sys
import psycopg2
import requests
from dingtalkchatbot.chatbot import DingtalkChatbot
from func_timeout import func_set_timeout
from requests.auth import HTTPBasicAuth
project_path = os.path.abspath("..")
sys.path.append(project_path)
os.environ["DJANGO_SETTINGS_MODULE"] = "db_ops_platform.settings"
import django
django.setup()
from db_ops_platform import configs
@func_set_timeout(60)
def get_top_seq_usage():
1、从平台接口,获取到需要执行巡检的pg的信息(地址、端口等)
# res 就是获取到的pg的信息
2、遍历每个实例的每个库去做检测并告警
for i, item in enumerate(res):
instance_id = item["instance_id"]
instance_desc = item["instance_desc"]
instance_addr = item["instance_addr"]
instance_port = item["instance_port"]
instance_cluster = item["instance_cluster"]
instance_role = item["instance_role"]
instance_engine = item["instance_engine"]
instance_monit = item["instance_monit"]
if (
instance_engine != "PostgreSQL"
or instance_monit != "ON"
or instance_role != "Primary"
):
continue
pg_conn = psycopg2.connect(
host=instance_addr,
port=instance_port,
user=configs.REMOTE_PG_USER,
password=configs.REMOTE_PG_PASSWD,
database="postgres",
)
pg_cursor = pg_conn.cursor()
pg_cursor.execute(
"""
select datname from pg_database where datname not in ('template1','template0') ;
"""
)
myresult = pg_cursor.fetchall()
db_list = []
for i in myresult:
db_list.append(i[0])
for db in db_list:
pg_conn = psycopg2.connect(
host=instance_addr,
port=instance_port,
user=configs.REMOTE_PG_USER,
password=configs.REMOTE_PG_PASSWD,
database=db,
)
pg_cursor = pg_conn.cursor()
# 需要先把当前库下的searth_path设置为所有的schema,否则查询会提示部分sequence找不到
pg_cursor.execute(
"""
SELECT n.nspname AS "Name"FROM pg_catalog.pg_namespace n WHERE n.nspname !~ '^pg_' AND n.nspname <> 'information_schema';
"""
)
schema_res = pg_cursor.fetchall()
schema_str = ""
for ii in schema_res:
schema_str = "'" ii[0] "'" ","
pg_cursor.execute("set search_path=" schema_str[0:-1])
seq_usage_sql = """
SELECT
seq.relname AS sequence,
format_type(s.seqtypid, NULL) sequence_datatype,
CONCAT(tbl.relname, '.', att.attname) AS owned_by,
format_type(att.atttypid, atttypmod) AS column_datatype,
pg_sequence_last_value(seq.oid::regclass) AS last_sequence_value,
TO_CHAR((
CASE WHEN format_type(s.seqtypid, NULL) = 'smallint' THEN
(pg_sequence_last_value(seq.relname::regclass) / 32767::float)
WHEN format_type(s.seqtypid, NULL) = 'integer' THEN
(pg_sequence_last_value(seq.relname::regclass) / 2147483647::float)
WHEN format_type(s.seqtypid, NULL) = 'bigint' THEN
(pg_sequence_last_value(seq.relname::regclass) / 9223372036854775807::float)
END) * 100, 'fm9999999999999999999990D00%') AS sequence_percent,
TO_CHAR((
CASE WHEN format_type(att.atttypid, NULL) = 'smallint' THEN
(pg_sequence_last_value(seq.relname::regclass) / 32767::float)
WHEN format_type(att.atttypid, NULL) = 'integer' THEN
(pg_sequence_last_value(seq.relname::regclass) / 2147483647::float)
WHEN format_type(att.atttypid, NULL) = 'bigint' THEN
(pg_sequence_last_value(seq.relname::regclass) / 9223372036854775807::float)
END) * 100, 'fm9999999999999999999990D00%') AS column_percent
FROM
pg_depend d
JOIN pg_class AS seq ON seq.relkind = 'S'
AND seq.oid = d.objid
JOIN pg_class AS tbl ON tbl.relkind = 'r'
AND tbl.oid = d.refobjid
JOIN pg_attribute AS att ON att.attrelid = d.refobjid
AND att.attnum = d.refobjsubid
JOIN pg_sequence s ON s.seqrelid = seq.oid
WHERE
d.deptype = 'a'
AND d.classid = 'pg_class'::regclass::oid
"""
pg_cursor.execute(seq_usage_sql)
res = pg_cursor.fetchall()
for i in res:
(
sequence,
sequence_datatype,
owned_by,
column_datatype,
last_sequence_value,
sequence_percent,
column_percent,
) = i
if sequence_percent and float(sequence_percent.replace("%", "")) > 85.0:
print(f"warn ,{sequence}, {sequence_percent}")
content_full = (
"类型: PG sequence 使用率过高告警"
"nn"
"集群: "
str(instance_cluster)
"nn"
"实例名: "
instance_desc
"nn"
"实例ID: "
str(instance_id)
"nn"
"角色: "
instance_role
"nn"
"库名: "
str(db)
"nn"
"序列名: "
str(sequence)
"nn"
"序列类型: "
str(sequence_datatype)
"nn"
"序列的使用率: "
str(sequence_percent)
)
try:
msg = DingtalkChatbot(
webhook=configs.default_alerts_webhook,
secret=configs.default_dingding_alert_secret,
fail_notice=True,
)
msg.send_markdown(
title="生产PG 序列使用率过高告警",
text=str(content_full),
is_at_all=False,
)
except Exception as e:
print(str(e))
pg_cursor.close()
pg_conn.close()
if __name__ == "__main__":
try:
get_top_seq_usage()
except Exception as e:
print(str(e))
# 发送钉钉告警
msg = DingtalkChatbot(
webhook=configs.default_alerts_webhook,
secret=configs.default_dingding_alert_secret,
fail_notice=True,
)
msg.send_markdown(
title="db_ops_platform脚本执行报错",
text=str(os.path.basename(__file__) "报错原因:" str(e)),
is_at_all=False,
)