高质量编码--使用Pandas和Tornado构建高性能数据查询服务

2019-07-15 11:05:54 浏览数 (1)

大数情况下,数据保存在数据库中,使用SQL来从数据库中查询数据,但相对于直接从内存中取数据前者显得比较慢和笨重。下面介绍基于csv文件目录存储数据,使用Tornado来作为Web服务器,使用Pandas来高性能查询数据。

效果如下:

看一下数据在CSV中的存储结构

tornado作为web服务器,index路由对应查询页面,devs路由对应取得所有传感器列表(每个传感器由设备ID和传感器类型唯一决定),data路由根据设备ID和传感器类型,以及日期范围来查询数据。当web服务启动时,同时将数据加载到全局变量保存在内存中。

代码语言:python代码运行次数:0复制
#-*- coding:utf-8 -*-
import os
import tornado.httpserver
import tornado.ioloop
import tornado.options
import tornado.web
from tornado.options import define,options
import json
import datetime
from tools import *

define('port',default=8096,help='run on the given port',type=int)

class cross_originAllowed_Handler(tornado.web.RequestHandler):
    
    def initialize(self):
       
        self.set_header('Access-Control-Allow-Origin','*')
        self.set_header('Access-Control-Allow-Methods','POST,GET')
        self.set_header('Access-Control-Max-Age',1000)
        self.set_header('Access-Control-Allow-Headers','*')

class indexHandler(tornado.web.RequestHandler):
    def get(self):      
        self.render('index.html')   

class devsHandler(cross_originAllowed_Handler):
    def get(self):
        self.write({'result':devs.to_json()})  
              
class dataHandler(cross_originAllowed_Handler):
    def get(self):
        dev=self.get_argument('dev',None)
        startDate=self.get_argument('startDate',None)
        endDate=self.get_argument('endDate',None)
        if not (dev and startDate and endDate):
            self.write({'result':'{}'})
            return     
        data=getByDev2DateRange(int(dev),startDate,endDate)
        self.write({'result':data})

class Application(tornado.web.Application):
    def __init__(self, handlers, **settings):
        global df
        global devs
        df,devs=initDf()
        tornado.web.Application.__init__(self, handlers, **settings)    
        
if __name__=='__main__':
    tornado.options.parse_command_line()
    app=Application(
        handlers=[
                 
                (r'/',indexHandler),
                (r'/index',indexHandler),
                
                 (r'/devs',devsHandler),
                (r'/data',dataHandler),
                 (r"/assets/(.*)", tornado.web.StaticFileHandler, {"path": "static/assets"}),

                (r"/dist/(.*)", tornado.web.StaticFileHandler, {"path": "static/dist"}),
                (r"/css/(.*)", tornado.web.StaticFileHandler, {"path": "static/css"}),
                (r"/js/(.*)", tornado.web.StaticFileHandler, {"path": "static/js"}),
                 
                  ],

        template_path=os.path.join(os.path.curdir,'templates'),static_path=os.path.join(os.path.curdir,'static'),cookie_secret='miaojiyue',debug=True )
         
    http_server=tornado.httpserver.HTTPServer(app)
    http_server.listen(options.port)
    tornado.ioloop.IOLoop.instance().start()
    

使用pandas将数据加载到dataframe中如下:

下面看一下使用Pandas数据分析工具的具体实现

代码语言:python代码运行次数:0复制
#-*-coding:utf-8 -*-
import os
import numpy as np
import pandas as pd
def initData():
    dataDir='D:wcdata'
    csvs=os.listdir(dataDir)
    dfs=[]
    names=['ID', 'DevID', 'DevData', 'DevUnit', 'voltage',
       'DevAlarm', 'RecDateTime', 'StrInfo', 'ErrorCode',
       'dflag', 'AddTime']

    #遍历文件夹中所有csv文件,将数据拼合到一个dataframe中
    for csv in csvs:
        #由于csv中首行没有存储列名,指定数据对应的列名称
        df0=pd.read_csv(os.path.join(dataDir,csv),names=names)
        print(csv)
        print(df0)
        dfs.append(df0)
    df=pd.concat(dfs)
    df['devId']=df['DevID']
    df['devUnit']=df['DevUnit']
    #根据小数点分隔字符串,将时间格式化到整数秒,并将字符串转为时间格式。
    df.RecDateTime=df.RecDateTime.apply(lambda x:x[:x.find('.')])
    df.RecDateTime=pd.to_datetime(df.RecDateTime)
    df['recDateTime']=df['RecDateTime']
    #为dataframe指定设备ID和接收时间作为索引并根据索引排序
    df=df.set_index(['DevID','RecDateTime']).drop(columns=['ID']).sort_index()
    return df

def initDf():
    print('begin to read data from pkl!')
    global df
    df=pd.read_pickle('data.pkl')
    print('read complete!')
    global df2
    print('begin to read devs!')
    #devs=df.index.map(lambda x:x[0]).unique().to_list()
    #devs=np.load('data.npz')['devs'].tolist()
    #df2=pd.read_excel('data.xls',names=["devId","devName","devUnit"])
    df2=pd.read_pickle('data2.pkl')
    
    print('devs complete!')
    return df,df2

def getByDev2DateRange(dev,startDate,endDate):
    print(dev)
    print(startDate)
    print(endDate)
    if dev not in df2['devId']:
        return '{}'
    dates=pd.date_range(start=startDate, end=endDate)
    dates=dates.map(lambda x:x.strftime('%Y-%m-%d'))
    #使用传感器ID索引过滤数据
    d=df.loc[dev]
    #取得该传感器所有数据的时间列表
    dates2=d.index.map(lambda x:x.strftime('%Y-%m-%d'))
    #用是否包含布尔索引来过滤查询时间范围的数据,也可以使用dates.contains(date2)
    data=d[dates2.isin(dates)]
    #只返回'recDateTime','DevData'两列
    data=data.loc[:,['recDateTime','DevData']]
    data.recDateTime=data.recDateTime.apply(lambda x:x.strftime('%Y-%m-%d %H:%M:%S'))
    data=data.reset_index()
    data=data.drop(columns=['RecDateTime'])
    return data.to_json()

这里有两个dataframe,一个保存传感器基本信息(包含DevID和DevUnit),一个保存历史数据。其中初始化它们时有两种方式,一种是从csv文件中加载,一种是预先将从csv中加载的dataframe使用to_pickle保存到pkl文件中,然后从pkl文件直接加载,后者文件更小而且加载速度更快。

下文将介绍查询数据使用echarts展示的前端代码。

0 人点赞