HJ 212-2017是污染物在线监控(监测)系统数据传输标准的一种。
本标准适用于污染物在线监控(监测)系统、污染物排放过程(工况)自动监控系统与监控中心之间的数据传输,规定了传输的过程及参数命令、交互命令、数据命令和控制命令的格式,给出了代码定义,本标准允许扩展,但扩展内容时不得与本标准中所使用或保留的控制命令相冲突。 本标准还规定了在线监控(监测)仪器仪表和数据采集传输仪之间的数据传输格式,同时给出了代码定义。
传感器设备通过TCP连接,使用HJ 212协议向服务器发送报文数据。
服务器接收程序运行如下:
设备作为TCP Client上传数据,所以服务端接收程序是一个TCP Server程序,接收到报文,解析并存储。代码如下:
代码语言:python代码运行次数:0复制# -*-coding:utf-8 -*-
import socket
import threading
from datetime import datetime
from hjt212 import *
def deleteConnection(item):
global connectionList
del connectionList['connection' item]
class WebSocket(threading.Thread):#继承Thread
#def __init__(self,conn,index,name,remote,path='/'):
def __init__(self,conn,name,remote,path='/'):
threading.Thread.__init__(self)#初始化父类Thread
self.conn=conn
#self.index=index
self.name=name
self.remote=remote
self.path=path
self.buffer=''
self.buffer_utf8=''
self.length_buffer=0
def run(self):#重载Thread的run
#print ('Socket %s Start!' % self.index)
print ('Socket %s Start!'% (self.remote[0] ":" str(self.remote[1])))
headers={}
self.handshaken=False
while True:
print(self.name)
#print(self.handshaken)
if self.handshaken==False:
message=self.conn.recv(1024)
if(len(message)!=0):
sourceIP=self.remote[0] ":" str(self.remote[1])
print(u"IP::" sourceIP u"n报文::" message.hex() u"n时间::" str(datetime.now()))
nowStr=datetime.now().strftime('%Y-%m-%d %H:%M:%S')
t=threading.Thread(target=savePack,args=(sourceIP,message,nowStr,self))
t.run()
#print('Socket %s Start Handshaken with %s!' % (self.index,self.remote))
print('Socket Start Handshaken with %s!' % (self.remote,))
#print('Socket %s Handshaken with %s success!' %(self.index,self.remote))
else:
self.handshaken = True
else:
#deleteConnection(str(self.index))
self.conn.close()
break #退出线程
class WebSocketServer(object):
def __init__(self):
self.socket=None
def begin(self):
print ('WebSocketServer Start!')
self.socket=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
self.socket.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)
self.socket.bind(('0.0.0.0',8091))
self.socket.listen(50)
#global connectionlist
#i=0
while True:
connection,address=self.socket.accept()
ip=address[0]
newSocket=WebSocket(connection,ip,address)
newSocket.start() #开始线程,执行run函数
#global connectionList
#connectionList['connection' str(i)]=connection
#i=i 1
if __name__=='__main__':
#global connectionList
#connectionList = {}
server=WebSocketServer()
server.begin()
hjt212.py文件中定义了hj212协议数据包的解析和存储:包括CRC校验,报文解析成json字典,最后根据CN编号(2051表示为分钟数据,2061表示为小时数据)分别保存在MongoDB的不同表中。
代码语言:python代码运行次数:0复制# -*-coding:utf-8 -*-
"""
{ $and: [ { "CN": "1" },equals
{ "CN": { $ne: "2" } },doesn't equal
{ "CN": /.*abc.*/i },contains
{ "CN": { $not: /.*abc.*/i } },doesn't contain
{ "CN": /^abc.*/i },starts with
{ "CN": { $not: /^abc.*/i } },doesn't start with
{ "CN": /.*abc$/i },ends with
{ "CN": { $not: /.*abc$/i } },doesn't end with
{ "CN": { $exists: true } }, exists
{ "CN": { $exists: false } },doesn't exist
{ "CN": { $in: [abc] } },in
{ "CN": { $nin: [abc] } },not in
{ "CN": { $all: [abc] } },array contains all
{ "CN": { $gt: "1" } },>
{ "CN": { $gte: "1" } },>=
{ "CN": { $lt: "1" } },<
{ "CN": { $lte: "1" } },<=
{ "CN": { $gte: "1", $lte: "3" } },<=...<
{ "CN": { $gt: "1", $lte: "3" } },<..<=
{ "CN": { $gte: "1", $lt: "3" } },<..<
{ "CN": { $type: 16 } }, has type
{ "CN": { $not: { $type: 16 } } ,doesn't have type
{ $text: { $search: "abc", $language: "zhs", $caseSensitive: true } } text index search
{ $or: [ { "CN": "1" }, { "MN": { $gt: "1" } } ] }
{ $or: [ { "CN": "1" }, { "MN": { $gt: "1" } }, { $and: [ { "CN": "1" } ] } ] }
{ $or: [ { "CN": "1" }, { $and: [ { "CN": "1" } ] }, { $and: [ { "CN": "1" } ] } ] }
{ $nor: [ { "CN": "1" }, { $and: [ { "CN": "1" } ] }, { $or: [ { "CN": "1" } ] } ] }
{ $nor: [ { "CN": "1" }, { "CP": { $elemMatch: {"CN": "1" } } }, { $or: [ { "CN": "1" } ] } ] }
{ $nor: [ { "CN": "1" }, { "CP": { $not: { $elemMatch: {"CN": "1" } } } }, { $or: [ { "CN": "1" } ] } ] }
{ "PW": 1, "MN": 0}
{ "PW": 1, "Flag": -1}
"""
dictFactor1={'w01001': 'pH值', 'w01009': '溶解氧', 'w01010': '水温', 'w01014': '电导率', 'w01012': '悬浮物', 'w01018': '化学需氧量', 'w21003': '氨氮', 'w20122': '总铜', 'w21016': '氰化物'}
dictFactor2={'w01001': 'pH值', 'w01009': '溶解氧', 'w01010': '水温', 'w01014': '电导率', 'w01012': '悬浮物', 'w01018': '化学需氧量', 'w21003': '氨氮', 'w20122': '总铜', 'w21016': '氰化物'}
from datetime import datetime
global bs
global data
import binascii
import pymongo
import logging
logger=logging.getLogger('HJT212')
fh=logging.FileHandler('log.txt')
logFormatter=logging.Formatter('%(asctime)s -%(name)s -%(message)s')
fh.setFormatter(logFormatter)
logger.addHandler(fh)
logger.setLevel(logging.ERROR)
mongo=pymongo.MongoClient()
t1=mongo.water.minute
t2=mongo.water.hour
def crc32asii(v):
return '0x%8x' % (binascii.crc32(v) & 0xffffffff)
def crc2hex(crc):
return 'x' % (binascii.crc32(binascii.a2b_hex(crc)) & 0xffffffff)
def crc16(x, invert):
a = 0xFFFF
b = 0xA001
for byte in x:
a ^= ord(byte)
for i in range(8):
last = a % 2
a >>= 1
if last == 1:
a ^= b
s = hex(a).upper()
return s[4:6] s[2:4] if invert == True else s[2:4] s[4:6]
def calc_crc(string):
data = bytearray.fromhex(string)
crc = 0xFFFF
for pos in data:
crc ^= pos
for i in range(8):
if ((crc & 1) != 0):
crc >>= 1
crc ^= 0xA001
else:
crc >>= 1
return hex(((crc & 0xff) << 8) (crc >> 8))
"""
unsigned int CRC16_Checkout ( unsigned char *puchMsg, unsigned int usDataLen )
{
unsigned int i,j,crc_reg,check;
crc_reg = 0xFFFF;
for(i=0;i<usDataLen;i )
{
crc_reg = (crc_reg>>8) ^ puchMsg[i];
for(j=0;j<8;j )
{
check = crc_reg & 0x0001;
crc_reg >>= 1;
if(check==0x0001)
{
crc_reg ^= 0xA001;
}
}
}
return crc_reg;
}
"""
def crc(string):
data=bytearray.fromhex(string)
crc_reg=0xFFFF
for x in data:
crc_reg=(crc_reg>>8)^x
for i in range(8):
check=crc_reg&0x0001
crc_reg=(crc_reg>>1)
if(check==0x0001):
crc_reg=(crc_reg^0xA001)
return crc_reg
def parsePack(ip,bs,receiveTime):
#global bs
#bs=text.strip().split(' ')
#packHead=bytes.fromhex(''.join(bs[:2]))
text=bs.hex()
packHead=bytes.fromhex(text[:4])
packHead=packHead.decode('ascii')
print(u"包头:%s"%packHead)
#dataLength=bytes.fromhex(''.join(bs[2:6]))
dataLength=bytes.fromhex(text[4:12])
#dataLength=int(dataLength)
dataLength=int(dataLength)*2
print(u"数据段长度:%s"�taLength)
global data
#data=bs[6:dataLength 6]
data=text[12:dataLength 12]
#crc1=crc(''.join(data))
crc1=crc(data)
print("计算CRC为:%s"%crc1)
print(text[dataLength 12:dataLength 12 8])
#crc2=int(bytes.fromhex(''.join(bs[dataLength 6:dataLength 6 4])),16)
crc2=int(bytes.fromhex(text[dataLength 12:dataLength 12 8]),16)
print("数据CRC为:%s"%crc2)
print(u"CRC比对成功"if crc1==crc2 else u"CRC比对失败")
print(u'数据段为:n%s'%''.join(data))
#print(u'数据为:n%s'%bytes.fromhex(''.join(data)).decode('ascii'))
print(u'数据为:n%s'%bytes.fromhex(data).decode('ascii'))
#dataStr=bytes.fromhex(''.join(data)).decode('ascii')
dataStr=bytes.fromhex(data).decode('ascii')
sepNum=dataStr.find('DataTime')
dataStr1=dataStr[:sepNum]
dataDict1=dict(map(lambda x:(x.split('=')[0],x.split('=')[1]),dataStr1.split(';')))
print(dataDict1)
dataStr2=dataStr[sepNum:]
dateText=dataStr2.split(';')[0].split('=')[1]
date=datetime.strptime(dateText,'%Y%m%d%H%M%S')
print(u"数据时间为:%s"�te.strftime('%Y-%m-%d %H:%M:%S'))
dateStr=date.strftime('%Y-%m-%d %H:%M:%S')
list2=list(map(lambda x:x.split(','),dataStr2.split(';')[1:]))
print(list2)
dictValues=dict(map(lambda x:[x[0].split('=')[0],x[0].split('=')[1].strip('&')],list2))
dictFlags=dict(map(lambda x:[x[1].split('=')[0],x[1].split('=')[1].strip('&')],list2))
dictValues0=dict(map(lambda x:[dictFactor1.get(x[0].split('-')[0]),x[1]],dictValues.items()))
dictFlags0=dict(map(lambda x:[dictFactor1.get(x[0].split('-')[0]),x[1]],dictFlags.items()))
print(dictValues0)
print(dictFlags0)
#dataFoot=bytes.fromhex(''.join(bs[-2:]))
dataFoot=bytes.fromhex(text[-4:])
print(u'包尾为:%s'�taFoot.decode('ascii'))
qn=datetime.strptime(dataDict1['QN'][:-3],'%Y%m%d%H%M%S')
qnStr=qn.strftime('%Y-%m-%d %H:%M:%S')
dataDict1.update({'ip':ip.split(':')[0],'port':ip.split(':')[1],'DataTime':dateText,'dateStr':dateStr,'qnStr':qnStr,'receiveTime':receiveTime})
return crc1==crc2,dataDict1,dictValues,dictFlags
def savePack(ip,bs,receiveTime,self):
try:
#if True:
success,heads,values,flags=parsePack(ip,bs,receiveTime)
except Exception as e:
print("error occurred")
logger.error(e)
#self.handshaken = True
return
if(success):
#mongo.insert_one()
print(heads)
print(values)
print(flags)
values.update(heads)
values.update(flags)
CN=int(heads['CN'])
if(CN==2051):
insId=t1.insert_one(values)
print(u'ID:%s,已经保存到分钟数据表'%str(insId.inserted_id))
elif(CN==2061):
insId=t2.insert_one(values)
print(u'ID:%s,已经保存到小时数据表'%str(insId.inserted_id))
if __name__=='__main__':
pass