首先放视频
【提示】下面图片代码略多,注意观看~
初次连接到腾讯云物联网开发平台(IoT Explorer)
一开始拿到板子就使用demo连接到腾讯云物联网开发平台(IoT Explorer):
完美完成通信。
开发自己的demo
方案构思如下:https://cloud.tencent.com/developer/article/1492173
完成情况:
更多功能详见视频简介。
使用TencentOS tiny移植lwip
移植的接口文件
移植的部分接口文件如下:
代码语言:txt复制#include "debug.h"
#include <lwip/opt.h>
#include <lwip/arch.h>
#include "tcpip.h"
#include "lwip/init.h"
#include "lwip/netif.h"
#include "lwip/sio.h"
#include "ethernetif.h"
#if !NO_SYS
#include "sys_arch.h"
#endif
#include <lwip/stats.h>
#include <lwip/debug.h>
#include <lwip/sys.h>
#include "lwip/dhcp.h"
#include <string.h>
int errno;
u32_t lwip_sys_now;
struct sys_timeouts {
struct sys_timeo *next;
};
struct timeoutlist
{
struct sys_timeouts timeouts;
k_task_t *pid;
};
#define SYS_THREAD_MAX 4
static struct timeoutlist s_timeoutlist[SYS_THREAD_MAX];
static u16_t s_nextthread = 0;
u32_t
sys_jiffies(void)
{
lwip_sys_now = tos_systick_get();
return lwip_sys_now;
}
u32_t
sys_now(void)
{
lwip_sys_now = tos_systick_get();
return lwip_sys_now;
}
void
sys_init(void)
{
int i;
// Initialize the the per-thread sys_timeouts structures
// make sure there are no valid pids in the list
for(i = 0; i < SYS_THREAD_MAX; i )
{
s_timeoutlist[i].pid = 0;
s_timeoutlist[i].timeouts.next = NULL;
}
// keep track of how many threads have been created
s_nextthread = 0;
}
struct sys_timeouts *sys_arch_timeouts(void)
{
int i;
k_task_t *pid;
struct timeoutlist *tl;
pid = k_curr_task;
for(i = 0; i < s_nextthread; i )
{
tl = &(s_timeoutlist[i]);
if(tl->pid == pid)
{
return &(tl->timeouts);
}
}
return NULL;
}
sys_prot_t sys_arch_protect(void)
{
TOS_CPU_CPSR_ALLOC();
TOS_CPU_INT_DISABLE();
return cpu_cpsr;
}
void sys_arch_unprotect(sys_prot_t cpu_cpsr)
{
TOS_CPU_INT_ENABLE();
}
#if !NO_SYS
err_t
sys_sem_new(sys_sem_t *sem, u8_t count)
{
/* 创建 sem */
tos_sem_create(sem,count);
#if SYS_STATS
lwip_stats.sys.sem.used;
if (lwip_stats.sys.sem.max < lwip_stats.sys.sem.used) {
lwip_stats.sys.sem.max = lwip_stats.sys.sem.used;
}
#endif /* SYS_STATS */
if(sem != SYS_SEM_NULL)
return ERR_OK;
else
{
#if SYS_STATS
lwip_stats.sys.sem.err;
#endif /* SYS_STATS */
printf("[sys_arch]:new sem fail!n");
return ERR_MEM;
}
}
void
sys_sem_free(sys_sem_t *sem)
{
#if SYS_STATS
--lwip_stats.sys.sem.used;
#endif /* SYS_STATS */
/* 删除 sem */
tos_sem_destroy(sem);
sem = SYS_SEM_NULL;
}
int sys_sem_valid(sys_sem_t *sem)
{
return (sem->pend_obj.type != NULL);
}
void
sys_sem_set_invalid(sys_sem_t *sem)
{
sem = SYS_SEM_NULL;
(void)sem;
}
/*
如果timeout参数不为零,则返回值为
等待信号量所花费的毫秒数。如果
信号量未在指定时间内发出信号,返回值为
SYS_ARCH_TIMEOUT。如果线程不必等待信号量
该函数返回零。 */
u32_t
sys_arch_sem_wait(sys_sem_t *sem, u32_t timeout)
{
k_tick_t wait_tick = 0;
k_tick_t start_tick = 0 ;
//看看信号量是否有效
if(sem == SYS_SEM_NULL)
return SYS_ARCH_TIMEOUT;
//首先获取开始等待信号量的时钟节拍
start_tick = sys_now();
//timeout != 0,需要将ms换成系统的时钟节拍
if(timeout != 0)
{
//将ms转换成时钟节拍
wait_tick = timeout / (1000/TOS_CFG_CPU_TICK_PER_SECOND);
if (wait_tick == 0)
wait_tick = 1;
}
else
wait_tick = TOS_TIME_FOREVER; //一直阻塞
//等待成功,计算等待的时间,否则就表示等待超时
if(tos_sem_pend(sem, wait_tick) == K_ERR_NONE)
return ((sys_now()-start_tick) * (1000/TOS_CFG_CPU_TICK_PER_SECOND));
else
return SYS_ARCH_TIMEOUT;
}
void
sys_sem_signal(sys_sem_t *sem)
{
if(tos_sem_post( sem ) != K_ERR_NONE)
printf("[sys_arch]:sem signal fail!n");
}
err_t
sys_mutex_new(sys_mutex_t *mutex)
{
/* 创建 sem */
tos_mutex_create(mutex);
if(mutex != SYS_MRTEX_NULL)
return ERR_OK;
else
{
printf("[sys_arch]:new mutex fail!n");
return ERR_MEM;
}
}
void
sys_mutex_free(sys_mutex_t *mutex)
{
tos_mutex_destroy(mutex);
}
void
sys_mutex_set_invalid(sys_mutex_t *mutex)
{
mutex = SYS_MRTEX_NULL;
(void)mutex;
}
void
sys_mutex_lock(sys_mutex_t *mutex)
{
tos_mutex_pend_timed(mutex,/* 互斥量句柄 */
TOS_TIME_FOREVER); /* 等待时间 */
}
void
sys_mutex_unlock(sys_mutex_t *mutex)
{
tos_mutex_post( mutex );//给出互斥量
}
sys_thread_t
sys_thread_new(const char *name, lwip_thread_fn function, void *arg, int stacksize, int prio)
{
k_err_t err;
sys_thread_t task;
k_stack_t *task_stack;
task = tos_mmheap_alloc(sizeof(k_task_t));
task_stack = tos_mmheap_alloc(stacksize);
/* 创建MidPriority_Task任务 */
err = tos_task_create(task,
(char*)name,
function,
arg,
prio,
task_stack,
stacksize,
20);
if(err != K_ERR_NONE)
printf("TencentOS Create task fail! code : %d rn",err);
if(task == K_NULL)
{
printf("[sys_arch]:create task fail!n");
return NULL;
}
return task;
}
err_t
sys_mbox_new(sys_mbox_t *mbox, int size)
{
k_err_t err;
/* 创建Test_Queue */
err = tos_queue_create(mbox);
if(err != K_ERR_NONE)
{
printf("TencentOS Create mbox fail! code : %d rn",err);
return ERR_MEM;
}
return ERR_OK;
}
void
sys_mbox_free(sys_mbox_t *mbox)
{
tos_queue_destroy(mbox);
}
int sys_mbox_valid(sys_mbox_t *mbox)
{
return (mbox->pend_obj.type != NULL);
}
void
sys_mbox_set_invalid(sys_mbox_t *mbox)
{
mbox = SYS_MBOX_NULL;
(void)mbox;
}
void
sys_mbox_post(sys_mbox_t *q, void *msg)
{
tos_queue_post( q, /* 消息队列的句柄 */
msg,/* 发送的消息内容 */
sizeof(void *));
// while(tos_queue_post( q, /* 消息队列的句柄 */
// msg,/* 发送的消息内容 */
// sizeof(void *)) != K_ERR_NONE);
}
err_t
sys_mbox_trypost(sys_mbox_t *q, void *msg)
{
if(tos_queue_post(q,msg,sizeof(void *)) == K_ERR_NONE)
return ERR_OK;
else
return ERR_MEM;
}
err_t
sys_mbox_trypost_fromisr(sys_mbox_t *q, void *msg)
{
return sys_mbox_trypost(q, msg);
}
u32_t
sys_arch_mbox_fetch(sys_mbox_t *q, void **msg, u32_t timeout)
{
void *dummyptr;
k_tick_t wait_tick = 0;
k_tick_t start_tick = 0 ;
size_t size;
size = sizeof(void *);
if (msg == NULL ) //看看存储消息的地方是否有效
msg = &dummyptr;
//首先获取开始等待信号量的时钟节拍
start_tick = sys_now();
//timeout != 0,需要将ms换成系统的时钟节拍
if(timeout != 0)
{
//将ms转换成时钟节拍
wait_tick = timeout / (1000/TOS_CFG_CPU_TICK_PER_SECOND);
if (wait_tick == 0)
wait_tick = 1;
}
//一直阻塞
else
wait_tick = TOS_TIME_FOREVER;
//等待成功,计算等待的时间,否则就表示等待超时
if(tos_queue_pend(q,&(*msg),&size, wait_tick) == K_ERR_NONE)
return ((sys_now() - start_tick)*(1000/TOS_CFG_CPU_TICK_PER_SECOND));
else
{
*msg = NULL;
return SYS_ARCH_TIMEOUT;
}
}
u32_t
sys_arch_mbox_tryfetch(sys_mbox_t *q, void **msg)
{
size_t size;
size = sizeof(void *);
void *dummyptr;
if ( msg == NULL )
msg = &dummyptr;
//等待成功,计算等待的时间
if(tos_queue_pend(q,&(*msg),&size, 0) == K_ERR_NONE)
return ERR_OK;
else
return SYS_MBOX_EMPTY;
}
#if LWIP_NETCONN_SEM_PER_THREAD
#error LWIP_NETCONN_SEM_PER_THREAD==1 not supported
#endif /* LWIP_NETCONN_SEM_PER_THREAD */
#endif /* !NO_SYS */
/* Variables Initialization */
struct netif gnetif;
ip4_addr_t ipaddr;
ip4_addr_t netmask;
ip4_addr_t gw;
uint8_t IP_ADDRESS[4];
uint8_t NETMASK_ADDRESS[4];
uint8_t GATEWAY_ADDRESS[4];
void TCPIP_Init(void)
{
tcpip_init(NULL, NULL);
/* IP addresses initialization */
/* USER CODE BEGIN 0 */
#if LWIP_DHCP
ip_addr_set_zero_ip4(&ipaddr);
ip_addr_set_zero_ip4(&netmask);
ip_addr_set_zero_ip4(&gw);
#else
IP4_ADDR(&ipaddr,IP_ADDR0,IP_ADDR1,IP_ADDR2,IP_ADDR3);
IP4_ADDR(&netmask,NETMASK_ADDR0,NETMASK_ADDR1,NETMASK_ADDR2,NETMASK_ADDR3);
IP4_ADDR(&gw,GW_ADDR0,GW_ADDR1,GW_ADDR2,GW_ADDR3);
#endif /* USE_DHCP */
/* USER CODE END 0 */
/* Initilialize the LwIP stack without RTOS */
/* add the network interface (IPv4/IPv6) without RTOS */
netif_add(&gnetif, &ipaddr, &netmask, &gw, NULL, ðernetif_init, &tcpip_input);
/* Registers the default network interface */
netif_set_default(&gnetif);
if (netif_is_link_up(&gnetif))
{
/* When the netif is fully configured this function must be called */
netif_set_up(&gnetif);
}
else
{
/* When the netif link is down this function must be called */
netif_set_down(&gnetif);
}
#if LWIP_DHCP //若使用了DHCP
int err;
/* Creates a new DHCP client for this interface on the first call.
Note: you must call dhcp_fine_tmr() and dhcp_coarse_tmr() at
the predefined regular intervals after starting the client.
You can peek in the netif->dhcp struct for the actual DHCP status.*/
printf("本例程将使用DHCP动态分配IP地址,如果不需要则在lwipopts.h中将LWIP_DHCP定义为0nn");
err = dhcp_start(&gnetif); //开启dhcp
if(err == ERR_OK)
printf("lwip dhcp init success...nn");
else
printf("lwip dhcp init fail...nn");
while(ip_addr_cmp(&(gnetif.ip_addr),&ipaddr)) //等待dhcp分配的ip有效
{
vTaskDelay(1);
}
#endif
printf("本地IP地址是:%d.%d.%d.%dnn",
((gnetif.ip_addr.addr)&0x000000ff),
(((gnetif.ip_addr.addr)&0x0000ff00)>>8),
(((gnetif.ip_addr.addr)&0x00ff0000)>>16),
((gnetif.ip_addr.addr)&0xff000000)>>24);
}
iperf测速
测速信息(表示非常稳定,更多详情见附件):
代码语言:txt复制bin/iperf.exe -c 192.168.0.122 -P 1 -i 1 -p 5001 -f k -t 1000000000
------------------------------------------------------------
Client connecting to 192.168.0.122, TCP port 5001
TCP window size: 64.0 KByte (default)
------------------------------------------------------------
[300] local 192.168.0.195 port 56104 connected with 192.168.0.122 port 5001
[ ID] Interval Transfer Bandwidth
[300] 0.0- 1.0 sec 11640 KBytes 95355 Kbits/sec
[300] 1.0- 2.0 sec 11592 KBytes 94962 Kbits/sec
[300] 2.0- 3.0 sec 11576 KBytes 94831 Kbits/sec
[300] 3.0- 4.0 sec 11592 KBytes 94962 Kbits/sec
[300] 4.0- 5.0 sec 11592 KBytes 94962 Kbits/sec
[300] 5.0- 6.0 sec 11576 KBytes 94831 Kbits/sec
[300] 6.0- 7.0 sec 11592 KBytes 94962 Kbits/sec
[300] 7.0- 8.0 sec 11576 KBytes 94831 Kbits/sec
[300] 8.0- 9.0 sec 11592 KBytes 94962 Kbits/sec
[300] 9.0-10.0 sec 11576 KBytes 94831 Kbits/sec
[300] 10.0-11.0 sec 11552 KBytes 94634 Kbits/sec
[300] 11.0-12.0 sec 11592 KBytes 94962 Kbits/sec
[300] 12.0-13.0 sec 11568 KBytes 94765 Kbits/sec
[300] 13.0-14.0 sec 11592 KBytes 94962 Kbits/sec
[300] 14.0-15.0 sec 11576 KBytes 94831 Kbits/sec
[300] 15.0-16.0 sec 11584 KBytes 94896 Kbits/sec
移植emxgui
部分接口文件如下(效果见视频):
代码语言:txt复制#include <stddef.h>
#include "emXGUI_Arch.h"
#include "tos.h"
/*===================================================================================*/
/*
函数功能: 创建一个互斥(该互斥锁必须支持嵌套使用)
返回: 互斥对象句柄(唯一标识)
说明: 互斥对象句柄按实际OS所定,可以是指针,ID号等...
*/
GUI_MUTEX* GUI_MutexCreate(void)
{
GUI_MUTEX *mutex;
mutex = tos_mmheap_alloc(sizeof(k_mutex_t));
tos_mutex_create((k_mutex_t*)mutex);
return mutex;
}
/*===================================================================================*/
/*
函数功能: 互斥锁定
参数: hMutex(由GUI_MutexCreate返回的句柄);
time 最长等待毫秒数,0立既返回,0xFFFFFFFF,一直等待
返回: TRUE:成功;FALSE:失败或超时
说明: .
*/
BOOL GUI_MutexLock(GUI_MUTEX *hMutex,U32 time)
{
k_tick_t wait_tick;
//timeout != 0,需要将ms换成系统的时钟节拍
if(time != 0)
{
//将ms转换成时钟节拍
wait_tick = time / (1000/TOS_CFG_CPU_TICK_PER_SECOND);
if (wait_tick == 0)
wait_tick = 1;
}
else if(time == 0xFFFFFFFF)
wait_tick = TOS_TIME_FOREVER; //一直阻塞
if(tos_mutex_pend_timed((k_mutex_t*)hMutex, time) == K_ERR_NONE)
{
return TRUE;
}
return FALSE;
}
/*===================================================================================*/
/*
函数功能: 互斥解锁
参数: hMutex(由GUI_MutexCreate返回的句柄);
返回: 无
说明: .
*/
void GUI_MutexUnlock(GUI_MUTEX *hMutex)
{
tos_mutex_post((k_mutex_t*)hMutex);
}
/*===================================================================================*/
/*
函数功能: 互斥删除
参数: hMutex(由GUI_MutexCreate返回的句柄);
返回: 无
说明: .
*/
void GUI_MutexDelete(GUI_MUTEX *hMutex)
{
tos_mutex_destroy((k_mutex_t*)hMutex);
tos_mmheap_free(hMutex);
}
/*===================================================================================*/
/*
函数功能: 创建一个信号量
参数: init: 信号量初始值; max: 信号量最大值
返回: 信号量对象句柄(唯一标识)
说明: 信号量对象句柄按实际OS所定,可以是指针,ID号等...
*/
GUI_SEM* GUI_SemCreate(int init,int max)
{
GUI_SEM *sem;
sem = tos_mmheap_alloc(sizeof(k_sem_t));
tos_sem_create((k_sem_t*)sem,init);
return sem;
}
/*===================================================================================*/
/*
函数功能: 信号量等待
参数: hsem(由GUI_SemCreate返回的句柄);
time 最长等待毫秒数,0立既返回,0xFFFFFFFF,一直等待
返回: TRUE:成功;FALSE:失败或超时
说明: .
*/
BOOL GUI_SemWait(GUI_SEM *hsem,U32 time)
{
k_tick_t wait_tick;
//timeout != 0,需要将ms换成系统的时钟节拍
if(time != 0)
{
//将ms转换成时钟节拍
wait_tick = time / (1000/TOS_CFG_CPU_TICK_PER_SECOND);
if (wait_tick == 0)
wait_tick = 1;
}
else if(time == 0xFFFFFFFF)
wait_tick = TOS_TIME_FOREVER; //一直阻塞
else
wait_tick = 0;
if(tos_sem_pend((k_sem_t*)hsem,time)== K_ERR_NONE)
{
return TRUE;
}
return FALSE;
}
/*===================================================================================*/
/*
函数功能: 信号量发送
参数: hsem(由GUI_SemCreate返回的句柄);
返回: 无
说明: .
*/
void GUI_SemPost(GUI_SEM *hsem)
{
tos_sem_post((k_sem_t*)hsem);
}
/*
函数功能: 信号量发送(受freertos管理的中断)
参数: hsem(由GUI_SemCreate返回的句柄);
返回: 无
说明: 若在受freertos管理的中断中调用GUI_SemPost,会导致port.c:425
*/
void GUI_SemPostISR(GUI_SEM *hsem)
{
tos_sem_post((k_sem_t*)hsem);
}
/*===================================================================================*/
/*
函数功能: 信号量删除
参数: hsem(由GUI_SemCreate返回的句柄);
返回: 无
说明: .
*/
void GUI_SemDelete(GUI_SEM *hsem)
{
tos_sem_destroy((k_sem_t*)hsem);
tos_mmheap_free(hsem);
}
/*===================================================================================*/
/*
函数功能: 获得当前线程句柄(唯一标识)
参数: 无
返回: 当前线程唯一标识,按实际OS所定,可以是指针,ID号等...
说明: .
*/
HANDLE GUI_GetCurThreadHandle(void)
{
return (HANDLE)k_curr_task;
}
/*===================================================================================*/
/*
函数功能: 获得当前系统时间(单位:毫秒)
参数: 无
返回: 当前系统时间
说明: .
*/
U32 GUI_GetTickCount(void)
{
U32 i;
i=tos_systick_get();
return (i*1000)/TOS_CFG_CPU_TICK_PER_SECOND;
}
/*===================================================================================*/
/*
函数功能: 最短时间内让出CPU
参数: 无
返回: 无
说明: 按具体OS情况而定,最简单的方法是:OS Delay 一个 tick 周期.
*/
void GUI_Yield(void)
{
tos_task_delay(2);
}
/*===================================================================================*/
/*
函数功能: 延时函数
参数: ms: 延时时间(单位:毫秒)
返回: 无
说明:
*/
void GUI_msleep(u32 ms)
{
tos_task_delay((ms/(1000/TOS_CFG_CPU_TICK_PER_SECOND)));
}
/*
* 函数功能: 创建线程
* @param name 线程名
* @param entry 线程入口函数
* @param parameter 线程参数
* @param stack_size 线程栈大小(单位字节,注意部分系统需要进行单位转换)
* @param priority 线程优先级
* @param tick FreeRTOS没这个功能,时间片(同优先级任务的时间片轮转)
* @return 是否创建成功
*/
BOOL GUI_Thread_Create(void (*entry)(void *parameter),
const char *name,
u32 stack_size,
void *parameter,
u32 priority,
u32 tick)
{
k_err_t err;
k_task_t *task;
k_stack_t *task_stack;
task = tos_mmheap_alloc(sizeof(k_task_t));
task_stack = tos_mmheap_alloc(stack_size);
/* 创建MidPriority_Task任务 */
err = tos_task_create(task,
(char*)name,
entry,
parameter,
priority,
task_stack,
stack_size,
tick);
if(err == K_ERR_NONE)
return TRUE;
else
{
GUI_ERROR("GUI Thread Create failed:%s",name);
return FALSE;
}
}
/**
* @breif: 删除线程,可通过GUI_GetCurThreadHandle获取当前任务句柄作为输入参数
* @return 无
*/
void GUI_Thread_Delete(HANDLE thread)
{
k_task_t* task = (k_task_t*)thread;
tos_task_destroy(task);
tos_mmheap_free(task->stk_base);
tos_mmheap_free(task);
}
#endif
使用iot hub平台
设备在线情况:
使用规则引擎:
开发云平台签名小工具:
MQTT.fx连接
stm32f429使用mqtt连接iot hub平台
部分mqtt代码:
代码语言:txt复制#include "mqttclient.h"
#include "transport.h"
#include "MQTTPacket.h"
#include "tos.h"
#include "string.h"
#include "sockets.h"
#include "lwip/opt.h"
#include "lwip/sys.h"
#include "lwip/api.h"
#include "lwip/sockets.h"
#include "cJSON_Process.h"
#include "bsp_dht11.h"
/******************************* 全局变量声明 ************************************/
/*
* 当我们在写应用程序的时候,可能需要用到一些全局变量。
*/
extern k_queue_t MQTT_Data_Queue;
//定义用户消息结构体
MQTT_USER_MSG mqtt_user_msg;
int32_t MQTT_Socket = 0;
void deliverMessage(MQTTString *TopicName,MQTTMessage *msg,MQTT_USER_MSG *mqtt_user_msg);
/************************************************************************
** 函数名称: MQTT_Connect
** 函数功能: 初始化客户端并登录服务器
** 入口参数: int32_t sock:网络描述符
** 出口参数: >=0:发送成功 <0:发送失败
** 备 注:
************************************************************************/
uint8_t MQTT_Connect(void)
{
MQTTPacket_connectData data = MQTTPacket_connectData_initializer;
uint8_t buf[200];
int buflen = sizeof(buf);
int len = 0;
data.clientID.cstring = CLIENT_ID; //随机
data.keepAliveInterval = KEEPLIVE_TIME; //保持活跃
data.username.cstring = USER_NAME; //用户名
data.password.cstring = PASSWORD; //密钥
data.MQTTVersion = MQTT_VERSION; //3表示3.1版本,4表示3.11版本
data.cleansession = 1;
//组装消息
len = MQTTSerialize_connect((unsigned char *)buf, buflen, &data);
//发送消息
transport_sendPacketBuffer(buf, len);
/* 等待连接响应 */
if (MQTTPacket_read(buf, buflen, transport_getdata) == CONNACK)
{
unsigned char sessionPresent, connack_rc;
if (MQTTDeserialize_connack(&sessionPresent, &connack_rc, buf, buflen) != 1 || connack_rc != 0)
{
printf("无法连接,错误代码是: %d!n", connack_rc);
return Connect_NOK;
}
else
{
printf("用户名与密钥验证成功,MQTT连接成功!n");
return Connect_OK;
}
}
else
printf("MQTT连接无响应!n");
return Connect_NOTACK;
}
/************************************************************************
** 函数名称: MQTT_PingReq
** 函数功能: 发送MQTT心跳包
** 入口参数: 无
** 出口参数: >=0:发送成功 <0:发送失败
** 备 注:
************************************************************************/
int32_t MQTT_PingReq(int32_t sock)
{
int32_t len;
uint8_t buf[200];
int32_t buflen = sizeof(buf);
fd_set readfd;
struct timeval tv;
tv.tv_sec = 5;
tv.tv_usec = 0;
FD_ZERO(&readfd);
FD_SET(sock,&readfd);
len = MQTTSerialize_pingreq(buf, buflen);
transport_sendPacketBuffer(buf, len);
//等待可读事件
if(select(sock 1,&readfd,NULL,NULL,&tv) == 0)
return -1;
//有可读事件
if(FD_ISSET(sock,&readfd) == 0)
return -2;
if(MQTTPacket_read(buf, buflen, transport_getdata) != PINGRESP)
return -3;
return 0;
}
/************************************************************************
** 函数名称: MQTTSubscribe
** 函数功能: 订阅消息
** 入口参数: int32_t sock:套接字
** int8_t *topic:主题
** enum QoS pos:消息质量
** 出口参数: >=0:发送成功 <0:发送失败
** 备 注:
************************************************************************/
int32_t MQTTSubscribe(int32_t sock,char *topic,enum QoS pos)
{
static uint32_t PacketID = 0;
uint16_t packetidbk = 0;
int32_t conutbk = 0;
uint8_t buf[100];
int32_t buflen = sizeof(buf);
MQTTString topicString = MQTTString_initializer;
int32_t len;
int32_t req_qos,qosbk;
fd_set readfd;
struct timeval tv;
tv.tv_sec = 2;
tv.tv_usec = 0;
FD_ZERO(&readfd);
FD_SET(sock,&readfd);
//复制主题
topicString.cstring = (char *)topic;
//订阅质量
req_qos = pos;
//串行化订阅消息
len = MQTTSerialize_subscribe(buf, buflen, 0, PacketID , 1, &topicString, &req_qos);
//发送TCP数据
if(transport_sendPacketBuffer(buf, len) < 0)
return -1;
//等待可读事件--等待超时
if(select(sock 1,&readfd,NULL,NULL,&tv) == 0)
return -2;
//有可读事件--没有可读事件
if(FD_ISSET(sock,&readfd) == 0)
return -3;
//等待订阅返回--未收到订阅返回
if(MQTTPacket_read(buf, buflen, transport_getdata) != SUBACK)
return -4;
//拆订阅回应包
if(MQTTDeserialize_suback(&packetidbk,1, &conutbk, &qosbk, buf, buflen) != 1)
return -5;
//检测返回数据的正确性
if((qosbk == 0x80)||(packetidbk != (PacketID-1)))
return -6;
//订阅成功
return 0;
}
/************************************************************************
** 函数名称: UserMsgCtl
** 函数功能: 用户消息处理函数
** 入口参数: MQTT_USER_MSG *msg:消息结构体指针
** 出口参数: 无
** 备 注:
************************************************************************/
void UserMsgCtl(MQTT_USER_MSG *msg)
{
//这里处理数据只是打印,用户可以在这里添加自己的处理方式
printf("*****收到订阅的消息!******n");
//返回后处理消息
switch(msg->msgqos)
{
case 0:
printf("MQTT>>消息质量:QoS0n");
break;
case 1:
printf("MQTT>>消息质量:QoS1n");
break;
case 2:
printf("MQTT>>消息质量:QoS2n");
break;
default:
printf("MQTT>>错误的消息质量n");
break;
}
printf("MQTT>>消息主题:%sn",msg->topic);
printf("MQTT>>消息类容:%sn",msg->msg);
printf("MQTT>>消息长度:%dn",msg->msglenth);
Proscess(msg->msg);
//处理完后销毁数据
msg->valid = 0;
}
/************************************************************************
** 函数名称: GetNextPackID
** 函数功能: 产生下一个数据包ID
** 入口参数: 无
** 出口参数: uint16_t packetid:产生的ID
** 备 注:
************************************************************************/
uint16_t GetNextPackID(void)
{
static uint16_t pubpacketid = 0;
return pubpacketid ;
}
/************************************************************************
** 函数名称: mqtt_msg_publish
** 函数功能: 用户推送消息
** 入口参数: MQTT_USER_MSG *msg:消息结构体指针
** 出口参数: >=0:发送成功 <0:发送失败
** 备 注:
************************************************************************/
int32_t MQTTMsgPublish(int32_t sock, char *topic, int8_t qos, uint8_t* msg)
{
int8_t retained = 0; //保留标志位
uint32_t msg_len; //数据长度
uint8_t buf[MSG_MAX_LEN];
int32_t buflen = sizeof(buf),len;
MQTTString topicString = MQTTString_initializer;
uint16_t packid = 0,packetidbk;
//填充主题
topicString.cstring = (char *)topic;
//填充数据包ID
if((qos == QOS1)||(qos == QOS2))
{
packid = GetNextPackID();
}
else
{
qos = QOS0;
retained = 0;
packid = 0;
}
msg_len = strlen((char *)msg);
//推送消息
len = MQTTSerialize_publish(buf, buflen, 0, qos, retained, packid, topicString, (unsigned char*)msg, msg_len);
if(len <= 0)
return -1;
if(transport_sendPacketBuffer(buf, len) < 0)
return -2;
//质量等级0,不需要返回
if(qos == QOS0)
{
return 0;
}
//等级1
if(qos == QOS1)
{
//等待PUBACK
if(WaitForPacket(sock,PUBACK,5) < 0)
return -3;
return 1;
}
//等级2
if(qos == QOS2)
{
//等待PUBREC
if(WaitForPacket(sock,PUBREC,5) < 0)
return -3;
//发送PUBREL
len = MQTTSerialize_pubrel(buf, buflen,0, packetidbk);
if(len == 0)
return -4;
if(transport_sendPacketBuffer(buf, len) < 0)
return -6;
//等待PUBCOMP
if(WaitForPacket(sock,PUBREC,5) < 0)
return -7;
return 2;
}
//等级错误
return -8;
}
/************************************************************************
** 函数名称: ReadPacketTimeout
** 函数功能: 阻塞读取MQTT数据
** 入口参数: int32_t sock:网络描述符
** uint8_t *buf:数据缓存区
** int32_t buflen:缓冲区大小
** uint32_t timeout:超时时间--0-表示直接查询,没有数据立即返回
** 出口参数: -1:错误,其他--包类型
** 备 注:
************************************************************************/
int32_t ReadPacketTimeout(int32_t sock,uint8_t *buf,int32_t buflen,uint32_t timeout)
{
fd_set readfd;
struct timeval tv;
if(timeout != 0)
{
tv.tv_sec = timeout;
tv.tv_usec = 0;
FD_ZERO(&readfd);
FD_SET(sock,&readfd);
//等待可读事件--等待超时
if(select(sock 1,&readfd,NULL,NULL,&tv) == 0)
return -1;
//有可读事件--没有可读事件
if(FD_ISSET(sock,&readfd) == 0)
return -1;
}
//读取TCP/IP事件
return MQTTPacket_read(buf, buflen, transport_getdata);
}
/************************************************************************
** 函数名称: deliverMessage
** 函数功能: 接受服务器发来的消息
** 入口参数: MQTTMessage *msg:MQTT消息结构体
** MQTT_USER_MSG *mqtt_user_msg:用户接受结构体
** MQTTString *TopicName:主题
** 出口参数: 无
** 备 注:
************************************************************************/
void deliverMessage(MQTTString *TopicName,MQTTMessage *msg,MQTT_USER_MSG *mqtt_user_msg)
{
//消息质量
mqtt_user_msg->msgqos = msg->qos;
//保存消息
memcpy(mqtt_user_msg->msg,msg->payload,msg->payloadlen);
mqtt_user_msg->msg[msg->payloadlen] = 0;
//保存消息长度
mqtt_user_msg->msglenth = msg->payloadlen;
//消息主题
memcpy((char *)mqtt_user_msg->topic,TopicName->lenstring.data,TopicName->lenstring.len);
mqtt_user_msg->topic[TopicName->lenstring.len] = 0;
//消息ID
mqtt_user_msg->packetid = msg->id;
//标明消息合法
mqtt_user_msg->valid = 1;
}
/************************************************************************
** 函数名称: mqtt_pktype_ctl
** 函数功能: 根据包类型进行处理
** 入口参数: uint8_t packtype:包类型
** 出口参数: 无
** 备 注:
************************************************************************/
void mqtt_pktype_ctl(uint8_t packtype,uint8_t *buf,uint32_t buflen)
{
MQTTMessage msg;
int32_t rc;
MQTTString receivedTopic;
uint32_t len;
switch(packtype)
{
case PUBLISH:
//拆析PUBLISH消息
if(MQTTDeserialize_publish(&msg.dup,(int*)&msg.qos, &msg.retained, &msg.id, &receivedTopic,
(unsigned char **)&msg.payload, &msg.payloadlen, buf, buflen) != 1)
return;
//接受消息
deliverMessage(&receivedTopic,&msg,&mqtt_user_msg);
//消息质量不同,处理不同
if(msg.qos == QOS0)
{
//QOS0-不需要ACK
//直接处理数据
UserMsgCtl(&mqtt_user_msg);
return;
}
//发送PUBACK消息
if(msg.qos == QOS1)
{
len =MQTTSerialize_puback(buf,buflen,mqtt_user_msg.packetid);
if(len == 0)
return;
//发送返回
if(transport_sendPacketBuffer(buf,len)<0)
return;
//返回后处理消息
UserMsgCtl(&mqtt_user_msg);
return;
}
//对于质量2,只需要发送PUBREC就可以了
if(msg.qos == QOS2)
{
len = MQTTSerialize_ack(buf, buflen, PUBREC, 0, mqtt_user_msg.packetid);
if(len == 0)
return;
//发送返回
transport_sendPacketBuffer(buf,len);
}
break;
case PUBREL:
//解析包数据,必须包ID相同才可以
rc = MQTTDeserialize_ack(&msg.type,&msg.dup, &msg.id, buf,buflen);
if((rc != 1)||(msg.type != PUBREL)||(msg.id != mqtt_user_msg.packetid))
return ;
//收到PUBREL,需要处理并抛弃数据
if(mqtt_user_msg.valid == 1)
{
//返回后处理消息
UserMsgCtl(&mqtt_user_msg);
}
//串行化PUBCMP消息
len = MQTTSerialize_pubcomp(buf,buflen,msg.id);
if(len == 0)
return;
//发送返回--PUBCOMP
transport_sendPacketBuffer(buf,len);
break;
case PUBACK://等级1客户端推送数据后,服务器返回
break;
case PUBREC://等级2客户端推送数据后,服务器返回
break;
case PUBCOMP://等级2客户端推送PUBREL后,服务器返回
break;
default:
break;
}
}
/************************************************************************
** 函数名称: WaitForPacket
** 函数功能: 等待特定的数据包
** 入口参数: int32_t sock:网络描述符
** uint8_t packettype:包类型
** uint8_t times:等待次数
** 出口参数: >=0:等到了特定的包 <0:没有等到特定的包
** 备 注:
************************************************************************/
int32_t WaitForPacket(int32_t sock,uint8_t packettype,uint8_t times)
{
int32_t type;
uint8_t buf[MSG_MAX_LEN];
uint8_t n = 0;
int32_t buflen = sizeof(buf);
do
{
//读取数据包
type = ReadPacketTimeout(sock,buf,buflen,2);
if(type != -1)
mqtt_pktype_ctl(type,buf,buflen);
n ;
}while((type != packettype)&&(n < times));
//收到期望的包
if(type == packettype)
return 0;
else
return -1;
}
void Client_Connect(void)
{
char* host_ip;
#ifdef LWIP_DNS
ip4_addr_t dns_ip;
netconn_gethostbyname(HOST_NAME, &dns_ip);
host_ip = ip_ntoa(&dns_ip);
printf("host name : %s , host_ip : %sn",HOST_NAME,host_ip);
#else
host_ip = HOST_NAME;
#endif
MQTT_START:
//创建网络连接
printf("1.开始连接对应云平台的服务器...n");
printf("服务器IP地址:%s,端口号:
!n",host_ip,HOST_PORT);
while(1)
{
//连接服务器
MQTT_Socket = transport_open((int8_t*)host_ip,HOST_PORT);
//如果连接服务器成功
if(MQTT_Socket >= 0)
{
printf("连接云平台服务器成功!n");
break;
}
printf("连接云平台服务器失败,等待3秒再尝试重新连接!n");
//等待3秒
tos_task_delay(3000);
}
printf("2.MQTT用户名与密钥验证登录...n");
//MQTT用户名与密钥验证登录
if(MQTT_Connect() != Connect_OK)
{
//重连服务器
printf("MQTT用户名与密钥验证登录失败...n");
//关闭链接
transport_close();
goto MQTT_START;
}
//订阅消息
printf("3.开始订阅消息...n");
//订阅消息
if(MQTTSubscribe(MQTT_Socket,(char *)SUB_TOPIC,QOS1) < 0)
{
//重连服务器
printf("客户端订阅消息失败...n");
//关闭链接
transport_close();
goto MQTT_START;
}
//无限循环
printf("4.开始循环接收订阅的消息...n");
}
/************************************************************************
** 函数名称: mqtt_recv_thread
** 函数功能: MQTT任务
** 入口参数: void *pvParameters:任务参数
** 出口参数: 无
** 备 注: MQTT连云步骤:
** 1.连接对应云平台的服务器
** 2.MQTT用户与密钥验证登录
** 3.订阅指定主题
** 4.等待接收主题的数据与上报主题数据
************************************************************************/
void mqtt_recv_thread(void *pvParameters)
{
uint32_t curtick;
uint8_t no_mqtt_msg_exchange = 1;
uint8_t buf[MSG_MAX_LEN];
int32_t buflen = sizeof(buf);
int32_t type;
fd_set readfd;
struct timeval tv; //等待时间
tv.tv_sec = 0;
tv.tv_usec = 10;
MQTT_START:
//开始连接
Client_Connect();
//获取当前滴答,作为心跳包起始时间
curtick = sys_now();
while(1)
{
//表明无数据交换
no_mqtt_msg_exchange = 1;
FD_ZERO(&readfd);
FD_SET(MQTT_Socket,&readfd);
//等待可读事件
select(MQTT_Socket 1,&readfd,NULL,NULL,&tv);
//判断MQTT服务器是否有数据
if(FD_ISSET(MQTT_Socket,&readfd) != 0)
{
//读取数据包--注意这里参数为0,不阻塞
type = ReadPacketTimeout(MQTT_Socket,buf,buflen,0);
if(type != -1)
{
mqtt_pktype_ctl(type,buf,buflen);
//表明有数据交换
no_mqtt_msg_exchange = 0;
//获取当前滴答,作为心跳包起始时间
curtick = sys_now();
}
}
//这里主要目的是定时向服务器发送PING保活命令
if((sys_now() - curtick) >(KEEPLIVE_TIME/2*1000))
{
curtick = sys_now();
//判断是否有数据交换
if(no_mqtt_msg_exchange == 0)
{
//如果有数据交换,这次就不需要发送PING消息
continue;
}
if(MQTT_PingReq(MQTT_Socket) < 0)
{
//重连服务器
printf("发送保持活性ping失败....n");
goto CLOSE;
}
//心跳成功
printf("发送保持活性ping作为心跳成功....n");
//表明有数据交换
no_mqtt_msg_exchange = 0;
}
}
CLOSE:
//关闭链接
transport_close();
//重新链接服务器
goto MQTT_START;
}
void mqtt_send_thread(void *pvParameters)
{
printf("mqtt_send_threadn");
int32_t ret;
uint8_t no_mqtt_msg_exchange = 1;
uint32_t curtick;
uint8_t res;
/* 定义一个创建信息返回值,默认为pdTRUE */
k_err_t err;
/* 定义一个接收消息的变量 */
size_t msg_size;
DHT11_Data_TypeDef* recv_data;
//初始化json数据
cJSON* cJSON_Data = NULL;
cJSON_Data = cJSON_Data_Init();
double a,b,c;
MQTT_SEND_START:
while(1)
{
err = tos_queue_pend(&MQTT_Data_Queue,
(void*)&recv_data,
&msg_size,
3000);
// recv_data = r_data;
if(err == K_ERR_NONE)
{
a = recv_data->temperature;
b = recv_data->humidity;
c = sys_now();
printf("a = %f,b = %fn",a,b);
//更新数据
res = cJSON_Update(cJSON_Data,TIME_NUM,&c);
res = cJSON_Update(cJSON_Data,TEMP_NUM,&a);
res = cJSON_Update(cJSON_Data,HUM_NUM,&b);
if(UPDATE_SUCCESS == res)
{
//更新数据成功,
char* p = cJSON_Print(cJSON_Data);
//发布消息
ret = MQTTMsgPublish(MQTT_Socket,(char*)PUB_TOPIC,QOS0,(uint8_t*)p);
if(ret >= 0)
{
//表明有数据交换
no_mqtt_msg_exchange = 0;
//获取当前滴答,作为心跳包起始时间
curtick = sys_now();
}
tos_mmheap_free(p);
p = NULL;
}
else
printf("update failn");
}
//这里主要目的是定时向服务器发送PING保活命令
if((sys_now() - curtick) >(KEEPLIVE_TIME/2*1000))
{
curtick = sys_now();
//判断是否有数据交换
if(no_mqtt_msg_exchange == 0)
{
//如果有数据交换,这次就不需要发送PING消息
continue;
}
if(MQTT_PingReq(MQTT_Socket) < 0)
{
//重连服务器
printf("发送保持活性ping失败....n");
goto MQTT_SEND_CLOSE;
}
//心跳成功
printf("发送保持活性ping作为心跳成功....n");
//表明有数据交换
no_mqtt_msg_exchange = 0;
}
}
MQTT_SEND_CLOSE:
//关闭链接
transport_close();
//开始连接
Client_Connect();
goto MQTT_SEND_START;
}
void
mqtt_thread_init(void)
{
sys_thread_new("mqtt_recv_thread", mqtt_recv_thread, NULL, 2048*4, 4);
sys_thread_new("mqtt_send_thread", mqtt_send_thread, NULL, 2048*4, 3);
}
此外还自己基于腾讯云主机部署mqtt服务器
使用TencentOS连接成功并完成通讯