Python开发---基于HJ 212协议的简单接收程序

2022-04-01 16:51:25 浏览数 (1)

HJ 212-2017是污染物在线监控(监测)系统数据传输标准的一种。

本标准适用于污染物在线监控(监测)系统、污染物排放过程(工况)自动监控系统与监控中心之间的数据传输,规定了传输的过程及参数命令、交互命令、数据命令和控制命令的格式,给出了代码定义,本标准允许扩展,但扩展内容时不得与本标准中所使用或保留的控制命令相冲突。 本标准还规定了在线监控(监测)仪器仪表和数据采集传输仪之间的数据传输格式,同时给出了代码定义。

传感器设备通过TCP连接,使用HJ 212协议向服务器发送报文数据。

服务器接收程序运行如下:

设备作为TCP Client上传数据,所以服务端接收程序是一个TCP Server程序,接收到报文,解析并存储。代码如下:

代码语言:python代码运行次数:0复制
# -*-coding:utf-8 -*-
import socket
import threading
from datetime import datetime
from hjt212 import *

def deleteConnection(item):
    global connectionList
    del connectionList['connection' item]



class WebSocket(threading.Thread):#继承Thread

    #def __init__(self,conn,index,name,remote,path='/'):
    def __init__(self,conn,name,remote,path='/'):
        threading.Thread.__init__(self)#初始化父类Thread
        self.conn=conn
        #self.index=index
        self.name=name
        
        self.remote=remote
        self.path=path
        
        self.buffer=''
        self.buffer_utf8=''
        self.length_buffer=0

    def run(self):#重载Thread的run
        #print ('Socket %s Start!' % self.index)
        print ('Socket %s Start!'% (self.remote[0] ":" str(self.remote[1])))
        headers={}
        self.handshaken=False

        while True:
            print(self.name)
            #print(self.handshaken)
                      
            if self.handshaken==False:
                message=self.conn.recv(1024)
                if(len(message)!=0):
		    
                    sourceIP=self.remote[0] ":" str(self.remote[1])
                    print(u"IP::" sourceIP u"n报文::" message.hex() u"n时间::" str(datetime.now()))
                    nowStr=datetime.now().strftime('%Y-%m-%d %H:%M:%S')
                    t=threading.Thread(target=savePack,args=(sourceIP,message,nowStr,self))
                    t.run()
                    
                    #print('Socket %s Start Handshaken with %s!' % (self.index,self.remote))
                    print('Socket  Start Handshaken with %s!' % (self.remote,))
                    #print('Socket %s Handshaken with %s success!' %(self.index,self.remote))
                    
                
                     
                   
                else:
                    self.handshaken = True
            else:
                
                #deleteConnection(str(self.index))
                self.conn.close()
                break #退出线程

class WebSocketServer(object):
    def __init__(self):
        self.socket=None

    def begin(self):
        print ('WebSocketServer Start!')
        self.socket=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
        self.socket.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)
        self.socket.bind(('0.0.0.0',8091))
        self.socket.listen(50)

        #global connectionlist

        #i=0
    
        while True:
            connection,address=self.socket.accept()
            
            ip=address[0]    
            
            newSocket=WebSocket(connection,ip,address)
            newSocket.start() #开始线程,执行run函数
            #global connectionList
            #connectionList['connection' str(i)]=connection
            #i=i 1


if __name__=='__main__':
    #global connectionList
    #connectionList = {}
    server=WebSocketServer()
    server.begin()

hjt212.py文件中定义了hj212协议数据包的解析和存储:包括CRC校验,报文解析成json字典,最后根据CN编号(2051表示为分钟数据,2061表示为小时数据)分别保存在MongoDB的不同表中。

代码语言:python代码运行次数:0复制
# -*-coding:utf-8 -*-

