【IoT迷你赛】完成结果视频

2019-08-27 10:27:57 浏览数 (1)

首先放视频

视频内容

【提示】下面图片代码略多,注意观看~

初次连接到腾讯云物联网开发平台(IoT Explorer)

一开始拿到板子就使用demo连接到腾讯云物联网开发平台(IoT Explorer):

image.pngimage.png
image.pngimage.png

完美完成通信。

开发自己的demo

方案构思如下:https://cloud.tencent.com/developer/article/1492173

方案构思方案构思

完成情况:

微信小程序开发微信小程序开发
pc上位机开发pc上位机开发

更多功能详见视频简介。

使用TencentOS tiny移植lwip

lwip工程demolwip工程demo

移植的接口文件

移植的部分接口文件如下:

代码语言:txt复制
#include "debug.h"

#include <lwip/opt.h>
#include <lwip/arch.h>

#include "tcpip.h"
#include "lwip/init.h"
#include "lwip/netif.h"
#include "lwip/sio.h"
#include "ethernetif.h"

#if !NO_SYS
#include "sys_arch.h"
#endif
#include <lwip/stats.h>
#include <lwip/debug.h>
#include <lwip/sys.h>
#include "lwip/dhcp.h"
#include <string.h>

int errno;


u32_t lwip_sys_now;

struct sys_timeouts {
  struct sys_timeo *next;
};

struct timeoutlist
{
	struct sys_timeouts timeouts;
	k_task_t *pid;
};

#define SYS_THREAD_MAX 4

static struct timeoutlist s_timeoutlist[SYS_THREAD_MAX];

static u16_t s_nextthread = 0;

u32_t
sys_jiffies(void)
{
  lwip_sys_now = tos_systick_get();
  return lwip_sys_now;
}

u32_t
sys_now(void)
{
  lwip_sys_now = tos_systick_get();
  return lwip_sys_now;
}

void
sys_init(void)
{
	int i;
	// Initialize the the per-thread sys_timeouts structures
	// make sure there are no valid pids in the list
	for(i = 0; i < SYS_THREAD_MAX; i  )
	{
		s_timeoutlist[i].pid = 0;
		s_timeoutlist[i].timeouts.next = NULL;
	}
	// keep track of how many threads have been created
	s_nextthread = 0;
}

struct sys_timeouts *sys_arch_timeouts(void)
{
	int i;
	k_task_t *pid;
	struct timeoutlist *tl;
	pid = k_curr_task;
	for(i = 0; i < s_nextthread; i  )
	{
		tl = &(s_timeoutlist[i]);
		if(tl->pid == pid)
		{
			return &(tl->timeouts);
		}
	}
	return NULL;
}

sys_prot_t sys_arch_protect(void)
{
    TOS_CPU_CPSR_ALLOC();
    TOS_CPU_INT_DISABLE();
    return cpu_cpsr;
}

void sys_arch_unprotect(sys_prot_t cpu_cpsr)
{
	TOS_CPU_INT_ENABLE();
}

#if !NO_SYS


err_t
sys_sem_new(sys_sem_t *sem, u8_t count)
{
    /* 创建 sem */
    tos_sem_create(sem,count);

#if SYS_STATS
	  lwip_stats.sys.sem.used;
 	if (lwip_stats.sys.sem.max < lwip_stats.sys.sem.used) {
		lwip_stats.sys.sem.max = lwip_stats.sys.sem.used;
	}
#endif /* SYS_STATS */
  
  if(sem != SYS_SEM_NULL)
    return ERR_OK;
  else
  {
#if SYS_STATS
      lwip_stats.sys.sem.err;
#endif /* SYS_STATS */
    printf("[sys_arch]:new sem fail!n");
    return ERR_MEM;
  }
}

void
sys_sem_free(sys_sem_t *sem)
{
#if SYS_STATS
   --lwip_stats.sys.sem.used;
#endif /* SYS_STATS */
  /* 删除 sem */
  tos_sem_destroy(sem);
  sem = SYS_SEM_NULL;
}


int sys_sem_valid(sys_sem_t *sem)                                               
{
  return (sem->pend_obj.type != NULL);                                    
}


void
sys_sem_set_invalid(sys_sem_t *sem)
{
    sem = SYS_SEM_NULL;
    (void)sem;
}

/* 
 如果timeout参数不为零,则返回值为
 等待信号量所花费的毫秒数。如果
 信号量未在指定时间内发出信号,返回值为
 SYS_ARCH_TIMEOUT。如果线程不必等待信号量
 该函数返回零。 */
u32_t
sys_arch_sem_wait(sys_sem_t *sem, u32_t timeout)
{
  k_tick_t wait_tick = 0;
  k_tick_t start_tick = 0 ;
  
  //看看信号量是否有效
  if(sem == SYS_SEM_NULL)
    return SYS_ARCH_TIMEOUT;
  
  //首先获取开始等待信号量的时钟节拍
  start_tick = sys_now();
  
  //timeout != 0,需要将ms换成系统的时钟节拍
  if(timeout != 0)
  {
    //将ms转换成时钟节拍
    wait_tick = timeout / (1000/TOS_CFG_CPU_TICK_PER_SECOND);
    if (wait_tick == 0)
      wait_tick = 1;
  }
  else
    wait_tick = TOS_TIME_FOREVER;  //一直阻塞
  
  //等待成功,计算等待的时间,否则就表示等待超时
  if(tos_sem_pend(sem, wait_tick) == K_ERR_NONE)
    return ((sys_now()-start_tick) * (1000/TOS_CFG_CPU_TICK_PER_SECOND));
  else
    return SYS_ARCH_TIMEOUT;
}

void
sys_sem_signal(sys_sem_t *sem)
{
  if(tos_sem_post( sem ) != K_ERR_NONE)
    printf("[sys_arch]:sem signal fail!n");
}

