linux mqtt客户端

linux mqtt客户端实现功能:(1)定时30s发送心跳包;(2)接收mqtt数据包,解析函数是user_recv_handle_cb;(3)定时PERIOD_TIME发布自身订阅的主题信息,即循环PERIOD_TIME发啥收啥。说明:(1)主要根据庆科的MiCO_A_v3.2.0/demos/net/mqtt_client的stm32freeRTOS移植到li…

大家好,又见面了,我是你们的朋友全栈君。如果您正在找激活码,请点击查看最新教程,关注关注公众号 “全栈程序员社区” 获取激活教程,可能之前旧版本教程已经失效.最新Idea2022.1教程亲测有效,一键激活。

Jetbrains全家桶1年46,售后保障稳定

实现功能:

(1)定时30s发送心跳包;

(2)接收 mqtt 数据包,解析函数是 user_recv_handle_cb;

(3)定时  PERIOD_TIME 发布 自身订阅的主题 信息,即循环 PERIOD_TIME 发啥收啥。

说明:

(1)主要根据  庆科的MiCO_A_v3.2.0/demos/net/mqtt_client 的 stm32  freeRTOS  移植到 linux 平台。

(2)实现方式:select、queue 、pthread。

核心源码:

/*************************************** 描述***********************
作者: lee
日期: 2019/7/2
文件名:mqtt_client.c
功能描述:
    1.定时30s发送心跳包
    2.接收 mqtt 数据包,解析函数是user_recv_handle_cb
    3.定时  PERIOD_TIME   发布 自身订阅的主题 信息,即循环 PERIOD_TIME 发啥收啥

**********************************************************************/
#include "./libraries/protocols/mqtt/MQTTClient.h"
#include "/usr/local/include/uv.h"
#include "pthread.h"
#include "sys/select.h"
#include "sys/queue.h"

/*********************************
 *              Macros
 ***********************************************/
#define app_log(M, ...) custom_log("APP", M, ##__VA_ARGS__)
#define mqtt_log(M, ...) custom_log("MQTT", M, ##__VA_ARGS__)

#define MQTT_CMD_TIMEOUT 5000 // 5s

#define MAX_MQTT_TOPIC_SIZE  (256)
#define MAX_MQTT_DATA_SIZE   (1024)

#define MQTT_SERVER "127.0.0.1"
//#define MQTT_SERVER "test.mosquitto.org"
#define MQTT_SERVER_PORT 1883

#define PERIOD_TIME   2000  // 2s

/***********************************************
 *              Constants
 ***********************************************/
#define MQTT_CLIENT_ID  "MiCO_MQTT_Client"
#define MQTT_CLIENT_USERNAME NULL
#define MQTT_CLIENT_PASSWORD NULL
#define MQTT_CLIENT_KEEPALIVE 30
#define MQTT_CLIENT_SUB_TOPIC "mico/test/send" // loop msg
#define MQTT_CLIENT_PUB_TOPIC "mico/test/send"
#define MQTT_YIELD_TMIE 5000 // 5s
#define MQTT_CLIENT_PUB_MSG "mico_mqtt_client_test_data_1234567890"

/***********************************************
 *              Structures
 ***********************************************/
typedef struct {   
    char topic[MAX_MQTT_TOPIC_SIZE];
    char qos;
    char retained;

    uint8_t data[MAX_MQTT_DATA_SIZE];
    uint32_t datalen;
} s_MQTT_Data_Packet_Info;

struct node{
    STAILQ_ENTRY(node) next;
    void (*fp) (void*);  
    void *data;
};

/***********************************************
 *              Function Declarations
 ***********************************************/
void user_send_cb(void* data);



/***********************************************
 *              Variables Definitions
 ***********************************************/
 uv_req_t mqtt_client_recv_handle, mqtt_client_send_handle;

volatile static bool no_mqtt_msg_exchange = true;

Client c; // mqtt client object
Network n; // socket network for mqtt client

STAILQ_HEAD(head, node);
struct head *lhead = 0;

void user_recv_handle_cb(void* data){
    s_MQTT_Data_Packet_Info *p_recv_msg = (s_MQTT_Data_Packet_Info *)data;

    if (p_recv_msg)
    {
        实际工程中替换
        app_log("\t\t\t\t\tuser get data success! from_topic=[%s], msg=[%ld][%s].\r\n", p_recv_msg->topic, p_recv_msg->datalen, p_recv_msg->data);  

        free(p_recv_msg);
        p_recv_msg = NULL;
    }          
}

