物联网MQTT协议报文解析(简单的c语音客户端实现)

2020-08-04 10:27:03 浏览数 (1)

MQTT(Message Queuing Telemetry Transport),是一个物联网传输协议,它被设计用于轻量级的发布/订阅式消息传输,旨在为低带宽和不稳定的网络环境中的物联网设备提供可靠的网络服务。MQTT是专门针对物联网开发的轻量级传输协议。MQTT协议针对低带宽网络,低计算能力的设备,做了特殊的优化,使得其能适应各种物联网应用场景。

如今很多第三方推送平台都采用了MQTT来实现,消息中间件ActiveMQ的订阅/发布模块也是基于MQTT实现。

以下为MQTT的 会话,订阅,发布的几个报文的解析:

先看下这张图,为整体的报文结构。大致分为 固定头(Control PacketLength) 可变头 载体

PS E:testmqtt> .test.exe test socket: ->connect ok!

//会话,CONNECT ->send: 102900044D51545404C2003C000A0102303330343035303600067075626C696300097061677075626C6963 <-recv: 20020000 <-recv ok!len = 4 //订阅主题 ->send: 821C000100176A6B2F636F6D6D616E642F7265616C79636F6E74726F6C00 <-recv: 9003000100 <-recv ok!len = 5 //订阅主题 . ->send: 821D000200186A6B2F636F6D6D616E642F72656164706172616D6574657200 <-recv: 9003000200 //发布消息 ->send: 302200166A6B2F72657475726E2F7265616C79636F6E74726F6C61626331323938370000

//收到订阅的主题 <-recv: 301C00176A6B2F636F6D6D616E642F7265616C79636F6E74726F6C998866 <-recv ok!len = 30 请按任意键继续. . . PS E:testmqtt>

报文解析:

//CONNECT 报文

->send:

102900044D51545404C2003C000A0102303330343035303600067075626C696300097061677075626C6963

解析:

//固定报文头

10 //固定报文头 byte1

29 //固定报文头之 byte2

//可变报文头

0004//长度

4D515454 //内容(MQTT)

04 //协议级别

C2//连接标志位

003C //保持连接时长(60秒)

//载体部分

000A//长度

01023033303430353036//用户名

0006

7075626C6963

0009

7061677075626C6963 //密码

<-recv:

20020000

解析:

20 //固定头

02//固定头 byte2 长度

00 //连接确认标志

00 //连接返回码

//订阅主题报文

->send:

821C000100176A6B2F636F6D6D616E642F7265616C79636F6E74726F6C00

解析:

82 //固定报文头 byte1

1C //固定报文头 byte2 (剩余长度)

//可变报文头

00//消息标识符byte1

01//消息标识符byte2

//载荷

0017//主题长度

6A6B2F636F6D6D616E642F7265616C79636F6E74726F6C // 内容为 : jk/command/realycontrol

00 //服务质量要求Qos

<-recv:

9003000100

应答解析:

90//固定报文头

03 //固定报文头 (长度)

0001 //报文标识符

00

<-recv ok!len = 5

//订阅主题报文

->send:

821D000200186A6B2F636F6D6D616E642F72656164706172616D6574657200

<-recv:

301C00176A6B2F636F6D6D616E642F7265616C79636F6E74726F6C9988669003000200

<-recv ok!len = 35

//发送消息报文(固定头 可变头(主题长度 主题内容 标识符(可选) 载体内容))

->send:

302200166A6B2F72657475726E2F7265616C79636F6E74726F6C61626331323938370000

30 //固定头 (控制字)

22 //固定头(长度)

//可变头

0016

6A6B2F72657475726E2F7265616C79636F6E74726F6C //主题名内容为: jk/return/realycontrol

61626331323938370000 // 载体内容为: abc12987

只有当 QoS 等级是 1 或 2 时,报文标识符(Packet Identifier)字段才能出现在 PUBLISH 报文中

