MQTTnet[通俗易懂]

MQTTnet[通俗易懂]近期学习了一下物联网中应用较广的MQTT协议,同时使用MQTTnet开源类库做了简单实现,因此做下笔记。注意:在实现订阅者离线再连接时,一直接受不到离线信息,需要做一下配置源码>>>GitHub//Broker设置options.MaxPendingMessagesPerClient=1000;options.EnablePersistentSessions…

大家好,又见面了,我是你们的朋友全栈君。

近期学习了一下物联网中应用较广的MQTT协议,同时使用MQTTnet开源类库做了简单实现,因此做下笔记。
环境:.NET Framework 4.6.1 MQTTnet 2.8.2.0 遵循MQTT 3.1.0协议规范 源码 >>> GitHub
注意:在实现订阅者离线再连接时,一直接受不到离线信息,需要做一下配置

// Broker设置
options.MaxPendingMessagesPerClient = 1000;
options.EnablePersistentSessions = true;

// 客户端,ClientId不能变
_clientOption.CleanSession = false;

以下是两个封装的类:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Text;
using System.Threading.Tasks;
using MQTTnet;
using MQTTnet.Protocol;
using MQTTnet.Server;

namespace MQTT.Util
{
    public class MqttServer
    {
        private IMqttServer _mqttServer = null;

        private string _ip = string.Empty;
        public string IP { get { return this._ip; } }

        private string _port = string.Empty;
        public string Port { get { return this._port; } }

        public Action<MqttConnectionValidatorContext> ConnectionValidator = null;
        public EventHandler<MqttClientConnectedEventArgs> ClientConnected = null;
        public EventHandler<MqttClientDisconnectedEventArgs> ClientDisconnected = null;
        public EventHandler<MqttApplicationMessageReceivedEventArgs> ApplicationMessageReceived = null;
        public EventHandler<MqttClientSubscribedTopicEventArgs> ClientSubscribedTopic = null;
        public EventHandler<MqttClientUnsubscribedTopicEventArgs> ClientUnsubscribedTopic = null;
        public EventHandler Started = null;
        public EventHandler Stopped = null;

        public MqttServer(string ip, string port)
        {
            this._ip = ip;
            this._port = port;
        }

        public async void Start()
        {
            // Backlog:表示服务器可以接受的并发请求的最大值
            var optionBuilder = new MqttServerOptionsBuilder().WithConnectionBacklog(1000).WithDefaultEndpointPort(Convert.ToInt32(this._port));
            optionBuilder.WithDefaultEndpointBoundIPAddress(IPAddress.Parse(this._ip));

            var options = optionBuilder.Build() as MqttServerOptions;
            options.ConnectionValidator += this.ConnectionValidator;
            options.MaxPendingMessagesPerClient = 1000;
            options.EnablePersistentSessions = true;

            this._mqttServer = new MqttFactory().CreateMqttServer();
            this._mqttServer.ClientConnected += this.ClientConnected;
            this._mqttServer.ClientDisconnected += this.ClientDisconnected;
            this._mqttServer.ApplicationMessageReceived += this.ApplicationMessageReceived;
            this._mqttServer.ClientSubscribedTopic += this.ClientSubscribedTopic;
            this._mqttServer.ClientUnsubscribedTopic += this.ClientUnsubscribedTopic;
            this._mqttServer.Started += this.Started;
            this._mqttServer.Stopped += this.Stopped;

            await _mqttServer.StartAsync(options);
        }

        public void Stop()
        {
            if (this._mqttServer == null)
                return;
            foreach (var clientSessionStatuse in this._mqttServer.GetClientSessionsStatusAsync().Result)
            {
                clientSessionStatuse.DisconnectAsync();
            }
            this._mqttServer.StopAsync();
            this._mqttServer = null;
        }
    }
}

using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Protocol;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;

namespace MQTT.Util
{
    public class MqttClient
    {
        private IMqttClient _mqttClient = null;
        private MqttClientOptions _clientOption = null;

        private string _serverIP = string.Empty;
        public string ServerIP { get { return this._serverIP; } }

        private string _serverPort = string.Empty;
        public string ServerPort { get { return this._serverPort; } }

        public EventHandler<MqttApplicationMessageReceivedEventArgs> ApplicationMessageReceived = null;
        public EventHandler<MqttClientConnectedEventArgs> ClientConnected = null;
        public EventHandler<MqttClientDisconnectedEventArgs> ClientDisconnected = null;

