大家好,我是Brook!
宝,今天做表了没,什么表,定制化表!
皮一下~
今日主题:如何开发自动化生成数据分析报表
数据分析开发过程中,数据报表开发是常见的需求,利用Python开发定制化分析报表。业务数据实时刷新,自动生成各类报表,告别重复做表,大大提升工作效率。
背景:本文主要对楼宇监测设备的实时数据报表开发
如何定制化开发数据报表生成工具?
1、将分散的多个数据源统一处理汇总
2、定制好数据展示模板(Word、Excel、Html),将指定报表任务数据源更新到对应的模板中呈现。
具体思路:
一、工具类common文件:公共模块
1)file_process类--相关文件处理函数
2)excel_to_doc函数--表格插入函数
3)xml_extract 解析函数--对数据包内容解析
4)tample模板文件--docx、excel、html
注:模板准备,结合报表数据指标特点,准备报表模板占位标志和样式模板
这里主要介绍如何通过Word呈现数据,html网页分享后期分享,Excel模板已推送过相关文章见:准工业级代码分享:Python用于自动生成EXCEL周期报告
二、编写业务函数:提取报表数据
1、数据准备
提取数据---根据业务特点生成所需表数据
包括业务数据及配置数据--一般是固定的变量字段或数据分析相关变量指标
2、数据处理--根据业务组织数据,完成报表
注:可切换报表类型、定时刷新更新模板
一、工具类common文件夹
实际项目文件存放更加细分,我这里为省事把这些公共模块全放在一块啦。
1、Python操作文件相关处理函数
这里主要包含读取文件夹文件路径、读取指定类型文件、修改文件后缀、文件移动清除操作。
代码语言:javascript复制import os
import shutil
# 也可以将文件路径写死:
file_dir = r'.AutoOps_platformxml位置'
#得到指定文件夹路径下所有文件路径
def all_path(dirname):
result = []
for maindir, subdir, file_name_list in os.walk(dirname):
for filename in file_name_list:
apath = os.path.join(maindir, filename)
result.append(apath)
return result
#读取指定后缀文件
def GetExtNamesList(fileslist,ext):
filenames=[]
for file in fileslist:
fileinfo=os.path.splitext(file)
if fileinfo[1]==ext:
filenames.append(file)
return filenames
# 批量修改一个文件下的文件后缀为xml
def Rename():
Path = r'xml包'
filelist = os.listdir(Path)
for files in filelist:
Olddir = os.path.join(Path, files)
if os.path.isdir(Olddir): # 判断是否是文件夹,是文件夹,跳过
continue
filename = os.path.splitext(files)[0]
Newdir = os.path.join(Path, filename '.xml') # 只要修改后缀名就可以更改成任意想要的格式
os.rename(Olddir, Newdir)
#将临时文件夹中xml文件移动到指定文件中保存
def xmlmove(path,targetpath):
shutil.rmtree(file_dir "xml历史存放位置")
os.mkdir(file_dir "xml历史存放位置")
filelist=tools.GetExtNamesList(tools.all_path(path),'.xml')
for file in filelist:
targetname=file.replace('\','/').replace(targetpath,path)
if not os.path.exists(os.path.dirname(targetpath)):
os.makedirs(os.path.dirname(targetpath))
if os.path.exists(targetname):
shutil.move(file,targetpath) #这里除了移动文件也可以直接清除
return
2、Python向word中插入图表
代码语言:javascript复制from docx import Document
# 存储dataframe表格到word
def excel_to_doc(document, test_df):
# 添加一个表格--行数和列数,行数多加一行,需要将列名同时保存
t = document.add_table(test_df.shape[0] 1, test_df.shape[1])
# t.style = "Light Shading"
# 将每列列名保存到表格中
for j in range(test_df.shape[-1]):
t.cell(0, j).text = test_df.columns[j]
# 将每列数据保存到新建的表格中
for i in range(test_df.shape[0]):
for j in range(test_df.shape[-1]):
# 第一行保存的是列名,所以数据保存时,行数要加1
t.cell(i 1, j).text = str(test_df.values[i, j])
3、Python向解析xml包数据
代码语言:javascript复制def read_xml(xmlFileName):
with open(xmlFileName, 'r') as xml_file:
# 读取数据,以树结构存储
tree = ET.parse(xml_file)
# 提取树根节点
root = tree.getroot()
L = iter_records(root)
# 返回DataFrame数据
return pd.DataFrame(L).T
def iter_records(root): # 生成器方法,每次调用返回一对值,直到循环结束
'''
解析所有记录
'''
for data in root.iter(tag='data'):
# 保存字典
temp_dict = {}
# 遍历所有字段
for meter in data:
for function in meter:
temp_dict[function.attrib['name']] = function.text
# 生成值
yield temp_dict
4、Python模板字段字典准备
代码语言:javascript复制import json
dic={# 建筑类型
'buildFunction' :{
576: "办公建筑",
577: "商场建筑",
1251: "文化教育建筑",
1252: "医疗卫生建筑",
1253: "体育建筑",
1254: "综合建筑",
1255: "其他建筑",
1451: "宾馆饭店建筑"},
# 建筑空调形式
'airform' :{
0: "集中式全空气系统",
1: "风机盘管 新风系统",
2: "分体空调或VRV局部式机组系统",
3: "其他"},
# 建筑结构形式
'structureform':{...},
....
}
js = json.dumps(dic)
file = open(dir_ 'config.txt', 'w')
file.write(js)
file.close()
模板中字段占位标识如下,这里仅演示:
建筑类型 {{BUILDFUNCTION}} 、建筑空调形式{{AIRFORM}}
二、业务函数
1、数据准备
数据包数据提取
代码语言:javascript复制# 提取数据
def Data_Extraction(BuildID):
# 打开数据库连接
db = pymssql.connect(
"IP",
"USERNAME",
"PASSWORD",
"databasename",
charset="utf8")
if db:
print("连接成功!")
cursor = db.cursor()
sql = """select * from T_DaqData
where F_UBuildID = '{0}' and F_CreateTime > convert(varchar(100),GETDATE(),23)
order by F_DaqDatetime asc
""".format(BuildID)
cursor.execute(sql) # 执行查询语句,选择表中所有数据
result = cursor.fetchall() # 获取所有记录
#提取所需字段数据
df = pd.DataFrame(
result,
columns=[
"F_UBuildID",
"F_DaqDatetime",
"F_DaqData",
"F_CreateTime"])
#获取最新数据
DATA = df[df.F_DaqDatetime == df[["F_DaqDatetime"]].max()[0]]["F_DaqData"]
cursor.close()
return DATA
基本信息提取
代码语言:javascript复制#未保留数据处理细节
def build_info_Extraction(build_id):
from sqlalchemy import create_engine
import pandas as pd
import cx_Oracle
pd.set_option('expand_frame_repr', False)
# 避免编码问题带来的乱码
import os
os.environ['NLS_LANG'] = 'SIMPLIFIED CHINESE_CHINA.UTF8'
# 连接oracle
engine = cx_Oracle.connect(
'username',
'password',
'ip:1521/database')
sql_0 = "select * from b_build_info where buildcode ='{0}'".format(
build_id)
# df1:基本信息dataframe表格数据
df1 = pd.read_sql_query(sql_0, engine)
# df3:基本信息字典
df3 = df1.to_dict(orient='list')
buildid = df3["ID"][0]
sql_1 = """select * from t_device_info where buildid = '{0}'""".format(
buildid)
# #查询获取数据-设备配置表
df2 = pd.read_sql_query(sql_1, engine)
#df4:设备配置信息
df4 = pd.DataFrame(df2,columns=["BUILDID","NAME","CODE","ITEM","COMP"])
# 月度检查输出表
df5 = pd.DataFrame(df2, columns=["CODE", "NAME", "MOD", "COMP"])
engine.close()
return df3, df4, df1, df2, df5
2、数据处理
代码语言:javascript复制def data_factory(BuildID,task):
# 获取数据库建筑能耗数据包
try:
# 获取数据库建筑能耗数据包部分
DATA_1 = Data_Extraction(BuildID)
with open(dir_ "xml包%s" % "{0}".format(BuildID), encoding="gbk",
mode="w") as f:
f.write(DATA_1.values[0])
f.close()
# 修改后缀为xml
tools.Rename()
# XML数据源路径
r_filenameXML = dir_ 'xml包{0}.xml'.format(
BuildID)
# 解析数据
xml_read = xml_extract.read_xml(r_filenameXML)
data = pd.DataFrame(xml_read)
#数据处理,省略。。。。
filter_data= filter_process(data)
# 准备基础配置表部分
b_build_info = build_info_Extraction(BuildID)
# 建筑基础信息---字典形式存储
build_basis_info = b_build_info[0]
# 表格形式存储
build_basis_info=b_build_info[2]
# 建筑配置表
build_Config_info = b_build_info[1]
# 建筑配置表(月度检查模板)
build_Config_info2 = b_build_info[4]
# 合并数据部分
hb_data = pd.merge(
build_Config_info,
filter_data,
on="CODE",
how="left")
# 月度检查模板
hb_data2 = pd.merge(
build_Config_info2,
filter_data,
on="CODE",
how="left")
# 监测设备数量
L_NUMBER = hb_data2.shape[0]
BUILDNAME = build_basis_info["BUILDNAME"][0]
hb_data["BUILDID"] = ["{0}".format(BUILDNAME)] * hb_data.shape[0]
#数据库取出基本信息
data_dic = {key: value[0] for key, value in build_basis_info.items()}
#准备数据字典
data_dic2 = data_dic.copy()
with open('config.txt', 'r') as file:
import json
dic = json.loads(file.read())
file.close()
data_dic2['BUILDFUNCTION'] = dic.get("buildFunction").get(
data_dic["BUILDFUNCTION"])
data_dic2['AIRFORM'] = dic.get("airform").get(
data_dic["AIRFORM"])
#--------省略字典生成过程-------#
#更新其他计算指标,汇总数据
now_time = datetime.now()
date_time = now_time.strftime('%Y-%m-%d %H:%M')
data_dic2["now_time"] = date_time
data_dic2.update(model_dict)
if task=='报表模板1':
#加载模板-1
doc1 = DocxTemplate(dir_ 'tamplate_1.docx')
doc1.render(data_dic2) # 填充数据
doc1.save(dir_ '报表位置{0}报表{1}.docx'.format(BUILDNAME,date_time))
# 保存目标文件
elif task=="报表模板2":
#加载模板-2
doc2 = DocxTemplate(dir_ 'tamplate_2.docx')
doc2.render(data_dic2) #填充数据
data_to_doc.excel_to_doc(doc2,hb_data)
doc2.save(dir_ '报表位置{0}报表{1}.docx'.format(BUILDNAME,date_time))
# 保存目标文件
elif task=="报表模板3":
# 加载模板-3
doc3 =DocxTemplate(dir_ "tamplate_3.docx")
#加载模板3文件
doc3.render(data_dic2) #填充数据
test_df = pd.DataFrame(hb_data2)
data_to_doc.excel_to_doc(doc3,test_df)
doc3.save(dir_ '报表位置{0}报表{1}.docx'.format(BUILDNAME,date_time))
# 保存目标文件
else:
pass
print("导出完成")
except Exception as e:
print("包含非法符号", BuildID)
最后运行主函数即可完成数据报表的生成
代码语言:javascript复制#encoding=utf-8
import xml.etree.ElementTree as ET
import os
import sys
import pymssql
import pandas as pd
from docxtpl import DocxTemplate
import shutil
import common.file_process as tools # 导入工具模块并设置别名为tools
import common.xml_extract as xml_extract # 导入工具模块并设置别名为xml_extract
import common.excel_to_doc as data_to_doc # 导入工具模块并设置别名为data_to_doc
from datetime import datetime
if __name__ == '__main__':
# 输入待查询建筑ID清单列表
BuildID =['xxxx','xxxxx']
task=["报表模板1","报表模板2","报表模板3"]
#这里手动切换报表模板
for i in range(len(BuildID)):
data_factory(BuildID[i],task=task[1]) #选择需要导出的表格模板
# 先删除文件夹中内容--清空临时文件内容
tools.xmlmove(dir_ "xml包",dir_ "xml历史存放位置")
shutil.rmtree(dir_ "xml包")
os.mkdir(dir_ "xml包")
这里只是简单实例介绍一般报表开发流程和思路,希望起到抛转引玉的作用。实际数据分析报告涉及多项指标、图表和数据处理过程,本质大同小异。