"""
{ $and: [ { "CN": "1" },equals
{ "CN": { $ne: "2" } },doesn't equal
{ "CN": /.*abc.*/i },contains
{ "CN": { $not: /.*abc.*/i } },doesn't contain
{ "CN": /^abc.*/i },starts with
{ "CN": { $not: /^abc.*/i } },doesn't start with
{ "CN": /.*abc$/i },ends with
{ "CN": { $not: /.*abc$/i } },doesn't end with
{ "CN": { $exists: true } }, exists
{ "CN": { $exists: false } },doesn't exist
{ "CN": { $in: [abc] } },in 
{ "CN": { $nin: [abc] } },not in
{ "CN": { $all: [abc] } },array contains all
{ "CN": { $gt: "1" } },>
{ "CN": { $gte: "1" } },>=
{ "CN": { $lt: "1" } },<
{ "CN": { $lte: "1" } },<=
{ "CN": { $gte: "1", $lte: "3" } },<=...<
{ "CN": { $gt: "1", $lte: "3" } },<..<=
{ "CN": { $gte: "1", $lt: "3" } },<..<
{ "CN": { $type: 16 } }, has type 
{ "CN": { $not: { $type: 16 } } ,doesn't have type
{ $text: { $search: "abc", $language: "zhs", $caseSensitive: true } } text index search

{ $or: [ { "CN": "1" }, { "MN": { $gt: "1" } } ] }

{ $or: [ { "CN": "1" }, { "MN": { $gt: "1" } }, { $and: [ { "CN": "1" } ] } ] }

{ $or: [ { "CN": "1" }, { $and: [ { "CN": "1" } ] }, { $and: [ { "CN": "1" } ] } ] }

{ $nor: [ { "CN": "1" }, { $and: [ { "CN": "1" } ] }, { $or: [ { "CN": "1" } ] } ] }

{ $nor: [ { "CN": "1" }, { "CP": { $elemMatch: {"CN": "1" } } }, { $or: [ { "CN": "1" } ] } ] }

{ $nor: [ { "CN": "1" }, { "CP": { $not: { $elemMatch: {"CN": "1" } } } }, { $or: [ { "CN": "1" } ] } ] }


{ "PW": 1, "MN": 0}

{ "PW": 1, "Flag": -1}
"""


dictFactor1={'w01001': 'pH值', 'w01009': '溶解氧', 'w01010': '水温', 'w01014': '电导率', 'w01012': '悬浮物', 'w01018': '化学需氧量', 'w21003': '氨氮', 'w20122': '总铜', 'w21016': '氰化物'}

dictFactor2={'w01001': 'pH值', 'w01009': '溶解氧', 'w01010': '水温', 'w01014': '电导率', 'w01012': '悬浮物', 'w01018': '化学需氧量', 'w21003': '氨氮', 'w20122': '总铜', 'w21016': '氰化物'}
from datetime import datetime
global bs
global data
import binascii
import pymongo
import logging
logger=logging.getLogger('HJT212')
fh=logging.FileHandler('log.txt')
logFormatter=logging.Formatter('%(asctime)s -%(name)s -%(message)s')
fh.setFormatter(logFormatter)
logger.addHandler(fh)
logger.setLevel(logging.ERROR)

mongo=pymongo.MongoClient()
t1=mongo.water.minute
t2=mongo.water.hour
def crc32asii(v):
    return '0x%8x' % (binascii.crc32(v) & 0xffffffff)


def crc2hex(crc): 
    return 'x' % (binascii.crc32(binascii.a2b_hex(crc)) & 0xffffffff)

def crc16(x, invert):
    a = 0xFFFF
    b = 0xA001
    for byte in x:
        a ^= ord(byte)
        for i in range(8):
            last = a % 2
            a >>= 1
            if last == 1:
                a ^= b
    s = hex(a).upper()
 
    return s[4:6] s[2:4] if invert == True else s[2:4] s[4:6]
def calc_crc(string):
    data = bytearray.fromhex(string)
    crc = 0xFFFF
    for pos in data:
        crc ^= pos
        for i in range(8):
            if ((crc & 1) != 0):
                crc >>= 1
                crc ^= 0xA001
            else:
                crc >>= 1
    return hex(((crc & 0xff) << 8)   (crc >> 8))


"""
unsigned int CRC16_Checkout ( unsigned char *puchMsg, unsigned int usDataLen ) 
{ 
unsigned int i,j,crc_reg,check; 
crc_reg = 0xFFFF; 
for(i=0;i<usDataLen;i  ) 
{ 
crc_reg = (crc_reg>>8) ^ puchMsg[i]; 
 for(j=0;j<8;j  ) 
{ 
 check = crc_reg & 0x0001; 
 crc_reg >>= 1; 
 if(check==0x0001) 
{ 
 crc_reg ^= 0xA001; 
 } 
 } 
} 
return crc_reg; 
}
"""
 
def crc(string):
    data=bytearray.fromhex(string)
    crc_reg=0xFFFF
    for x in data:
        crc_reg=(crc_reg>>8)^x
        for i in range(8):
            check=crc_reg&0x0001
            crc_reg=(crc_reg>>1)
            if(check==0x0001):
                crc_reg=(crc_reg^0xA001)
    return crc_reg

