大家好,又见面了,我是你们的朋友全栈君。
近期学习了一下物联网中应用较广的MQTT协议,同时使用MQTTnet开源类库做了简单实现,因此做下笔记。
环境:.NET Framework 4.6.1 MQTTnet 2.8.2.0 遵循MQTT 3.1.0协议规范 源码 >>> GitHub
注意:在实现订阅者离线再连接时,一直接受不到离线信息,需要做一下配置
// Broker设置
options.MaxPendingMessagesPerClient = 1000;
options.EnablePersistentSessions = true;
// 客户端,ClientId不能变
_clientOption.CleanSession = false;
以下是两个封装的类:
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Text;
using System.Threading.Tasks;
using MQTTnet;
using MQTTnet.Protocol;
using MQTTnet.Server;
namespace MQTT.Util
{
public class MqttServer
{
private IMqttServer _mqttServer = null;
private string _ip = string.Empty;
public string IP { get { return this._ip; } }
private string _port = string.Empty;
public string Port { get { return this._port; } }
public Action<MqttConnectionValidatorContext> ConnectionValidator = null;
public EventHandler<MqttClientConnectedEventArgs> ClientConnected = null;
public EventHandler<MqttClientDisconnectedEventArgs> ClientDisconnected = null;
public EventHandler<MqttApplicationMessageReceivedEventArgs> ApplicationMessageReceived = null;
public EventHandler<MqttClientSubscribedTopicEventArgs> ClientSubscribedTopic = null;
public EventHandler<MqttClientUnsubscribedTopicEventArgs> ClientUnsubscribedTopic = null;
public EventHandler Started = null;
public EventHandler Stopped = null;
public MqttServer(string ip, string port)
{
this._ip = ip;
this._port = port;
}
public async void Start()
{
// Backlog:表示服务器可以接受的并发请求的最大值
var optionBuilder = new MqttServerOptionsBuilder().WithConnectionBacklog(1000).WithDefaultEndpointPort(Convert.ToInt32(this._port));
optionBuilder.WithDefaultEndpointBoundIPAddress(IPAddress.Parse(this._ip));
var options = optionBuilder.Build() as MqttServerOptions;
options.ConnectionValidator += this.ConnectionValidator;
options.MaxPendingMessagesPerClient = 1000;
options.EnablePersistentSessions = true;
this._mqttServer = new MqttFactory().CreateMqttServer();
this._mqttServer.ClientConnected += this.ClientConnected;
this._mqttServer.ClientDisconnected += this.ClientDisconnected;
this._mqttServer.ApplicationMessageReceived += this.ApplicationMessageReceived;
this._mqttServer.ClientSubscribedTopic += this.ClientSubscribedTopic;
this._mqttServer.ClientUnsubscribedTopic += this.ClientUnsubscribedTopic;
this._mqttServer.Started += this.Started;
this._mqttServer.Stopped += this.Stopped;
await _mqttServer.StartAsync(options);
}
public void Stop()
{
if (this._mqttServer == null)
return;
foreach (var clientSessionStatuse in this._mqttServer.GetClientSessionsStatusAsync().Result)
{
clientSessionStatuse.DisconnectAsync();
}
this._mqttServer.StopAsync();
this._mqttServer = null;
}
}
}
using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Protocol;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
namespace MQTT.Util
{
public class MqttClient
{
private IMqttClient _mqttClient = null;
private MqttClientOptions _clientOption = null;
private string _serverIP = string.Empty;
public string ServerIP { get { return this._serverIP; } }
private string _serverPort = string.Empty;
public string ServerPort { get { return this._serverPort; } }
public EventHandler<MqttApplicationMessageReceivedEventArgs> ApplicationMessageReceived = null;
public EventHandler<MqttClientConnectedEventArgs> ClientConnected = null;
public EventHandler<MqttClientDisconnectedEventArgs> ClientDisconnected = null;
public MqttClient(string clientId = "")
{
if (clientId == string.Empty)
clientId = Guid.NewGuid().ToString();
this._clientOption = new MqttClientOptions() { ClientId = clientId };
this._clientOption.CleanSession = false;
this._clientOption.KeepAlivePeriod = TimeSpan.FromSeconds(90);
this._clientOption.CommunicationTimeout = TimeSpan.FromSeconds(10);
// 遗嘱信息
// this._clientOption.WillMessage =
this._clientOption.ProtocolVersion = MQTTnet.Serializer.MqttProtocolVersion.V310;
this._mqttClient = new MqttFactory().CreateMqttClient();
}
public void Init(string ip, string port, string userName, string pwd)
{
this._serverIP = ip;
this._serverPort = port;
_mqttClient.ApplicationMessageReceived += this.ApplicationMessageReceived;
_mqttClient.Connected += this.ClientConnected;
_mqttClient.Disconnected += this.ClientDisconnected;
this._clientOption.ChannelOptions = new MqttClientTcpOptions()
{
Server = this._serverIP,
Port = Convert.ToInt32(this._serverPort)
};
this._clientOption.Credentials = new MqttClientCredentials()
{
Username = userName,
Password = pwd
};
}
/**
host: 服务器地址
port: 服务器端口
tls: 是否使用tls协议,mosca是支持tls的,如果使用了要设置成true
keepalive: 心跳时间,单位秒,每隔固定时间发送心跳包, 心跳间隔不得大于120s
clean: session是否清除,这个需要注意,如果是false,代表保持登录,如果客户端离线了再次登录就可以接收到离线消息
auth: 是否使用登录验证
user: 用户名
pass: 密码
willTopic: 订阅主题
willMsg: 自定义的离线消息
willQos: 接收离线消息的级别
clientId: 客户端id,需要特别指出的是这个id需要全局唯一,因为服务端是根据这个来区分不同的客户端的,默认情况下一个id登录后,假如有另外的连接以这个id登录,上一个连接会被踢下线, 我使用的设备UUID
*/
public async void Connect()
{
if (this._mqttClient == null)
return;
try
{
if (this._mqttClient.IsConnected)
{
return;
}
await _mqttClient.ConnectAsync(this._clientOption);
}
catch (Exception)
{
throw;
}
}
public async void Disconnect()
{
if (this._mqttClient == null)
return;
if (this._mqttClient.IsConnected)
await _mqttClient.DisconnectAsync();
}
public void Stop()
{
if (this._mqttClient == null)
return;
this._mqttClient.Dispose();
this._mqttClient = null;
}
public void SubscribeAsync(string topic, string qos)
{
Task.Factory.StartNew(async () =>
{
await _mqttClient.SubscribeAsync(
new List<TopicFilter>
{
new TopicFilter(
topic,
(MqttQualityOfServiceLevel)Enum.Parse(typeof (MqttQualityOfServiceLevel), qos))
});
});
}
public void PublishAsync(string topic, string qos, string payload)
{
if (this._mqttClient == null)
return;
Task.Factory.StartNew(async () =>
{
var msg = new MqttApplicationMessage()
{
Topic = topic,
Payload = Encoding.UTF8.GetBytes(payload),
QualityOfServiceLevel = (MqttQualityOfServiceLevel)Enum.Parse(typeof(MqttQualityOfServiceLevel), qos),
Retain = true // 是否保留信息
};
await _mqttClient.PublishAsync(msg);
});
}
}
}
发布者:全栈程序员-用户IM,转载请注明出处:https://javaforall.cn/152846.html原文链接:https://javaforall.cn
【正版授权,激活自己账号】: Jetbrains全家桶Ide使用,1年售后保障,每天仅需1毛
【官方授权 正版激活】: 官方授权 正版激活 支持Jetbrains家族下所有IDE 使用个人JB账号...