// call back, msg received from mqtt server
static void messageArrived(MessageData* md){
    s_MQTT_Data_Packet_Info* p_recv_msg = NULL; 

    MQTTMessage* pMessage = md->message;

    p_recv_msg = (s_MQTT_Data_Packet_Info *)calloc(1, sizeof(s_MQTT_Data_Packet_Info));
    if (p_recv_msg == NULL)  
    {
        mqtt_log("malloc内存分配不足");
        return;
    }
    p_recv_msg->datalen = pMessage->payloadlen;
    p_recv_msg->qos = pMessage->qos;
    p_recv_msg->retained = pMessage->retained;
    strncpy(p_recv_msg->topic, md->topicName->lenstring.data, md->topicName->lenstring.len);
    memcpy(p_recv_msg->data, pMessage->payload, p_recv_msg->datalen + 1);  // lee:  !!!!!!!!!!!!!!!!!!!!   p_recv_msg->datalen + 1  不加1,会出现段错误 

    // mqtt_client_recv_handle.data = &p_recv_msg;
    // uv_queue_work(loop, &mqtt_client_recv_handle, user_recv_handle_cb, NULL);    //  lee !!!!!!!!!!!!!!!!!!!! 发现  libuv中的工作队列不能和select混用
    
    struct node process_func;
    process_func.data = p_recv_msg;
    process_func.fp = user_recv_handle_cb;
    STAILQ_INSERT_TAIL(lhead, &process_func, next); 

    p_recv_msg = NULL; 
}

static OSStatus mqtt_client_release(Client *c, Network *n){
    OSStatus err = kNoErr;
    
    if (c->isconnected) MQTTDisconnect(c);

    n->disconnect(n);// close connection

    if (MQTT_SUCCESS != MQTTClientDeinit(c)){
        app_log("MQTTClientDeinit failed!");
        err = kDeletedErr;
    }

    return err;
}

void* work_thread(void* arg){
    s_MQTT_Data_Packet_Info* p_send_msg = NULL;

    Timer period_timer;
    InitTimer(&period_timer);
    countdown_ms(&period_timer, PERIOD_TIME);

    while (1)
    {
        if (!STAILQ_EMPTY(lhead)){
            struct node* pfirst_node = STAILQ_FIRST(lhead);
            pfirst_node->fp(pfirst_node->data);
            STAILQ_REMOVE_HEAD(lhead, next);
        } 
        
        while (expired(&period_timer))
        {
            if (c.isconnected){
                
                p_send_msg = (s_MQTT_Data_Packet_Info*) calloc(1, sizeof(s_MQTT_Data_Packet_Info));                
                if (p_send_msg == NULL) {
                    mqtt_log("没有内存可用");
                    continue;
                }
                
                p_send_msg->qos = 0;
                p_send_msg->retained = 0;
                p_send_msg->datalen = strlen(MQTT_CLIENT_PUB_MSG);
                memcpy(p_send_msg->data, MQTT_CLIENT_PUB_MSG, p_send_msg->datalen);
                strncpy(p_send_msg->topic, MQTT_CLIENT_PUB_TOPIC, MAX_MQTT_TOPIC_SIZE);

                struct node process_func;
                process_func.data = p_send_msg;
                process_func.fp = user_send_cb;
                STAILQ_INSERT_TAIL(lhead, &process_func, next); 
            
                p_send_msg = NULL;
            }
            else
            {
                mqtt_log("MQTT client does not init ok");
            }
            
            countdown_ms(&period_timer, PERIOD_TIME);   
        } 
    }
    pthread_exit(NULL);   
}


