【保姆级教程】Python定制化开发生成数据报表

2022-04-08 12:42:34 浏览数 (1)

大家好,我是Brook!

宝,今天做表了没,什么表,定制化表!

皮一下~

今日主题:如何开发自动化生成数据分析报表

数据分析开发过程中,数据报表开发是常见的需求,利用Python开发定制化分析报表。业务数据实时刷新,自动生成各类报表,告别重复做表,大大提升工作效率。

背景:本文主要对楼宇监测设备的实时数据报表开发

如何定制化开发数据报表生成工具?

1、将分散的多个数据源统一处理汇总

2、定制好数据展示模板(Word、Excel、Html),将指定报表任务数据源更新到对应的模板中呈现。

具体思路:

一、工具类common文件:公共模块

1)file_process类--相关文件处理函数

2)excel_to_doc函数--表格插入函数

3)xml_extract 解析函数--对数据包内容解析

4)tample模板文件--docx、excel、html

注:模板准备,结合报表数据指标特点,准备报表模板占位标志和样式模板

这里主要介绍如何通过Word呈现数据,html网页分享后期分享,Excel模板已推送过相关文章见:准工业级代码分享:Python用于自动生成EXCEL周期报告

二、编写业务函数:提取报表数据

1、数据准备

提取数据---根据业务特点生成所需表数据

包括业务数据及配置数据--一般是固定的变量字段或数据分析相关变量指标

2、数据处理--根据业务组织数据,完成报表

注:可切换报表类型、定时刷新更新模板

一、工具类common文件夹

实际项目文件存放更加细分,我这里为省事把这些公共模块全放在一块啦。

1、Python操作文件相关处理函数

这里主要包含读取文件夹文件路径、读取指定类型文件、修改文件后缀、文件移动清除操作。

代码语言:javascript复制
import os
import shutil

# 也可以将文件路径写死:
file_dir = r'.AutoOps_platformxml位置'
#得到指定文件夹路径下所有文件路径
def all_path(dirname):
    result = []
    for maindir, subdir, file_name_list in os.walk(dirname):
        for filename in file_name_list:
            apath = os.path.join(maindir, filename)
            result.append(apath)
    return result

#读取指定后缀文件
def GetExtNamesList(fileslist,ext):
    filenames=[]
    for file in fileslist:
        fileinfo=os.path.splitext(file)
        if fileinfo[1]==ext:
            filenames.append(file)
    return filenames


# 批量修改一个文件下的文件后缀为xml
def Rename():
    Path = r'xml包'
    filelist = os.listdir(Path)
    for files in filelist:
        Olddir = os.path.join(Path, files)
        if os.path.isdir(Olddir):  # 判断是否是文件夹,是文件夹,跳过
            continue
        filename = os.path.splitext(files)[0]
        Newdir = os.path.join(Path, filename   '.xml')  # 只要修改后缀名就可以更改成任意想要的格式
        os.rename(Olddir, Newdir)
        
