大家好,又见面了,我是你们的朋友全栈君。
创建windows服务网上有很多,不多述;
服务端做好后一定要写bat安装卸载文件
install.bat
@echo.请稍等,MqttNetServiceAddUserAndPassword服务安装启动中…………
@echo off
@title 安装windows服务:MqttNetServiceAddUserAndPassword
@sc create MqttNetServiceAddUserAndPassword binPath=”%~dp0\MqttNetServiceAddUserAndPassword.exe”
@sc config MqttNetServiceAddUserAndPassword start= auto
@sc start MqttNetServiceAddUserAndPassword
@echo.MqttNetServiceAddUserAndPassword启动完毕
pause
//binPath=”%~dp0\MqttNetServiceAddUserAndPassword.exe” 当前路径,也可指定
delete.bat
@echo.服务MqttNetServiceAddUserAndPassword卸载中……….
@echo off
@sc stop MqttNetServiceAddUserAndPassword
@sc delete MqttNetServiceAddUserAndPassword
@echo off
@echo.MqttNetServiceAddUserAndPassword卸载完毕
@pause
服务端:
using MQTTnet;
using MQTTnet.Protocol;
using MQTTnet.Server;
using Newtonsoft.Json;
using System;
using System.Collections.Generic;
using System.ComponentModel;
using System.Data;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Net;
using System.Net.Sockets;
using System.ServiceProcess;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Timers;
namespace MqttNetServiceAddUserAndPassword
{
public partial class Service1 : ServiceBase
{
private readonly static object locker = new object();
private MqttServer mqttServer = null;
private System.Timers.Timer timer = null;
private GodSharp.Sockets.SocketServer socketService = null;
//此集合用于判断写入日志在一段时间内不重,以客户端id为依据,最多2000个清零;
private List<string> subClientIDs = new List<string>();
public Service1()
{
InitializeComponent();
//创建一个定时器,检查5s内有多少客户端接入并将相关信息记录到日志中
timer = new System.Timers.Timer();
timer.AutoReset = true;
timer.Enabled = true;
timer.Interval = 5000;
timer.Elapsed += new ElapsedEventHandler(GetSubClientSAndSetShow);
}
protected override void OnStart(string[] args)
{
//开启服务
//CreateMQTTServer();
Task.Run(CreateMQTTServer);
if (timer.Enabled == false)
{
timer.Enabled = true;
timer.Start();
}
//创建socket服务端
//CreateServerSocket();
// SocketServer.StartSocketService();
}
protected override void OnStop()
{
if (timer.Enabled == true)
{
timer.Enabled = false;
timer.Stop();
}
}
/// <summary>
/// 开启服务
/// </summary>
private async Task CreateMQTTServer()
{
if (mqttServer == null)
{
var optionsBuilder = new MqttServerOptionsBuilder();
optionsBuilder.WithConnectionValidator(c =>
{
if (c.ClientId.Length < 5 || !c.ClientId.StartsWith(“Eohi_”))
{
c.ReturnCode = MqttConnectReturnCode.ConnectionRefusedIdentifierRejected;
return;
}
if (c.Username != “user” || c.Password != “123456”)
{
c.ReturnCode = MqttConnectReturnCode.ConnectionRefusedBadUsernameOrPassword;
return;
}
c.ReturnCode = MqttConnectReturnCode.ConnectionAccepted;
});
//指定 ip地址,默认为本地,但此方法不能使用ipaddress报错,有哪位大神帮解答,感激。
//options.WithDefaultEndpointBoundIPAddress(IPAddress.Parse(“”))
//指定端口
optionsBuilder.WithDefaultEndpointPort(1884);
//连接记录数,默认 一般为2000
//optionsBuilder.WithConnectionBacklog(2000);
mqttServer = new MqttFactory().CreateMqttServer() as MqttServer;
string msg = null;
//将发送的消息加到日志
mqttServer.ApplicationMessageReceived += (s, e) =>
{
msg = @”发送消息的客户端id:” + e.ClientId + “\r\n”
+ “发送时间:” + DateTime.Now + “\r\n”
+ “发送消息的主题:” + e.ApplicationMessage.Topic + “\r\n”
+ “发送的消息内容:” + Encoding.UTF8.GetString(e.ApplicationMessage.Payload ?? new byte[0]) + “\r\n”
+ “————————————————–\r\n”
;
WriteMsgLog(msg);
};
await mqttServer.StartAsync(optionsBuilder.Build());
}
}
#region 记录日志
/// <summary>
/// 消息记录日志
/// </summary>
/// <param name=”msg”></param>
private void WriteMsgLog(string msg)
{
//string path = @”C:\log.txt”;
//该日志文件会存在windows服务程序目录下
string path = AppDomain.CurrentDomain.BaseDirectory + “\\Msglog.txt”;
FileInfo file = new FileInfo(path);
if (!file.Exists)
{
FileStream fs;
fs = File.Create(path);
fs.Close();
}
using (FileStream fs = new FileStream(path, FileMode.Append, FileAccess.Write))
{
using (StreamWriter sw = new StreamWriter(fs))
{
sw.WriteLine(DateTime.Now.ToString() + ” ” + msg);
}
}
}
private void PubMessage(string topic, string msg)
{
if (mqttServer != null)
{
lock (locker)
{
var message = new MqttApplicationMessageBuilder();
message.WithTopic(topic);
message.WithPayload(msg);
mqttServer.PublishAsync(message.Build());
}
}
}
/// <summary>
///客户端链接日志 客户端接入
/// </summary>
/// <param name=”msg”></param>
private void WriteClientLinkLog(string msg)
{
//string path = @”C:\log.txt”;
//该日志文件会存在windows服务程序目录下
string path = AppDomain.CurrentDomain.BaseDirectory + “\\ClientLinklog.txt”;
FileInfo file = new FileInfo(path);
if (!file.Exists)
{
FileStream fs;
fs = File.Create(path);
fs.Close();
}
using (FileStream fs = new FileStream(path, FileMode.Append, FileAccess.Write))
{
using (StreamWriter sw = new StreamWriter(fs))
{
sw.WriteLine(msg);
}
}
}
/// <summary>
/// 通过定时器将客户端链接信息写入日志
/// </summary>
/// <param name=”sender”></param>
/// <param name=”e”></param>
private void GetSubClientSAndSetShow(object sender, ElapsedEventArgs e)
{
// List<SetServiceM> dic = new List<SetServiceM>();
if (mqttServer != null)
{
List<ConnectedMqttClient> subclients = mqttServer.GetConnectedClientsAsync().Result.ToList();
if (subclients.Count > 0)
{
string subclientcount = @”客户端接入的总数为:” + (subclients.Count – 1).ToString() + “\r\n”
+ “——————————————————- \r\n”;
WriteClientLinkLog(subclientcount);
PubMessage(“ClientsCount”, (subclients.Count – 1).ToString());
List<string> clientids = new List<string>();
//连接客户端的个数
// dic.Add(SetServiceM.SetService( “ClientCount”, subclients.Count.ToString()));
// var dicclientlink = new Dictionary<string, string>();
foreach (var item in subclients)
{
if (!subClientIDs.Contains(item.ClientId))
{
subClientIDs.Add(item.ClientId);
string msg = @”连接客户端ID:” + item.ClientId + “\r\n”
+ “连接时间:” + DateTime.Now + “\r\n”
+ “协议版本:” + item.ProtocolVersion + “\r\n”
+ “最后收到的非保持活包:” + item.LastNonKeepAlivePacketReceived + “\r\n”
+ “最后收到的包:” + item.LastPacketReceived + “\r\n”
+ “挂起的应用程序消息:” + item.PendingApplicationMessages + “\r\n”
+ “————————————————” + “\r\n”;
WriteClientLinkLog(msg);
PubMessage(“clientlink”, msg);
// mqttServer.PublishAsync(“clientlink”, msg);
// dicclientlink.Add(item.ClientId, msg);
}
clientids.Add(item.ClientId);
}
if (subClientIDs.Count >= 2000)
{
subClientIDs.Clear();
}
var exceptlist = subClientIDs.Except(clientids).ToList();
// var dicclientoutline = new Dictionary<string, string>();
if (exceptlist.Count > 0)
{
exceptlist.ForEach(u =>
{
string msgoutline = @”客户端下线ID:” + u + “\r\n”
+ “客户端下线时间:” + DateTime.Now.ToString() + “\r\n”
+ “———————————————————— \r\n”
;
WriteClientLinkLog(msgoutline);
subClientIDs.Remove(u);
PubMessage(“clientlink”, msgoutline);
// mqttServer.PublishAsync(“clientlink”, msgoutline);
// dicclientoutline.Add(“OutLineID_” + u, msgoutline);
});
}
连接客户端的id
//dic.Add(SetServiceM.SetService(“clientlink”, JsonConvert.SerializeObject(dicclientlink)));
客户端下线的时间
//dic.Add(SetServiceM.SetService(“clientoutline”, JsonConvert.SerializeObject(dicclientoutline)));
//SocketServer.connection.Send(Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(dic)));
}
else
{
string subclientcount = @”暂无客户端接入!” + “\r\n”
+ “——————————————————– \r\n”;
WriteClientLinkLog(subclientcount);
}
}
}
/// <summary>
/// 客户端下线时间
/// </summary>
/// <param name=”msg”></param>
public void WriteClientOutLineLog(string msg)
{
string path = AppDomain.CurrentDomain.BaseDirectory + “\\ClientOutLineLog.txt”;
FileInfo file = new FileInfo(path);
if (!file.Exists)
{
FileStream fs = File.Create(path);
fs.Close();
}
using (FileStream fs = new FileStream(path, FileMode.Append, FileAccess.Write))
{
using (StreamWriter sw = new StreamWriter(fs))
{
sw.WriteLine(msg);
}
}
}
//windows服务里的服务端
private void CreateServerSocket()
{
if (socketService == null)
{
// IPEndPoint ipep = new IPEndPoint(IPAddress.Parse(“127.0.0.1”), 9001);
socketService = new GodSharp.Sockets.SocketServer(“127.0.0.1”, 9001, ProtocolType.Tcp); //Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
socketService.Start();
socketService.Listen(10);
Thread thread = new Thread(new ThreadStart(new Action(() =>
{
while (true)
{
// socketClient = socketService.Clients[0];
// string data = “sql|” ; //在这里封装数据,通常是自己定义一种数据结构,如struct data{sql;result}
// client.Send(Encoding.Default.GetBytes(msg));
}
})));
}
else
{
CreateServerSocket();
}
}
#endregion
}
}
服务端桌面显示程序:
using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Protocol;
using System;
using System.Collections.Generic;
using System.ComponentModel;
using System.Data;
using System.Diagnostics;
using System.Drawing;
using System.Linq;
using System.Net;
using System.Net.NetworkInformation;
using System.Net.Sockets;
using System.ServiceProcess;
using System.Text;
using System.Text.RegularExpressions;
using System.Threading;
using System.Threading.Tasks;
using System.Windows.Forms;
namespace MQTTNetFrm
{
public partial class Form1 : Form
{
private ServiceController ServiceController = null;
private MqttClientOptions options = null;
private MqttClient mqttClient = null;
public Form1()
{
InitializeComponent();
new Thread(new ThreadStart(GetServiceStatus)).Start();
Task.Run(LinkClientService).Wait();
}
/// <summary>
/// 获取当前ip地址
/// </summary>
/// <returns></returns>
private string GetLocalIP()
{
string ip = null;
var iplist = Dns.GetHostAddresses(Dns.GetHostName()).DefaultIfEmpty().ToList();
iplist.ForEach(u =>
{
if (u.AddressFamily == AddressFamily.InterNetwork)
ip= u.ToString();
});
return ip;
}
private async Task LinkClientService()
{
var m = “Eohi_Frm_” + Guid.NewGuid().ToString();
options = new MqttClientOptions
{
ClientId = m,
CleanSession = true,
ChannelOptions = new MqttClientTcpOptions
{
Server = GetLocalIP(),
Port = 1884,
},
Credentials = new MqttClientCredentials()
{
Username = “user”,
Password = “123456”
}
};
var factory = new MqttFactory();
mqttClient = factory.CreateMqttClient() as MqttClient;
try
{
await mqttClient.ConnectAsync(options);
but_submsg_Click();
this.Invoke(new Action(() => { lab_serverstatus.Text = “连接正常,服务运行中…………”; }));
}
catch (Exception ex)
{
}
}
private async void but_submsg_Click()
{
if (mqttClient != null)
{
await mqttClient.SubscribeAsync(new TopicFilter(“ClientsCount”, MqttQualityOfServiceLevel.AtMostOnce));
await mqttClient.SubscribeAsync(new TopicFilter(“clientlink”, MqttQualityOfServiceLevel.AtMostOnce));
await mqttClient.SubscribeAsync(new TopicFilter(“msglog”, MqttQualityOfServiceLevel.AtMostOnce));
mqttClient.ApplicationMessageReceived += (s, e) =>
{
this.Invoke(new Action(() =>
{
var msg = Encoding.UTF8.GetString(e.ApplicationMessage.Payload);
if (msg.Length<=5)
{
lab_clientcount.Text = msg;
}
if (msg.Length>10)
{
if (msg.StartsWith(“连接”) )
rtb_clientlog.AppendText(msg);
rtb_msglog.AppendText(msg);
}
}));
};
}
}
private void GetServiceStatus()
{
ServiceController[] serviceControllers = ServiceController.GetServices();
if (serviceControllers.Length > 0)
{
serviceControllers.ToList().ForEach(u =>
{
if (u.DisplayName == “MqttNetServiceAddUserAndPassword”)
{
if (ServiceController == null)
{
ServiceController = u;
}
if (u.Status == ServiceControllerStatus.Running)
{
lab_serverstatus.Text = “服务运行中…………”;
}
else
{
lab_serverstatus.Text = “服务已停止…………”;
}
}
});
}
}
private void button2_Click(object sender, EventArgs e)
{
if (tabControl1.SelectedTab == tabPage1)
{
rtb_clientlog.Text = “”;
}
else
{
rtb_msglog.Text = “”;
}
}
private void Form1_Load(object sender, EventArgs e)
{
}
}
}
客户端:
using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Protocol;
using System;
using System.Collections.Generic;
using System.ComponentModel;
using System.Data;
using System.Drawing;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Timers;
using System.Windows.Forms;
namespace MqttClientTest01
{
public partial class Form1 : Form
{
private MqttClient mqttClient = null;
private System.Timers.Timer timer = null;
private int CountLink = 0;
private MqttClientOptions options = null;
public Form1()
{
InitializeComponent();
创建一个定时器,检查5s内有多少客户端接入并将相关信息记录到日志中
//timer = new System.Timers.Timer();
//timer.AutoReset = true;
//timer.Interval = 1000;
//timer.Elapsed += new ElapsedEventHandler(LinkMqttNetService);
}
private void LinkMqttNetService(object sender, ElapsedEventArgs e)
{
if (mqttClient == null)
{
// RunAsync();
CountLink++;
}
if (CountLink >= 5)
{
MessageBox.Show(“连接多次失败,请确认各参数是否正确!”);
CountLink = 0;
timer.Enabled = false;
}
}
private void but_linkserver_Click(object sender, EventArgs k)
{
LinkClientService();
//CountLink = 0;
//timer.Enabled = true;
//timer.Start();
}
/// <summary>
/// 链接客户端
/// </summary>
public async void LinkClientService()
{
var m = “Eohi_” + Guid.NewGuid().ToString();
options = new MqttClientOptions
{
ClientId = m,
CleanSession = true,
ChannelOptions = new MqttClientTcpOptions
{
Server = txtb_serverip.Text.Trim(),
Port = Convert.ToInt32(txtb_serverport.Text.Trim()),
},
Credentials = new MqttClientCredentials()
{
Username = tb_username.Text,
Password = tb_userpwd.Text
}
};
var factory = new MqttFactory();
mqttClient = factory.CreateMqttClient() as MqttClient;
try
{
await mqttClient.ConnectAsync(options);
this.Invoke(new Action(() =>
{
lab_linkstatus.Text = “连接成功!”;
lab_linktimer.Text = DateTime.Now.ToString();
}));
mqttClient.Disconnected += async (s, e) =>
{
if (e.ClientWasConnected==false)
{
try
{
await mqttClient.ConnectAsync(options);
this.Invoke(new Action(() =>
{
lab_linkstatus.Text = “连接成功!”;
lab_linktimer.Text = DateTime.Now.ToString();
}));
}
catch (Exception ex)
{
lab_linkstatus.Text = “连接失败!”+ex.Message;
lab_linktimer.Text = DateTime.Now.ToString();
}
}
};
}
catch (Exception ex)
{
lab_linkstatus.Text = “连接失败!请检查ip/端口” ;
lab_linktimer.Text = DateTime.Now.ToString();
}
}
private void tb_username_TextChanged(object sender, EventArgs e)
{
}
private void but_clientsend_Click(object sender, EventArgs e)
{
if (mqttClient != null)
{
var message = new MqttApplicationMessageBuilder();
message.WithTopic(txtb_msgtopic.Text.Trim());
message.WithPayload(rtb_pubmsg.Text.Trim());
message.WithExactlyOnceQoS();
message.WithRetainFlag();
mqttClient.PublishAsync(message.Build());
}
}
private async void but_submsg_Click(object sender, EventArgs k)
{
if (mqttClient != null)
{
await mqttClient.SubscribeAsync(new TopicFilter(txtb_subtopic.Text.Trim(), MqttQualityOfServiceLevel.AtMostOnce));
mqttClient.ApplicationMessageReceived += (s, e) =>
{
this.Invoke(new Action(() =>
{
rtb_submsgclient.AppendText(“ClientID=” + e.ClientId + “\n”);
rtb_submsgclient.AppendText($”+ Topic = {e.ApplicationMessage.Topic}” + “\n”);
rtb_submsgclient.AppendText($”+ Payload = {Encoding.UTF8.GetString(e.ApplicationMessage.Payload) + “\n”}”);
rtb_submsgclient.AppendText($”+ QoS = {e.ApplicationMessage.QualityOfServiceLevel}” + “\n”);
rtb_submsgclient.AppendText($”+ Retain = {e.ApplicationMessage.Retain}” + “\n”);
}));
};
}
}
private void button1_Click(object sender, EventArgs e)
{
rtb_submsgclient.Text = “”;
}
}
}
发布者:全栈程序员-用户IM,转载请注明出处:https://javaforall.cn/152869.html原文链接:https://javaforall.cn
【正版授权,激活自己账号】: Jetbrains全家桶Ide使用,1年售后保障,每天仅需1毛
【官方授权 正版激活】: 官方授权 正版激活 支持Jetbrains家族下所有IDE 使用个人JB账号...