void* mqtt_client_thread(void* arg){
    OSStatus err = kUnknownErr;

    int rc = -1;
    fd_set readfds;
    struct timeval t = { 0, MQTT_YIELD_TMIE * 1000};

    ssl_opts ssl_settings;
    MQTTPacket_connectData connectData = MQTTPacket_connectData_initializer;

    memset(&c, 0, sizeof(c));
    memset(&n, 0, sizeof(n));

MQTT_start:
    // create network connection
    ssl_settings.ssl_enable = false;

    mqtt_log("enter into mqtt client thread.");
    while (1){
        rc = NewNetwork(&n, MQTT_SERVER, MQTT_SERVER_PORT, ssl_settings);
        if (rc == MQTT_SUCCESS) break;
        mqtt_log("ERROR: MQTT network connection err=%d, reconnect after 3s...", rc);
        sleep(3);
    }

    mqtt_log("MQTT network connection success!");

    // 2.init mqtt client
    rc = MQTTClientInit(&c, &n, MQTT_CMD_TIMEOUT);
    require_noerr_string(rc, MQTT_reconnect, "ERROR: MQTT client init err.");

    mqtt_log("MQTT client init success!");

    // 3.create mqtt client connection
    connectData.willFlag = 0;
    connectData.MQTTVersion = 4; // 3: 3.1, 4: v3.1.1
    connectData.clientID.cstring = MQTT_CLIENT_ID;
    connectData.username.cstring = MQTT_CLIENT_USERNAME;
    connectData.password.cstring = MQTT_CLIENT_PASSWORD;
    connectData.keepAliveInterval = MQTT_CLIENT_KEEPALIVE;
    connectData.cleansession = 1;

    rc = MQTTConnect(&c, &connectData);
    require_noerr_string(rc, MQTT_reconnect, "ERROR: MQTT client connect err.");

    mqtt_log("MQTT client connect success!");

    // 4.mqtt client subscribe
    rc = MQTTSubscribe(&c, MQTT_CLIENT_SUB_TOPIC, QOS0, messageArrived);
    require_noerr_string(rc, MQTT_reconnect, "ERROR: MQTT client subscribe err.");

    mqtt_log("MQTT client subscribe success! recv_topic=[%s].", MQTT_CLIENT_SUB_TOPIC);

    // 5. client loop for recv msg && keepalive
    while (1){
        no_mqtt_msg_exchange = true;
        FD_ZERO(&readfds);
        FD_SET(c.ipstack->my_socket, &readfds);
        select(c.ipstack->my_socket + 1, &readfds, NULL, NULL, &t);
        
        // recv msg from server
        if (FD_ISSET( c.ipstack->my_socket, &readfds)){
            rc = MQTTYield(&c, (int)MQTT_YIELD_TMIE);
            require_noerr(rc, MQTT_reconnect);
            no_mqtt_msg_exchange = false;
        }
    
        //if no msg exchange, we need to check ping msg to keep alive
        if (no_mqtt_msg_exchange){
            rc = keepalive(&c);
                require_noerr_string(rc, MQTT_reconnect, "ERROR: keep alive err");
        }
            
    }  

MQTT_reconnect:
    mqtt_log("Disconnect MQTT client, and reconnect after 5s, reason: mqtt_rc = %d, err = %d", rc, err);
    mqtt_client_release(&c, &n);
    sleep(5);
    goto MQTT_start;

exit:
    mqtt_log("EXIT: MQTT client exit with err = %d.", err);
    mqtt_client_release(&c, &n);
    pthread_exit(NULL);
}

static OSStatus mqtt_msg_publish(Client *c, const char* topic, char qos, char retained,
                                const unsigned char* msg,
                                uint32_t msg_len){
    OSStatus err = kUnknownErr;
    int ret = 0;
    MQTTMessage publishData = MQTTMessage_publishData_initializer;

    require(topic && msg_len && msg, exit);

    //upload data qos0
    publishData.qos = qos;
    publishData.retained = retained;
    publishData.payload = (void*)msg;
    publishData.payloadlen = msg_len;

    ret = MQTTPublish(c, topic, &publishData);

    if (MQTT_SUCCESS == ret){
        err = kNoErr;
    }else{
        err = kUnknownErr;
    }

exit:
    return err;
}

void user_send_cb(void* data){
    OSStatus err = kNoErr;

    s_MQTT_Data_Packet_Info* p_send_msg = (s_MQTT_Data_Packet_Info*)data;
    require_noerr_string((p_send_msg == NULL), exit, "没有内存可用");
   
    err = mqtt_msg_publish(&c, p_send_msg->topic, p_send_msg->qos, p_send_msg->retained,
                        p_send_msg->data,
                        p_send_msg->datalen);
    require_noerr_string(err, exit, "publish失败");

    mqtt_log("MQTT publish data success! send_topic=[%s], msg=[%ld][%s].\r\n", p_send_msg->topic, p_send_msg->datalen, p_send_msg->data);

    no_mqtt_msg_exchange = false; // 在当前情况下,多发一次或少发一次,无关紧要,无需用互斥锁
              
exit:
    if (p_send_msg != NULL){
        free(p_send_msg);  
        p_send_msg = NULL;
    }

}


