mqttnet 详解_vs2017通过mqttnet创建mqtt服务端 客户端

mqttnet 详解_vs2017通过mqttnet创建mqtt服务端 客户端服务端:usingMQTTnet;usingMQTTnet.Server;usingSystem;usingSystem.Collections.Generic;usingSystem.ComponentModel;usingSystem.Data;usingSystem.Diagnostics;usingSystem.IO;usingSystem.Linq;usingSyst…

大家好,又见面了,我是你们的朋友全栈君。

服务端:

using MQTTnet;

using MQTTnet.Server;

using System;

using System.Collections.Generic;

using System.ComponentModel;

using System.Data;

using System.Diagnostics;

using System.IO;

using System.Linq;

using System.Net;

using System.ServiceProcess;

using System.Text;

using System.Threading.Tasks;

using System.Timers;

namespace MQTTNetService

{

public partial class MQTTNetService : ServiceBase

{

private MqttServer mqttServer = null;

private System.Timers.Timer timer = null;

//此集合用于判断写入日志在一段时间内不重,以客户端id为依据,最多1000个清零;

private List subClientIDs = new List();

public MQTTNetService()

{

InitializeComponent();

//创建一个定时器,检查5s内有多少客户端接入并将相关信息记录到日志中

timer = new System.Timers.Timer();

timer.AutoReset = true;

timer.Enabled = true;

timer.Interval = 5000;

timer.Elapsed += new ElapsedEventHandler(GetSubClientSAndSetShow);

}

protected override void OnStart(string[] args)

{

//开启服务

CreateMQTTServer();

if (timer.Enabled == false)

{

timer.Enabled = true;

timer.Start();

}

}

protected override void OnStop()

{

if (timer.Enabled == true)

{

timer.Enabled = false;

timer.Stop();

}

}

///

/// 开启服务

///

private async void CreateMQTTServer()

{

if (mqttServer == null)

{

var options = new MqttServerOptions();

var optionsBuilder = new MqttServerOptionsBuilder();

//指定 ip地址,默认为本地,但此方法不能使用ipaddress报错,有哪位大神帮解答,感激。

//options.WithDefaultEndpointBoundIPAddress(IPAddress.Parse(“”))

//指定端口

optionsBuilder.WithDefaultEndpointPort(1883);

//连接记录数,默认 一般为2000

//optionsBuilder.WithConnectionBacklog(2000);

mqttServer = new MqttFactory().CreateMqttServer() as MqttServer;

//将发送的消息加到日志

mqttServer.ApplicationMessageReceived += (s, e) =>

{

string msg = @”发送消息的客户端id:” + e.ClientId + “\n”

+ “发送时间:” + DateTime.Now + “\n”

+ “发送消息的主题:” + e.ApplicationMessage.Topic + “\n”

+ “发送的消息内容:” + Encoding.UTF8.GetString(e.ApplicationMessage.Payload ?? new byte[0]) + “\n”

+ “————————————————–\n”

;

WriteMsgLog(msg);

};

await mqttServer.StartAsync(options);

}

}

#region 记录日志

///

/// 消息记录日志

///

///

private void WriteMsgLog(string msg)

{

//string path = @”C:\log.txt”;

//该日志文件会存在windows服务程序目录下

string path = AppDomain.CurrentDomain.BaseDirectory + “\\Msglog.txt”;

FileInfo file = new FileInfo(path);

if (!file.Exists)

{

FileStream fs;

fs = File.Create(path);

fs.Close();

}

using (FileStream fs = new FileStream(path, FileMode.Append, FileAccess.Write))

{

using (StreamWriter sw = new StreamWriter(fs))

{

sw.WriteLine(DateTime.Now.ToString() + ” ” + msg);

}

}

}

///

///客户端链接日志

///

///

private void WriteClientLinkLog(string msg)

{

//string path = @”C:\log.txt”;

//该日志文件会存在windows服务程序目录下

string path = AppDomain.CurrentDomain.BaseDirectory + “\\ClientLinklog.txt”;

FileInfo file = new FileInfo(path);

if (!file.Exists)

{

FileStream fs;

fs = File.Create(path);

fs.Close();

}

using (FileStream fs = new FileStream(path, FileMode.Append, FileAccess.Write))

{

using (StreamWriter sw = new StreamWriter(fs))

{

sw.WriteLine(DateTime.Now.ToString() + ” ” + msg);

}

}

}

///

/// 通过定时器将客户端链接信息写入日志

///

///

///

private void GetSubClientSAndSetShow(object sender, ElapsedEventArgs e)

{

if (mqttServer != null)

{

List subclients = mqttServer.GetConnectedClientsAsync().Result.ToList();

if (subclients.Count > 0)

{

foreach (var item in subclients)

{

if (!subClientIDs.Contains(item.ClientId))

{

subClientIDs.Add(item.ClientId);

string msg = @”接收客户端ID:” + item.ClientId + “\n”

+ “接收时间:” + DateTime.Now + “\n”

+ “协议版本:” + item.ProtocolVersion + “\n”

+ “最后收到的非保持活包” + item.LastNonKeepAlivePacketReceived + “\n”

+ “最后收到的包” + item.LastPacketReceived + “\n”

+ “挂起的应用程序消息” + item.PendingApplicationMessages + “\n”

+ “————————————————” + “\n”;

WriteClientLinkLog(msg);

}

}

if (subClientIDs.Count >= 1000)

{

subClientIDs.Clear();

}

}

}

}

#endregion

}

}