#将临时文件夹中xml文件移动到指定文件中保存
def xmlmove(path,targetpath):
    shutil.rmtree(file_dir "xml历史存放位置")
    os.mkdir(file_dir "xml历史存放位置")
    filelist=tools.GetExtNamesList(tools.all_path(path),'.xml')

    for file in filelist:
        targetname=file.replace('\','/').replace(targetpath,path)
        if not os.path.exists(os.path.dirname(targetpath)):
            os.makedirs(os.path.dirname(targetpath))
        if os.path.exists(targetname):
            shutil.move(file,targetpath) #这里除了移动文件也可以直接清除
    return

2、Python向word中插入图表

代码语言:javascript复制
from docx import Document
# 存储dataframe表格到word
def excel_to_doc(document, test_df):
    # 添加一个表格--行数和列数,行数多加一行,需要将列名同时保存
    t = document.add_table(test_df.shape[0]   1, test_df.shape[1])
    #     t.style = "Light Shading"
    # 将每列列名保存到表格中
    for j in range(test_df.shape[-1]):
        t.cell(0, j).text = test_df.columns[j]

    # 将每列数据保存到新建的表格中
    for i in range(test_df.shape[0]):
        for j in range(test_df.shape[-1]):
            # 第一行保存的是列名,所以数据保存时,行数要加1
            t.cell(i   1, j).text = str(test_df.values[i, j])

3、Python向解析xml包数据

代码语言:javascript复制
def read_xml(xmlFileName):
    with open(xmlFileName, 'r') as xml_file:
        # 读取数据,以树结构存储
        tree = ET.parse(xml_file)

        # 提取树根节点
        root = tree.getroot()

        L = iter_records(root)

        # 返回DataFrame数据
        return pd.DataFrame(L).T


def iter_records(root):  # 生成器方法,每次调用返回一对值,直到循环结束
    '''
        解析所有记录
    '''
    for data in root.iter(tag='data'):

        # 保存字典
        temp_dict = {}
        # 遍历所有字段
        for meter in data:

            for function in meter:
                temp_dict[function.attrib['name']] = function.text

        # 生成值
        yield temp_dict

4、Python模板字段字典准备

代码语言:javascript复制
import json

dic={# 建筑类型
    'buildFunction' :{
    576: "办公建筑",
    577: "商场建筑",
    1251: "文化教育建筑",
    1252: "医疗卫生建筑",
    1253: "体育建筑",
    1254: "综合建筑",
    1255: "其他建筑",
    1451: "宾馆饭店建筑"},
    # 建筑空调形式
    'airform' :{
    0: "集中式全空气系统",
    1: "风机盘管 新风系统",
    2: "分体空调或VRV局部式机组系统",
    3: "其他"},
    # 建筑结构形式
    'structureform':{...},
     ....
    }
js = json.dumps(dic)
file = open(dir_ 'config.txt', 'w')
file.write(js)
file.close()

模板中字段占位标识如下,这里仅演示:

建筑类型 {{BUILDFUNCTION}} 、建筑空调形式{{AIRFORM}}

二、业务函数

1、数据准备

数据包数据提取

代码语言:javascript复制
# 提取数据
def Data_Extraction(BuildID):
       # 打开数据库连接
    db = pymssql.connect(
        "IP",
        "USERNAME",
        "PASSWORD",
        "databasename",
        charset="utf8")
    if db:
        print("连接成功!")
    cursor = db.cursor()
    
    sql = """select * from T_DaqData

    where F_UBuildID = '{0}' and F_CreateTime > convert(varchar(100),GETDATE(),23)

    order by F_DaqDatetime asc

    """.format(BuildID)

    cursor.execute(sql)  # 执行查询语句,选择表中所有数据

    result = cursor.fetchall()  # 获取所有记录

    #提取所需字段数据
    df = pd.DataFrame(
        result,
        columns=[
            "F_UBuildID",
            "F_DaqDatetime",
            "F_DaqData",
            "F_CreateTime"])
            
    #获取最新数据        
    DATA = df[df.F_DaqDatetime == df[["F_DaqDatetime"]].max()[0]]["F_DaqData"]

    cursor.close()

    return DATA

基本信息提取

代码语言:javascript复制
#未保留数据处理细节
def build_info_Extraction(build_id):
    from sqlalchemy import create_engine
    import pandas as pd
    import cx_Oracle

    pd.set_option('expand_frame_repr', False)

    # 避免编码问题带来的乱码
    import os
    os.environ['NLS_LANG'] = 'SIMPLIFIED CHINESE_CHINA.UTF8'
    # 连接oracle
    engine = cx_Oracle.connect(
        'username',
        'password',
        'ip:1521/database')

    sql_0 = "select * from b_build_info where buildcode ='{0}'".format(
        build_id)

    # df1:基本信息dataframe表格数据

    df1 = pd.read_sql_query(sql_0, engine)

    # df3:基本信息字典

    df3 = df1.to_dict(orient='list')

    buildid = df3["ID"][0]

    sql_1 = """select * from t_device_info where buildid = '{0}'""".format(
        buildid)

    # #查询获取数据-设备配置表

    df2 = pd.read_sql_query(sql_1, engine)
    
    #df4:设备配置信息
    df4 = pd.DataFrame(df2,columns=["BUILDID","NAME","CODE","ITEM","COMP"])

    # 月度检查输出表
    df5 = pd.DataFrame(df2, columns=["CODE", "NAME", "MOD", "COMP"])

    engine.close()

    return df3, df4, df1, df2, df5

2、数据处理

代码语言:javascript复制
def data_factory(BuildID,task):
    # 获取数据库建筑能耗数据包
    try:
        # 获取数据库建筑能耗数据包部分
        DATA_1 = Data_Extraction(BuildID)

        with open(dir_ "xml包%s" % "{0}".format(BuildID), encoding="gbk",
                  mode="w") as f:
            f.write(DATA_1.values[0])
            f.close()
        # 修改后缀为xml
        tools.Rename()
        
        # XML数据源路径
        r_filenameXML = dir_ 'xml包{0}.xml'.format(
            BuildID)
            
        # 解析数据
        xml_read = xml_extract.read_xml(r_filenameXML)
        data = pd.DataFrame(xml_read)
        
        #数据处理,省略。。。。
        filter_data= filter_process(data)

        # 准备基础配置表部分
        b_build_info = build_info_Extraction(BuildID)

        # 建筑基础信息---字典形式存储
        build_basis_info = b_build_info[0]

        # 表格形式存储
        build_basis_info=b_build_info[2]

        # 建筑配置表
        build_Config_info = b_build_info[1]

        # 建筑配置表(月度检查模板)
        build_Config_info2 = b_build_info[4]

        # 合并数据部分
        hb_data = pd.merge(
            build_Config_info,
            filter_data,
            on="CODE",
            how="left")

        # 月度检查模板
        hb_data2 = pd.merge(
            build_Config_info2,
            filter_data,
            on="CODE",
            how="left")

        # 监测设备数量
        L_NUMBER = hb_data2.shape[0]
        BUILDNAME = build_basis_info["BUILDNAME"][0]
        hb_data["BUILDID"] = ["{0}".format(BUILDNAME)] * hb_data.shape[0]
        #数据库取出基本信息
        data_dic = {key: value[0] for key, value in build_basis_info.items()}
        #准备数据字典
        data_dic2 = data_dic.copy()
        
        with open('config.txt', 'r') as file:
            import json
            dic = json.loads(file.read())
            file.close()

        data_dic2['BUILDFUNCTION'] = dic.get("buildFunction").get(
            data_dic["BUILDFUNCTION"])

        data_dic2['AIRFORM'] = dic.get("airform").get(
            data_dic["AIRFORM"])
            
        #--------省略字典生成过程-------#

        #更新其他计算指标,汇总数据
        now_time = datetime.now()
        date_time = now_time.strftime('%Y-%m-%d %H:%M')
        data_dic2["now_time"] = date_time
        data_dic2.update(model_dict)

        if task=='报表模板1':
            #加载模板-1
            doc1 = DocxTemplate(dir_ 'tamplate_1.docx')  
            doc1.render(data_dic2)  # 填充数据
            doc1.save(dir_ '报表位置{0}报表{1}.docx'.format(BUILDNAME,date_time)) 
            # 保存目标文件
            
        elif task=="报表模板2":
            #加载模板-2
            doc2 = DocxTemplate(dir_ 'tamplate_2.docx') 
            doc2.render(data_dic2) #填充数据
            data_to_doc.excel_to_doc(doc2,hb_data)
            doc2.save(dir_ '报表位置{0}报表{1}.docx'.format(BUILDNAME,date_time))
            # 保存目标文件
            
        elif task=="报表模板3":
            # 加载模板-3
            doc3 =DocxTemplate(dir_ "tamplate_3.docx")
            #加载模板3文件
            doc3.render(data_dic2) #填充数据
            test_df = pd.DataFrame(hb_data2)
            data_to_doc.excel_to_doc(doc3,test_df)
            doc3.save(dir_ '报表位置{0}报表{1}.docx'.format(BUILDNAME,date_time))
            # 保存目标文件
            
        else:
            pass
        print("导出完成")

    except Exception as e:
        print("包含非法符号", BuildID)

最后运行主函数即可完成数据报表的生成

代码语言:javascript复制
#encoding=utf-8
import xml.etree.ElementTree as ET
import os
import sys
import pymssql
import pandas as pd
from docxtpl import DocxTemplate
import shutil
import common.file_process as tools  # 导入工具模块并设置别名为tools
import common.xml_extract as xml_extract  # 导入工具模块并设置别名为xml_extract
import common.excel_to_doc as data_to_doc  # 导入工具模块并设置别名为data_to_doc
from datetime import datetime

if __name__ == '__main__':
    # 输入待查询建筑ID清单列表
    BuildID =['xxxx','xxxxx']
    task=["报表模板1","报表模板2","报表模板3"]
    #这里手动切换报表模板
    for i in range(len(BuildID)):
        data_factory(BuildID[i],task=task[1])  #选择需要导出的表格模板

    # 先删除文件夹中内容--清空临时文件内容
    tools.xmlmove(dir_ "xml包",dir_ "xml历史存放位置")
    shutil.rmtree(dir_ "xml包")
    os.mkdir(dir_ "xml包")

这里只是简单实例介绍一般报表开发流程和思路,希望起到抛转引玉的作用。实际数据分析报告涉及多项指标、图表和数据处理过程,本质大同小异。

0 人点赞