代码语言:javascript复制
/*******************************************************************************
函数名称:MqttConnectPacket
函数功能:按照MQTT协议发送建立连接数据包
输入参数:*packet,连接数据包缓存指针,*id:用户ID号指针,*username:用户名指针 *password:用户密码指针
输出参数:无
备注说明:用户ID是必需的,用户名和密码可以为空
********************************************************************************/
u16 MqttConnectPacket(u8 *mqtt_message,char *client_id,char *username,char *password)
{
	u16 client_id_length = strlen(client_id);
	u16 username_length = strlen(username);
	u16 password_length = strlen(password);
	u16 packetLen;
	u16 i,baseIndex;
	
	packetLen = 12   2   client_id_length;
	if(username_length > 0)
		packetLen = packetLen   2   username_length;
	if(password_length > 0)
		packetLen = packetLen  2   password_length;
	
	mqtt_message[0] = 16;				//0x10 // MQTT Message Type CONNECT
	mqtt_message[1] = packetLen - 2;	//剩余长度,不包括固定头
	baseIndex = 2;
	if(packetLen > 127)
	{
		mqtt_message[2] = 1;			//packetLen/127;    
		baseIndex = 3;
	}
	mqtt_message[baseIndex  ] = 0;		// Protocol Name Length MSB    
	mqtt_message[baseIndex  ] = 4;		// Protocol Name Length LSB    
	mqtt_message[baseIndex  ] = 77;		// ASCII Code for M    
	mqtt_message[baseIndex  ] = 81;		// ASCII Code for Q    
	mqtt_message[baseIndex  ] = 84;		// ASCII Code for T    
	mqtt_message[baseIndex  ] = 84;		// ASCII Code for T    
	mqtt_message[baseIndex  ] = 4;		// MQTT Protocol version = 4    
	mqtt_message[baseIndex  ] = 0xC2;		// conn flags 需要用户名和密码认证
	mqtt_message[baseIndex  ] = 0;		// Keep-alive Time Length MSB    
	mqtt_message[baseIndex  ] = 60;		// Keep-alive Time Length LSB    
	mqtt_message[baseIndex  ] = (0xff00&client_id_length)>>8;// Client ID length MSB    
	mqtt_message[baseIndex  ] = 0xff&client_id_length;	// Client ID length LSB    
	
	// Client ID
	for(i = 0; i < client_id_length; i  )
	{
		mqtt_message[baseIndex   i] = client_id[i];    
	}
	baseIndex = baseIndex client_id_length;
		
	if(username_length > 0)
	{
		//username    
		mqtt_message[baseIndex  ] = (0xff00&username_length)>>8;//username length MSB    
		mqtt_message[baseIndex  ] = 0xff&username_length;	//username length LSB    
		for(i = 0; i < username_length ; i  )
		{
			mqtt_message[baseIndex   i] = username[i];    
		}
		baseIndex = baseIndex   username_length;
	}
		
	if(password_length > 0)
	{
		//password    
		mqtt_message[baseIndex  ] = (0xff00&password_length)>>8;//password length MSB    
		mqtt_message[baseIndex  ] = 0xff&password_length;	//password length LSB    
		for(i = 0; i < password_length ; i  )
		{
			mqtt_message[baseIndex   i] = password[i];    
		}
		baseIndex  = password_length; 
	}	
	
	return baseIndex;    
}

/*******************************************************************************
函数名称:MqttPublishPacket
函数功能:按照MQTT协议构建MQTT发布消息包
输入参数:*mqtt_message,连接数据包缓存指针,*topic;消息主题  message:消息内容  message_ln:消息内容长度 qos:服务质量0、1、2
输出参数:无
备注说明:
********************************************************************************/
u16 MqttPublishPacket(u8 *mqtt_message, char * topic,char * message,u16 message_ln, u8 qos)
{  
	u16 topic_length = strlen(topic);    
	u16 message_length = message_ln;//strlen(message);  
	u16 i,index=0;	
	u16	ln = 0;
	static u16 id=0;
	
	mqtt_message[index  ] = 48;	//0x30 // MQTT Message Type PUBLISH    
//////////////////////////20180927///////////////////////////////////////////////////	
//	if(qos)
//		mqtt_message[index  ] = 2   topic_length   2   message_length;
//	else
//		mqtt_message[index  ] = 2   topic_length   message_length;   // Remaining length   
	///////////////////////以上是老代码,以下是新代码。老代码只能发送小于127字节的数据长度////////
	//新代码可以发送的长度为16383,即15K//////////////////
	ln = 2   topic_length   message_length;
	if(ln <128)
	  mqtt_message[index  ] = ln;
	else if(ln < 16384)
	{
	  	mqtt_message[index  ] = (0x80 |(ln8));	
		mqtt_message[index  ] = ln/128;
	}
//////////////////////////20180927///////////////////////////////////////////////////  
	mqtt_message[index  ] = (0xff00&topic_length)>>8;
	mqtt_message[index  ] = 0xff&topic_length;
		
	// Topic    
	for(i = 0; i < topic_length; i  )
	{
		mqtt_message[index   i] = topic[i];
	}
	index  = topic_length;
		
	if(qos)
	{
		mqtt_message[index  ] = (0xff00&id)>>8;
		mqtt_message[index  ] = 0xff&id;
		id  ;
	}
	
	// Message
	for(i = 0; i < message_length; i  )
	{
		mqtt_message[index   i] = message[i];
	}
	index  = message_length;
		
	return index;
}