err_t
sys_mutex_new(sys_mutex_t *mutex)
{
  /* 创建 sem */   
  tos_mutex_create(mutex);
  if(mutex != SYS_MRTEX_NULL)
    return ERR_OK;
  else
  {
    printf("[sys_arch]:new mutex fail!n");
    return ERR_MEM;
  }
}

void
sys_mutex_free(sys_mutex_t *mutex)
{
  tos_mutex_destroy(mutex);
}

void
sys_mutex_set_invalid(sys_mutex_t *mutex)
{
    mutex = SYS_MRTEX_NULL;
    (void)mutex;
}

void
sys_mutex_lock(sys_mutex_t *mutex)
{
  tos_mutex_pend_timed(mutex,/* 互斥量句柄 */
                       TOS_TIME_FOREVER); /* 等待时间 */
}

void
sys_mutex_unlock(sys_mutex_t *mutex)
{
  tos_mutex_post( mutex );//给出互斥量
}


sys_thread_t
sys_thread_new(const char *name, lwip_thread_fn function, void *arg, int stacksize, int prio)
{
    k_err_t err;
    sys_thread_t task;
    k_stack_t *task_stack;
    
    task = tos_mmheap_alloc(sizeof(k_task_t));
    task_stack = tos_mmheap_alloc(stacksize);
    /* 创建MidPriority_Task任务 */
    err = tos_task_create(task, 
                          (char*)name, 
                          function,
                          arg, 
                          prio, 
                          task_stack,
                          stacksize,
                          20);
    if(err != K_ERR_NONE)
        printf("TencentOS Create task fail! code : %d rn",err);
  if(task == K_NULL)
  {
    printf("[sys_arch]:create task fail!n");
    return NULL;
  }
  return task;
}

err_t
sys_mbox_new(sys_mbox_t *mbox, int size)
{
    k_err_t err;
    /* 创建Test_Queue */
    err = tos_queue_create(mbox);
    if(err != K_ERR_NONE)
    {
        printf("TencentOS Create mbox fail! code : %d rn",err);
        return ERR_MEM;
    }
    return ERR_OK;
}

void
sys_mbox_free(sys_mbox_t *mbox)
{
    tos_queue_destroy(mbox);
}

int sys_mbox_valid(sys_mbox_t *mbox)          
{      
  return (mbox->pend_obj.type != NULL);
}   

void
sys_mbox_set_invalid(sys_mbox_t *mbox)
{
    mbox = SYS_MBOX_NULL; 
    (void)mbox;
}

void
sys_mbox_post(sys_mbox_t *q, void *msg)
{
    tos_queue_post( q, /* 消息队列的句柄 */
                    msg,/* 发送的消息内容 */
                    sizeof(void *));
//    while(tos_queue_post( q, /* 消息队列的句柄 */
//                    msg,/* 发送的消息内容 */
//                    sizeof(void *)) != K_ERR_NONE);
}

err_t
sys_mbox_trypost(sys_mbox_t *q, void *msg)
{
  if(tos_queue_post(q,msg,sizeof(void *)) == K_ERR_NONE)  
    return ERR_OK;
  else
    return ERR_MEM;
}

err_t
sys_mbox_trypost_fromisr(sys_mbox_t *q, void *msg)
{
  return sys_mbox_trypost(q, msg);
}

u32_t
sys_arch_mbox_fetch(sys_mbox_t *q, void **msg, u32_t timeout)
{
  void *dummyptr;
  k_tick_t wait_tick = 0;
  k_tick_t start_tick = 0 ;
  size_t size;
  size = sizeof(void *);
    
  if (msg == NULL )  //看看存储消息的地方是否有效
		msg = &dummyptr;
  
  //首先获取开始等待信号量的时钟节拍
  start_tick = sys_now();
  
  //timeout != 0,需要将ms换成系统的时钟节拍
  if(timeout != 0)
  {
    //将ms转换成时钟节拍
    wait_tick = timeout / (1000/TOS_CFG_CPU_TICK_PER_SECOND);
    if (wait_tick == 0)
      wait_tick = 1;
  }
  //一直阻塞
  else
    wait_tick = TOS_TIME_FOREVER;
  
  //等待成功,计算等待的时间,否则就表示等待超时
  if(tos_queue_pend(q,&(*msg),&size, wait_tick) == K_ERR_NONE)
    return ((sys_now() - start_tick)*(1000/TOS_CFG_CPU_TICK_PER_SECOND));
  else
  {
    *msg = NULL;
    return SYS_ARCH_TIMEOUT;
  }
}

u32_t
sys_arch_mbox_tryfetch(sys_mbox_t *q, void **msg)
{
    size_t size;
    size = sizeof(void *);
	void *dummyptr;
	if ( msg == NULL )
		msg = &dummyptr;
  
  //等待成功,计算等待的时间
  if(tos_queue_pend(q,&(*msg),&size, 0) == K_ERR_NONE)
    return ERR_OK;
  else
    return SYS_MBOX_EMPTY;
}

#if LWIP_NETCONN_SEM_PER_THREAD
#error LWIP_NETCONN_SEM_PER_THREAD==1 not supported
#endif /* LWIP_NETCONN_SEM_PER_THREAD */

#endif /* !NO_SYS */

/* Variables Initialization */
struct netif gnetif;
ip4_addr_t ipaddr;
ip4_addr_t netmask;
ip4_addr_t gw;
uint8_t IP_ADDRESS[4];
uint8_t NETMASK_ADDRESS[4];
uint8_t GATEWAY_ADDRESS[4];