int main(void){
    // void *rval;
    
    OSStatus err = kNoErr;

    lhead = (struct head*)malloc(sizeof(struct head));
    STAILQ_INIT(lhead);
    
    pthread_t mqtt_client_handle, work_thread_Handle/*, timer_thread_Handle*/;
    
    // 默认堆栈大小为8M, 嵌入式里太大,重新设置
    pthread_attr_t attr;
    err = pthread_attr_init(&attr);
    require_noerr_string(err, exit, "ERROR: Unable to init thread attr.");

    err = pthread_attr_setstacksize(&attr, 16384);// 堆栈大小不能小于16384Byte 
    require_noerr_string(err, exit, "ERROR: Unable to set thread size.");

    err = pthread_create(&mqtt_client_handle, &attr, mqtt_client_thread, NULL);
    require_noerr_string(err, exit, "ERROR: Unable to start the mqtt client thread.");

    err = pthread_create(&work_thread_Handle, &attr, work_thread, NULL);
    require_noerr_string(err, exit, "ERROR: Unable to start the work thread.");

    // err = pthread_create(&timer_thread_Handle, NULL, timer_thread, NULL);
    // require_noerr_string(err, exit, "ERROR: Unable to start the timer thread.");
    // loop = uv_default_loop();
    
    // uv_timer_t period_timer;
    // uv_timer_init(loop, &period_timer);
    // uv_timer_start(&period_timer, user_send_handler, 0, 2000);

    // err = uv_run(loop, UV_RUN_DEFAULT); 
    // require_noerr_string(err, exit, "ERROR: Unable to run uv loop.");
    // struct timerval interval;
    // struct itimerval timer;

    pthread_join(work_thread_Handle, NULL);
    pthread_join(mqtt_client_handle, NULL);   重点,当主线程没有其他可执行的循环时,一定要加此句

    pthread_attr_destroy(&attr);

exit :
    if (err != kNoErr){
        app_log("ERROR: app thread exit err: %d", err);   
    }

    free(lhead);
    lhead  = NULL;

    return err;
}

Jetbrains全家桶1年46,售后保障稳定

整个工程源码:

链接: https://pan.baidu.com/s/10w8a9X_7prtYyHsmMUj7Sw   

提取码: 48aa 

参考资料:

linux c MQTT客户端实现

https://www.jianshu.com/p/d309de966379

 

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

发布者:全栈程序员-用户IM,转载请注明出处:https://javaforall.cn/206802.html原文链接:https://javaforall.cn

【正版授权,激活自己账号】: Jetbrains全家桶Ide使用,1年售后保障,每天仅需1毛

【官方授权 正版激活】: 官方授权 正版激活 支持Jetbrains家族下所有IDE 使用个人JB账号...

(0)


相关推荐

  • winrar3.7-winrar4.0的注冊码[通俗易懂]

    winrar3.7-winrar4.0的注冊码[通俗易懂]首先新建记事本文件(txt文件),把下面红色代码复制进去,然后将文件另存为以rarreg.key为文件名称的文件(当然因为设置的不同,可能出现你保存后的文件为rarreg.key.txt没关系

  • 浅析MySQL中concat及group_concat的使用

    浅析MySQL中concat及group_concat的使用

  • 数据类型转换的类是Convert_c++类型转换

    数据类型转换的类是Convert_c++类型转换C#数据类型和类型转换Convert.ToInt16与Convert.ToInt32区别版权声明:本文由 群燕小站 原创,转载请注明【转自:群燕小站(http://www.zqunyan.com);原文链接: http://www.zqunyan.com/79.html】取值的范围不同:int16:-32768到32767 int32:-2,147,483,648到2…

  • mybatis的逆向工程_mybatis逆向工程多表查询

    mybatis的逆向工程_mybatis逆向工程多表查询逆向工程字面意思就是反向生成工程,和hibernate一样mybatis也有自己的逆向工程工具,hibernate的逆向生成我没有做过,不过我猜大概都已样,再说,hibernate的现在使用很少了,到了使用的时候再去用吧,使用逆向工程时,需要注意的是表之间的关系无法映射出来!也就是说mybatis的逆向工程生成的都是单表操作,1:mybatis逆向工程开发文档:http://www.mybati

  • java 上传文件接口_Java接口实现文件上传

    java 上传文件接口_Java接口实现文件上传因工作需要,在后台管理页面加入一个上传文件的模块,虽然接口的代码很简单,但实现期间遇到了一些比较有趣的坑,特记录下来。需求实现文件上传,并提供一个可供下载的路径。想法文件上传代码暂且不谈,先说说文件放在服务器什么位置比较合适。我首先想到的是两个地方:tomcat的webapps/ROOT目录下,如果放在这个目录下,数量少了还好,一旦数量多了,必定会影响tomcat本身的运行速度。这个虽然可用但不可…

  • 美国网件对KRACK WPA2安全漏洞做出回应:表示正积极跟进修复[通俗易懂]

    美国网件对KRACK WPA2安全漏洞做出回应:表示正积极跟进修复[通俗易懂]NETGEAR美国网件意识到最近公布的安全漏洞KRACK,它利用了WPA2(WiFi保护访问II)中的安全漏洞。美国网件现正在积极跟进修复,已更新了多个产品的修复程序,请浏览美国网件官方网站(https://kb.netgear.com/000049498/Security-Advisory-for-WPA-2-Vulnerabilities-PSV-2017-2826-PSV-2017-2836…

发表回复

您的电子邮箱地址不会被公开。

关注全栈程序员社区公众号