手把手教你搭建一个Python数据质量监控系统

2022-04-08 12:40:38 浏览数 (1)

大家好,我是Brook!

数据应用过程中,数据源的准确性和有效性是数据分析的基础。根据实际业务逻辑定义数据校准指标体系,搭建数据中心的质量监控面板及时发现数据异常,从而实现保证数据质量的目的。

那如何实现一个数据质量监控工具?

其实开发一个监控面板并不难,首先确定所关注的指标,定义量化,然后建立一个定时任务连接生产数据库,通过业务功能处理函数将计算指标返回汇总,最后将结果映射到模板网页上展示。

背景:本文主要对楼宇监测设备的采集数据包是否实时上传进行报警提示

具体思路:

一、工具类tool文件

主要存放数据库连接类和业务处理函数模块(比如邮件发送类、连续掉线模块类等)

此处用到的数据库涉及到sql server 、oracel及mysql。

其中sql server为存储原始数据层,oracel为存储计算解析结果层,mysql则为本次报警提示记录存储层。

二、主函数模块

1、建立中间库

建立报警日志记录中间存储库

注:可以根据业务特点建立监控的指标报表

2、报警功能

1)业务指标监控函数(包括数据库连接、数据处理、数据存储 )

2)数据提醒模块(此处采用常规的邮件提醒,也可以采用短信通知,微信网页登录知道的可后台留言指导

3、定时任务,固定时刻对生产数据库检查一次。

一、工具类tool文件夹

数据库连接类模块

1、Python操作Oracle数据库--参考手把手教你搭建一个Python连接数据库快速取数工具

2、Python操作sql server数据库

通过使用pymssql库操作SqlServer连接

代码语言:javascript复制
#建立基本连接
HOST = '127.0.0.1'
USER = 'USERNAME'
PW = 'PASSWORD'
DB = 'DBNAME'
conn = pymssql.connect(host=HOST, user=USER, password=PW, database=DB)
cursor = conn.cursor()

假设T_Data数据表为业务原始表,场景:接受物联网设备远程传输数据,通过以下语句执行获取中当前执行最新时刻的收到的所有数据(备注:该表仅存储近1天的数据,原始数据会迁移到历史库中存储,数据库分库、分表后期再详细介绍)

代码语言:javascript复制
#执行sql语句
sql = """select * from T_Data
        where F_CreateTime > convert(varchar(100),GETDATE(),23)
        """
cursor.execute(sql)  # 执行查询语句,选择表中所有数据
result = cursor.fetchall()  # 获取所有记录

3、Python操作mysql数据库--Data_Update_mysql.py

以下创建连接mysql数据库的连接类MYSQL_DB。主要提供函数方法如下:

代码语言:javascript复制
import pandas as pd
import pymysql

class MYSQL_DB(object):
    def __init__(self):
        self.host = "localhost"
        self.database = "mydb"
        self.user = "root"
        self.password = "root"
        self.port = 3306
        self.charset = "utf8"

    def db_connection(self):
        return pymysql.connect(
                host=self.host,
                user=self.user,
                password=self.password,
                database=self.database,
                port=self.port,
                charset=self.charset)
        # # #方法2
        # self.engine = create_engine("mysql pymysql://root:root@127.0.0.1:3306/mydb?charset=utf8")

    # 插入数据库
    def insert_table_to_sql(self, sql, valuelist):
        conn = self.db_connection()
        cur = conn.cursor()
        try:
            cur.executemany(sql, valuelist)
            conn.commit()
            print("插入成功!")
        except Exception as e:
            conn.rollback()
            print("insert with error", e)
        finally:
            cur.close()
            conn.close()
    # 查询数据库
    def select_table_by_sql(self, sql):
        try:
            conn = self.db_connection()
            cur = conn.cursor()
            cur.execute(sql)
            data = cur.fetchall()
            columnDes = cur.description  # 获取连接对象的描述信息
            columnNames = [columnDes[i][0] for i in range(len(columnDes))]
            df = pd.DataFrame([list(i) for i in data], columns=columnNames)
            cur.close()
            conn.close()
            return df
        except Exception as e:
            data = ("error with sql", sql, e)
            return data
    #增删改操作
    def Execute_sql(self, sql):
        conn = self.db_connection()
        cur = conn.cursor()
        try:
            cur.execute(sql)
            conn.commit()
        except BaseException:
            conn.rollback()
            print("execute sql with error")
        finally:
            cur.close()
            conn.close()

业务处理函数模块

1、邮件发送类--sender_mail.py

dir_=r".临时文件位置",这里dir_为临时文件存储位置,

实际上没有必要通过该临时文件转发,因为已经考虑建立报警日志储存库,直接通过函数返回结果传参即可。这里为保持本地备份待用,可定时清空超过一定时间的日志文件即可,下次可以把这个小功能也分享一下。这里生成临时文件其实还有一个目的是分享watchdog模块监控指定目录下是否有指定类型文件产生,从而触发程序执行,这个实时转发模块也不错。

代码语言:javascript复制
import smtplib
import time
from email.header   import  Header
from email.mime.multipart import MIMEMultipart
from email.mime.text  import MIMEText
from email.mime.application  import MIMEApplication
from datetime import datetime

class Sender_mail:
    def __init__(self,dir_=None):
        self.dir_=dir_

    #群发邮件模板
    def sender_mail(self,dir_):

        sender = 'xxxxxx@qq.com'  #填你的qq邮箱
        #创建对象
        smt_p = smtplib.SMTP()
        #设置smtp服务器
        smt_p.connect(host='smtp.qq.com', port=25)
        #在qq邮箱设置开启SMTP服务并复制授权码
        password="xxxxxx"
        #进行邮箱登录一次,填写你本人的邮箱
        smt_p.login(sender,password)

        count_num = 1
        #使用for循环来进行群发邮件
        for i in ['xxxx@qq.com', "xxxxxx@qq.com"]:
            #列表中邮箱格式不正确时,在发邮件的时候会出现异常报错,捕获到这些异常就跳过
            try:

            #邮件设置
                msg = MIMEMultipart()
                msg['From'] = "数据中心"
            #收件人
                msg['To'] = i
                #抄送
                # msg['Cc'] = 'xxx@qq.com'
            #主题名称
                msg['subject'] = Header('在线反馈表', 'utf-8')
            #附件 —附加发送excel
                msg.attach(MIMEText('您好,' '掉线清单!请查收','plain', 'utf-8'))
                now_time = datetime.now().date()
                xlsxpart=MIMEApplication(open(dir_ '连续掉线情况反馈表{0}.xlsx'.format(now_time),'rb').read())
                xlsxpart.add_header('Content-Disposition','attachment',filename='设备连续掉线情况反馈表{0}.xlsx'.format(now_time))
                msg.attach(xlsxpart)
           
            #word、图片格式如下
                # message_docx = MIMEText(open(dir_ 'test.docx', 'rb').read(), 'base64', 'utf8')
                # message_docx.add_header('crontent-disposition', 'attachment', filename='test.docx')
                # msg.attach(message_docx)
                # message_image = MIMEText(open(dir_ 'test.jpg', 'rb').read(), 'base64', 'utf8')
                # message_image.add_header('content-disposition', 'attachment', filename='test.jpg')
                # msg.attach(message_image)

            #发送邮件
                smt_p.sendmail(sender,i,msg.as_string())
            #sleep10秒避免发送频率过快,可能被判定垃圾邮件。
                time.sleep(10)
                print('第%d次发送给%s' % (count_num,i))
                count_num = count_num   1

            except Exception as e:
                    #打印出来发送第几次的时候,邮箱出问题,一个邮箱最多不超过300个发送对象
                print('第%d次给%s发送邮件异常' % (count_num,i))
                continue
        smt_p.quit()

    #定制模块
    def sender_mail_1():
        pass

代码语言:javascript复制

2、连续掉线模块类--Continuous_offline.py

具体函数代码参考以往推文--Python判断连续时间序列范围并分组应用

这里封装为类模块,代码结构示意如下

代码语言:javascript复制
import pandas as pd
from itertools import groupby
from datetime import datetime

class Continuous_offine(object):
    def __init__(self,df=None,now_time=None):
        self.df = df
        self.now_time = datetime.now().date()

    #日期-天数转换函数
    def which_day(self,x):
        .......
        return whichday

    #连续掉线时间范围及天数处理函数
    def data_preprocess_dactory(self,lst,k_v,BUILD_ID):
        result1 = []
        result2 = []
        for k, g in groupby(enumerate(lst), lambda x: x[1] - x[0]):
            l1 = [k_v.get(j).strftime('%Y-%m-%d') for i, j in g]  # 连续时间的列表
            if len(l1) > 1:
                scop = str(min(l1))   '-'   str(max(l1))  # 连续时间范围用"-"连接
                result1.append(scop)
                result2.append(len(l1))  #连续天数
            else:
                scop = l1[0]
                result1.append(scop)
                result2.append(len(l1))   #连续天数
        df = pd.DataFrame({'时间': result1, '连续掉线天数': result2})

        return df.reindex(columns=["建筑编号", "时间", "连续掉线天数"], fill_value="{0}".format(BUILD_ID))

    def main_process(self,df):
        df1=pd.DataFrame(df[["BUILD_ID","BUILD_NAME","OFF_TIME"]])
        id_name =df1.set_index("BUILD_ID")["BUILD_NAME"].to_dict()  #ID-名称映射字典
        Build_list=df1.BUILD_ID.unique().tolist()

        data_list = []
        for k in range(len(Build_list)):
            df2=df1[df1.BUILD_ID=="{0}".format(Build_list[k])].copy()

            df2["OFF_TIME"]=pd.to_datetime(df2['OFF_TIME'])
            df2["辅助列-天数"]=df2["OFF_TIME"].map(lambda x:self.which_day(x))

            lst = df2["辅助列-天数"].tolist()   # 连续数字

            k_v = df2.set_index("辅助列-天数")["OFF_TIME"].to_dict()  #辅助列-天数映射字典

            df3=self.data_preprocess_dactory(lst,k_v,Build_list[k])

            df3.insert(1,'建筑名称',df3["建筑编号"].map(lambda x:id_name.get(x)))  # 指定第2列插入建筑名称

            data_list.append(df3)

        res = pd.concat(data_list, axis=0, ignore_index=True,sort=False)
        print(res)

        res["max_连续掉线天数"]=res.groupby("建筑编号")["连续掉线天数"].transform('max')
        res1=res[res.连续掉线天数==res.max_连续掉线天数]

        return res

二、数据提取主函数模块

导入模块

代码语言:javascript复制
#encoding=utf-8
import pymssql
import pandas as pd
from datetime import datetime

import time
import os
from apscheduler.schedulers.blocking import BlockingScheduler
from tools.sender_mail import Sender_mail
from tools.Data_Update_mysql import MYSQL_DB
from tools.Continuous_offline import Continuous_offine

1、建立中间库

在mysql数据库中建立报警信息记录表

代码语言:javascript复制
sql="""CREATE TABLE IF NOT EXISTS Offline_building_history_new(
     ID int(8) not null auto_increment COMMENT '序号',
     OFF_TIME date not null COMMENT '掉线记录时间',
     BUILD_ID VARCHAR (40) NOT NULL COMMENT '建筑编号',
     BUILD_NAME VARCHAR (100) NOT NULL COMMENT '建筑名称',
     BUILD_FUNCTION VARCHAR (100) NOT NULL COMMENT '建筑功能',
     Access_time VARCHAR (20) NOT NULL COMMENT '接入时间',
     primary key(ID)
) ENGINE = INNODB DEFAULT CHARSET = utf8 COMMENT = '建筑掉线历史记录数据信息库';
"""
engine.execute(sql)

2、报警功能函数

1)业务指标监控函数(包括数据库连接、数据处理、数据存储 )

