基于MQTTnet 3.0.12实现MQTT服务器和客户端「建议收藏」

基于MQTTnet 3.0.12实现MQTT服务器和客户端「建议收藏」基于MQTTnet3.0.12实现MQTT服务器和客户端概述-可看可不看的废话MQTT是啥MQTTnet正文MQTTServer/Broker实现MQTTServer/Broker创建MQTTServer/Broker发布消息MQTTClient实现MQTTClient创建MQTTClient发布消息MQTTClien订阅消息MQTTClien取消订阅消息概述-可看可不看的废话公司基于制造业客户的数据采集需求,在布局自己的数据中台。现在物联网的概念很火,辣条从业7年来一直号称是“自动化

大家好,又见面了,我是你们的朋友全栈君。

基于MQTTnet 3.0.12实现MQTT服务器和客户端)

概述-可看可不看的废话

现在物联网的概念很火,辣条从业7年来一直号称是“自动化工程师”,但其实到目前为止,所处的行业还是比较局限在自动化产线改造。就工业智能互联来说,水平方向上如何把流水线式的孤岛式机台联动起来,我们比较有经验,但是垂直方向上,如何做数采,或者说如何高效灵活的做数采,需要补课的东西还有很多。MQTT是IBM很早以前就提出来的协议,但很可惜一直没有接触过,新公司的项目上引用了MQTTnet的开源库,但是这个GitHub作者跳版跳得太狠了点,随着新版本的更新,改了很多旧版的东西,导致百度排在前几的文章都没法直接借鉴了,无奈之下,疯狂百度+阅读Git上源码,总算搞出来一个小的Demo,特此记录如下。

MQTT是啥

MQTT 是物联网 (IoT) 的 OASIS 标准消息传递协议。它设计为极其轻量级的发布/订阅消息传输,非常适合连接具有小代码占用空间和最小网络带宽的远程设备。如今,MQTT 广泛应用于汽车、制造、电信、石油和天然气等行业。

好了,这段话是我抄的MQTT官网的,英语原文,浏览器直译,傲娇。
官网指路:https://mqtt.org/

MQTTnet

MQTTnet是基于MQTT通信的高性能.NET库,它提供了一个MQTT客户端和一个MQTT服务器(代理)。截止目前,最新版本为3.0.12.0,支持.net core,支持MQTT 3.X和5.0版本。

https://github.com/chkr1011/MQTTnet
MQTTnet的Git路径。

正文

本Demo设计为一个Winform窗体程序。基于MQTTnet,实现了一个MQTT Server或者说Broker的创建,同时在窗体上提供了MQTT Client的创建功能,MQTT Client跟Server连接之后,通过点击按钮,实现主题订阅、发布的基础功能。

UI界面

MQTT Server/Broker实现

创建MQTT Server的思路还是蛮清晰的,主要是MqttServer各个事件的实现,一开始很懵逼,因为3.0.12的写法变化跟2.X完全不一样。
这里贴一段MQTTnet的代码,方便理解类的实现——
在这里插入图片描述
在这里插入图片描述

MQTT Server/Broker创建

  • 先定义好MqttServerOptions,这是启动Mqtt服务时候的传参,定义服务启动时的各种参数:IP、Port口,账密等等
var optionsBuilder = new MqttServerOptionsBuilder();
  • 实例化MqttServer以及委托实现MqttServer的各个事件
mqttServer = new MqttFactory().CreateMqttServer() as MqttServer;
mqttServer.StartedHandler = new MqttServerStartedHandlerDelegate(OnMqttServerStarted);
mqttServer.StoppedHandler = new MqttServerStoppedHandlerDelegate(OnMqttServerStopped);

下面是完成的实现代码:

public MqttServer mqttServer = null;
public async void StartMqttServer()
{ 

try
{ 

if (mqttServer == null)
{ 

var config = ReadConfiguration();
var optionsBuilder = new MqttServerOptionsBuilder()
.WithDefaultEndpoint().WithDefaultEndpointPort(int.Parse(config["Port"].ToString())).WithConnectionValidator(
c =>
{ 

var currentUser = config["Users"][0]["UserName"].ToString();
var currentPWD = config["Users"][0]["Password"].ToString();
if (currentUser == null || currentPWD == null)
{ 

c.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword;
return;
}
if (c.Username != currentUser)
{ 

c.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword;
return;
}
if (c.Password != currentPWD)
{ 

c.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword;
return;
}
c.ReasonCode = MqttConnectReasonCode.Success;
}).WithSubscriptionInterceptor(
c =>
{ 

c.AcceptSubscription = true;
}).WithApplicationMessageInterceptor(
c =>
{ 

c.AcceptPublish = true;
});
mqttServer = new MqttFactory().CreateMqttServer() as MqttServer;
mqttServer.StartedHandler = new MqttServerStartedHandlerDelegate(OnMqttServerStarted);
mqttServer.StoppedHandler = new MqttServerStoppedHandlerDelegate(OnMqttServerStopped);
mqttServer.ClientConnectedHandler = new MqttServerClientConnectedHandlerDelegate(OnMqttServerClientConnected);
mqttServer.ClientDisconnectedHandler = new MqttServerClientDisconnectedHandlerDelegate(OnMqttServerClientDisconnected);
mqttServer.ClientSubscribedTopicHandler = new MqttServerClientSubscribedHandlerDelegate(OnMqttServerClientSubscribedTopic);
mqttServer.ClientUnsubscribedTopicHandler = new MqttServerClientUnsubscribedTopicHandlerDelegate(OnMqttServerClientUnsubscribedTopic);
mqttServer.ApplicationMessageReceivedHandler = new MqttApplicationMessageReceivedHandlerDelegate(OnMqttServer_ApplicationMessageReceived);
await mqttServer.StartAsync(optionsBuilder.Build());
lbxMonitor.BeginInvoke(_updateMonitorAction,
Logger.TraceLog(Logger.Level.Info, "MQTT Server is started."));
}
}
catch (Exception ex)
{ 

lbxMonitor.BeginInvoke(_updateMonitorAction,
Logger.TraceLog(Logger.Level.Fatal, $"MQTT Server start fail.>{ex.Message}"));
}
}
public async void StopMqttServer()
{ 

if (mqttServer == null) return;
try
{ 

await mqttServer?.StopAsync();
mqttServer = null;
lbxMonitor.BeginInvoke(_updateMonitorAction,
Logger.TraceLog(Logger.Level.Info, "MQTT Server is stopped."));
}
catch (Exception ex)
{ 

lbxMonitor.BeginInvoke(_updateMonitorAction,
Logger.TraceLog(Logger.Level.Fatal, $"MQTT Server stop fail.>{ex.Message}"));
}
}

MQTT Server/Broker发布消息

-从MQTT的设计来看,服务端是代理的角色,订阅者和发布者是客户端,所以通常来说,消息的订阅与发布应当都是客户端干的事。但是,服务端自然也是可以参与一下发布者的角色的。我还没想到这样用的实际场景,先功能实现一下——

  • 很粗暴,先实例化一个MqttApplicationMessage对象,然后作为传参调用MqttServer.PublishAsync进行消息发布。
public async void ServerPublishMqttTopic(string topic, string payload)
{ 

var message = new MqttApplicationMessage()
{ 

Topic = topic,
Payload = Encoding.UTF8.GetBytes(payload)
};
await mqttServer.PublishAsync(message);
lbxMonitor.BeginInvoke(_updateMonitorAction,
Logger.TraceLog(Logger.Level.Info, string.Format("MQTT Broker发布主题[{0}]成功!", topic)));
}

当然啦,MqttApplicationMessage还有很多属性值可以传,QoS(MqttQualityOfServiceLevel)、Retain等等,这里只是为了先实现功能,就只传了最简单的Topic和Payload。

MQTT Client实现

MQTT Client创建

整体的实现思路跟Server端如出一辙,声明一个MqttClientOptions,赋值各种连接Server端需要的参数,最后作为MqttClient.ConnectAsync的传参,连接Server。
这里我在MqttClientOptions里面尝试了一下WillMessage,其实就是一个MqttApplicationMessage对象,WillMessage作为遗言机制,用于Client跟Server端挂点时的广播通知。
再之后就是实现MqttClient的各个事件,用来客制Connected,Disconnected,MessageReceived的各种逻辑,跟Server端实现没有什么区别,不再赘述。
上代码——