        public MqttClient(string clientId = "")
        {
            if (clientId == string.Empty)
                clientId = Guid.NewGuid().ToString();

            this._clientOption = new MqttClientOptions() { ClientId = clientId };
            this._clientOption.CleanSession = false;
            this._clientOption.KeepAlivePeriod = TimeSpan.FromSeconds(90);
            this._clientOption.CommunicationTimeout = TimeSpan.FromSeconds(10);
            // 遗嘱信息
            // this._clientOption.WillMessage = 
            this._clientOption.ProtocolVersion = MQTTnet.Serializer.MqttProtocolVersion.V310;

            this._mqttClient = new MqttFactory().CreateMqttClient();
        }

        public void Init(string ip, string port, string userName, string pwd)
        {
            this._serverIP = ip;
            this._serverPort = port;

            _mqttClient.ApplicationMessageReceived += this.ApplicationMessageReceived;
            _mqttClient.Connected += this.ClientConnected;
            _mqttClient.Disconnected += this.ClientDisconnected;
            this._clientOption.ChannelOptions = new MqttClientTcpOptions()
            {
                Server = this._serverIP,
                Port = Convert.ToInt32(this._serverPort)
            };
            this._clientOption.Credentials = new MqttClientCredentials()
            {
                Username = userName,
                Password = pwd
            };
        }

        /**
      host: 服务器地址
      port: 服务器端口
      tls:  是否使用tls协议,mosca是支持tls的,如果使用了要设置成true
      keepalive: 心跳时间,单位秒,每隔固定时间发送心跳包, 心跳间隔不得大于120s
      clean: session是否清除,这个需要注意,如果是false,代表保持登录,如果客户端离线了再次登录就可以接收到离线消息
      auth: 是否使用登录验证
      user: 用户名
      pass: 密码
      willTopic: 订阅主题
      willMsg: 自定义的离线消息
      willQos: 接收离线消息的级别
      clientId: 客户端id,需要特别指出的是这个id需要全局唯一,因为服务端是根据这个来区分不同的客户端的,默认情况下一个id登录后,假如有另外的连接以这个id登录,上一个连接会被踢下线, 我使用的设备UUID
*/
        public async void Connect()
        {
            if (this._mqttClient == null)
                return;

            try
            {
                if (this._mqttClient.IsConnected)
                {
                    return;
                }

                await _mqttClient.ConnectAsync(this._clientOption);
            }
            catch (Exception)
            {
                throw;
            }
        }

        public async void Disconnect()
        {
            if (this._mqttClient == null)
                return;
            if (this._mqttClient.IsConnected)
                await _mqttClient.DisconnectAsync();
        }

        public void Stop()
        {
            if (this._mqttClient == null)
                return;
            this._mqttClient.Dispose();
            this._mqttClient = null;
        }

        public void SubscribeAsync(string topic, string qos)
        {
            Task.Factory.StartNew(async () =>
            {
                await _mqttClient.SubscribeAsync(
                    new List<TopicFilter>
                    {
                            new TopicFilter(
                                topic,
                                (MqttQualityOfServiceLevel)Enum.Parse(typeof (MqttQualityOfServiceLevel), qos))
                    });
            });
        }

        public void PublishAsync(string topic, string qos, string payload)
        {
            if (this._mqttClient == null)
                return;
            Task.Factory.StartNew(async () =>
            {
                var msg = new MqttApplicationMessage()
                {
                    Topic = topic,
                    Payload = Encoding.UTF8.GetBytes(payload),
                    QualityOfServiceLevel = (MqttQualityOfServiceLevel)Enum.Parse(typeof(MqttQualityOfServiceLevel), qos),
                    Retain = true // 是否保留信息
                };
                await _mqttClient.PublishAsync(msg);
            });
        }
    }
}

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

发布者:全栈程序员-用户IM,转载请注明出处:https://javaforall.cn/152846.html原文链接:https://javaforall.cn

【正版授权,激活自己账号】: Jetbrains全家桶Ide使用,1年售后保障,每天仅需1毛

【官方授权 正版激活】: 官方授权 正版激活 支持Jetbrains家族下所有IDE 使用个人JB账号...

(0)


相关推荐

发表回复

您的电子邮箱地址不会被公开。

关注全栈程序员社区公众号