void TCPIP_Init(void)
{
  tcpip_init(NULL, NULL);
  
  /* IP addresses initialization */
  /* USER CODE BEGIN 0 */
#if LWIP_DHCP
  ip_addr_set_zero_ip4(&ipaddr);
  ip_addr_set_zero_ip4(&netmask);
  ip_addr_set_zero_ip4(&gw);
#else
  IP4_ADDR(&ipaddr,IP_ADDR0,IP_ADDR1,IP_ADDR2,IP_ADDR3);
  IP4_ADDR(&netmask,NETMASK_ADDR0,NETMASK_ADDR1,NETMASK_ADDR2,NETMASK_ADDR3);
  IP4_ADDR(&gw,GW_ADDR0,GW_ADDR1,GW_ADDR2,GW_ADDR3);
#endif /* USE_DHCP */
  /* USER CODE END 0 */
  /* Initilialize the LwIP stack without RTOS */
  /* add the network interface (IPv4/IPv6) without RTOS */
  netif_add(&gnetif, &ipaddr, &netmask, &gw, NULL, &ethernetif_init, &tcpip_input);

  /* Registers the default network interface */
  netif_set_default(&gnetif);

  if (netif_is_link_up(&gnetif))
  {
    /* When the netif is fully configured this function must be called */
    netif_set_up(&gnetif);
  }
  else
  {
    /* When the netif link is down this function must be called */
    netif_set_down(&gnetif);
  }
  
#if LWIP_DHCP	   			//若使用了DHCP
  int err;
  /*  Creates a new DHCP client for this interface on the first call.
  Note: you must call dhcp_fine_tmr() and dhcp_coarse_tmr() at
  the predefined regular intervals after starting the client.
  You can peek in the netif->dhcp struct for the actual DHCP status.*/
  
  printf("本例程将使用DHCP动态分配IP地址,如果不需要则在lwipopts.h中将LWIP_DHCP定义为0nn");
  
  err = dhcp_start(&gnetif);      //开启dhcp
  if(err == ERR_OK)
    printf("lwip dhcp init success...nn");
  else
    printf("lwip dhcp init fail...nn");
  while(ip_addr_cmp(&(gnetif.ip_addr),&ipaddr))   //等待dhcp分配的ip有效
  {
    vTaskDelay(1);
  } 
#endif
  printf("本地IP地址是:%d.%d.%d.%dnn",  
        ((gnetif.ip_addr.addr)&0x000000ff),       
        (((gnetif.ip_addr.addr)&0x0000ff00)>>8),  
        (((gnetif.ip_addr.addr)&0x00ff0000)>>16), 
        ((gnetif.ip_addr.addr)&0xff000000)>>24);
}

iperf测速

测速信息(表示非常稳定,更多详情见附件):

lwip.zip
代码语言:txt复制
bin/iperf.exe -c 192.168.0.122 -P 1 -i 1 -p 5001 -f k -t 1000000000
------------------------------------------------------------
Client connecting to 192.168.0.122, TCP port 5001
TCP window size: 64.0 KByte (default)
------------------------------------------------------------
[300] local 192.168.0.195 port 56104 connected with 192.168.0.122 port 5001
[ ID] Interval       Transfer     Bandwidth
[300]  0.0- 1.0 sec  11640 KBytes  95355 Kbits/sec
[300]  1.0- 2.0 sec  11592 KBytes  94962 Kbits/sec
[300]  2.0- 3.0 sec  11576 KBytes  94831 Kbits/sec
[300]  3.0- 4.0 sec  11592 KBytes  94962 Kbits/sec
[300]  4.0- 5.0 sec  11592 KBytes  94962 Kbits/sec
[300]  5.0- 6.0 sec  11576 KBytes  94831 Kbits/sec
[300]  6.0- 7.0 sec  11592 KBytes  94962 Kbits/sec
[300]  7.0- 8.0 sec  11576 KBytes  94831 Kbits/sec
[300]  8.0- 9.0 sec  11592 KBytes  94962 Kbits/sec
[300]  9.0-10.0 sec  11576 KBytes  94831 Kbits/sec
[300] 10.0-11.0 sec  11552 KBytes  94634 Kbits/sec
[300] 11.0-12.0 sec  11592 KBytes  94962 Kbits/sec
[300] 12.0-13.0 sec  11568 KBytes  94765 Kbits/sec
[300] 13.0-14.0 sec  11592 KBytes  94962 Kbits/sec
[300] 14.0-15.0 sec  11576 KBytes  94831 Kbits/sec
[300] 15.0-16.0 sec  11584 KBytes  94896 Kbits/sec

移植emxgui

emxguiemxgui

部分接口文件如下(效果见视频):

代码语言:txt复制
#include	<stddef.h>
#include	"emXGUI_Arch.h"

#include "tos.h"

/*===================================================================================*/
/*
函数功能: 创建一个互斥(该互斥锁必须支持嵌套使用)
返回: 互斥对象句柄(唯一标识)
说明: 互斥对象句柄按实际OS所定,可以是指针,ID号等...
*/
GUI_MUTEX*	GUI_MutexCreate(void)
{
    GUI_MUTEX *mutex;
    mutex = tos_mmheap_alloc(sizeof(k_mutex_t));
	tos_mutex_create((k_mutex_t*)mutex);	
    return mutex;
}

/*===================================================================================*/
/*
函数功能: 互斥锁定
参数: hMutex(由GUI_MutexCreate返回的句柄); 
      time 最长等待毫秒数,0立既返回,0xFFFFFFFF,一直等待
返回: TRUE:成功;FALSE:失败或超时
说明: .
*/
BOOL	GUI_MutexLock(GUI_MUTEX *hMutex,U32 time)
{
  k_tick_t wait_tick;
  //timeout != 0,需要将ms换成系统的时钟节拍
  if(time != 0)
  {
    //将ms转换成时钟节拍
    wait_tick = time / (1000/TOS_CFG_CPU_TICK_PER_SECOND);
    if (wait_tick == 0)
      wait_tick = 1;
  }
  else if(time == 0xFFFFFFFF)
    wait_tick = TOS_TIME_FOREVER;  //一直阻塞
  
    if(tos_mutex_pend_timed((k_mutex_t*)hMutex, time) == K_ERR_NONE)
	{
		return TRUE;
	}
	return	FALSE;
}