以上服务端不能判断特定标识的客户端接入,也就是只要有客户端连接就会接入,不够完善

客户端:简单用于测试 接收net core

using MQTTnet;

using MQTTnet.Client;

using MQTTnet.Protocol;

using System;

using System.Text;

using System.Threading.Tasks;

namespace mqttclienttest0101

{

class Program

{

static MqttClient mqttClient = null;

static void Main(string[] args)

{

//var factory = new MqttFactory();

//var mqttClient = factory.CreateMqttClient();

//var options = new MqttClientOptionsBuilder();

//options.WithClientId(new Guid().ToString());

//options.WithTcpServer(“192.168.3.193”);

//options.WithTls();

//options.WithCleanSession();

//var task = mqttClient.ConnectAsync(options.Build());

RunAsync();

Console.ReadLine();

if (mqttClient != null)

{

var message = new MqttApplicationMessageBuilder();

message.WithTopic(“TopicTest”);

message.WithPayload(“topictest001”);

message.WithExactlyOnceQoS();

message.WithRetainFlag();

mqttClient.PublishAsync(message.Build());

}

Console.WriteLine(“Hello World!”);

}

public static async Task RunAsync()

{

try

{

var options = new MqttClientOptions

{

ClientId = new Guid().ToString(),

CleanSession = true,

ChannelOptions = new MqttClientTcpOptions

{

//Server = “localhost”,

Server = “127.0.1.1”

},

//ChannelOptions = new MqttClientWebSocketOptions

//{

// Uri = “ws://localhost:59690/mqtt”

//}

};

var factory = new MqttFactory();

mqttClient = factory.CreateMqttClient()as MqttClient;

mqttClient.ApplicationMessageReceived += (s, e) =>

{

Console.WriteLine(“### RECEIVED APPLICATION MESSAGE ###”);

Console.WriteLine($”+ Topic = {e.ApplicationMessage.Topic}”);

Console.WriteLine($”+ Payload = {Encoding.UTF8.GetString(e.ApplicationMessage.Payload)}”);

Console.WriteLine($”+ QoS = {e.ApplicationMessage.QualityOfServiceLevel}”);

Console.WriteLine($”+ Retain = {e.ApplicationMessage.Retain}”);

Console.WriteLine();

};

mqttClient.Connected += async (s, e) =>

{

Console.WriteLine(“### CONNECTED WITH SERVER ###”);

await mqttClient.SubscribeAsync(new TopicFilterBuilder().WithTopic(“#”).Build());

Console.WriteLine(“### SUBSCRIBED ###”);

};

mqttClient.Disconnected += async (s, e) =>

{

Console.WriteLine(“### DISCONNECTED FROM SERVER ###”);

await Task.Delay(TimeSpan.FromSeconds(5));

try

{

await mqttClient.ConnectAsync(options);

}

catch

{

Console.WriteLine(“### RECONNECTING FAILED ###”);

}

};

try

{

await mqttClient.ConnectAsync(options);

}

catch (Exception exception)

{

Console.WriteLine(“### CONNECTING FAILED ###” + Environment.NewLine + exception);

}

Console.WriteLine(“### WAITING FOR APPLICATION MESSAGES ###”);

//while (true)

//{

// Console.ReadLine();

// await mqttClient.SubscribeAsync(new TopicFilter(“test”, MqttQualityOfServiceLevel.AtMostOnce));

// var applicationMessage = new MqttApplicationMessageBuilder()

// .WithTopic(“A/B/C”)

// .WithPayload(“Hello World”)

// .WithAtLeastOnceQoS()

// .Build();

// await mqttClient.PublishAsync(applicationMessage);

//}

}

catch (Exception exception)

{

Console.WriteLine(exception);

}

}

}

}

客户端2 发送

mqttnet 详解_vs2017通过mqttnet创建mqtt服务端 客户端

using MQTTnet;

using MQTTnet.Client;

using MQTTnet.Protocol;

using System;

using System.Collections.Generic;

using System.ComponentModel;

using System.Data;

using System.Drawing;

using System.Linq;

using System.Text;

using System.Threading.Tasks;

using System.Windows.Forms;

namespace MqttClientTest01

{

public partial class Form1 : Form

{

private MqttClient mqttClient = null;

public Form1()

{

InitializeComponent();

}

private void Form1_Load(object sender, EventArgs e)

{

}

private void but_linkserver_Click(object sender, EventArgs e)

{

RunAsync();

创建一个新的MQTT客户端。

//var factory = new MqttFactory();

//mqttClient = factory.CreateMqttClient() as MqttClient;

使用构建器创建基于TCP的选项。

//var options = new MqttClientOptionsBuilder();

//options.WithClientId(new Guid().ToString());

//options.WithTcpServer(txtb_serverip.Text.Trim(), Convert.ToInt32(txtb_serverport.Text.Trim()));

//options.WithTls();

//options.WithCleanSession();

//but_submsg_Click(sender, e);

//var task = mqttClient.ConnectAsync(options.Build());

//if (task != null)

//{

//}

}

public void RunAsync()

{

try

{

var options = new MqttClientOptions

{

ClientId = Guid.NewGuid().ToString(),

CleanSession = true,

ChannelOptions = new MqttClientTcpOptions

{

//Server = “localhost”,

Server = txtb_serverip.Text.Trim(),

Port = Convert.ToInt32(txtb_serverport.Text.Trim()) ,

BufferSize=20*400,

},

//ChannelOptions = new MqttClientWebSocketOptions

//{

// Uri = “ws://localhost:59690/mqtt”

//}

};

var factory = new MqttFactory();

mqttClient = factory.CreateMqttClient() as MqttClient;

try

{

var task= mqttClient.ConnectAsync(options);

if (task!=null)

{

lab_linkstatus.Text = “连接成功!”;

lab_linktimer.Text = DateTime.Now.ToString();

}

}

catch (Exception exception)

{

// Console.WriteLine(“### CONNECTING FAILED ###” + Environment.NewLine + exception);

}

// Console.WriteLine(“### WAITING FOR APPLICATION MESSAGES ###”);

//while (true)

//{

// Console.ReadLine();

// await mqttClient.SubscribeAsync(new TopicFilter(“test”, MqttQualityOfServiceLevel.AtMostOnce));

// var applicationMessage = new MqttApplicationMessageBuilder()

// .WithTopic(“A/B/C”)

// .WithPayload(“Hello World”)

// .WithAtLeastOnceQoS()

// .Build();

// await mqttClient.PublishAsync(applicationMessage);

//}

}

catch (Exception exception)

{

Console.WriteLine(exception);

}

}

private void tb_username_TextChanged(object sender, EventArgs e)

{

}

private void but_clientsend_Click(object sender, EventArgs e)

{

if (mqttClient != null)

{

var message = new MqttApplicationMessageBuilder();

message.WithTopic(txtb_msgtopic.Text.Trim());

message.WithPayload(rtb_pubmsg.Text.Trim());

message.WithExactlyOnceQoS();

message.WithRetainFlag();

mqttClient.PublishAsync(message.Build());

}

}

private async void but_submsg_Click(object sender, EventArgs k)

{

if (mqttClient != null)

{

//mqttClient.ApplicationMessageReceived += (s, e) =>

//{

// rtb_submsgclient.AppendText(“### RECEIVED APPLICATION MESSAGE ###”);

// rtb_submsgclient.AppendText($”+ Topic = {e.ApplicationMessage.Topic}”);

// rtb_submsgclient.AppendText($”+ Payload = {Encoding.UTF8.GetString(e.ApplicationMessage.Payload)}”);

// rtb_submsgclient.AppendText($”+ QoS = {e.ApplicationMessage.QualityOfServiceLevel}”);

// rtb_submsgclient.AppendText($”+ Retain = {e.ApplicationMessage.Retain}”);

//};

var meww= await mqttClient.SubscribeAsync(new TopicFilter(txtb_subtopic.Text.Trim(), MqttQualityOfServiceLevel.AtMostOnce));

订阅主题

//IList subresult = await mqttClient.SubscribeAsync(new TopicFilterBuilder().WithTopic(txtb_subtopic.Text.Trim()).Build());

// if (subresult != null)

// {

// List result = subresult.ToList();

// if (result.Count > 0)

// {

// this.Invoke(new Action(() =>

// {

// foreach (var item in result)

// {

// rtb_submsgclient.AppendText(item.TopicFilter.QualityOfServiceLevel.ToString() + “\n”);

// rtb_submsgclient.AppendText(item.TopicFilter.Topic.ToString() + “\n”);

// rtb_submsgclient.AppendText(item.ReturnCode.ToString() + “\n”);

// }

// }));

// }

// }

}

}

}

}

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

发布者:全栈程序员-用户IM,转载请注明出处:https://javaforall.cn/152866.html原文链接:https://javaforall.cn

【正版授权,激活自己账号】: Jetbrains全家桶Ide使用,1年售后保障,每天仅需1毛

【官方授权 正版激活】: 官方授权 正版激活 支持Jetbrains家族下所有IDE 使用个人JB账号...

(0)
blank

相关推荐

发表回复

您的电子邮箱地址不会被公开。

关注全栈程序员社区公众号