linux mqtt客户端

2022-11-01 16:29:20 浏览数 (2)

大家好,又见面了,我是你们的朋友全栈君。

实现功能:

(1)定时30s发送心跳包;

(2)接收 mqtt 数据包,解析函数是 user_recv_handle_cb;

(3)定时 PERIOD_TIME 发布 自身订阅的主题 信息,即循环 PERIOD_TIME 发啥收啥。

说明:

(1)主要根据 庆科的MiCO_A_v3.2.0/demos/net/mqtt_client 的 stm32 freeRTOS 移植到 linux 平台。

(2)实现方式:select、queue 、pthread。

核心源码:

代码语言:javascript复制
/*************************************** 描述***********************
作者: lee
日期: 2019/7/2
文件名:mqtt_client.c
功能描述:
1.定时30s发送心跳包
2.接收 mqtt 数据包,解析函数是user_recv_handle_cb
3.定时  PERIOD_TIME   发布 自身订阅的主题 信息,即循环 PERIOD_TIME 发啥收啥
**********************************************************************/
#include "./libraries/protocols/mqtt/MQTTClient.h"
#include "/usr/local/include/uv.h"
#include "pthread.h"
#include "sys/select.h"
#include "sys/queue.h"
/*********************************
*              Macros
***********************************************/
#define app_log(M, ...) custom_log("APP", M, ##__VA_ARGS__)
#define mqtt_log(M, ...) custom_log("MQTT", M, ##__VA_ARGS__)
#define MQTT_CMD_TIMEOUT 5000 // 5s
#define MAX_MQTT_TOPIC_SIZE  (256)
#define MAX_MQTT_DATA_SIZE   (1024)
#define MQTT_SERVER "127.0.0.1"
//#define MQTT_SERVER "test.mosquitto.org"
#define MQTT_SERVER_PORT 1883
#define PERIOD_TIME   2000  // 2s
/***********************************************
*              Constants
***********************************************/
#define MQTT_CLIENT_ID  "MiCO_MQTT_Client"
#define MQTT_CLIENT_USERNAME NULL
#define MQTT_CLIENT_PASSWORD NULL
#define MQTT_CLIENT_KEEPALIVE 30
#define MQTT_CLIENT_SUB_TOPIC "mico/test/send" // loop msg
#define MQTT_CLIENT_PUB_TOPIC "mico/test/send"
#define MQTT_YIELD_TMIE 5000 // 5s
#define MQTT_CLIENT_PUB_MSG "mico_mqtt_client_test_data_1234567890"
/***********************************************
*              Structures
***********************************************/
typedef struct {   
char topic[MAX_MQTT_TOPIC_SIZE];
char qos;
char retained;
uint8_t data[MAX_MQTT_DATA_SIZE];
uint32_t datalen;
} s_MQTT_Data_Packet_Info;
struct node{
STAILQ_ENTRY(node) next;
void (*fp) (void*);  
void *data;
};
/***********************************************
*              Function Declarations
***********************************************/
void user_send_cb(void* data);
/***********************************************
*              Variables Definitions
***********************************************/
uv_req_t mqtt_client_recv_handle, mqtt_client_send_handle;
volatile static bool no_mqtt_msg_exchange = true;
Client c; // mqtt client object
Network n; // socket network for mqtt client
STAILQ_HEAD(head, node);
struct head *lhead = 0;
void user_recv_handle_cb(void* data){
s_MQTT_Data_Packet_Info *p_recv_msg = (s_MQTT_Data_Packet_Info *)data;
if (p_recv_msg)
{
实际工程中替换
app_log("tttttuser get data success! from_topic=[%s], msg=[%ld][%s].rn", p_recv_msg->topic, p_recv_msg->datalen, p_recv_msg->data);  
free(p_recv_msg);
p_recv_msg = NULL;
}          
}
// call back, msg received from mqtt server
static void messageArrived(MessageData* md){
s_MQTT_Data_Packet_Info* p_recv_msg = NULL; 
MQTTMessage* pMessage = md->message;
p_recv_msg = (s_MQTT_Data_Packet_Info *)calloc(1, sizeof(s_MQTT_Data_Packet_Info));
if (p_recv_msg == NULL)  
{
mqtt_log("malloc内存分配不足");
return;
}
p_recv_msg->datalen = pMessage->payloadlen;
p_recv_msg->qos = pMessage->qos;
p_recv_msg->retained = pMessage->retained;
strncpy(p_recv_msg->topic, md->topicName->lenstring.data, md->topicName->lenstring.len);
memcpy(p_recv_msg->data, pMessage->payload, p_recv_msg->datalen   1);  // lee:  !!!!!!!!!!!!!!!!!!!!   p_recv_msg->datalen   1  不加1,会出现段错误 
// mqtt_client_recv_handle.data = &p_recv_msg;
// uv_queue_work(loop, &mqtt_client_recv_handle, user_recv_handle_cb, NULL);    //  lee !!!!!!!!!!!!!!!!!!!! 发现  libuv中的工作队列不能和select混用
struct node process_func;
process_func.data = p_recv_msg;
process_func.fp = user_recv_handle_cb;
STAILQ_INSERT_TAIL(lhead, &process_func, next); 
p_recv_msg = NULL; 
}
static OSStatus mqtt_client_release(Client *c, Network *n){
OSStatus err = kNoErr;
if (c->isconnected) MQTTDisconnect(c);
n->disconnect(n);// close connection
if (MQTT_SUCCESS != MQTTClientDeinit(c)){
app_log("MQTTClientDeinit failed!");
err = kDeletedErr;
}
return err;
}
void* work_thread(void* arg){
s_MQTT_Data_Packet_Info* p_send_msg = NULL;
Timer period_timer;
InitTimer(&period_timer);
countdown_ms(&period_timer, PERIOD_TIME);
while (1)
{
if (!STAILQ_EMPTY(lhead)){
struct node* pfirst_node = STAILQ_FIRST(lhead);
pfirst_node->fp(pfirst_node->data);
STAILQ_REMOVE_HEAD(lhead, next);
} 
while (expired(&period_timer))
{
if (c.isconnected){
p_send_msg = (s_MQTT_Data_Packet_Info*) calloc(1, sizeof(s_MQTT_Data_Packet_Info));                
if (p_send_msg == NULL) {
mqtt_log("没有内存可用");
continue;
}
p_send_msg->qos = 0;
p_send_msg->retained = 0;
p_send_msg->datalen = strlen(MQTT_CLIENT_PUB_MSG);
memcpy(p_send_msg->data, MQTT_CLIENT_PUB_MSG, p_send_msg->datalen);
strncpy(p_send_msg->topic, MQTT_CLIENT_PUB_TOPIC, MAX_MQTT_TOPIC_SIZE);
struct node process_func;
process_func.data = p_send_msg;
process_func.fp = user_send_cb;
STAILQ_INSERT_TAIL(lhead, &process_func, next); 
p_send_msg = NULL;
}
else
{
mqtt_log("MQTT client does not init ok");
}
countdown_ms(&period_timer, PERIOD_TIME);   
} 
}
pthread_exit(NULL);   
}
void* mqtt_client_thread(void* arg){
OSStatus err = kUnknownErr;
int rc = -1;
fd_set readfds;
struct timeval t = { 0, MQTT_YIELD_TMIE * 1000};
ssl_opts ssl_settings;
MQTTPacket_connectData connectData = MQTTPacket_connectData_initializer;
memset(&c, 0, sizeof(c));
memset(&n, 0, sizeof(n));
MQTT_start:
// create network connection
ssl_settings.ssl_enable = false;
mqtt_log("enter into mqtt client thread.");
while (1){
rc = NewNetwork(&n, MQTT_SERVER, MQTT_SERVER_PORT, ssl_settings);
if (rc == MQTT_SUCCESS) break;
mqtt_log("ERROR: MQTT network connection err=%d, reconnect after 3s...", rc);
sleep(3);
}
mqtt_log("MQTT network connection success!");
// 2.init mqtt client
rc = MQTTClientInit(&c, &n, MQTT_CMD_TIMEOUT);
require_noerr_string(rc, MQTT_reconnect, "ERROR: MQTT client init err.");
mqtt_log("MQTT client init success!");
// 3.create mqtt client connection
connectData.willFlag = 0;
connectData.MQTTVersion = 4; // 3: 3.1, 4: v3.1.1
connectData.clientID.cstring = MQTT_CLIENT_ID;
connectData.username.cstring = MQTT_CLIENT_USERNAME;
connectData.password.cstring = MQTT_CLIENT_PASSWORD;
connectData.keepAliveInterval = MQTT_CLIENT_KEEPALIVE;
connectData.cleansession = 1;
rc = MQTTConnect(&c, &connectData);
require_noerr_string(rc, MQTT_reconnect, "ERROR: MQTT client connect err.");
mqtt_log("MQTT client connect success!");
// 4.mqtt client subscribe
rc = MQTTSubscribe(&c, MQTT_CLIENT_SUB_TOPIC, QOS0, messageArrived);
require_noerr_string(rc, MQTT_reconnect, "ERROR: MQTT client subscribe err.");
mqtt_log("MQTT client subscribe success! recv_topic=[%s].", MQTT_CLIENT_SUB_TOPIC);
// 5. client loop for recv msg && keepalive
while (1){
no_mqtt_msg_exchange = true;
FD_ZERO(&readfds);
FD_SET(c.ipstack->my_socket, &readfds);
select(c.ipstack->my_socket   1, &readfds, NULL, NULL, &t);
// recv msg from server
if (FD_ISSET( c.ipstack->my_socket, &readfds)){
rc = MQTTYield(&c, (int)MQTT_YIELD_TMIE);
require_noerr(rc, MQTT_reconnect);
no_mqtt_msg_exchange = false;
}
//if no msg exchange, we need to check ping msg to keep alive
if (no_mqtt_msg_exchange){
rc = keepalive(&c);
require_noerr_string(rc, MQTT_reconnect, "ERROR: keep alive err");
}
}  
MQTT_reconnect:
mqtt_log("Disconnect MQTT client, and reconnect after 5s, reason: mqtt_rc = %d, err = %d", rc, err);
mqtt_client_release(&c, &n);
sleep(5);
goto MQTT_start;
exit:
mqtt_log("EXIT: MQTT client exit with err = %d.", err);
mqtt_client_release(&c, &n);
pthread_exit(NULL);
}
static OSStatus mqtt_msg_publish(Client *c, const char* topic, char qos, char retained,
const unsigned char* msg,
uint32_t msg_len){
OSStatus err = kUnknownErr;
int ret = 0;
MQTTMessage publishData = MQTTMessage_publishData_initializer;
require(topic && msg_len && msg, exit);
//upload data qos0
publishData.qos = qos;
publishData.retained = retained;
publishData.payload = (void*)msg;
publishData.payloadlen = msg_len;
ret = MQTTPublish(c, topic, &publishData);
if (MQTT_SUCCESS == ret){
err = kNoErr;
}else{
err = kUnknownErr;
}
exit:
return err;
}
void user_send_cb(void* data){
OSStatus err = kNoErr;
s_MQTT_Data_Packet_Info* p_send_msg = (s_MQTT_Data_Packet_Info*)data;
require_noerr_string((p_send_msg == NULL), exit, "没有内存可用");
err = mqtt_msg_publish(&c, p_send_msg->topic, p_send_msg->qos, p_send_msg->retained,
p_send_msg->data,
p_send_msg->datalen);
require_noerr_string(err, exit, "publish失败");
mqtt_log("MQTT publish data success! send_topic=[%s], msg=[%ld][%s].rn", p_send_msg->topic, p_send_msg->datalen, p_send_msg->data);
no_mqtt_msg_exchange = false; // 在当前情况下,多发一次或少发一次,无关紧要,无需用互斥锁
exit:
if (p_send_msg != NULL){
free(p_send_msg);  
p_send_msg = NULL;
}
}
int main(void){
// void *rval;
OSStatus err = kNoErr;
lhead = (struct head*)malloc(sizeof(struct head));
STAILQ_INIT(lhead);
pthread_t mqtt_client_handle, work_thread_Handle/*, timer_thread_Handle*/;
// 默认堆栈大小为8M, 嵌入式里太大,重新设置
pthread_attr_t attr;
err = pthread_attr_init(&attr);
require_noerr_string(err, exit, "ERROR: Unable to init thread attr.");
err = pthread_attr_setstacksize(&attr, 16384);// 堆栈大小不能小于16384Byte 
require_noerr_string(err, exit, "ERROR: Unable to set thread size.");
err = pthread_create(&mqtt_client_handle, &attr, mqtt_client_thread, NULL);
require_noerr_string(err, exit, "ERROR: Unable to start the mqtt client thread.");
err = pthread_create(&work_thread_Handle, &attr, work_thread, NULL);
require_noerr_string(err, exit, "ERROR: Unable to start the work thread.");
// err = pthread_create(&timer_thread_Handle, NULL, timer_thread, NULL);
// require_noerr_string(err, exit, "ERROR: Unable to start the timer thread.");
// loop = uv_default_loop();
// uv_timer_t period_timer;
// uv_timer_init(loop, &period_timer);
// uv_timer_start(&period_timer, user_send_handler, 0, 2000);
// err = uv_run(loop, UV_RUN_DEFAULT); 
// require_noerr_string(err, exit, "ERROR: Unable to run uv loop.");
// struct timerval interval;
// struct itimerval timer;
pthread_join(work_thread_Handle, NULL);
pthread_join(mqtt_client_handle, NULL);   重点,当主线程没有其他可执行的循环时,一定要加此句
pthread_attr_destroy(&attr);
exit :
if (err != kNoErr){
app_log("ERROR: app thread exit err: %d", err);   
}
free(lhead);
lhead  = NULL;
return err;
}

整个工程源码:

链接: https://pan.baidu.com/s/10w8a9X_7prtYyHsmMUj7Sw

提取码: 48aa

参考资料:

linux c MQTT客户端实现

https://www.jianshu.com/p/d309de966379

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

0 人点赞