前言
- 实时语音识别、实时监控检测状态等等。你是不是在处理离线数据时得心应手,而在面对实时的数据处理的时候会不知所措?
- 时序在智能制造领域是个非常重要的指标,在线实时检测是优势与趋势。
- python如何处理在线数据?简单说下,利用queue、threading多线程处理。(实际情况要比这复杂多得多)
- 拿到数据后的处理要根据实际想情况而定,下面我会举个简单的例子说明。
一、实时数据输入
1.1、队列模拟实时数据
由于每个人在通讯过程中,实时获取的数据方式不同。下面构造数据模拟通讯数据实时输入。
代码语言:javascript复制import queue
list1 = [i for i in range(100 * 1000)] # 0到10w 模拟实时输入的数据
q = queue.Queue() # 构建一个队列
for data in list1:
q.put(data)
print(q.get())
1.2、获取实时数据类
代码语言:javascript复制import threading
class GetDataThread(threading.Thread):
def __init__(self):
super(GetDataThread, self).__init__()
self.data = [i for i in range(100 * 1000)]
def run(self):
pass
二、实时数据处理
2.1、保存实时数据
保存数据是为了离线进行分析。这里把数据保存到数据库,用sqlite3第三方库。当然如果只是简单分析,可以直接写入csv文件
代码语言:javascript复制import queue
import sqlite3
import threading
# 连接数据库操作
conn = sqlite3.connect("datebase.db", check_same_thread=False)
cn = conn.cursor()
# 把获取到的数据存进数据库,写入数据 线程类WriteThread。
class WriteThread(threading.Thread):
def __init__(self):
super(WriteThread, self).__init__()
self.queue = queue.Queue() # 定义一个自身队列
def run(self):
while True:
list_data = self.queue.get() # 实时获取数据
# 插入数据库操作。 如果只是简单分析,可以直接写入csv文件
cn.execute("insert into tableone(time,data) values('{}', '{}')"
.format(list_data[0], list_data[1]))
conn.commit() # 提交
2.2、实时处理数据
代码语言:javascript复制import queue
import numpy as np
import threading
class DataOperationThread(threading.Thread):
def __init__(self):
super(DataOperationThread, self).__init__()
self.queue = queue.Queue()
self.data_list = []
# 对实时获取到的数据进行处理
def run(self):
while True:
data = self.queue.get()
self.data_list.append(data)
"""
下面就根据实际数据,实际情况进行处理,实际情况可能较为复杂。
我这里举个简单例子。计算输出每1000条数据的平均值。
"""
if len(self.data_list) > 1000:
avg = np.average(self.data_list[:1000])
print(avg) # 输出每1000条数据平均值
self.data_list = self.data_list[1000:] # 删除计算过的数据,重置列表
三、完整代码
代码语言:javascript复制import time
import sqlite3
import queue
import threading
import numpy as np
# 连接数据库操作 数据库要提前构建完成
conn = sqlite3.connect("datebase.db", check_same_thread=False)
cn = conn.cursor()
class GetDataThread(threading.Thread):
def __init__(self):
super(GetDataThread, self).__init__()
# 用csv数据模拟实时获取到的数据,实际情况是根据通讯或者其他方式获取到数据。
self.data = [i for i in range(100 * 1000)]
# 调用两个类的线程
self.thread_one = WriteThread()
self.thread_two = DataOperationThread()
self.queue_one = self.thread_one.queue
self.queue_two = self.thread_two.queue
def run(self):
# 开启线程
self.thread_one.start()
self.thread_two.start()
# 把我们数据分别put到两个线程的队列里
for rows in self.data:
self.thread_two.queue.put(rows) # 实时处理的数据
data = [time.time(), rows]
self.thread_one.queue.put(data) # 保存到数据库里的数据 需要当前数据的时间信息
time.sleep(0.001) # !!!!注:这里模拟实际通讯延迟 实际过程中不需要这行代码
# 把获取到的数据存进数据库,写入数据线程类WriteThread。
class WriteThread(threading.Thread):
def __init__(self):
super(WriteThread, self).__init__()
self.queue = queue.Queue() # 定义自身队列
def run(self):
while True:
list_data = self.queue.get() # 实时获取数据
print(list_data[0], list_data[1])
# 插入数据库
cn.execute("insert into tableone(time,data) values('{}', '{}')".format(list_data[0], list_data[1]))
conn.commit()
class DataOperationThread(threading.Thread):
def __init__(self):
super(DataOperationThread, self).__init__()
self.queue = queue.Queue()
self.data_list = []
# 对实时获取到的数据进行处理
def run(self):
while True:
data = self.queue.get()
self.data_list.append(data)
"""
下面就根据实际数据,实际情况进行处理,实际情况可能较为复杂。
我这里举个简单例子。计算输出每1000条数据的平均值。
"""
if len(self.data_list) > 1000:
avg = np.average(self.data_list[:1000])
print(avg) # 输出每1000条数据平均值
self.data_list = self.data_list[1000:] # 删除计算过的数据,重置列表
if __name__ == '__main__':
GetDataThread().start()