private MqttClient mqttClient = null;
private async Task ClientStart()
{ 

try
{ 

var tcpServer = txtIPAddr.Text;
var tcpPort = int.Parse(txtPort.Text.Trim());
var mqttUser = txtUserName.Text.Trim();
var mqttPassword = txtPWD.Text.Trim();
var mqttFactory = new MqttFactory();
var options = new MqttClientOptions
{ 

ClientId = txtClientID.Text.Trim(),
ProtocolVersion = MQTTnet.Formatter.MqttProtocolVersion.V311,
ChannelOptions = new MqttClientTcpOptions
{ 

Server = tcpServer,
Port = tcpPort
},
WillDelayInterval = 10,
WillMessage = new MqttApplicationMessage()
{ 

Topic = $"LastWill/{txtClientID.Text.Trim()}",
Payload= Encoding.UTF8.GetBytes("I Lost the connection!"),
QualityOfServiceLevel = MqttQualityOfServiceLevel.ExactlyOnce
}
};
if (options.ChannelOptions == null)
{ 

throw new InvalidOperationException();
}
if (!string.IsNullOrEmpty(mqttUser))
{ 

options.Credentials = new MqttClientCredentials
{ 

Username = mqttUser,
Password = Encoding.UTF8.GetBytes(mqttPassword)
};
}
options.CleanSession = true;
options.KeepAlivePeriod = TimeSpan.FromSeconds(5);
mqttClient = mqttFactory.CreateMqttClient() as MqttClient;
mqttClient.ConnectedHandler = new MqttClientConnectedHandlerDelegate(OnMqttClientConnected);
mqttClient.DisconnectedHandler = new MqttClientDisconnectedHandlerDelegate(OnMqttClientDisConnected);
mqttClient.ApplicationMessageReceivedHandler = new MqttApplicationMessageReceivedHandlerDelegate(OnSubscriberMessageReceived);
await mqttClient.ConnectAsync(options);
lbxMonitor.BeginInvoke(_updateMonitorAction,
Logger.TraceLog(Logger.Level.Info, $"客户端[{options.ClientId}]尝试连接..."));
}
catch (Exception ex)
{ 

lbxMonitor.BeginInvoke(_updateMonitorAction,
Logger.TraceLog(Logger.Level.Fatal, $"客户端尝试连接出错.>{ex.Message}"));
}
}
private async Task ClientStop()
{ 

try
{ 

if (mqttClient == null) return;
await mqttClient.DisconnectAsync();
mqttClient = null;
}
catch (Exception ex)
{ 

lbxMonitor.BeginInvoke(_updateMonitorAction,
Logger.TraceLog(Logger.Level.Fatal, $"客户端尝试断开Server出错.>{ex.Message}"));
}
}

MQTT Client发布消息

这里的实现逻辑跟写法和Server端的发布别无二致,我在这里的MqttApplicationMessage补上了QoS和Retain的设置,由Form页面的控件传参。
这里补一句关于Retain的用法:Retain意为保留,设为True表示这条消息发布的时候如果没有订阅者,则该消息保留在Server端,直到被人订阅时立刻发布出去并删除,设为False时则没有这样的效果。

public async void ClientPublishMqttTopic(string topic, string payload)
{ 

try
{ 

var message = new MqttApplicationMessage()
{ 

Topic = topic,
Payload = Encoding.UTF8.GetBytes(payload),
QualityOfServiceLevel = (MqttQualityOfServiceLevel)cmbQos.SelectedIndex,
Retain = bool.Parse(cmbRetain.SelectedItem.ToString())
};
await mqttClient.PublishAsync(message);
lbxMonitor.BeginInvoke(_updateMonitorAction,
Logger.TraceLog(Logger.Level.Info, string.Format("客户端[{0}]发布主题[{1}]成功!", mqttClient.Options.ClientId, topic)));
}
catch (Exception ex)
{ 

lbxMonitor.BeginInvoke(_updateMonitorAction,
Logger.TraceLog(Logger.Level.Fatal, string.Format("客户端[{0}]发布主题[{1}]异常!>{2}", mqttClient.Options.ClientId, topic,ex.Message)));
}
}

MQTT Clien订阅消息

呼叫MqttClient.SubscribeAsync,传入消息主题即可。

public async void ClientSubscribeTopic(string topic)
{ 

await mqttClient.SubscribeAsync(topic);
lbxMonitor.BeginInvoke(_updateMonitorAction,
Logger.TraceLog(Logger.Level.Info, string.Format("客户端[{0}]订阅主题[{1}]成功!", mqttClient.Options.ClientId, topic)));
}

MQTT Clien取消订阅消息

