linux mqtt客户端

linux mqtt客户端实现功能:(1)定时30s发送心跳包;(2)接收mqtt数据包,解析函数是user_recv_handle_cb;(3)定时PERIOD_TIME发布自身订阅的主题信息,即循环PERIOD_TIME发啥收啥。说明:(1)主要根据庆科的MiCO_A_v3.2.0/demos/net/mqtt_client的stm32freeRTOS移植到li…

大家好,又见面了,我是你们的朋友全栈君。如果您正在找激活码,请点击查看最新教程,关注关注公众号 “全栈程序员社区” 获取激活教程,可能之前旧版本教程已经失效.最新Idea2022.1教程亲测有效,一键激活。

Jetbrains全家桶1年46,售后保障稳定

实现功能:

(1)定时30s发送心跳包;

(2)接收 mqtt 数据包,解析函数是 user_recv_handle_cb;

(3)定时  PERIOD_TIME 发布 自身订阅的主题 信息,即循环 PERIOD_TIME 发啥收啥。

说明:

(1)主要根据  庆科的MiCO_A_v3.2.0/demos/net/mqtt_client 的 stm32  freeRTOS  移植到 linux 平台。

(2)实现方式:select、queue 、pthread。

核心源码:

/*************************************** 描述***********************
作者: lee
日期: 2019/7/2
文件名:mqtt_client.c
功能描述:
1.定时30s发送心跳包
2.接收 mqtt 数据包,解析函数是user_recv_handle_cb
3.定时  PERIOD_TIME   发布 自身订阅的主题 信息,即循环 PERIOD_TIME 发啥收啥
**********************************************************************/
#include "./libraries/protocols/mqtt/MQTTClient.h"
#include "/usr/local/include/uv.h"
#include "pthread.h"
#include "sys/select.h"
#include "sys/queue.h"
/*********************************
*              Macros
***********************************************/
#define app_log(M, ...) custom_log("APP", M, ##__VA_ARGS__)
#define mqtt_log(M, ...) custom_log("MQTT", M, ##__VA_ARGS__)
#define MQTT_CMD_TIMEOUT 5000 // 5s
#define MAX_MQTT_TOPIC_SIZE  (256)
#define MAX_MQTT_DATA_SIZE   (1024)
#define MQTT_SERVER "127.0.0.1"
//#define MQTT_SERVER "test.mosquitto.org"
#define MQTT_SERVER_PORT 1883
#define PERIOD_TIME   2000  // 2s
/***********************************************
*              Constants
***********************************************/
#define MQTT_CLIENT_ID  "MiCO_MQTT_Client"
#define MQTT_CLIENT_USERNAME NULL
#define MQTT_CLIENT_PASSWORD NULL
#define MQTT_CLIENT_KEEPALIVE 30
#define MQTT_CLIENT_SUB_TOPIC "mico/test/send" // loop msg
#define MQTT_CLIENT_PUB_TOPIC "mico/test/send"
#define MQTT_YIELD_TMIE 5000 // 5s
#define MQTT_CLIENT_PUB_MSG "mico_mqtt_client_test_data_1234567890"
/***********************************************
*              Structures
***********************************************/
typedef struct {   
char topic[MAX_MQTT_TOPIC_SIZE];
char qos;
char retained;
uint8_t data[MAX_MQTT_DATA_SIZE];
uint32_t datalen;
} s_MQTT_Data_Packet_Info;
struct node{
STAILQ_ENTRY(node) next;
void (*fp) (void*);  
void *data;
};
/***********************************************
*              Function Declarations
***********************************************/
void user_send_cb(void* data);
/***********************************************
*              Variables Definitions
***********************************************/
uv_req_t mqtt_client_recv_handle, mqtt_client_send_handle;
volatile static bool no_mqtt_msg_exchange = true;
Client c; // mqtt client object
Network n; // socket network for mqtt client
STAILQ_HEAD(head, node);
struct head *lhead = 0;
void user_recv_handle_cb(void* data){
s_MQTT_Data_Packet_Info *p_recv_msg = (s_MQTT_Data_Packet_Info *)data;
if (p_recv_msg)
{
实际工程中替换
app_log("\t\t\t\t\tuser get data success! from_topic=[%s], msg=[%ld][%s].\r\n", p_recv_msg->topic, p_recv_msg->datalen, p_recv_msg->data);  
free(p_recv_msg);
p_recv_msg = NULL;
}          
}
// call back, msg received from mqtt server
static void messageArrived(MessageData* md){
s_MQTT_Data_Packet_Info* p_recv_msg = NULL; 
MQTTMessage* pMessage = md->message;
p_recv_msg = (s_MQTT_Data_Packet_Info *)calloc(1, sizeof(s_MQTT_Data_Packet_Info));
if (p_recv_msg == NULL)  
{
mqtt_log("malloc内存分配不足");
return;
}
p_recv_msg->datalen = pMessage->payloadlen;
p_recv_msg->qos = pMessage->qos;
p_recv_msg->retained = pMessage->retained;
strncpy(p_recv_msg->topic, md->topicName->lenstring.data, md->topicName->lenstring.len);
memcpy(p_recv_msg->data, pMessage->payload, p_recv_msg->datalen + 1);  // lee:  !!!!!!!!!!!!!!!!!!!!   p_recv_msg->datalen + 1  不加1,会出现段错误 
// mqtt_client_recv_handle.data = &p_recv_msg;
// uv_queue_work(loop, &mqtt_client_recv_handle, user_recv_handle_cb, NULL);    //  lee !!!!!!!!!!!!!!!!!!!! 发现  libuv中的工作队列不能和select混用
struct node process_func;
process_func.data = p_recv_msg;
process_func.fp = user_recv_handle_cb;
STAILQ_INSERT_TAIL(lhead, &process_func, next); 
p_recv_msg = NULL; 
}
static OSStatus mqtt_client_release(Client *c, Network *n){
OSStatus err = kNoErr;
if (c->isconnected) MQTTDisconnect(c);
n->disconnect(n);// close connection
if (MQTT_SUCCESS != MQTTClientDeinit(c)){
app_log("MQTTClientDeinit failed!");
err = kDeletedErr;
}
return err;
}
void* work_thread(void* arg){
s_MQTT_Data_Packet_Info* p_send_msg = NULL;
Timer period_timer;
InitTimer(&period_timer);
countdown_ms(&period_timer, PERIOD_TIME);
while (1)
{
if (!STAILQ_EMPTY(lhead)){
struct node* pfirst_node = STAILQ_FIRST(lhead);
pfirst_node->fp(pfirst_node->data);
STAILQ_REMOVE_HEAD(lhead, next);
} 
while (expired(&period_timer))
{
if (c.isconnected){
p_send_msg = (s_MQTT_Data_Packet_Info*) calloc(1, sizeof(s_MQTT_Data_Packet_Info));                
if (p_send_msg == NULL) {
mqtt_log("没有内存可用");
continue;
}
p_send_msg->qos = 0;
p_send_msg->retained = 0;
p_send_msg->datalen = strlen(MQTT_CLIENT_PUB_MSG);
memcpy(p_send_msg->data, MQTT_CLIENT_PUB_MSG, p_send_msg->datalen);
strncpy(p_send_msg->topic, MQTT_CLIENT_PUB_TOPIC, MAX_MQTT_TOPIC_SIZE);
struct node process_func;
process_func.data = p_send_msg;
process_func.fp = user_send_cb;
STAILQ_INSERT_TAIL(lhead, &process_func, next); 
p_send_msg = NULL;
}
else
{
mqtt_log("MQTT client does not init ok");
}
countdown_ms(&period_timer, PERIOD_TIME);   
} 
}
pthread_exit(NULL);   
}
void* mqtt_client_thread(void* arg){
OSStatus err = kUnknownErr;
int rc = -1;
fd_set readfds;
struct timeval t = { 0, MQTT_YIELD_TMIE * 1000};
ssl_opts ssl_settings;
MQTTPacket_connectData connectData = MQTTPacket_connectData_initializer;
memset(&c, 0, sizeof(c));
memset(&n, 0, sizeof(n));
MQTT_start:
// create network connection
ssl_settings.ssl_enable = false;
mqtt_log("enter into mqtt client thread.");
while (1){
rc = NewNetwork(&n, MQTT_SERVER, MQTT_SERVER_PORT, ssl_settings);
if (rc == MQTT_SUCCESS) break;
mqtt_log("ERROR: MQTT network connection err=%d, reconnect after 3s...", rc);
sleep(3);
}
mqtt_log("MQTT network connection success!");
// 2.init mqtt client
rc = MQTTClientInit(&c, &n, MQTT_CMD_TIMEOUT);
require_noerr_string(rc, MQTT_reconnect, "ERROR: MQTT client init err.");
mqtt_log("MQTT client init success!");
// 3.create mqtt client connection
connectData.willFlag = 0;
connectData.MQTTVersion = 4; // 3: 3.1, 4: v3.1.1
connectData.clientID.cstring = MQTT_CLIENT_ID;
connectData.username.cstring = MQTT_CLIENT_USERNAME;
connectData.password.cstring = MQTT_CLIENT_PASSWORD;
connectData.keepAliveInterval = MQTT_CLIENT_KEEPALIVE;
connectData.cleansession = 1;
rc = MQTTConnect(&c, &connectData);
require_noerr_string(rc, MQTT_reconnect, "ERROR: MQTT client connect err.");
mqtt_log("MQTT client connect success!");
// 4.mqtt client subscribe
rc = MQTTSubscribe(&c, MQTT_CLIENT_SUB_TOPIC, QOS0, messageArrived);
require_noerr_string(rc, MQTT_reconnect, "ERROR: MQTT client subscribe err.");
mqtt_log("MQTT client subscribe success! recv_topic=[%s].", MQTT_CLIENT_SUB_TOPIC);
// 5. client loop for recv msg && keepalive
while (1){
no_mqtt_msg_exchange = true;
FD_ZERO(&readfds);
FD_SET(c.ipstack->my_socket, &readfds);
select(c.ipstack->my_socket + 1, &readfds, NULL, NULL, &t);
// recv msg from server
if (FD_ISSET( c.ipstack->my_socket, &readfds)){
rc = MQTTYield(&c, (int)MQTT_YIELD_TMIE);
require_noerr(rc, MQTT_reconnect);
no_mqtt_msg_exchange = false;
}
//if no msg exchange, we need to check ping msg to keep alive
if (no_mqtt_msg_exchange){
rc = keepalive(&c);
require_noerr_string(rc, MQTT_reconnect, "ERROR: keep alive err");
}
}  
MQTT_reconnect:
mqtt_log("Disconnect MQTT client, and reconnect after 5s, reason: mqtt_rc = %d, err = %d", rc, err);
mqtt_client_release(&c, &n);
sleep(5);
goto MQTT_start;
exit:
mqtt_log("EXIT: MQTT client exit with err = %d.", err);
mqtt_client_release(&c, &n);
pthread_exit(NULL);
}
static OSStatus mqtt_msg_publish(Client *c, const char* topic, char qos, char retained,
const unsigned char* msg,
uint32_t msg_len){
OSStatus err = kUnknownErr;
int ret = 0;
MQTTMessage publishData = MQTTMessage_publishData_initializer;
require(topic && msg_len && msg, exit);
//upload data qos0
publishData.qos = qos;
publishData.retained = retained;
publishData.payload = (void*)msg;
publishData.payloadlen = msg_len;
ret = MQTTPublish(c, topic, &publishData);
if (MQTT_SUCCESS == ret){
err = kNoErr;
}else{
err = kUnknownErr;
}
exit:
return err;
}
void user_send_cb(void* data){
OSStatus err = kNoErr;
s_MQTT_Data_Packet_Info* p_send_msg = (s_MQTT_Data_Packet_Info*)data;
require_noerr_string((p_send_msg == NULL), exit, "没有内存可用");
err = mqtt_msg_publish(&c, p_send_msg->topic, p_send_msg->qos, p_send_msg->retained,
p_send_msg->data,
p_send_msg->datalen);
require_noerr_string(err, exit, "publish失败");
mqtt_log("MQTT publish data success! send_topic=[%s], msg=[%ld][%s].\r\n", p_send_msg->topic, p_send_msg->datalen, p_send_msg->data);
no_mqtt_msg_exchange = false; // 在当前情况下,多发一次或少发一次,无关紧要,无需用互斥锁
exit:
if (p_send_msg != NULL){
free(p_send_msg);  
p_send_msg = NULL;
}
}
int main(void){
// void *rval;
OSStatus err = kNoErr;
lhead = (struct head*)malloc(sizeof(struct head));
STAILQ_INIT(lhead);
pthread_t mqtt_client_handle, work_thread_Handle/*, timer_thread_Handle*/;
// 默认堆栈大小为8M, 嵌入式里太大,重新设置
pthread_attr_t attr;
err = pthread_attr_init(&attr);
require_noerr_string(err, exit, "ERROR: Unable to init thread attr.");
err = pthread_attr_setstacksize(&attr, 16384);// 堆栈大小不能小于16384Byte 
require_noerr_string(err, exit, "ERROR: Unable to set thread size.");
err = pthread_create(&mqtt_client_handle, &attr, mqtt_client_thread, NULL);
require_noerr_string(err, exit, "ERROR: Unable to start the mqtt client thread.");
err = pthread_create(&work_thread_Handle, &attr, work_thread, NULL);
require_noerr_string(err, exit, "ERROR: Unable to start the work thread.");
// err = pthread_create(&timer_thread_Handle, NULL, timer_thread, NULL);
// require_noerr_string(err, exit, "ERROR: Unable to start the timer thread.");
// loop = uv_default_loop();
// uv_timer_t period_timer;
// uv_timer_init(loop, &period_timer);
// uv_timer_start(&period_timer, user_send_handler, 0, 2000);
// err = uv_run(loop, UV_RUN_DEFAULT); 
// require_noerr_string(err, exit, "ERROR: Unable to run uv loop.");
// struct timerval interval;
// struct itimerval timer;
pthread_join(work_thread_Handle, NULL);
pthread_join(mqtt_client_handle, NULL);   重点,当主线程没有其他可执行的循环时,一定要加此句
pthread_attr_destroy(&attr);
exit :
if (err != kNoErr){
app_log("ERROR: app thread exit err: %d", err);   
}
free(lhead);
lhead  = NULL;
return err;
}