/*===================================================================================*/
/*
函数功能: 互斥解锁
参数: hMutex(由GUI_MutexCreate返回的句柄);    
返回: 无
说明: .
*/
void	GUI_MutexUnlock(GUI_MUTEX *hMutex)
{
	 tos_mutex_post((k_mutex_t*)hMutex);
}

/*===================================================================================*/
/*
函数功能: 互斥删除
参数: hMutex(由GUI_MutexCreate返回的句柄);    
返回: 无
说明: .
*/
void	GUI_MutexDelete(GUI_MUTEX *hMutex)
{
	tos_mutex_destroy((k_mutex_t*)hMutex);
    tos_mmheap_free(hMutex);
}

/*===================================================================================*/
/*
函数功能: 创建一个信号量
参数: init: 信号量初始值; max: 信号量最大值
返回: 信号量对象句柄(唯一标识)
说明: 信号量对象句柄按实际OS所定,可以是指针,ID号等...
*/
GUI_SEM*	GUI_SemCreate(int init,int max)
{
    GUI_SEM *sem;
    sem = tos_mmheap_alloc(sizeof(k_sem_t));
	tos_sem_create((k_sem_t*)sem,init);	
    return sem;
}

/*===================================================================================*/
/*
函数功能: 信号量等待
参数: hsem(由GUI_SemCreate返回的句柄); 
      time 最长等待毫秒数,0立既返回,0xFFFFFFFF,一直等待
返回: TRUE:成功;FALSE:失败或超时
说明: .
*/
BOOL	GUI_SemWait(GUI_SEM *hsem,U32 time)
{
    k_tick_t wait_tick;
    //timeout != 0,需要将ms换成系统的时钟节拍
    if(time != 0)
    {
        //将ms转换成时钟节拍
        wait_tick = time / (1000/TOS_CFG_CPU_TICK_PER_SECOND);
        if (wait_tick == 0)
            wait_tick = 1;
    }
    else if(time == 0xFFFFFFFF)
        wait_tick = TOS_TIME_FOREVER;  //一直阻塞
    else
        wait_tick = 0;
    
	if(tos_sem_pend((k_sem_t*)hsem,time)== K_ERR_NONE)
	{
		return TRUE;
	}
	return FALSE;
}

/*===================================================================================*/
/*
函数功能: 信号量发送
参数: hsem(由GUI_SemCreate返回的句柄);  
返回: 无
说明: .
*/
void	GUI_SemPost(GUI_SEM *hsem)
{
	tos_sem_post((k_sem_t*)hsem);
}
/*
函数功能: 信号量发送(受freertos管理的中断)
参数: hsem(由GUI_SemCreate返回的句柄);  
返回: 无
说明: 若在受freertos管理的中断中调用GUI_SemPost,会导致port.c:425
*/
void GUI_SemPostISR(GUI_SEM *hsem)
{ 
    tos_sem_post((k_sem_t*)hsem);
}
/*===================================================================================*/
/*
函数功能: 信号量删除
参数: hsem(由GUI_SemCreate返回的句柄);    
返回: 无
说明: .
*/
void	GUI_SemDelete(GUI_SEM *hsem)
{
	tos_sem_destroy((k_sem_t*)hsem);
    tos_mmheap_free(hsem);
}

/*===================================================================================*/
/*
函数功能: 获得当前线程句柄(唯一标识)
参数: 无  
返回: 当前线程唯一标识,按实际OS所定,可以是指针,ID号等...
说明: .
*/
HANDLE	GUI_GetCurThreadHandle(void)
{
	return	(HANDLE)k_curr_task;
}


/*===================================================================================*/
/*
函数功能: 获得当前系统时间(单位:毫秒)
参数: 无  
返回: 当前系统时间
说明: .
*/
U32	GUI_GetTickCount(void)
{
	U32 i;
	
	i=tos_systick_get();
	
	return (i*1000)/TOS_CFG_CPU_TICK_PER_SECOND;

}

/*===================================================================================*/
/*
函数功能: 最短时间内让出CPU
参数: 无  
返回: 无
说明: 按具体OS情况而定,最简单的方法是:OS Delay 一个 tick 周期.
*/
void	GUI_Yield(void)
{
	tos_task_delay(2);
}

/*===================================================================================*/
/*
函数功能: 延时函数
参数: ms: 延时时间(单位:毫秒) 
返回: 无
说明: 
*/
void	GUI_msleep(u32 ms)
{
	tos_task_delay((ms/(1000/TOS_CFG_CPU_TICK_PER_SECOND)));
}

/*
 * 函数功能: 创建线程
 * @param name 线程名
 * @param entry 线程入口函数
 * @param parameter 线程参数
 * @param stack_size 线程栈大小(单位字节,注意部分系统需要进行单位转换)
 * @param priority 线程优先级
 * @param tick FreeRTOS没这个功能,时间片(同优先级任务的时间片轮转)
 * @return 是否创建成功
*/
BOOL GUI_Thread_Create(void (*entry)(void *parameter),
                         const char *name,
                         u32  stack_size,
                         void *parameter,
                         u32  priority,
                         u32  tick)
{
    k_err_t err;
    k_task_t *task;
    k_stack_t *task_stack;
    
    task = tos_mmheap_alloc(sizeof(k_task_t));
    task_stack = tos_mmheap_alloc(stack_size);
    /* 创建MidPriority_Task任务 */
    err = tos_task_create(task, 
                          (char*)name, 
                          entry,
                          parameter, 
                          priority, 
                          task_stack,
                          stack_size,
                          tick);

  if(err == K_ERR_NONE)
    return TRUE;
  else
  {
    GUI_ERROR("GUI Thread Create failed:%s",name);
    return FALSE; 
  }    
}

/**
 * @breif: 删除线程,可通过GUI_GetCurThreadHandle获取当前任务句柄作为输入参数
 * @return 无
*/
void GUI_Thread_Delete(HANDLE thread)
{
    k_task_t* task = (k_task_t*)thread;
    tos_task_destroy(task);
    tos_mmheap_free(task->stk_base);
    tos_mmheap_free(task);
    
}
#endif

