大家好,我是Brook!
数据应用过程中,数据源的准确性和有效性是数据分析的基础。根据实际业务逻辑定义数据校准指标体系,搭建数据中心的质量监控面板及时发现数据异常,从而实现保证数据质量的目的。
那如何实现一个数据质量监控工具?
其实开发一个监控面板并不难,首先确定所关注的指标,定义量化,然后建立一个定时任务连接生产数据库,通过业务功能处理函数将计算指标返回汇总,最后将结果映射到模板网页上展示。
背景:本文主要对楼宇监测设备的采集数据包是否实时上传进行报警提示
具体思路:
一、工具类tool文件
主要存放数据库连接类和业务处理函数模块(比如邮件发送类、连续掉线模块类等)
此处用到的数据库涉及到sql server 、oracel及mysql。
其中sql server为存储原始数据层,oracel为存储计算解析结果层,mysql则为本次报警提示记录存储层。
二、主函数模块
1、建立中间库
建立报警日志记录中间存储库
注:可以根据业务特点建立监控的指标报表
2、报警功能
1)业务指标监控函数(包括数据库连接、数据处理、数据存储 )
2)数据提醒模块(此处采用常规的邮件提醒,也可以采用短信通知,微信网页登录知道的可后台留言指导
3、定时任务,固定时刻对生产数据库检查一次。
一、工具类tool文件夹
数据库连接类模块
1、Python操作Oracle数据库--参考手把手教你搭建一个Python连接数据库快速取数工具
2、Python操作sql server数据库
通过使用pymssql库操作SqlServer连接
代码语言:javascript复制#建立基本连接
HOST = '127.0.0.1'
USER = 'USERNAME'
PW = 'PASSWORD'
DB = 'DBNAME'
conn = pymssql.connect(host=HOST, user=USER, password=PW, database=DB)
cursor = conn.cursor()
假设T_Data数据表为业务原始表,场景:接受物联网设备远程传输数据,通过以下语句执行获取中当前执行最新时刻的收到的所有数据(备注:该表仅存储近1天的数据,原始数据会迁移到历史库中存储,数据库分库、分表后期再详细介绍)
代码语言:javascript复制#执行sql语句
sql = """select * from T_Data
where F_CreateTime > convert(varchar(100),GETDATE(),23)
"""
cursor.execute(sql) # 执行查询语句,选择表中所有数据
result = cursor.fetchall() # 获取所有记录
3、Python操作mysql数据库--Data_Update_mysql.py
以下创建连接mysql数据库的连接类MYSQL_DB。主要提供函数方法如下:
代码语言:javascript复制import pandas as pd
import pymysql
class MYSQL_DB(object):
def __init__(self):
self.host = "localhost"
self.database = "mydb"
self.user = "root"
self.password = "root"
self.port = 3306
self.charset = "utf8"
def db_connection(self):
return pymysql.connect(
host=self.host,
user=self.user,
password=self.password,
database=self.database,
port=self.port,
charset=self.charset)
# # #方法2
# self.engine = create_engine("mysql pymysql://root:root@127.0.0.1:3306/mydb?charset=utf8")
# 插入数据库
def insert_table_to_sql(self, sql, valuelist):
conn = self.db_connection()
cur = conn.cursor()
try:
cur.executemany(sql, valuelist)
conn.commit()
print("插入成功!")
except Exception as e:
conn.rollback()
print("insert with error", e)
finally:
cur.close()
conn.close()
# 查询数据库
def select_table_by_sql(self, sql):
try:
conn = self.db_connection()
cur = conn.cursor()
cur.execute(sql)
data = cur.fetchall()
columnDes = cur.description # 获取连接对象的描述信息
columnNames = [columnDes[i][0] for i in range(len(columnDes))]
df = pd.DataFrame([list(i) for i in data], columns=columnNames)
cur.close()
conn.close()
return df
except Exception as e:
data = ("error with sql", sql, e)
return data
#增删改操作
def Execute_sql(self, sql):
conn = self.db_connection()
cur = conn.cursor()
try:
cur.execute(sql)
conn.commit()
except BaseException:
conn.rollback()
print("execute sql with error")
finally:
cur.close()
conn.close()
业务处理函数模块
1、邮件发送类--sender_mail.py
dir_=r".临时文件位置",这里dir_为临时文件存储位置,
实际上没有必要通过该临时文件转发,因为已经考虑建立报警日志储存库,直接通过函数返回结果传参即可。这里为保持本地备份待用,可定时清空超过一定时间的日志文件即可,下次可以把这个小功能也分享一下。这里生成临时文件其实还有一个目的是分享watchdog模块监控指定目录下是否有指定类型文件产生,从而触发程序执行,这个实时转发模块也不错。
代码语言:javascript复制import smtplib
import time
from email.header import Header
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.mime.application import MIMEApplication
from datetime import datetime
class Sender_mail:
def __init__(self,dir_=None):
self.dir_=dir_
#群发邮件模板
def sender_mail(self,dir_):
sender = 'xxxxxx@qq.com' #填你的qq邮箱
#创建对象
smt_p = smtplib.SMTP()
#设置smtp服务器
smt_p.connect(host='smtp.qq.com', port=25)
#在qq邮箱设置开启SMTP服务并复制授权码
password="xxxxxx"
#进行邮箱登录一次,填写你本人的邮箱
smt_p.login(sender,password)
count_num = 1
#使用for循环来进行群发邮件
for i in ['xxxx@qq.com', "xxxxxx@qq.com"]:
#列表中邮箱格式不正确时,在发邮件的时候会出现异常报错,捕获到这些异常就跳过
try:
#邮件设置
msg = MIMEMultipart()
msg['From'] = "数据中心"
#收件人
msg['To'] = i
#抄送
# msg['Cc'] = 'xxx@qq.com'
#主题名称
msg['subject'] = Header('在线反馈表', 'utf-8')
#附件 —附加发送excel
msg.attach(MIMEText('您好,' '掉线清单!请查收','plain', 'utf-8'))
now_time = datetime.now().date()
xlsxpart=MIMEApplication(open(dir_ '连续掉线情况反馈表{0}.xlsx'.format(now_time),'rb').read())
xlsxpart.add_header('Content-Disposition','attachment',filename='设备连续掉线情况反馈表{0}.xlsx'.format(now_time))
msg.attach(xlsxpart)
#word、图片格式如下
# message_docx = MIMEText(open(dir_ 'test.docx', 'rb').read(), 'base64', 'utf8')
# message_docx.add_header('crontent-disposition', 'attachment', filename='test.docx')
# msg.attach(message_docx)
# message_image = MIMEText(open(dir_ 'test.jpg', 'rb').read(), 'base64', 'utf8')
# message_image.add_header('content-disposition', 'attachment', filename='test.jpg')
# msg.attach(message_image)
#发送邮件
smt_p.sendmail(sender,i,msg.as_string())
#sleep10秒避免发送频率过快,可能被判定垃圾邮件。
time.sleep(10)
print('第%d次发送给%s' % (count_num,i))
count_num = count_num 1
except Exception as e:
#打印出来发送第几次的时候,邮箱出问题,一个邮箱最多不超过300个发送对象
print('第%d次给%s发送邮件异常' % (count_num,i))
continue
smt_p.quit()
#定制模块
def sender_mail_1():
pass
代码语言:javascript复制
2、连续掉线模块类--Continuous_offline.py
具体函数代码参考以往推文--Python判断连续时间序列范围并分组应用
这里封装为类模块,代码结构示意如下
代码语言:javascript复制import pandas as pd
from itertools import groupby
from datetime import datetime
class Continuous_offine(object):
def __init__(self,df=None,now_time=None):
self.df = df
self.now_time = datetime.now().date()
#日期-天数转换函数
def which_day(self,x):
.......
return whichday
#连续掉线时间范围及天数处理函数
def data_preprocess_dactory(self,lst,k_v,BUILD_ID):
result1 = []
result2 = []
for k, g in groupby(enumerate(lst), lambda x: x[1] - x[0]):
l1 = [k_v.get(j).strftime('%Y-%m-%d') for i, j in g] # 连续时间的列表
if len(l1) > 1:
scop = str(min(l1)) '-' str(max(l1)) # 连续时间范围用"-"连接
result1.append(scop)
result2.append(len(l1)) #连续天数
else:
scop = l1[0]
result1.append(scop)
result2.append(len(l1)) #连续天数
df = pd.DataFrame({'时间': result1, '连续掉线天数': result2})
return df.reindex(columns=["建筑编号", "时间", "连续掉线天数"], fill_value="{0}".format(BUILD_ID))
def main_process(self,df):
df1=pd.DataFrame(df[["BUILD_ID","BUILD_NAME","OFF_TIME"]])
id_name =df1.set_index("BUILD_ID")["BUILD_NAME"].to_dict() #ID-名称映射字典
Build_list=df1.BUILD_ID.unique().tolist()
data_list = []
for k in range(len(Build_list)):
df2=df1[df1.BUILD_ID=="{0}".format(Build_list[k])].copy()
df2["OFF_TIME"]=pd.to_datetime(df2['OFF_TIME'])
df2["辅助列-天数"]=df2["OFF_TIME"].map(lambda x:self.which_day(x))
lst = df2["辅助列-天数"].tolist() # 连续数字
k_v = df2.set_index("辅助列-天数")["OFF_TIME"].to_dict() #辅助列-天数映射字典
df3=self.data_preprocess_dactory(lst,k_v,Build_list[k])
df3.insert(1,'建筑名称',df3["建筑编号"].map(lambda x:id_name.get(x))) # 指定第2列插入建筑名称
data_list.append(df3)
res = pd.concat(data_list, axis=0, ignore_index=True,sort=False)
print(res)
res["max_连续掉线天数"]=res.groupby("建筑编号")["连续掉线天数"].transform('max')
res1=res[res.连续掉线天数==res.max_连续掉线天数]
return res
二、数据提取主函数模块
导入模块
代码语言:javascript复制#encoding=utf-8
import pymssql
import pandas as pd
from datetime import datetime
import time
import os
from apscheduler.schedulers.blocking import BlockingScheduler
from tools.sender_mail import Sender_mail
from tools.Data_Update_mysql import MYSQL_DB
from tools.Continuous_offline import Continuous_offine
1、建立中间库
在mysql数据库中建立报警信息记录表
代码语言:javascript复制sql="""CREATE TABLE IF NOT EXISTS Offline_building_history_new(
ID int(8) not null auto_increment COMMENT '序号',
OFF_TIME date not null COMMENT '掉线记录时间',
BUILD_ID VARCHAR (40) NOT NULL COMMENT '建筑编号',
BUILD_NAME VARCHAR (100) NOT NULL COMMENT '建筑名称',
BUILD_FUNCTION VARCHAR (100) NOT NULL COMMENT '建筑功能',
Access_time VARCHAR (20) NOT NULL COMMENT '接入时间',
primary key(ID)
) ENGINE = INNODB DEFAULT CHARSET = utf8 COMMENT = '建筑掉线历史记录数据信息库';
"""
engine.execute(sql)
2、报警功能函数
1)业务指标监控函数(包括数据库连接、数据处理、数据存储 )
处理流程:
代码语言:javascript复制now_time = datetime.now().date()
#查询连续掉线天数
def continuous_offline_day(now_time):
db = MYSQL_DB() # 实例化一个对象
sql_off_new = """SELECT * FROM offline_history_new where
DATE_SUB(CURDATE(), INTERVAL 30 DAY) <= date(OFF_TIME)""" #查询最近30天内数据
data = db.select_table_by_sql(sql_off_new)
Continuous_offine_new = Continuous_offine()
# 连接数据库传入dataframe数据表
res=Continuous_offine_new.main_process(data)
res["掉线频次"] = res.groupby("建筑编号")["建筑名称"].transform('count')
res=res[res['时间'].str.contains('{0}'.format(now_time))]
res.to_excel(dir_ "连续掉线情况反馈表{0}.xlsx".format(now_time),
index=False)
def task1():
# 打开数据库连接
db = pymssql.connect(
"IP",
"USERNAME",
"PASSWORD",
"mydatabasename-1",
charset="utf8")
if db:
print("连接成功!")
cursor = db.cursor()
sql = """select * from T_Data
where F_CreateTime > convert(varchar(100),GETDATE(),23)
"""
cursor.execute(sql) # 执行查询语句,选择表中所有数据
result = cursor.fetchall() # 获取所有记录
print("导出成功!")
#业务数据处理流程
# ---这里pass不作介绍,涉及到一些后处理,这里根据业务而定
# 处理输出数据
df_1=Data_process(df = pd.DataFrame(
result,
columns=[
"F_UBuildID",
"F_DaqDatetime",
"F_DaqData",
"F_CreateTime"]))
# 在指定位置添加列
new_colunms_list = ["序号", "掉线记录时间", "建筑编号", "建筑名称", "建筑功能", "接入时间"]
df_new1 = df_1.reindex(columns=new_colunms_list, fill_value=now_time) #now_time设置为全局变量
data_t = df_new1[df_new1.columns[1:]]
data_T_new = data_t.astype(str)
data_result_tuples_new = [tuple(i) for i in data_T_new.values]
# 插入数据库
db = MYSQL_DB() # 实例化一个对象
sql_new = """ insert into offline_history_new(OFF_TIME,BUILD_ID,BUILD_NAME,BUILD_FUNCTION,Access_time) values (%s,%s,%s,%s,%s)"""
# 插入数据库
db.insert_table_to_sql(sql_new, data_result_tuples_new)
# 连续掉线天数统计
continuous_offline_day(now_time)
print('Tick!The time is:%s' % datetime.now())
2)邮件提醒模块
代码语言:javascript复制#发送给运维方
def task2():
#发送邮件
#运维方
Sender_mail.sender_mail()
time.sleep(3)
#其他
Sender_mail.sender_mail_1()
3、定时任务调度
这里用到了定时任务模块APScheduler,提供基于日期、固定时间间隔及crontab类型的任务。
代码语言:javascript复制if __name__ =='__main__':
scheduler=BlockingScheduler()
# scheduler.add_job(tick,'interval',seconds=3)#触发器还有date,corn,其中date按特定时间点触发,cron则按固定时间间隔触发
scheduler.add_job(task1, 'cron',hour=13,minute=58)
scheduler.add_job(task2, 'cron', hour=14, minute=0)
#minute='*/3'表示每五分钟执行一次;hour=‘19-21’,minute='23' 表 示19:23、20:23、21:23各执行一次任务
print('Press Ctrl {0} to exit'.format('Break' if os.name == 'nt' else 'C'))
try:
scheduler.start()
except (KeyboardInterrupt,SystemExit):
pass
如果进一步对oracle业务数据库解析数据分析,可掌握具体监测设备的运行状态比如特征分析、异常分析等。其中可能会存在数据解析不完全的问题,由于数据解析批处理遇到异常数据包导致进程阻塞,从而任务超时未解析成功造成数据缺失,通过监控校对数据的一致性来及时发现问题并制定重算任务。
到此整个数据库数据质量监控工具开发流程介绍完毕,其他功能函数可自行拓展。按照业务逻辑开始动手搭建属于自己的数据监管平台吧!
后期我将分享如何构建独立的python环境,部署成Web应用,通过网站实时访问数据监控面板,时刻掌握楼宇监测设备数据连续上传情况,