处理流程:

代码语言:javascript复制
now_time = datetime.now().date()
#查询连续掉线天数
def continuous_offline_day(now_time):

    db = MYSQL_DB()  # 实例化一个对象

    sql_off_new = """SELECT * FROM offline_history_new where
    DATE_SUB(CURDATE(), INTERVAL 30 DAY) <= date(OFF_TIME)"""  #查询最近30天内数据

    data = db.select_table_by_sql(sql_off_new)

    Continuous_offine_new = Continuous_offine()

    # 连接数据库传入dataframe数据表
    res=Continuous_offine_new.main_process(data)
    
    res["掉线频次"] = res.groupby("建筑编号")["建筑名称"].transform('count')
    res=res[res['时间'].str.contains('{0}'.format(now_time))]

    res.to_excel(dir_ "连续掉线情况反馈表{0}.xlsx".format(now_time),
        index=False)



def task1():
    # 打开数据库连接
    db = pymssql.connect(
        "IP",
        "USERNAME",
        "PASSWORD",
        "mydatabasename-1",
        charset="utf8")
    if db:
        print("连接成功!")
    cursor = db.cursor()

    sql = """select * from T_Data
        where F_CreateTime > convert(varchar(100),GETDATE(),23)
        """

    cursor.execute(sql)  # 执行查询语句,选择表中所有数据

    result = cursor.fetchall()  # 获取所有记录

    print("导出成功!")

    #业务数据处理流程
    # ---这里pass不作介绍,涉及到一些后处理,这里根据业务而定
    # 处理输出数据
    df_1=Data_process(df = pd.DataFrame(
        result,
        columns=[
            "F_UBuildID",
            "F_DaqDatetime",
            "F_DaqData",
            "F_CreateTime"]))

    # 在指定位置添加列
    new_colunms_list = ["序号", "掉线记录时间", "建筑编号", "建筑名称", "建筑功能", "接入时间"]

    df_new1 = df_1.reindex(columns=new_colunms_list, fill_value=now_time)   #now_time设置为全局变量

    data_t = df_new1[df_new1.columns[1:]]

    data_T_new = data_t.astype(str)

    data_result_tuples_new = [tuple(i) for i in data_T_new.values]

    # 插入数据库
    db = MYSQL_DB()  # 实例化一个对象

    sql_new = """ insert into offline_history_new(OFF_TIME,BUILD_ID,BUILD_NAME,BUILD_FUNCTION,Access_time) values (%s,%s,%s,%s,%s)"""

    # 插入数据库
    db.insert_table_to_sql(sql_new, data_result_tuples_new)

    # 连续掉线天数统计
    continuous_offline_day(now_time)

    print('Tick!The time is:%s' % datetime.now())