使用iot hub平台

设备在线情况:

image.pngimage.png

使用规则引擎:

image.pngimage.png

开发云平台签名小工具:

image.pngimage.png

MQTT.fx连接

image.pngimage.png

stm32f429使用mqtt连接iot hub平台

image.pngimage.png

部分mqtt代码:

代码语言:txt复制
#include "mqttclient.h"
#include "transport.h"
#include "MQTTPacket.h"
#include "tos.h"

#include "string.h"
#include "sockets.h"

#include "lwip/opt.h"

#include "lwip/sys.h"
#include "lwip/api.h"

#include "lwip/sockets.h"

#include "cJSON_Process.h"
#include "bsp_dht11.h"

/******************************* 全局变量声明 ************************************/
/*
 * 当我们在写应用程序的时候,可能需要用到一些全局变量。
 */
extern k_queue_t MQTT_Data_Queue;


//定义用户消息结构体
MQTT_USER_MSG  mqtt_user_msg;

int32_t MQTT_Socket = 0;


void deliverMessage(MQTTString *TopicName,MQTTMessage *msg,MQTT_USER_MSG *mqtt_user_msg);

/************************************************************************
** 函数名称: MQTT_Connect								
** 函数功能: 初始化客户端并登录服务器
** 入口参数: int32_t sock:网络描述符
** 出口参数: >=0:发送成功 <0:发送失败
** 备    注: 
************************************************************************/
uint8_t MQTT_Connect(void)
{
    MQTTPacket_connectData data = MQTTPacket_connectData_initializer;
    uint8_t buf[200];
    int buflen = sizeof(buf);
    int len = 0;
    data.clientID.cstring = CLIENT_ID;                   //随机
    data.keepAliveInterval = KEEPLIVE_TIME;         //保持活跃
    data.username.cstring = USER_NAME;              //用户名
    data.password.cstring = PASSWORD;               //密钥
    data.MQTTVersion = MQTT_VERSION;                //3表示3.1版本,4表示3.11版本
    data.cleansession = 1;
    //组装消息
    len = MQTTSerialize_connect((unsigned char *)buf, buflen, &data);
    //发送消息
    transport_sendPacketBuffer(buf, len);

    /* 等待连接响应 */
    if (MQTTPacket_read(buf, buflen, transport_getdata) == CONNACK)
    {
        unsigned char sessionPresent, connack_rc;
        if (MQTTDeserialize_connack(&sessionPresent, &connack_rc, buf, buflen) != 1 || connack_rc != 0)
        {
          printf("无法连接,错误代码是: %d!n", connack_rc);
            return Connect_NOK;
        }
        else 
        {
            printf("用户名与密钥验证成功,MQTT连接成功!n");
            return Connect_OK;
        }
    }
    else
        printf("MQTT连接无响应!n");
        return Connect_NOTACK;
}


/************************************************************************
** 函数名称: MQTT_PingReq								
** 函数功能: 发送MQTT心跳包
** 入口参数: 无
** 出口参数: >=0:发送成功 <0:发送失败
** 备    注: 
************************************************************************/
int32_t MQTT_PingReq(int32_t sock)
{
	  int32_t len;
		uint8_t buf[200];
		int32_t buflen = sizeof(buf);	 
		fd_set readfd;
	  struct timeval tv;
	  tv.tv_sec = 5;
	  tv.tv_usec = 0;
	
	  FD_ZERO(&readfd);
	  FD_SET(sock,&readfd);			
	
		len = MQTTSerialize_pingreq(buf, buflen);
		transport_sendPacketBuffer(buf, len);
	
		//等待可读事件
		if(select(sock 1,&readfd,NULL,NULL,&tv) == 0)
			return -1;
		
	  //有可读事件
		if(FD_ISSET(sock,&readfd) == 0)
			return -2;
		
		if(MQTTPacket_read(buf, buflen, transport_getdata) != PINGRESP)
			return -3;
		
		return 0;
	
}


/************************************************************************
** 函数名称: MQTTSubscribe								
** 函数功能: 订阅消息
** 入口参数: int32_t sock:套接字
**           int8_t *topic:主题
**           enum QoS pos:消息质量
** 出口参数: >=0:发送成功 <0:发送失败
** 备    注: 
************************************************************************/
int32_t MQTTSubscribe(int32_t sock,char *topic,enum QoS pos)
{
	  static uint32_t PacketID = 0;
	  uint16_t packetidbk = 0;
	  int32_t conutbk = 0;
		uint8_t buf[100];
		int32_t buflen = sizeof(buf);
	  MQTTString topicString = MQTTString_initializer;  
		int32_t len;
	  int32_t req_qos,qosbk;
	
		fd_set readfd;
	  struct timeval tv;
	  tv.tv_sec = 2;
	  tv.tv_usec = 0;
	
	  FD_ZERO(&readfd);
	  FD_SET(sock,&readfd);		
	
	  //复制主题
    topicString.cstring = (char *)topic;
		//订阅质量
	  req_qos = pos;
	
	  //串行化订阅消息
    len = MQTTSerialize_subscribe(buf, buflen, 0, PacketID  , 1, &topicString, &req_qos);
		//发送TCP数据
	  if(transport_sendPacketBuffer(buf, len) < 0)
				return -1;
	  
    //等待可读事件--等待超时
		if(select(sock 1,&readfd,NULL,NULL,&tv) == 0)
				return -2;
		//有可读事件--没有可读事件
		if(FD_ISSET(sock,&readfd) == 0)
				return -3;

		//等待订阅返回--未收到订阅返回
		if(MQTTPacket_read(buf, buflen, transport_getdata) != SUBACK)
				return -4;	
		
		//拆订阅回应包
		if(MQTTDeserialize_suback(&packetidbk,1, &conutbk, &qosbk, buf, buflen) != 1)
				return -5;
		
		//检测返回数据的正确性
		if((qosbk == 0x80)||(packetidbk != (PacketID-1)))
				return -6;
		
    //订阅成功
		return 0;
}