Jetbrains全家桶1年46,售后保障稳定

整个工程源码:

链接: https://pan.baidu.com/s/10w8a9X_7prtYyHsmMUj7Sw   

提取码: 48aa 

参考资料:

linux c MQTT客户端实现

https://www.jianshu.com/p/d309de966379

 

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

发布者:全栈程序员-用户IM,转载请注明出处:https://javaforall.cn/206802.html原文链接:https://javaforall.cn

【正版授权,激活自己账号】: Jetbrains全家桶Ide使用,1年售后保障,每天仅需1毛

【官方授权 正版激活】: 官方授权 正版激活 支持Jetbrains家族下所有IDE 使用个人JB账号...

(0)


相关推荐

  • wireshark找不到接口win10_安装打印机找不到usb接口

    wireshark找不到接口win10_安装打印机找不到usb接口Win10下使用WireShark出现没有找到接口问题,无法抓取数据包解决:安装Win10Pcap。到http://www.win10pcap.org/download/下载该软件安装完成后,重启WireShark

    2022年10月24日
  • 测试用例设计的八大要素「建议收藏」

    测试用例设计的八大要素「建议收藏」1、测试用例的八大要素用例编号和其他编号一样,测试用例编号是用来唯一识别测试用例的编号,要求具有易识别和易维护性,用户可以很容易根据用例编号获取到相应用例的目的和作用,在系统测试用例中,编号的一般格式为A-B-C-D这几部分的作用分别如下:A:产品或项目类型,如CMS(内容管理系统)、CRM(客户关系管理系统)B:一般用来说明用例的属性,如ST(系统测试)、IT(集成测试)、UT(单元测试)C:测试需求的表示,说明该用例针对的需求点,可包括测试项和测试子项等,如文档管理、客户投诉信息管理等。

  • 【软件工具】服务器硬件资源监控

    【软件工具】服务器硬件资源监控服务器资源使用情况及硬件监控,是服务器管理员或运维人员必备的技能和工作内容。对于服务器硬件的时时监控,除了需要掌握定的方法外,还常会用到些相应的相关软件程序。当然,运维同仁般都具备定的编程能力,根据服务器情况,编写个便捷、好用和适合自己的服务器硬件监控软件也是很有必要的。     服务器硬件监控常用方法及相关软件:  raid卡监控:raid卡常有rai

  • Spring boot 使用Jasypt加密用户名密码

    Spring boot 使用Jasypt加密用户名密码

  • Java虚拟机:Java中堆和栈的详细区别

    Java虚拟机:Java中堆和栈的详细区别

  • oracle触发器报错语法,Oracle 触发器

    oracle触发器报错语法,Oracle 触发器Oracle触发器是使用者对Oracle数据库的对象做特定的操作时,触发的一段PL/SQL程序代码,叫做触发器。触发的事件包括对表的DML操作,用户的DDL操作以及数据库事件等。一、触发器的作用Oracle触发器可以根据不同的数据库事件进行特定的调用触发器程序块,因此,它可以帮助开发者完成一些PL/SQL存储过程完成不了的问题,比如操作日志的记录、防止一些无效的操作、校验数据的正确性、限制一些对数…

发表回复

您的电子邮箱地址不会被公开。

关注全栈程序员社区公众号