2)邮件提醒模块

代码语言:javascript复制
#发送给运维方
def task2():
    #发送邮件
    #运维方
    Sender_mail.sender_mail()
    time.sleep(3)
    #其他
    Sender_mail.sender_mail_1()

3、定时任务调度

这里用到了定时任务模块APScheduler,提供基于日期、固定时间间隔及crontab类型的任务。

代码语言:javascript复制
if __name__ =='__main__':
    scheduler=BlockingScheduler()
    # scheduler.add_job(tick,'interval',seconds=3)#触发器还有date,corn,其中date按特定时间点触发,cron则按固定时间间隔触发
    scheduler.add_job(task1, 'cron',hour=13,minute=58)
    scheduler.add_job(task2, 'cron', hour=14, minute=0)
    #minute='*/3'表示每五分钟执行一次;hour=‘19-21’,minute='23'  表 示19:23、20:23、21:23各执行一次任务

    print('Press Ctrl {0} to exit'.format('Break' if os.name == 'nt' else 'C'))

    try:
        scheduler.start()
    except (KeyboardInterrupt,SystemExit):
        pass

如果进一步对oracle业务数据库解析数据分析,可掌握具体监测设备的运行状态比如特征分析、异常分析等。其中可能会存在数据解析不完全的问题,由于数据解析批处理遇到异常数据包导致进程阻塞,从而任务超时未解析成功造成数据缺失,通过监控校对数据的一致性来及时发现问题并制定重算任务。

到此整个数据库数据质量监控工具开发流程介绍完毕,其他功能函数可自行拓展。按照业务逻辑开始动手搭建属于自己的数据监管平台吧!

后期我将分享如何构建独立的python环境,部署成Web应用,通过网站实时访问数据监控面板,时刻掌握楼宇监测设备数据连续上传情况,

0 人点赞