1、实时语音识别、实时监控状态。许多新手在处理离线数据时得心应手,在面对实时的数据进行数据处理时会不知所措,而时序在智能制造领域是个非常重要的指标,在线实时检测是优势与趋势。今天分享下python如何处理在线数据。简单说下(实际情况要比这复杂的多的多),利用queue、threading多线程处理。拿到数据如何进行处理要根据实际情况。
2、代码。
代码语言:javascript复制import time
import sqlite3
import queue
import threading
import pandas as pd
# 连接数据库操作
conn = sqlite3.connect("datebase.db", check_same_thread=False)
cn = conn.cursor()
class GetDataThread(threading.Thread):
def __init__(self):
super(GetDataThread, self).__init__()
# 用csv数据模拟实时获取到的数据,实际情况是根据通讯或者其他方式获取到数据。
self.df = pd.read_csv("test.csv",encoding="gbk",header=None)
self.data = self.df[0] # 第一列数据
# 调用两个类的线程
self.thread_one = WriteThread()
self.thread_two = DataOperationThread()
self.queue_one = self.thread_one.queue
self.queue_two = self.thread_two.queue
def run(self):
# 开启线程
self.thread_one.start()
self.thread_two.start()
# 把我们数据分别put到两个线程的队列里
for rows in self.data:
self.thread_two.queue.put(rows)
# 保存的数据进入当前时间
data = [time.time(), rows]
self.thread_one.queue.put(data)
# 把获取到的数据存进数据库,写入数据线程类WriteThread。
class WriteThread(threading.Thread):
def __init__(self):
super(WriteThread, self).__init__()
self.queue = queue.Queue() # 定义一个队列
def run(self):
while True:
list_data = self.queue.get() # 实时获取数据
print(list_data[0], list_data[1])
# 插入数据库
cn.execute("insert into tableone(time,data) values('{}', '{}')".format(list_data[0],list_data[1]))
conn.commit()
class DataOperationThread(threading.Thread):
def __init__(self):
super(DataOperationThread, self).__init__()
self.queue = queue.Queue()
self.data_list = []
# self.list_index = []
# 对实时获取到的数据进行处理
def run(self):
while True:
data = self.queue.get()
self.data_list.append(data)
# 后面就根据实际数据,实际情况进行处理。
if __name__ == '__main__':
GetDataThread().start()