统计PG中的sequence的使用情况

2024-03-05 14:35:22 浏览数 (2)

生产上,建议对pg的序列使用率做监控和告警。

下面的这个巡检sql 来自《红石PG公众号》,

具体sql内容如下:

代码语言:sql复制
SELECT
                    seq.relname AS sequence,
                    format_type(s.seqtypid, NULL) sequence_datatype,
                    CONCAT(tbl.relname, '.', att.attname) AS owned_by,
                    format_type(att.atttypid, atttypmod) AS column_datatype,
                    pg_sequence_last_value(seq.oid::regclass) AS last_sequence_value,
                    TO_CHAR((
                        CASE WHEN format_type(s.seqtypid, NULL) = 'smallint' THEN
                            (pg_sequence_last_value(seq.relname::regclass) / 32767::float)
                        WHEN format_type(s.seqtypid, NULL) = 'integer' THEN
                            (pg_sequence_last_value(seq.relname::regclass) / 2147483647::float)
                        WHEN format_type(s.seqtypid, NULL) = 'bigint' THEN
                            (pg_sequence_last_value(seq.relname::regclass) / 9223372036854775807::float)
                        END) * 100, 'fm9999999999999999999990D00%') AS sequence_percent,
                    TO_CHAR((
                        CASE WHEN format_type(att.atttypid, NULL) = 'smallint' THEN
                            (pg_sequence_last_value(seq.relname::regclass) / 32767::float)
                        WHEN format_type(att.atttypid, NULL) = 'integer' THEN
                            (pg_sequence_last_value(seq.relname::regclass) / 2147483647::float)
                        WHEN format_type(att.atttypid, NULL) = 'bigint' THEN
                            (pg_sequence_last_value(seq.relname::regclass) / 9223372036854775807::float)
                        END) * 100, 'fm9999999999999999999990D00%') AS column_percent
                FROM
                    pg_depend d
                    JOIN pg_class AS seq ON seq.relkind = 'S'
                        AND seq.oid = d.objid
                    JOIN pg_class AS tbl ON tbl.relkind = 'r'
                        AND tbl.oid = d.refobjid
                    JOIN pg_attribute AS att ON att.attrelid = d.refobjid
                        AND att.attnum = d.refobjsubid
                    JOIN pg_sequence s ON s.seqrelid = seq.oid
                WHERE
                    d.deptype = 'a'
                    AND d.classid = 'pg_class'::regclass::oid

需要注意的是,需要先set search_path 把各个schema都拼起来,不然可能遇到sql提示找不到某个sequence的情况。

有了上面的sql后,就可以用程序包装下,加到巡检平台里,大致代码如下:

代码语言:python代码运行次数:0复制


# coding=utf-8

import json
import os
import sys

import psycopg2
import requests
from dingtalkchatbot.chatbot import DingtalkChatbot
from func_timeout import func_set_timeout
from requests.auth import HTTPBasicAuth

project_path = os.path.abspath("..")
sys.path.append(project_path)
os.environ["DJANGO_SETTINGS_MODULE"] = "db_ops_platform.settings"

import django

django.setup()

from db_ops_platform import configs


