大家好,又见面了,我是你们的朋友全栈君。如果您正在找激活码,请点击查看最新教程,关注关注公众号 “全栈程序员社区” 获取激活教程,可能之前旧版本教程已经失效.最新Idea2022.1教程亲测有效,一键激活。
Jetbrains全家桶1年46,售后保障稳定
实现功能:
(1)定时30s发送心跳包;
(2)接收 mqtt 数据包,解析函数是 user_recv_handle_cb;
(3)定时 PERIOD_TIME 发布 自身订阅的主题 信息,即循环 PERIOD_TIME 发啥收啥。
说明:
(1)主要根据 庆科的MiCO_A_v3.2.0/demos/net/mqtt_client 的 stm32 freeRTOS 移植到 linux 平台。
(2)实现方式:select、queue 、pthread。
核心源码:
/*************************************** 描述***********************
作者: lee
日期: 2019/7/2
文件名:mqtt_client.c
功能描述:
1.定时30s发送心跳包
2.接收 mqtt 数据包,解析函数是user_recv_handle_cb
3.定时 PERIOD_TIME 发布 自身订阅的主题 信息,即循环 PERIOD_TIME 发啥收啥
**********************************************************************/
#include "./libraries/protocols/mqtt/MQTTClient.h"
#include "/usr/local/include/uv.h"
#include "pthread.h"
#include "sys/select.h"
#include "sys/queue.h"
/*********************************
* Macros
***********************************************/
#define app_log(M, ...) custom_log("APP", M, ##__VA_ARGS__)
#define mqtt_log(M, ...) custom_log("MQTT", M, ##__VA_ARGS__)
#define MQTT_CMD_TIMEOUT 5000 // 5s
#define MAX_MQTT_TOPIC_SIZE (256)
#define MAX_MQTT_DATA_SIZE (1024)
#define MQTT_SERVER "127.0.0.1"
//#define MQTT_SERVER "test.mosquitto.org"
#define MQTT_SERVER_PORT 1883
#define PERIOD_TIME 2000 // 2s
/***********************************************
* Constants
***********************************************/
#define MQTT_CLIENT_ID "MiCO_MQTT_Client"
#define MQTT_CLIENT_USERNAME NULL
#define MQTT_CLIENT_PASSWORD NULL
#define MQTT_CLIENT_KEEPALIVE 30
#define MQTT_CLIENT_SUB_TOPIC "mico/test/send" // loop msg
#define MQTT_CLIENT_PUB_TOPIC "mico/test/send"
#define MQTT_YIELD_TMIE 5000 // 5s
#define MQTT_CLIENT_PUB_MSG "mico_mqtt_client_test_data_1234567890"
/***********************************************
* Structures
***********************************************/
typedef struct {
char topic[MAX_MQTT_TOPIC_SIZE];
char qos;
char retained;
uint8_t data[MAX_MQTT_DATA_SIZE];
uint32_t datalen;
} s_MQTT_Data_Packet_Info;
struct node{
STAILQ_ENTRY(node) next;
void (*fp) (void*);
void *data;
};
/***********************************************
* Function Declarations
***********************************************/
void user_send_cb(void* data);
/***********************************************
* Variables Definitions
***********************************************/
uv_req_t mqtt_client_recv_handle, mqtt_client_send_handle;
volatile static bool no_mqtt_msg_exchange = true;
Client c; // mqtt client object
Network n; // socket network for mqtt client
STAILQ_HEAD(head, node);
struct head *lhead = 0;
void user_recv_handle_cb(void* data){
s_MQTT_Data_Packet_Info *p_recv_msg = (s_MQTT_Data_Packet_Info *)data;
if (p_recv_msg)
{
实际工程中替换
app_log("\t\t\t\t\tuser get data success! from_topic=[%s], msg=[%ld][%s].\r\n", p_recv_msg->topic, p_recv_msg->datalen, p_recv_msg->data);
free(p_recv_msg);
p_recv_msg = NULL;
}
}
// call back, msg received from mqtt server
static void messageArrived(MessageData* md){
s_MQTT_Data_Packet_Info* p_recv_msg = NULL;
MQTTMessage* pMessage = md->message;
p_recv_msg = (s_MQTT_Data_Packet_Info *)calloc(1, sizeof(s_MQTT_Data_Packet_Info));
if (p_recv_msg == NULL)
{
mqtt_log("malloc内存分配不足");
return;
}
p_recv_msg->datalen = pMessage->payloadlen;
p_recv_msg->qos = pMessage->qos;
p_recv_msg->retained = pMessage->retained;
strncpy(p_recv_msg->topic, md->topicName->lenstring.data, md->topicName->lenstring.len);
memcpy(p_recv_msg->data, pMessage->payload, p_recv_msg->datalen + 1); // lee: !!!!!!!!!!!!!!!!!!!! p_recv_msg->datalen + 1 不加1,会出现段错误
// mqtt_client_recv_handle.data = &p_recv_msg;
// uv_queue_work(loop, &mqtt_client_recv_handle, user_recv_handle_cb, NULL); // lee !!!!!!!!!!!!!!!!!!!! 发现 libuv中的工作队列不能和select混用
struct node process_func;
process_func.data = p_recv_msg;
process_func.fp = user_recv_handle_cb;
STAILQ_INSERT_TAIL(lhead, &process_func, next);
p_recv_msg = NULL;
}
static OSStatus mqtt_client_release(Client *c, Network *n){
OSStatus err = kNoErr;
if (c->isconnected) MQTTDisconnect(c);
n->disconnect(n);// close connection
if (MQTT_SUCCESS != MQTTClientDeinit(c)){
app_log("MQTTClientDeinit failed!");
err = kDeletedErr;
}
return err;
}
void* work_thread(void* arg){
s_MQTT_Data_Packet_Info* p_send_msg = NULL;
Timer period_timer;
InitTimer(&period_timer);
countdown_ms(&period_timer, PERIOD_TIME);
while (1)
{
if (!STAILQ_EMPTY(lhead)){
struct node* pfirst_node = STAILQ_FIRST(lhead);
pfirst_node->fp(pfirst_node->data);
STAILQ_REMOVE_HEAD(lhead, next);
}
while (expired(&period_timer))
{
if (c.isconnected){
p_send_msg = (s_MQTT_Data_Packet_Info*) calloc(1, sizeof(s_MQTT_Data_Packet_Info));
if (p_send_msg == NULL) {
mqtt_log("没有内存可用");
continue;
}
p_send_msg->qos = 0;
p_send_msg->retained = 0;
p_send_msg->datalen = strlen(MQTT_CLIENT_PUB_MSG);
memcpy(p_send_msg->data, MQTT_CLIENT_PUB_MSG, p_send_msg->datalen);
strncpy(p_send_msg->topic, MQTT_CLIENT_PUB_TOPIC, MAX_MQTT_TOPIC_SIZE);
struct node process_func;
process_func.data = p_send_msg;
process_func.fp = user_send_cb;
STAILQ_INSERT_TAIL(lhead, &process_func, next);
p_send_msg = NULL;
}
else
{
mqtt_log("MQTT client does not init ok");
}
countdown_ms(&period_timer, PERIOD_TIME);
}
}
pthread_exit(NULL);
}
void* mqtt_client_thread(void* arg){
OSStatus err = kUnknownErr;
int rc = -1;
fd_set readfds;
struct timeval t = { 0, MQTT_YIELD_TMIE * 1000};
ssl_opts ssl_settings;
MQTTPacket_connectData connectData = MQTTPacket_connectData_initializer;
memset(&c, 0, sizeof(c));
memset(&n, 0, sizeof(n));
MQTT_start:
// create network connection
ssl_settings.ssl_enable = false;
mqtt_log("enter into mqtt client thread.");
while (1){
rc = NewNetwork(&n, MQTT_SERVER, MQTT_SERVER_PORT, ssl_settings);
if (rc == MQTT_SUCCESS) break;
mqtt_log("ERROR: MQTT network connection err=%d, reconnect after 3s...", rc);
sleep(3);
}
mqtt_log("MQTT network connection success!");
// 2.init mqtt client
rc = MQTTClientInit(&c, &n, MQTT_CMD_TIMEOUT);
require_noerr_string(rc, MQTT_reconnect, "ERROR: MQTT client init err.");
mqtt_log("MQTT client init success!");
// 3.create mqtt client connection
connectData.willFlag = 0;
connectData.MQTTVersion = 4; // 3: 3.1, 4: v3.1.1
connectData.clientID.cstring = MQTT_CLIENT_ID;
connectData.username.cstring = MQTT_CLIENT_USERNAME;
connectData.password.cstring = MQTT_CLIENT_PASSWORD;
connectData.keepAliveInterval = MQTT_CLIENT_KEEPALIVE;
connectData.cleansession = 1;
rc = MQTTConnect(&c, &connectData);
require_noerr_string(rc, MQTT_reconnect, "ERROR: MQTT client connect err.");
mqtt_log("MQTT client connect success!");
// 4.mqtt client subscribe
rc = MQTTSubscribe(&c, MQTT_CLIENT_SUB_TOPIC, QOS0, messageArrived);
require_noerr_string(rc, MQTT_reconnect, "ERROR: MQTT client subscribe err.");
mqtt_log("MQTT client subscribe success! recv_topic=[%s].", MQTT_CLIENT_SUB_TOPIC);
// 5. client loop for recv msg && keepalive
while (1){
no_mqtt_msg_exchange = true;
FD_ZERO(&readfds);
FD_SET(c.ipstack->my_socket, &readfds);
select(c.ipstack->my_socket + 1, &readfds, NULL, NULL, &t);
// recv msg from server
if (FD_ISSET( c.ipstack->my_socket, &readfds)){
rc = MQTTYield(&c, (int)MQTT_YIELD_TMIE);
require_noerr(rc, MQTT_reconnect);
no_mqtt_msg_exchange = false;
}
//if no msg exchange, we need to check ping msg to keep alive
if (no_mqtt_msg_exchange){
rc = keepalive(&c);
require_noerr_string(rc, MQTT_reconnect, "ERROR: keep alive err");
}
}
MQTT_reconnect:
mqtt_log("Disconnect MQTT client, and reconnect after 5s, reason: mqtt_rc = %d, err = %d", rc, err);
mqtt_client_release(&c, &n);
sleep(5);
goto MQTT_start;
exit:
mqtt_log("EXIT: MQTT client exit with err = %d.", err);
mqtt_client_release(&c, &n);
pthread_exit(NULL);
}
static OSStatus mqtt_msg_publish(Client *c, const char* topic, char qos, char retained,
const unsigned char* msg,
uint32_t msg_len){
OSStatus err = kUnknownErr;
int ret = 0;
MQTTMessage publishData = MQTTMessage_publishData_initializer;
require(topic && msg_len && msg, exit);
//upload data qos0
publishData.qos = qos;
publishData.retained = retained;
publishData.payload = (void*)msg;
publishData.payloadlen = msg_len;
ret = MQTTPublish(c, topic, &publishData);
if (MQTT_SUCCESS == ret){
err = kNoErr;
}else{
err = kUnknownErr;
}
exit:
return err;
}
void user_send_cb(void* data){
OSStatus err = kNoErr;
s_MQTT_Data_Packet_Info* p_send_msg = (s_MQTT_Data_Packet_Info*)data;
require_noerr_string((p_send_msg == NULL), exit, "没有内存可用");
err = mqtt_msg_publish(&c, p_send_msg->topic, p_send_msg->qos, p_send_msg->retained,
p_send_msg->data,
p_send_msg->datalen);
require_noerr_string(err, exit, "publish失败");
mqtt_log("MQTT publish data success! send_topic=[%s], msg=[%ld][%s].\r\n", p_send_msg->topic, p_send_msg->datalen, p_send_msg->data);
no_mqtt_msg_exchange = false; // 在当前情况下,多发一次或少发一次,无关紧要,无需用互斥锁
exit:
if (p_send_msg != NULL){
free(p_send_msg);
p_send_msg = NULL;
}
}
int main(void){
// void *rval;
OSStatus err = kNoErr;
lhead = (struct head*)malloc(sizeof(struct head));
STAILQ_INIT(lhead);
pthread_t mqtt_client_handle, work_thread_Handle/*, timer_thread_Handle*/;
// 默认堆栈大小为8M, 嵌入式里太大,重新设置
pthread_attr_t attr;
err = pthread_attr_init(&attr);
require_noerr_string(err, exit, "ERROR: Unable to init thread attr.");
err = pthread_attr_setstacksize(&attr, 16384);// 堆栈大小不能小于16384Byte
require_noerr_string(err, exit, "ERROR: Unable to set thread size.");
err = pthread_create(&mqtt_client_handle, &attr, mqtt_client_thread, NULL);
require_noerr_string(err, exit, "ERROR: Unable to start the mqtt client thread.");
err = pthread_create(&work_thread_Handle, &attr, work_thread, NULL);
require_noerr_string(err, exit, "ERROR: Unable to start the work thread.");
// err = pthread_create(&timer_thread_Handle, NULL, timer_thread, NULL);
// require_noerr_string(err, exit, "ERROR: Unable to start the timer thread.");
// loop = uv_default_loop();
// uv_timer_t period_timer;
// uv_timer_init(loop, &period_timer);
// uv_timer_start(&period_timer, user_send_handler, 0, 2000);
// err = uv_run(loop, UV_RUN_DEFAULT);
// require_noerr_string(err, exit, "ERROR: Unable to run uv loop.");
// struct timerval interval;
// struct itimerval timer;
pthread_join(work_thread_Handle, NULL);
pthread_join(mqtt_client_handle, NULL); 重点,当主线程没有其他可执行的循环时,一定要加此句
pthread_attr_destroy(&attr);
exit :
if (err != kNoErr){
app_log("ERROR: app thread exit err: %d", err);
}
free(lhead);
lhead = NULL;
return err;
}
整个工程源码:
链接: https://pan.baidu.com/s/10w8a9X_7prtYyHsmMUj7Sw
提取码: 48aa
参考资料:
linux c MQTT客户端实现
https://www.jianshu.com/p/d309de966379
发布者:全栈程序员-用户IM,转载请注明出处:https://javaforall.cn/206802.html原文链接:https://javaforall.cn
【正版授权,激活自己账号】: Jetbrains全家桶Ide使用,1年售后保障,每天仅需1毛
【官方授权 正版激活】: 官方授权 正版激活 支持Jetbrains家族下所有IDE 使用个人JB账号...