/************************************************************************
** 函数名称: UserMsgCtl						
** 函数功能: 用户消息处理函数
** 入口参数: MQTT_USER_MSG  *msg:消息结构体指针
** 出口参数: 无
** 备    注: 
************************************************************************/
void UserMsgCtl(MQTT_USER_MSG  *msg)
{
		//这里处理数据只是打印,用户可以在这里添加自己的处理方式
	  printf("*****收到订阅的消息!******n");
		//返回后处理消息
	  switch(msg->msgqos)
		{
			case 0:
				    printf("MQTT>>消息质量:QoS0n");
				    break;
			case 1:
				    printf("MQTT>>消息质量:QoS1n");
				    break;
			case 2:
				    printf("MQTT>>消息质量:QoS2n");
				    break;
			default:
				    printf("MQTT>>错误的消息质量n");
				    break;
		}
		printf("MQTT>>消息主题:%sn",msg->topic);	
		printf("MQTT>>消息类容:%sn",msg->msg);	
		printf("MQTT>>消息长度:%dn",msg->msglenth);	 
        Proscess(msg->msg);
	  //处理完后销毁数据
	  msg->valid  = 0;
}

/************************************************************************
** 函数名称: GetNextPackID						
** 函数功能: 产生下一个数据包ID
** 入口参数: 无
** 出口参数: uint16_t packetid:产生的ID
** 备    注: 
************************************************************************/
uint16_t GetNextPackID(void)
{
	 static uint16_t pubpacketid = 0;
	 return pubpacketid  ;
}

/************************************************************************
** 函数名称: mqtt_msg_publish						
** 函数功能: 用户推送消息
** 入口参数: MQTT_USER_MSG  *msg:消息结构体指针
** 出口参数: >=0:发送成功 <0:发送失败
** 备    注: 
************************************************************************/
int32_t MQTTMsgPublish(int32_t sock, char *topic, int8_t qos, uint8_t* msg)
{
    int8_t retained = 0;      //保留标志位
    uint32_t msg_len;         //数据长度
		uint8_t buf[MSG_MAX_LEN];
		int32_t buflen = sizeof(buf),len;
		MQTTString topicString = MQTTString_initializer;
	  uint16_t packid = 0,packetidbk;
	
		//填充主题
	  topicString.cstring = (char *)topic;

	  //填充数据包ID
	  if((qos == QOS1)||(qos == QOS2))
		{ 
			packid = GetNextPackID();
		}
		else
		{
			  qos = QOS0;
			  retained = 0;
			  packid = 0;
		}
     
    msg_len = strlen((char *)msg);
    
		//推送消息
		len = MQTTSerialize_publish(buf, buflen, 0, qos, retained, packid, topicString, (unsigned char*)msg, msg_len);
		if(len <= 0)
				return -1;
		if(transport_sendPacketBuffer(buf, len) < 0)	
				return -2;	
		
		//质量等级0,不需要返回
		if(qos == QOS0)
		{
				return 0;
		}
		
		//等级1
		if(qos == QOS1)
		{
				//等待PUBACK
			  if(WaitForPacket(sock,PUBACK,5) < 0)
					 return -3;
				return 1;
			  
		}
		//等级2
		if(qos == QOS2)	
		{
			  //等待PUBREC
			  if(WaitForPacket(sock,PUBREC,5) < 0)
					 return -3;
			  //发送PUBREL
        len = MQTTSerialize_pubrel(buf, buflen,0, packetidbk);
				if(len == 0)
					return -4;
				if(transport_sendPacketBuffer(buf, len) < 0)	
					return -6;			
			  //等待PUBCOMP
			  if(WaitForPacket(sock,PUBREC,5) < 0)
					 return -7;
				return 2;
		}
		//等级错误
		return -8;
}

/************************************************************************
** 函数名称: ReadPacketTimeout					
** 函数功能: 阻塞读取MQTT数据
** 入口参数: int32_t sock:网络描述符
**           uint8_t *buf:数据缓存区
**           int32_t buflen:缓冲区大小
**           uint32_t timeout:超时时间--0-表示直接查询,没有数据立即返回
** 出口参数: -1:错误,其他--包类型
** 备    注: 
************************************************************************/
int32_t ReadPacketTimeout(int32_t sock,uint8_t *buf,int32_t buflen,uint32_t timeout)
{
		fd_set readfd;
	  struct timeval tv;
	  if(timeout != 0)
		{
				tv.tv_sec = timeout;
				tv.tv_usec = 0;
				FD_ZERO(&readfd);
				FD_SET(sock,&readfd); 

				//等待可读事件--等待超时
				if(select(sock 1,&readfd,NULL,NULL,&tv) == 0)
						return -1;
				//有可读事件--没有可读事件
				if(FD_ISSET(sock,&readfd) == 0)
						return -1;
	  }
		//读取TCP/IP事件
		return MQTTPacket_read(buf, buflen, transport_getdata);
}


/************************************************************************
** 函数名称: deliverMessage						
** 函数功能: 接受服务器发来的消息
** 入口参数: MQTTMessage *msg:MQTT消息结构体
**           MQTT_USER_MSG *mqtt_user_msg:用户接受结构体
**           MQTTString  *TopicName:主题
** 出口参数: 无
** 备    注: 
************************************************************************/
void deliverMessage(MQTTString  *TopicName,MQTTMessage *msg,MQTT_USER_MSG *mqtt_user_msg)
{
		//消息质量
		mqtt_user_msg->msgqos = msg->qos;
		//保存消息
		memcpy(mqtt_user_msg->msg,msg->payload,msg->payloadlen);
		mqtt_user_msg->msg[msg->payloadlen] = 0;
		//保存消息长度
		mqtt_user_msg->msglenth = msg->payloadlen;
		//消息主题
		memcpy((char *)mqtt_user_msg->topic,TopicName->lenstring.data,TopicName->lenstring.len);
		mqtt_user_msg->topic[TopicName->lenstring.len] = 0;
		//消息ID
		mqtt_user_msg->packetid = msg->id;
		//标明消息合法
		mqtt_user_msg->valid = 1;		
}