/*******************************************************************************
函数名称:MqttPublishAckPacket
函数功能:按照MQTT协议构建MQTT发布消息确认包
输入参数:*mqtt_message,连接数据包缓存指针,*topic  message   qos:服务质量0、1、2
输出参数:无
备注说明://对QoS级别1的 PUBLISH 消息的回应当服务器发送 PUBLISH 消息给订阅者客户端,客户端回复 PUBACK 消息
********************************************************************************/
u8 MqttPublishAckPacket(u8 *mqtt_message)
{
	static u16 id=0;
	
	mqtt_message[0] = 64;				//0x40 //消息类型和标志 PUBACK
	mqtt_message[1] = 2;				//剩余长度(不包括固定头部)
	mqtt_message[2] = (0xff00&id)>>8;	//消息标识符
	mqtt_message[3] = 0xff&id;			//消息标识符
	id  ;
	
	return 4;
}

/*******************************************************************************
函数名称:MqtSubscribePacket
函数功能:按照MQTT协议构建MQTT订阅消息包
输入参数:*mqtt_message,连接数据包缓存指针,*topic;消息主题  message:消息内容   qos:服务质量0、1、2
输出参数:无
备注说明://whether=1,订阅; whether=0,取消订阅
********************************************************************************/ 
u16 MqtSubscribePacket(u8 *mqtt_message,char *topic,u8 qos,u8 whether)
{    
	u16 topic_len = strlen(topic);
	u16 i,index = 0;
	static u16 id=0;
	
	id  ;
	if(whether)
		mqtt_message[index  ] = 130;				//0x82 //消息类型和标志 SUBSCRIBE 订阅
	else
		mqtt_message[index  ] = 162;				//0xA2 取消订阅
	mqtt_message[index  ] = topic_len   5;			//剩余长度(不包括固定头部)
	mqtt_message[index  ] = (0xff00&id)>>8;			//消息标识符
	mqtt_message[index  ] = 0xff&id;				//消息标识符
	mqtt_message[index  ] = (0xff00&topic_len)>>8;	//主题长度(高位在前,低位在后)
	mqtt_message[index  ] = 0xff&topic_len;			//主题长度 
	
	for (i = 0;i < topic_len; i  )
	{
		mqtt_message[index   i] = topic[i];
	}
	index  = topic_len;
	
	if(whether)
	{
		mqtt_message[index] = qos;//QoS级别
		index  ;
	}
	return index;
}

//构建MQTT PING请求包
u8 MqttPingPacket(u8 *mqtt_message)
{ 
	mqtt_message[0] = 192;	//0xC0 //消息类型和标志 PING
	mqtt_message[1] = 0;	//剩余长度(不包括固定头部)
	
	return 2;
}

//构建MQTT断开连接包
u8 MqttDisconnectPacket(u8 *mqtt_message)
{ 
	mqtt_message[0] = 224;	//0xE0 //消息类型和标志 DISCONNECT
	mqtt_message[1] = 0;	//剩余长度(不包括固定头部)
	
	return 2;
} 

测试demo:

代码语言:javascript复制
int main()
{
    	int i,rcode,num = 0;
	    U32 rxlen=0;
        U08 ip[8]= {47,95,221,93};
        U16 port = 1883;
        U08 txbuf[1024];
        U08 rxbuf[1024];
		
		u8	*packet;
		u16	ln = 0;
		char	str[100];
		u8 deviceID[6] = {1,2,3,4,5,6};
		u8 userID[6] = {0x31,0x32,0x33,0x34,0x35,0x36};
		u8 companyID[6] ={0x31,0x32,0x33,0x34,0x35,0x36};
		
		u8 txbuf1[20]={'a','b','c',0x31,0x32,0x39,0x38,0x37};
		
		memset(txbuf,0,1024);
        memset(rxbuf,0,1024);
        printf("test socket:rn");
        rcode = Com_Dev_Connect(ip,port,0,0);
        if(rcode == 0)
        {
	        printf("->connect ok!rn");	
        }
		
		RefreshTopicParam(Mqtt.TopicParam,(char*)&userID,(char*)&companyID,(char*)&deviceID);
		
		packet = &Mqtt.TxBuf[0];	   
		DeiveidTostr((char*)deviceID,str);
		ln = MqttConnectPacket(packet,str,"publ","pag");
		
        rcode = Com_Dev_TxData(packet,ln,0,0);
		sleep(1); 
		rcode = Com_Dev_RxData(rxbuf,&rxlen,1024,0,0);
		if(rcode == 0)
		{
		   printf("<-recv ok!len = %drn",rxlen);	
		}
		system("pause");
		//订阅各种消息
		
		strcpy((char*)str,"jk/command/realycontrol");//订阅继电器控制命令主题
		//strcat((char *)str,(char const *)Mqtt.TopicParam);	
		packet = &Mqtt.TxBuf[0];
		ln = MqtSubscribePacket(packet,(char*)&str,0,1);
		
		rcode = Com_Dev_TxData(packet,ln,0,0);
		sleep(1); 
		rcode = Com_Dev_RxData(rxbuf,&rxlen,1024,0,0);
		if(rcode == 0)
		{
		   printf("<-recv ok!len = %drn",rxlen);	
		}
		system("pause");
		strcpy((char*)str,"jk/command/readparameter");//订阅读参数主题
		//strcat((char *)str,(char const *)Mqtt.TopicParam);	
		packet = &Mqtt.TxBuf[0];
		ln = MqtSubscribePacket(packet,(char*)&str,0,1);
		
		rcode = Com_Dev_TxData(packet,ln,0,0);
		sleep(1); 
		rcode = Com_Dev_RxData(rxbuf,&rxlen,1024,0,0);
		if(rcode == 0)
		{
		   printf("<-recv ok!len = %drn",rxlen);	
		}
		system("pause");
		//发布消息
		strcpy((char*)str,"jk/return/realycontrol");//
		//strcat((char *)str,(char const *)Mqtt.TopicParam);		
		num = 10;
		ln = MqttPublishPacket((u8*)&Mqtt.TxBuf[0],(char*)&str,(char*)&txbuf1,num,0);//继电器应答帧长度为10
		
		rcode = Com_Dev_TxData(Mqtt.TxBuf,ln,0,0);
		sleep(1); 
		rcode = Com_Dev_RxData(rxbuf,&rxlen,1024,0,0);
		if(rcode == 0)
		{
		   printf("<-recv ok!len = %drn",rxlen);	
		}
		system("pause");
	return 0;
}

附带makefile:

代码语言:javascript复制
########################################
#makefile
########################################

#编译主程序
BINARY  := test
OBJ_DIR := ./
CC= gcc
LD= ld
CFLAGS= -std=c99 -Wall -g
LDSCRIPT= -lws2_32
LDFLAGS= 

SRC  = $(wildcard *.c)
DIR  = $(notdir $(SRC))
OBJS = $(patsubst %.c,$(OBJ_DIR)%.o,$(DIR))

.PHONY: clean 
all:  prebuild  $(BINARY).exe

prebuild:
	@echo Building app...

$(BINARY).exe : $(OBJS)
	@echo Generating ...
	$(CC) -o $(BINARY).exe $(OBJS) $(LDFLAGS) $(LDSCRIPT) 
	@echo OK!

$(OBJ_DIR)%.o : %.c
	$(CC) -c $(CFLAGS) $< -o  $@	
clean:
	rm -f $(OBJ_DIR)*.o
	@echo Removed!

0 人点赞