@func_set_timeout(60)
def get_top_seq_usage():
    1、从平台接口,获取到需要执行巡检的pg的信息(地址、端口等)
    # res 就是获取到的pg的信息
    2、遍历每个实例的每个库去做检测并告警
    for i, item in enumerate(res):
        instance_id = item["instance_id"]
        instance_desc = item["instance_desc"]
        instance_addr = item["instance_addr"]
        instance_port = item["instance_port"]
        instance_cluster = item["instance_cluster"]
        instance_role = item["instance_role"]
        instance_engine = item["instance_engine"]
        instance_monit = item["instance_monit"]

        if (
            instance_engine != "PostgreSQL"
            or instance_monit != "ON"
            or instance_role != "Primary"
        ):
            continue

        pg_conn = psycopg2.connect(
            host=instance_addr,
            port=instance_port,
            user=configs.REMOTE_PG_USER,
            password=configs.REMOTE_PG_PASSWD,
            database="postgres",
        )

        pg_cursor = pg_conn.cursor()

        pg_cursor.execute(
            """
            select datname from pg_database where datname not in ('template1','template0') ;
            """
        )

        myresult = pg_cursor.fetchall()
        db_list = []
        for i in myresult:
            db_list.append(i[0])

        for db in db_list:

            pg_conn = psycopg2.connect(
                host=instance_addr,
                port=instance_port,
                user=configs.REMOTE_PG_USER,
                password=configs.REMOTE_PG_PASSWD,
                database=db,
            )
            pg_cursor = pg_conn.cursor()

            # 需要先把当前库下的searth_path设置为所有的schema,否则查询会提示部分sequence找不到
            pg_cursor.execute(
                """
                SELECT n.nspname AS "Name"FROM pg_catalog.pg_namespace n WHERE n.nspname !~ '^pg_' AND n.nspname <> 'information_schema';
                """
            )

            schema_res = pg_cursor.fetchall()
            schema_str = ""
            for ii in schema_res:
                schema_str  = "'"   ii[0]   "'"   ","

            pg_cursor.execute("set search_path="   schema_str[0:-1])
            seq_usage_sql = """
                SELECT
                    seq.relname AS sequence,
                    format_type(s.seqtypid, NULL) sequence_datatype,
                    CONCAT(tbl.relname, '.', att.attname) AS owned_by,
                    format_type(att.atttypid, atttypmod) AS column_datatype,
                    pg_sequence_last_value(seq.oid::regclass) AS last_sequence_value,
                    TO_CHAR((
                        CASE WHEN format_type(s.seqtypid, NULL) = 'smallint' THEN
                            (pg_sequence_last_value(seq.relname::regclass) / 32767::float)
                        WHEN format_type(s.seqtypid, NULL) = 'integer' THEN
                            (pg_sequence_last_value(seq.relname::regclass) / 2147483647::float)
                        WHEN format_type(s.seqtypid, NULL) = 'bigint' THEN
                            (pg_sequence_last_value(seq.relname::regclass) / 9223372036854775807::float)
                        END) * 100, 'fm9999999999999999999990D00%') AS sequence_percent,
                    TO_CHAR((
                        CASE WHEN format_type(att.atttypid, NULL) = 'smallint' THEN
                            (pg_sequence_last_value(seq.relname::regclass) / 32767::float)
                        WHEN format_type(att.atttypid, NULL) = 'integer' THEN
                            (pg_sequence_last_value(seq.relname::regclass) / 2147483647::float)
                        WHEN format_type(att.atttypid, NULL) = 'bigint' THEN
                            (pg_sequence_last_value(seq.relname::regclass) / 9223372036854775807::float)
                        END) * 100, 'fm9999999999999999999990D00%') AS column_percent
                FROM
                    pg_depend d
                    JOIN pg_class AS seq ON seq.relkind = 'S'
                        AND seq.oid = d.objid
                    JOIN pg_class AS tbl ON tbl.relkind = 'r'
                        AND tbl.oid = d.refobjid
                    JOIN pg_attribute AS att ON att.attrelid = d.refobjid
                        AND att.attnum = d.refobjsubid
                    JOIN pg_sequence s ON s.seqrelid = seq.oid
                WHERE
                    d.deptype = 'a'
                    AND d.classid = 'pg_class'::regclass::oid
                """

            pg_cursor.execute(seq_usage_sql)
            res = pg_cursor.fetchall()

            for i in res:
                (
                    sequence,
                    sequence_datatype,
                    owned_by,
                    column_datatype,
                    last_sequence_value,
                    sequence_percent,
                    column_percent,
                ) = i
                if sequence_percent and float(sequence_percent.replace("%", "")) > 85.0:
                    print(f"warn ,{sequence}, {sequence_percent}")
                    content_full = (
                        "类型: PG sequence 使用率过高告警"
                          "nn"
                          "集群: "
                          str(instance_cluster)
                          "nn"
                          "实例名: "
                          instance_desc
                          "nn"
                          "实例ID: "
                          str(instance_id)
                          "nn"
                          "角色: "
                          instance_role
                          "nn"
                          "库名: "
                          str(db)
                          "nn"
                          "序列名: "
                          str(sequence)
                          "nn"
                          "序列类型: "
                          str(sequence_datatype)
                          "nn"
                          "序列的使用率: "
                          str(sequence_percent)
                    )

                    try:
                        msg = DingtalkChatbot(
                            webhook=configs.default_alerts_webhook,
                            secret=configs.default_dingding_alert_secret,
                            fail_notice=True,
                        )
                        msg.send_markdown(
                            title="生产PG 序列使用率过高告警",
                            text=str(content_full),
                            is_at_all=False,
                        )
                    except Exception as e:
                        print(str(e))

        pg_cursor.close()
        pg_conn.close()


if __name__ == "__main__":
    try:
        get_top_seq_usage()
    except Exception as e:
        print(str(e))
        # 发送钉钉告警
        msg = DingtalkChatbot(
            webhook=configs.default_alerts_webhook,
            secret=configs.default_dingding_alert_secret,
            fail_notice=True,
        )
        msg.send_markdown(
            title="db_ops_platform脚本执行报错",
            text=str(os.path.basename(__file__)   "报错原因:"   str(e)),
            is_at_all=False,
        )

0 人点赞