/************************************************************************
** 函数名称: mqtt_pktype_ctl						
** 函数功能: 根据包类型进行处理
** 入口参数: uint8_t packtype:包类型
** 出口参数: 无
** 备    注: 
************************************************************************/
void mqtt_pktype_ctl(uint8_t packtype,uint8_t *buf,uint32_t buflen)
{
	  MQTTMessage msg;
		int32_t rc;
	  MQTTString receivedTopic;
	  uint32_t len;
		switch(packtype)
		{
			case PUBLISH:
        //拆析PUBLISH消息
        if(MQTTDeserialize_publish(&msg.dup,(int*)&msg.qos, &msg.retained, &msg.id, &receivedTopic,
          (unsigned char **)&msg.payload, &msg.payloadlen, buf, buflen) != 1)
            return;	
        //接受消息
        deliverMessage(&receivedTopic,&msg,&mqtt_user_msg);
        
        //消息质量不同,处理不同
        if(msg.qos == QOS0)
        {
           //QOS0-不需要ACK
           //直接处理数据
           UserMsgCtl(&mqtt_user_msg);
           return;
        }
        //发送PUBACK消息
        if(msg.qos == QOS1)
        {
            len =MQTTSerialize_puback(buf,buflen,mqtt_user_msg.packetid);
            if(len == 0)
              return;
            //发送返回
            if(transport_sendPacketBuffer(buf,len)<0)
               return;	
            //返回后处理消息
            UserMsgCtl(&mqtt_user_msg); 
            return;												
        }

        //对于质量2,只需要发送PUBREC就可以了
        if(msg.qos == QOS2)
        {
           len = MQTTSerialize_ack(buf, buflen, PUBREC, 0, mqtt_user_msg.packetid);			                
           if(len == 0)
             return;
           //发送返回
           transport_sendPacketBuffer(buf,len);	
        }		
        break;
			case  PUBREL:				           
        //解析包数据,必须包ID相同才可以
        rc = MQTTDeserialize_ack(&msg.type,&msg.dup, &msg.id, buf,buflen);
        if((rc != 1)||(msg.type != PUBREL)||(msg.id != mqtt_user_msg.packetid))
          return ;
        //收到PUBREL,需要处理并抛弃数据
        if(mqtt_user_msg.valid == 1)
        {
           //返回后处理消息
           UserMsgCtl(&mqtt_user_msg);
        }      
        //串行化PUBCMP消息
        len = MQTTSerialize_pubcomp(buf,buflen,msg.id);	                   	
        if(len == 0)
          return;									
        //发送返回--PUBCOMP
        transport_sendPacketBuffer(buf,len);										
        break;
			case   PUBACK://等级1客户端推送数据后,服务器返回
				break;
			case   PUBREC://等级2客户端推送数据后,服务器返回
				break;
			case   PUBCOMP://等级2客户端推送PUBREL后,服务器返回
        break;
			default:
				break;
		}
}

/************************************************************************
** 函数名称: WaitForPacket					
** 函数功能: 等待特定的数据包
** 入口参数: int32_t sock:网络描述符
**           uint8_t packettype:包类型
**           uint8_t times:等待次数
** 出口参数: >=0:等到了特定的包 <0:没有等到特定的包
** 备    注: 
************************************************************************/
int32_t WaitForPacket(int32_t sock,uint8_t packettype,uint8_t times)
{
	  int32_t type;
		uint8_t buf[MSG_MAX_LEN];
	  uint8_t n = 0;
		int32_t buflen = sizeof(buf);
		do
		{
				//读取数据包
				type = ReadPacketTimeout(sock,buf,buflen,2);
			  if(type != -1)
					mqtt_pktype_ctl(type,buf,buflen);
				n  ;
		}while((type != packettype)&&(n < times));
		//收到期望的包
		if(type == packettype)
			 return 0;
		else 
			 return -1;		
}