呼叫MqttClient.UnsubscribeAsync,取消消息订阅。

public async void ClientUnSubscribeTopic(string topic)
{ 

await mqttClient.UnsubscribeAsync(topic);
lbxMonitor.BeginInvoke(_updateMonitorAction,
Logger.TraceLog(Logger.Level.Info, string.Format("客户端[{0}]取消主题[{1}]成功!", mqttClient.Options.ClientId, topic)));
}

至此,简单的基于MQTTnet的Server端以及Client端的功能就实现咯,完整代码已传至GitHub,欢迎大家指点讨论,共同进步,一起成长!

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

发布者:全栈程序员-用户IM,转载请注明出处:https://javaforall.cn/152881.html原文链接:https://javaforall.cn

【正版授权,激活自己账号】: Jetbrains全家桶Ide使用,1年售后保障,每天仅需1毛

【官方授权 正版激活】: 官方授权 正版激活 支持Jetbrains家族下所有IDE 使用个人JB账号...

(0)
blank

相关推荐

  • pycharm软件界面设置与配置[通俗易懂]

    pycharm软件界面设置与配置[通俗易懂]pycharm软件界面设置与配置pycharm软件介绍:基于eclipse开发的开源软件,适用于整体开发较大项目。负责繁琐的工作细节,节省宝贵的时间,善用以键盘操作为主的编程方法,pycharm完全理解代码的每个面向,依靠它的智能化代码补全,实时检查和快速修复等功能,轻松进行项目导航。其有以下优点:集成python需要的模块,方便开发;语法高亮,快速识别代码,方便开发;代码提示。搭建pycharm软件的开发环境:首先安装JDK(JDK是整个java开发的核心,它包含了JAVA的运行环

  • 开发环境k8s使用local docker registry

    开发环境k8s使用local docker registry

  • awk从放弃到入门(1):awk基础 (通俗易懂,快进来看)「建议收藏」

    awk从放弃到入门(1):awk基础 (通俗易懂,快进来看)「建议收藏」我们先来用专业的术语描述一下awk是什么,如果你看不懂,没关系,我们会再用"大白话"解释一遍。 awk是一个报告生成器,它拥有强大的文本格式化的能力,这就是专业的说法。你可能不理解所谓的报告生成器中的"报告"是什么,你可以把"报告"理解为"报表"或者"表格",也就是说,我们可以利用awk命令,将一些文本整理成我们想要的样子,比如把一些文本整理成"表"的样子,然后再展示出来,刚才概念中提到的

  • 关于电角度的理解[通俗易懂]

    关于电角度的理解[通俗易懂]从电磁分布的角度来看,永磁体(或励磁)产生的磁场空间分布呈现周期性变化,一个周期为电角度的360度。显然从任意N极出发沿着某圆周方向经过S极再到下一个N极为一个周期的电角度。此过程中永磁体经过了级对数p个电极,即电周期进行了p个,那么p极对数转一圈的电角度则为p*360度…

  • ETH显卡矿机_eth矿机组装

    ETH显卡矿机_eth矿机组装显卡矿机搭建选择合适显卡选择硬件选择挖矿软件挖矿系统mineros挖矿软件注意:每个币种的软件都不一样挖矿系统和软件也有多种具体对应的官网都会有教程选择合适显卡主流显卡算力对比选择硬件选择挖矿软件前提准备自己的钱包地址选择矿池地址挖矿系统mineros步骤:注册账号刻盘启动挖矿和监控矿机状态挖矿软件NBMiner…

  • 各位学弟学妹,别再看教材了,时间复杂度看这篇就好了[通俗易懂]

    各位学弟学妹,别再看教材了,时间复杂度看这篇就好了[通俗易懂]时间复杂度是学习算法的基石,今天我们来聊聊为什么要引入时间复杂度,什么是时间复杂度以及如何去算一个算法的时间复杂度一、刻画算法的运行时间某日,慧能叫来了一尘打算给他补习补习一下基础知识,只见克写了一段非常简单的代码一尘看老师有点生气,开始虚心请教了为了方便讨论,这里我们把每一条语句的执行时间都看做是一样的,记为一个时间单元①蓝色框的两条语句,花费两个时间单元②黑色框的一条语句,花费n+1个时间单元③红色框的两条语句,花费2*n个时间单元这不是.

发表回复

您的电子邮箱地址不会被公开。

关注全栈程序员社区公众号