PostgreSQL 通过python 监控逻辑复制

2021-11-17 15:11:29 浏览数 (1)

本期是通过PYTHON 来对逻辑复制中的配置参数,publication 定义, 打印不适合进行逻辑复制的表,打印没有在使用的复制槽,另外包含当前发布端和接收端两边的LSN对比。

以下是代码,对于逻辑复制中主要的监控点有

1 是不是存在复制槽不使用的情况

2 是不是存在主库和从库之间的复制延迟(异步)

3 当前库是不是存在不适合进行逻辑复制的表

4 当前库是不是有设置发布

#!/usr/bin/python3 import os import sys import psycopg2 import re import subprocess #检测当前PG是否具备进行逻辑复制的参数配置 def parameter(): conn = None conn = psycopg2.connect(database="postgres",user="postgres",password="",host="localhost",port="5432") conn_sub = psycopg2.connect(database="postgres",user="admin",password="admin",host="192.168.198.101",port="5432") cur = conn.cursor() cur.execute("""show wal_level;""") rows = cur.fetchall() for row in rows: print("启用逻辑复制wal_level必须为logical") print("________________") print(row) cur = conn.cursor() cur.execute("""show max_worker_processes;""") rows = cur.fetchall() for row in rows: print("启用逻辑复制,请注意max_worker_process的数字") print("________________") print(row) cur = conn.cursor() cur.execute("""show max_replication_slots""") rows = cur.fetchall() for row in rows: print("启用逻辑复制,请注意复制槽的数字") print("________________") print(row) cur = conn.cursor() cur.execute("""show max_wal_senders;""") rows = cur.fetchall() for row in rows: print("启用逻辑复制,请注意最大max_wal_sender数字") print("________________") print(row) cur = conn.cursor() cur.execute("""show max_logical_replication_workers;""") rows = cur.fetchall() for row in rows: print("启用逻辑复制,请注意最大max_logical_replication_workers数字") print("________________") print(row)

cur = conn.cursor() cur.execute("""show max_sync_workers_per_subscription;""") rows = cur.fetchall() for row in rows: print("启用逻辑复制,请注意最大max_sync_workers_per_subscription数字") print("________________") print(row) cur = conn.cursor() cur.execute("""select pubname,puballtables,pubinsert,pubupdate,pubdelete,pubtruncate from pg_publication;""") rows = cur.fetchmany(100) print("当前库publication定义发布信息") print("-----------------------------") print("发布定义名 是否对全库表进行发布定义 追踪插入 追踪更新 追踪删除 追踪truncate") for row in rows: print(row)

print("----------------------------------------------") cur = conn.cursor() cur.execute("""select pubname,tablename from pg_publication_tables;""") rows = cur.fetchmany(100) print("当前库发布的数据表") print("------------------") print("发布定义名 发布表名") for row in rows: print(row) print("----------------------------------------------") cur = conn.cursor() cur.execute("""select relname,relhasindex from pg_catalog.pg_class as c inner join pg_namespace as n on (c.relnamespace = n.oid and n.nspname NOT IN ('information_schema', 'pg_catalog') and c.relkind='r') where c.relhasindex = 'f';""") rows = cur.fetchmany(100) print("当前库不适合建立逻辑复制表list") print("_______________________________") print("表名 无主键") for row in rows: print(row) print("----------------------------------------------") cur = conn.cursor() cur.execute("""SELECT slot_name,slot_type,active FROM pg_replication_slots where active= 'f';""") rows = cur.fetchmany(100) print("注意当前没有被使用的复制槽,确认不使用请立即删除") print("_______________________________") print("复制槽名 复制槽类型 目前不在使用") for row in rows: print(row) print("删除复制槽语句 select pg_drop_replication('复制槽名')") print("------------------------------------------------------") cur = conn.cursor() cur.execute("""select sent_lsn,replay_lsn from pg_stat_replication;""") rows = cur.fetchmany(100) print("主库lsn信息") print("_______________________________") print("发送LSN 回执LSN") for row in rows: print(row) print("------------------------------------------------------") cur = conn_sub.cursor() cur.execute("""select subname,received_lsn,latest_end_time::varchar(20) from pg_stat_subscription;""") rows = cur.fetchmany(100) print("查看目的端数据接收状态") print("-----------------------") print("接收端名 接收到的LSN 最后接收到LSN时间") for row in rows: print(row) conn.commit() conn_sub.commit() conn_sub.close conn.close if __name__ == "__main__": parameter()

下图为执行后的效果

下面有两个地方需要注意

1 设定subscription 的连接,需要在接收端的位置设置用户密码

2 可以将连接设置为及时输入的模式,会更方便,这里没有写死了。

另逻辑复制中最怕的是接收端数据出现问题,导致复制停止,目前需要通过日志来查询出现的问题。程序里面并未有及时分析日志的部分。

0 人点赞