大数情况下,数据保存在数据库中,使用SQL来从数据库中查询数据,但相对于直接从内存中取数据前者显得比较慢和笨重。下面介绍基于csv文件目录存储数据,使用Tornado来作为Web服务器,使用Pandas来高性能查询数据。
效果如下:
看一下数据在CSV中的存储结构
tornado作为web服务器,index路由对应查询页面,devs路由对应取得所有传感器列表(每个传感器由设备ID和传感器类型唯一决定),data路由根据设备ID和传感器类型,以及日期范围来查询数据。当web服务启动时,同时将数据加载到全局变量保存在内存中。
代码语言:python代码运行次数:0复制#-*- coding:utf-8 -*-
import os
import tornado.httpserver
import tornado.ioloop
import tornado.options
import tornado.web
from tornado.options import define,options
import json
import datetime
from tools import *
define('port',default=8096,help='run on the given port',type=int)
class cross_originAllowed_Handler(tornado.web.RequestHandler):
def initialize(self):
self.set_header('Access-Control-Allow-Origin','*')
self.set_header('Access-Control-Allow-Methods','POST,GET')
self.set_header('Access-Control-Max-Age',1000)
self.set_header('Access-Control-Allow-Headers','*')
class indexHandler(tornado.web.RequestHandler):
def get(self):
self.render('index.html')
class devsHandler(cross_originAllowed_Handler):
def get(self):
self.write({'result':devs.to_json()})
class dataHandler(cross_originAllowed_Handler):
def get(self):
dev=self.get_argument('dev',None)
startDate=self.get_argument('startDate',None)
endDate=self.get_argument('endDate',None)
if not (dev and startDate and endDate):
self.write({'result':'{}'})
return
data=getByDev2DateRange(int(dev),startDate,endDate)
self.write({'result':data})
class Application(tornado.web.Application):
def __init__(self, handlers, **settings):
global df
global devs
df,devs=initDf()
tornado.web.Application.__init__(self, handlers, **settings)
if __name__=='__main__':
tornado.options.parse_command_line()
app=Application(
handlers=[
(r'/',indexHandler),
(r'/index',indexHandler),
(r'/devs',devsHandler),
(r'/data',dataHandler),
(r"/assets/(.*)", tornado.web.StaticFileHandler, {"path": "static/assets"}),
(r"/dist/(.*)", tornado.web.StaticFileHandler, {"path": "static/dist"}),
(r"/css/(.*)", tornado.web.StaticFileHandler, {"path": "static/css"}),
(r"/js/(.*)", tornado.web.StaticFileHandler, {"path": "static/js"}),
],
template_path=os.path.join(os.path.curdir,'templates'),static_path=os.path.join(os.path.curdir,'static'),cookie_secret='miaojiyue',debug=True )
http_server=tornado.httpserver.HTTPServer(app)
http_server.listen(options.port)
tornado.ioloop.IOLoop.instance().start()
使用pandas将数据加载到dataframe中如下:
下面看一下使用Pandas数据分析工具的具体实现
代码语言:python代码运行次数:0复制#-*-coding:utf-8 -*-
import os
import numpy as np
import pandas as pd
def initData():
dataDir='D:wcdata'
csvs=os.listdir(dataDir)
dfs=[]
names=['ID', 'DevID', 'DevData', 'DevUnit', 'voltage',
'DevAlarm', 'RecDateTime', 'StrInfo', 'ErrorCode',
'dflag', 'AddTime']
#遍历文件夹中所有csv文件,将数据拼合到一个dataframe中
for csv in csvs:
#由于csv中首行没有存储列名,指定数据对应的列名称
df0=pd.read_csv(os.path.join(dataDir,csv),names=names)
print(csv)
print(df0)
dfs.append(df0)
df=pd.concat(dfs)
df['devId']=df['DevID']
df['devUnit']=df['DevUnit']
#根据小数点分隔字符串,将时间格式化到整数秒,并将字符串转为时间格式。
df.RecDateTime=df.RecDateTime.apply(lambda x:x[:x.find('.')])
df.RecDateTime=pd.to_datetime(df.RecDateTime)
df['recDateTime']=df['RecDateTime']
#为dataframe指定设备ID和接收时间作为索引并根据索引排序
df=df.set_index(['DevID','RecDateTime']).drop(columns=['ID']).sort_index()
return df
def initDf():
print('begin to read data from pkl!')
global df
df=pd.read_pickle('data.pkl')
print('read complete!')
global df2
print('begin to read devs!')
#devs=df.index.map(lambda x:x[0]).unique().to_list()
#devs=np.load('data.npz')['devs'].tolist()
#df2=pd.read_excel('data.xls',names=["devId","devName","devUnit"])
df2=pd.read_pickle('data2.pkl')
print('devs complete!')
return df,df2
def getByDev2DateRange(dev,startDate,endDate):
print(dev)
print(startDate)
print(endDate)
if dev not in df2['devId']:
return '{}'
dates=pd.date_range(start=startDate, end=endDate)
dates=dates.map(lambda x:x.strftime('%Y-%m-%d'))
#使用传感器ID索引过滤数据
d=df.loc[dev]
#取得该传感器所有数据的时间列表
dates2=d.index.map(lambda x:x.strftime('%Y-%m-%d'))
#用是否包含布尔索引来过滤查询时间范围的数据,也可以使用dates.contains(date2)
data=d[dates2.isin(dates)]
#只返回'recDateTime','DevData'两列
data=data.loc[:,['recDateTime','DevData']]
data.recDateTime=data.recDateTime.apply(lambda x:x.strftime('%Y-%m-%d %H:%M:%S'))
data=data.reset_index()
data=data.drop(columns=['RecDateTime'])
return data.to_json()
这里有两个dataframe,一个保存传感器基本信息(包含DevID和DevUnit),一个保存历史数据。其中初始化它们时有两种方式,一种是从csv文件中加载,一种是预先将从csv中加载的dataframe使用to_pickle保存到pkl文件中,然后从pkl文件直接加载,后者文件更小而且加载速度更快。
下文将介绍查询数据使用echarts展示的前端代码。