def parsePack(ip,bs,receiveTime):
    #global bs
    #bs=text.strip().split(' ')
    #packHead=bytes.fromhex(''.join(bs[:2]))
    text=bs.hex()
    packHead=bytes.fromhex(text[:4])
    packHead=packHead.decode('ascii')
    print(u"包头:%s"%packHead)
    #dataLength=bytes.fromhex(''.join(bs[2:6]))
    dataLength=bytes.fromhex(text[4:12])
    #dataLength=int(dataLength)
    dataLength=int(dataLength)*2
    print(u"数据段长度:%s"�taLength)
    global data
    #data=bs[6:dataLength 6]
    data=text[12:dataLength 12]
    #crc1=crc(''.join(data))
    crc1=crc(data)
    print("计算CRC为:%s"%crc1)
    print(text[dataLength 12:dataLength 12 8])
    #crc2=int(bytes.fromhex(''.join(bs[dataLength 6:dataLength 6 4])),16)
    crc2=int(bytes.fromhex(text[dataLength 12:dataLength 12 8]),16)
    print("数据CRC为:%s"%crc2)
    print(u"CRC比对成功"if crc1==crc2 else u"CRC比对失败")
    print(u'数据段为:n%s'%''.join(data))
    #print(u'数据为:n%s'%bytes.fromhex(''.join(data)).decode('ascii'))
    print(u'数据为:n%s'%bytes.fromhex(data).decode('ascii'))
    #dataStr=bytes.fromhex(''.join(data)).decode('ascii')
    dataStr=bytes.fromhex(data).decode('ascii')
    sepNum=dataStr.find('DataTime')
    dataStr1=dataStr[:sepNum]
    
    dataDict1=dict(map(lambda x:(x.split('=')[0],x.split('=')[1]),dataStr1.split(';')))
    print(dataDict1)
    dataStr2=dataStr[sepNum:]
    dateText=dataStr2.split(';')[0].split('=')[1]
    date=datetime.strptime(dateText,'%Y%m%d%H%M%S')
    
    print(u"数据时间为:%s"�te.strftime('%Y-%m-%d %H:%M:%S'))
    dateStr=date.strftime('%Y-%m-%d %H:%M:%S')
    list2=list(map(lambda x:x.split(','),dataStr2.split(';')[1:]))
    print(list2)
    dictValues=dict(map(lambda x:[x[0].split('=')[0],x[0].split('=')[1].strip('&')],list2))
    dictFlags=dict(map(lambda x:[x[1].split('=')[0],x[1].split('=')[1].strip('&')],list2))
    
    dictValues0=dict(map(lambda x:[dictFactor1.get(x[0].split('-')[0]),x[1]],dictValues.items()))
    dictFlags0=dict(map(lambda x:[dictFactor1.get(x[0].split('-')[0]),x[1]],dictFlags.items()))
    print(dictValues0)
    print(dictFlags0)
    #dataFoot=bytes.fromhex(''.join(bs[-2:]))
    dataFoot=bytes.fromhex(text[-4:])
    print(u'包尾为:%s'�taFoot.decode('ascii'))
    qn=datetime.strptime(dataDict1['QN'][:-3],'%Y%m%d%H%M%S')
    qnStr=qn.strftime('%Y-%m-%d %H:%M:%S')
    dataDict1.update({'ip':ip.split(':')[0],'port':ip.split(':')[1],'DataTime':dateText,'dateStr':dateStr,'qnStr':qnStr,'receiveTime':receiveTime})
    return crc1==crc2,dataDict1,dictValues,dictFlags
    
def savePack(ip,bs,receiveTime,self):
    try:
    #if True:
        success,heads,values,flags=parsePack(ip,bs,receiveTime)
    except Exception as e:
        print("error occurred")
        logger.error(e)
        #self.handshaken = True
        return 
         

    if(success):
        #mongo.insert_one()
        print(heads)
        print(values)
        print(flags)
        values.update(heads)
        values.update(flags)
        CN=int(heads['CN'])
        if(CN==2051):
            insId=t1.insert_one(values)
            print(u'ID:%s,已经保存到分钟数据表'%str(insId.inserted_id))
        elif(CN==2061):
            insId=t2.insert_one(values)
            print(u'ID:%s,已经保存到小时数据表'%str(insId.inserted_id))
        
if __name__=='__main__':
    pass
    

0 人点赞