void Client_Connect(void)
{
    char* host_ip;
  
#ifdef  LWIP_DNS
    ip4_addr_t dns_ip;
    netconn_gethostbyname(HOST_NAME, &dns_ip);
    host_ip = ip_ntoa(&dns_ip);
    printf("host name : %s , host_ip : %sn",HOST_NAME,host_ip);
#else
    host_ip = HOST_NAME;
#endif  
MQTT_START: 
  
		//创建网络连接
		printf("1.开始连接对应云平台的服务器...n");
    printf("服务器IP地址:%s,端口号:
!n",host_ip,HOST_PORT);
		while(1)
		{
				//连接服务器
				MQTT_Socket = transport_open((int8_t*)host_ip,HOST_PORT);
				//如果连接服务器成功
				if(MQTT_Socket >= 0)
				{
						printf("连接云平台服务器成功!n");
						break;
				}
				printf("连接云平台服务器失败,等待3秒再尝试重新连接!n");
				//等待3秒
				tos_task_delay(3000);
		}
    
    printf("2.MQTT用户名与密钥验证登录...n");
    //MQTT用户名与密钥验证登录
    if(MQTT_Connect() != Connect_OK)
    {
         //重连服务器
         printf("MQTT用户名与密钥验证登录失败...n");
          //关闭链接
         transport_close();
         goto MQTT_START;	 
    }
    
		//订阅消息
		printf("3.开始订阅消息...n");
    //订阅消息
    if(MQTTSubscribe(MQTT_Socket,(char *)SUB_TOPIC,QOS1) < 0)
    {
         //重连服务器
         printf("客户端订阅消息失败...n");
          //关闭链接
         transport_close();
         goto MQTT_START;	   
    }	

		//无限循环
		printf("4.开始循环接收订阅的消息...n");

}





/************************************************************************
** 函数名称: mqtt_recv_thread								
** 函数功能: MQTT任务
** 入口参数: void *pvParameters:任务参数
** 出口参数: 无
** 备    注: MQTT连云步骤:
**           1.连接对应云平台的服务器
**           2.MQTT用户与密钥验证登录
**           3.订阅指定主题
**           4.等待接收主题的数据与上报主题数据
************************************************************************/
void mqtt_recv_thread(void *pvParameters)
{
	  uint32_t curtick;
		uint8_t no_mqtt_msg_exchange = 1;
		uint8_t buf[MSG_MAX_LEN];
		int32_t buflen = sizeof(buf);
    int32_t type;
    fd_set readfd;
	  struct timeval tv;      //等待时间
	  tv.tv_sec = 0;
	  tv.tv_usec = 10;

  
MQTT_START: 
    //开始连接
    Client_Connect();
    //获取当前滴答,作为心跳包起始时间
		curtick = sys_now();
		while(1)
		{
				//表明无数据交换
				no_mqtt_msg_exchange = 1;
			
				FD_ZERO(&readfd);
				FD_SET(MQTT_Socket,&readfd);						  

				//等待可读事件
				select(MQTT_Socket 1,&readfd,NULL,NULL,&tv);
				
				//判断MQTT服务器是否有数据
				if(FD_ISSET(MQTT_Socket,&readfd) != 0)
				{
						//读取数据包--注意这里参数为0,不阻塞
						type = ReadPacketTimeout(MQTT_Socket,buf,buflen,0);
						if(type != -1)
						{
								mqtt_pktype_ctl(type,buf,buflen);
								//表明有数据交换
								no_mqtt_msg_exchange = 0;
								//获取当前滴答,作为心跳包起始时间
								curtick = sys_now();
						}
				}
        
        //这里主要目的是定时向服务器发送PING保活命令
        if((sys_now() - curtick) >(KEEPLIVE_TIME/2*1000))
        {
            curtick = sys_now();
            //判断是否有数据交换
            if(no_mqtt_msg_exchange == 0)
            {
               //如果有数据交换,这次就不需要发送PING消息
               continue;
            }
            
            if(MQTT_PingReq(MQTT_Socket) < 0)
            {
               //重连服务器
               printf("发送保持活性ping失败....n");
               goto CLOSE;	 
            }
            
            //心跳成功
            printf("发送保持活性ping作为心跳成功....n");
            //表明有数据交换
            no_mqtt_msg_exchange = 0;
        }   
		}

CLOSE:
	 //关闭链接
	 transport_close();
	 //重新链接服务器
	 goto MQTT_START;	
}

void mqtt_send_thread(void *pvParameters)
{
    printf("mqtt_send_threadn");
    int32_t ret;
    uint8_t no_mqtt_msg_exchange = 1;
    uint32_t curtick;
    uint8_t res;
    /* 定义一个创建信息返回值,默认为pdTRUE */
    k_err_t err;
    /* 定义一个接收消息的变量 */
    size_t msg_size;
    DHT11_Data_TypeDef* recv_data;
    //初始化json数据
    cJSON* cJSON_Data = NULL;
    cJSON_Data = cJSON_Data_Init();
    double a,b,c;
MQTT_SEND_START:
  
    while(1)
    {
        
       err = tos_queue_pend(&MQTT_Data_Queue,    
                         (void*)&recv_data,
                         &msg_size,
                         3000); 
//       recv_data = r_data;
       if(err == K_ERR_NONE)
       {
        a = recv_data->temperature;
        b = recv_data->humidity;
        c = sys_now();
        printf("a = %f,b = %fn",a,b);
        //更新数据      
        res = cJSON_Update(cJSON_Data,TIME_NUM,&c);
        res = cJSON_Update(cJSON_Data,TEMP_NUM,&a);
        res = cJSON_Update(cJSON_Data,HUM_NUM,&b);
        
        if(UPDATE_SUCCESS == res)
        {
            //更新数据成功,
            char* p = cJSON_Print(cJSON_Data);
            //发布消息
            ret = MQTTMsgPublish(MQTT_Socket,(char*)PUB_TOPIC,QOS0,(uint8_t*)p);
            if(ret >= 0)
            {
                //表明有数据交换
                no_mqtt_msg_exchange = 0;
                //获取当前滴答,作为心跳包起始时间
                curtick = sys_now();				
            }
            tos_mmheap_free(p);
            p = NULL;
        }
        else
          printf("update failn");
      }
      //这里主要目的是定时向服务器发送PING保活命令
      if((sys_now() - curtick) >(KEEPLIVE_TIME/2*1000))
      {
          curtick = sys_now();
          //判断是否有数据交换
          if(no_mqtt_msg_exchange == 0)
          {
             //如果有数据交换,这次就不需要发送PING消息
             continue;
          }
          
          if(MQTT_PingReq(MQTT_Socket) < 0)
          {
             //重连服务器
             printf("发送保持活性ping失败....n");
             goto MQTT_SEND_CLOSE;	 
          }
          
          //心跳成功
          printf("发送保持活性ping作为心跳成功....n");
          //表明有数据交换
          no_mqtt_msg_exchange = 0;
      } 
  }
MQTT_SEND_CLOSE:
	 //关闭链接
	 transport_close(); 
   //开始连接
   Client_Connect();
   goto MQTT_SEND_START;
}

void
mqtt_thread_init(void)
{
  sys_thread_new("mqtt_recv_thread", mqtt_recv_thread, NULL, 2048*4, 4);
  sys_thread_new("mqtt_send_thread", mqtt_send_thread, NULL, 2048*4, 3);
}

此外还自己基于腾讯云主机部署mqtt服务器

使用TencentOS连接成功并完成通讯

image.pngimage.png

0 人点赞