mqttnet消息推送与接收[通俗易懂]

mqttnet消息推送与接收[通俗易懂]创建windows服务网上有很多,不多述;服务端做好后一定要写bat安装卸载文件install.bat@echo.请稍等,MqttNetServiceAddUserAndPassword服务安装启动中…………@echooff@title安装windows服务:MqttNetServiceAddUserAndPassword@sccreateMqttNetServiceAdd…

大家好,又见面了,我是你们的朋友全栈君。

创建windows服务网上有很多,不多述;

服务端做好后一定要写bat安装卸载文件

install.bat

@echo.请稍等,MqttNetServiceAddUserAndPassword服务安装启动中…………
@echo off
@title 安装windows服务:MqttNetServiceAddUserAndPassword
@sc create MqttNetServiceAddUserAndPassword binPath=”%~dp0\MqttNetServiceAddUserAndPassword.exe”
@sc config MqttNetServiceAddUserAndPassword start= auto
@sc start MqttNetServiceAddUserAndPassword
@echo.MqttNetServiceAddUserAndPassword启动完毕
pause

//binPath=”%~dp0\MqttNetServiceAddUserAndPassword.exe”   当前路径,也可指定

delete.bat

@echo.服务MqttNetServiceAddUserAndPassword卸载中……….
@echo off
@sc stop MqttNetServiceAddUserAndPassword
@sc delete MqttNetServiceAddUserAndPassword
@echo off
@echo.MqttNetServiceAddUserAndPassword卸载完毕
@pause

服务端:

using MQTTnet;
using MQTTnet.Protocol;
using MQTTnet.Server;
using Newtonsoft.Json;
using System;
using System.Collections.Generic;
using System.ComponentModel;
using System.Data;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Net;
using System.Net.Sockets;
using System.ServiceProcess;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Timers;

namespace MqttNetServiceAddUserAndPassword
{

    public partial class Service1 : ServiceBase
    {

        private readonly static object locker = new object();
        private MqttServer mqttServer = null;
        private System.Timers.Timer timer = null;

        private GodSharp.Sockets.SocketServer socketService = null;

 

        //此集合用于判断写入日志在一段时间内不重,以客户端id为依据,最多2000个清零;
        private List<string> subClientIDs = new List<string>();
        public Service1()
        {

            InitializeComponent();
            //创建一个定时器,检查5s内有多少客户端接入并将相关信息记录到日志中
            timer = new System.Timers.Timer();
            timer.AutoReset = true;
            timer.Enabled = true;
            timer.Interval = 5000;
            timer.Elapsed += new ElapsedEventHandler(GetSubClientSAndSetShow);

        }

        protected override void OnStart(string[] args)
        {

            //开启服务
            //CreateMQTTServer();

            Task.Run(CreateMQTTServer);

            if (timer.Enabled == false)
            {

                timer.Enabled = true;
                timer.Start();
            }
            //创建socket服务端
            //CreateServerSocket();
        //    SocketServer.StartSocketService();
        }

        protected override void OnStop()
        {

            if (timer.Enabled == true)
            {

                timer.Enabled = false;
                timer.Stop();
            }
        }
        /// <summary>
        /// 开启服务
        /// </summary>
        private async Task CreateMQTTServer()
        {

            if (mqttServer == null)
            {

                var optionsBuilder = new MqttServerOptionsBuilder();
                optionsBuilder.WithConnectionValidator(c =>
                {

                    if (c.ClientId.Length < 5 || !c.ClientId.StartsWith(“Eohi_”))
                    {

                        c.ReturnCode = MqttConnectReturnCode.ConnectionRefusedIdentifierRejected;
                        return;
                    }

                    if (c.Username != “user” || c.Password != “123456”)
                    {

                        c.ReturnCode = MqttConnectReturnCode.ConnectionRefusedBadUsernameOrPassword;
                        return;
                    }
                    c.ReturnCode = MqttConnectReturnCode.ConnectionAccepted;
                });
                //指定 ip地址,默认为本地,但此方法不能使用ipaddress报错,有哪位大神帮解答,感激。
                //options.WithDefaultEndpointBoundIPAddress(IPAddress.Parse(“”))
                //指定端口
                optionsBuilder.WithDefaultEndpointPort(1884);
                //连接记录数,默认 一般为2000
                //optionsBuilder.WithConnectionBacklog(2000);
                mqttServer = new MqttFactory().CreateMqttServer() as MqttServer;
                string msg = null;
             
                //将发送的消息加到日志                      
                mqttServer.ApplicationMessageReceived += (s, e) =>
                {

                    msg = @”发送消息的客户端id:” + e.ClientId + “\r\n”
                  + “发送时间:” + DateTime.Now + “\r\n”
                  + “发送消息的主题:” + e.ApplicationMessage.Topic + “\r\n”
                 + “发送的消息内容:” + Encoding.UTF8.GetString(e.ApplicationMessage.Payload ?? new byte[0]) + “\r\n”
                 + “————————————————–\r\n”
                 ;
                    WriteMsgLog(msg);
                };
                await mqttServer.StartAsync(optionsBuilder.Build());

            }
        }
        #region 记录日志  
        /// <summary>  
        /// 消息记录日志  
        /// </summary>  
        /// <param name=”msg”></param>  
        private void WriteMsgLog(string msg)
        {

            //string path = @”C:\log.txt”;  

            //该日志文件会存在windows服务程序目录下  
            string path = AppDomain.CurrentDomain.BaseDirectory + “\\Msglog.txt”;
            FileInfo file = new FileInfo(path);
            if (!file.Exists)
            {

                FileStream fs;
                fs = File.Create(path);
                fs.Close();
            }
            using (FileStream fs = new FileStream(path, FileMode.Append, FileAccess.Write))
            {

                using (StreamWriter sw = new StreamWriter(fs))
                {

                    sw.WriteLine(DateTime.Now.ToString() + ”   ” + msg);
                }
            }
        }
        private void PubMessage(string topic, string msg)
        {

            if (mqttServer != null)
            {

                lock (locker)
                {

                    var message = new MqttApplicationMessageBuilder();
                    message.WithTopic(topic);
                    message.WithPayload(msg);
                    mqttServer.PublishAsync(message.Build());
                }
            }
        }
        /// <summary>
        ///客户端链接日志           客户端接入
        /// </summary>
        /// <param name=”msg”></param>
        private void WriteClientLinkLog(string msg)
        {

            //string path = @”C:\log.txt”;  

            //该日志文件会存在windows服务程序目录下  
            string path = AppDomain.CurrentDomain.BaseDirectory + “\\ClientLinklog.txt”;
            FileInfo file = new FileInfo(path);
            if (!file.Exists)
            {

                FileStream fs;
                fs = File.Create(path);
                fs.Close();
            }

            using (FileStream fs = new FileStream(path, FileMode.Append, FileAccess.Write))
            {

                using (StreamWriter sw = new StreamWriter(fs))
                {

                    sw.WriteLine(msg);
                }
            }
        }
        /// <summary>
        /// 通过定时器将客户端链接信息写入日志      
        /// </summary>
        /// <param name=”sender”></param>
        /// <param name=”e”></param>
        private void GetSubClientSAndSetShow(object sender, ElapsedEventArgs e)
        {

            // List<SetServiceM> dic = new List<SetServiceM>();   
            if (mqttServer != null)
            {

                List<ConnectedMqttClient> subclients = mqttServer.GetConnectedClientsAsync().Result.ToList();
                if (subclients.Count > 0)
                {

                    string subclientcount = @”客户端接入的总数为:” + (subclients.Count – 1).ToString() + “\r\n”
                        + “——————————————————- \r\n”;
                    WriteClientLinkLog(subclientcount);
                    PubMessage(“ClientsCount”, (subclients.Count – 1).ToString());
                    List<string> clientids = new List<string>();
                    //连接客户端的个数
                    //   dic.Add(SetServiceM.SetService( “ClientCount”, subclients.Count.ToString()));
                    //   var dicclientlink = new Dictionary<string, string>();

                    foreach (var item in subclients)
                    {

                        if (!subClientIDs.Contains(item.ClientId))
                        {

                            subClientIDs.Add(item.ClientId);
                            string msg = @”连接客户端ID:” + item.ClientId + “\r\n”
                            + “连接时间:” + DateTime.Now + “\r\n”
                            + “协议版本:” + item.ProtocolVersion + “\r\n”
                            + “最后收到的非保持活包:” + item.LastNonKeepAlivePacketReceived + “\r\n”
                            + “最后收到的包:” + item.LastPacketReceived + “\r\n”
                            + “挂起的应用程序消息:” + item.PendingApplicationMessages + “\r\n”
                            + “————————————————” + “\r\n”;
                            WriteClientLinkLog(msg);
                            PubMessage(“clientlink”, msg);
                            //    mqttServer.PublishAsync(“clientlink”, msg);
                            //    dicclientlink.Add(item.ClientId, msg);
                        }
                        clientids.Add(item.ClientId);
                    }
                    if (subClientIDs.Count >= 2000)
                    {

                        subClientIDs.Clear();
                    }
                    var exceptlist = subClientIDs.Except(clientids).ToList();
                    //  var dicclientoutline = new Dictionary<string, string>();
                    if (exceptlist.Count > 0)
                    {

                        exceptlist.ForEach(u =>
                        {

                            string msgoutline = @”客户端下线ID:” + u + “\r\n”
                       + “客户端下线时间:” + DateTime.Now.ToString() + “\r\n”
                       + “———————————————————— \r\n”
                       ;
                            WriteClientLinkLog(msgoutline);
                            subClientIDs.Remove(u);
                            PubMessage(“clientlink”, msgoutline);
                        //     mqttServer.PublishAsync(“clientlink”, msgoutline);
                        // dicclientoutline.Add(“OutLineID_” + u, msgoutline);
                    });
                    }
                    连接客户端的id
                    //dic.Add(SetServiceM.SetService(“clientlink”, JsonConvert.SerializeObject(dicclientlink)));
                    客户端下线的时间
                    //dic.Add(SetServiceM.SetService(“clientoutline”, JsonConvert.SerializeObject(dicclientoutline)));
                    //SocketServer.connection.Send(Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(dic)));
                }
                else
                {

                    string subclientcount = @”暂无客户端接入!” + “\r\n”
                     + “——————————————————– \r\n”;
                    WriteClientLinkLog(subclientcount);
                }
            }
        }
        /// <summary>
        /// 客户端下线时间
        /// </summary>
        /// <param name=”msg”></param>
        public void WriteClientOutLineLog(string msg)
        {

            string path = AppDomain.CurrentDomain.BaseDirectory + “\\ClientOutLineLog.txt”;
            FileInfo file = new FileInfo(path);
            if (!file.Exists)
            {

                FileStream fs = File.Create(path);
                fs.Close();
            }
            using (FileStream fs = new FileStream(path, FileMode.Append, FileAccess.Write))
            {

                using (StreamWriter sw = new StreamWriter(fs))
                {

                    sw.WriteLine(msg);
                }
            }
        }
        //windows服务里的服务端
        private void CreateServerSocket()
        {

            if (socketService == null)
            {

                // IPEndPoint ipep = new IPEndPoint(IPAddress.Parse(“127.0.0.1”), 9001);
                socketService = new GodSharp.Sockets.SocketServer(“127.0.0.1”, 9001, ProtocolType.Tcp);  //Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
                socketService.Start();
                socketService.Listen(10);
                Thread thread = new Thread(new ThreadStart(new Action(() =>
                {

                    while (true)
                    {

                    //  socketClient = socketService.Clients[0];
                    // string data = “sql|” ; //在这里封装数据,通常是自己定义一种数据结构,如struct data{sql;result}
                    // client.Send(Encoding.Default.GetBytes(msg));
                }
                })));
            }
            else
            {

                CreateServerSocket();
            }
        }

        #endregion
    }

}

服务端桌面显示程序:

using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Protocol;
using System;
using System.Collections.Generic;
using System.ComponentModel;
using System.Data;
using System.Diagnostics;
using System.Drawing;
using System.Linq;
using System.Net;
using System.Net.NetworkInformation;
using System.Net.Sockets;
using System.ServiceProcess;
using System.Text;
using System.Text.RegularExpressions;
using System.Threading;
using System.Threading.Tasks;
using System.Windows.Forms;

namespace MQTTNetFrm
{

    public partial class Form1 : Form
    {

        private ServiceController ServiceController = null;
        private MqttClientOptions options = null;
        private MqttClient mqttClient = null;

        public Form1()
        {

            InitializeComponent();

   
            new Thread(new ThreadStart(GetServiceStatus)).Start();

          Task.Run(LinkClientService).Wait();
        }
        /// <summary>
        /// 获取当前ip地址
        /// </summary>
        /// <returns></returns>
        private  string GetLocalIP()
        {

            string ip = null;
          var iplist = Dns.GetHostAddresses(Dns.GetHostName()).DefaultIfEmpty().ToList();
            iplist.ForEach(u =>
            {

                if (u.AddressFamily == AddressFamily.InterNetwork)
                    ip= u.ToString();
            });
            return ip;
        }
        private async Task LinkClientService()
        {

            var m = “Eohi_Frm_” + Guid.NewGuid().ToString();
            options = new MqttClientOptions
            {

                ClientId = m,
                CleanSession = true,
                ChannelOptions = new MqttClientTcpOptions
                {

                    Server = GetLocalIP(),
                    Port = 1884,
                },
                Credentials = new MqttClientCredentials()
                {

                    Username = “user”,
                    Password = “123456”
                }

            };
            var factory = new MqttFactory();
            mqttClient = factory.CreateMqttClient() as MqttClient;
            try
            {

                await mqttClient.ConnectAsync(options);
                but_submsg_Click();
                this.Invoke(new Action(() => { lab_serverstatus.Text = “连接正常,服务运行中…………”; }));
            }
            catch (Exception ex)
            {

            }

        }
        private async void but_submsg_Click()
        {

            if (mqttClient != null)
            {

                await mqttClient.SubscribeAsync(new TopicFilter(“ClientsCount”, MqttQualityOfServiceLevel.AtMostOnce));
                await mqttClient.SubscribeAsync(new TopicFilter(“clientlink”, MqttQualityOfServiceLevel.AtMostOnce));
                await mqttClient.SubscribeAsync(new TopicFilter(“msglog”, MqttQualityOfServiceLevel.AtMostOnce));
                mqttClient.ApplicationMessageReceived += (s, e) =>
                {

                    this.Invoke(new Action(() =>
                    {

                        var msg = Encoding.UTF8.GetString(e.ApplicationMessage.Payload);
                        if (msg.Length<=5)
                        {

                            lab_clientcount.Text = msg;
                        }
                        if (msg.Length>10)
                        {

                            if (msg.StartsWith(“连接”)    )
                                rtb_clientlog.AppendText(msg);
                            rtb_msglog.AppendText(msg);
                        }
   

                    }));
                };

            }
        }
        private void GetServiceStatus()
        {

            ServiceController[] serviceControllers = ServiceController.GetServices();
            if (serviceControllers.Length > 0)
            {

                serviceControllers.ToList().ForEach(u =>
                {

                    if (u.DisplayName == “MqttNetServiceAddUserAndPassword”)
                    {

                        if (ServiceController == null)
                        {

                            ServiceController = u;
                        }
                        if (u.Status == ServiceControllerStatus.Running)
                        {

                            lab_serverstatus.Text = “服务运行中…………”;
                        }
                        else
                        {

                            lab_serverstatus.Text = “服务已停止…………”;
                        }
                    }
                });
            }
        }
        private void button2_Click(object sender, EventArgs e)
        {

            if (tabControl1.SelectedTab == tabPage1)
            {

                rtb_clientlog.Text = “”;
            }
            else
            {

                rtb_msglog.Text = “”;
            }
        }

        private void Form1_Load(object sender, EventArgs e)
        {

        }
    }

}

客户端:

using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Protocol;
using System;
using System.Collections.Generic;
using System.ComponentModel;
using System.Data;
using System.Drawing;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Timers;
using System.Windows.Forms;

namespace MqttClientTest01
{

    public partial class Form1 : Form
    {

        private MqttClient mqttClient = null;
        private System.Timers.Timer timer = null;
        private int CountLink = 0;
        private MqttClientOptions options = null;
        public Form1()
        {

            InitializeComponent();
            创建一个定时器,检查5s内有多少客户端接入并将相关信息记录到日志中
            //timer = new System.Timers.Timer();
            //timer.AutoReset = true;
            //timer.Interval = 1000;
            //timer.Elapsed += new ElapsedEventHandler(LinkMqttNetService);
        }

        private void LinkMqttNetService(object sender, ElapsedEventArgs e)
        {

            if (mqttClient == null)
            {

                //   RunAsync();
                CountLink++;
            }
            if (CountLink >= 5)
            {

                MessageBox.Show(“连接多次失败,请确认各参数是否正确!”);
                CountLink = 0;
                timer.Enabled = false;
            }
        }
        private void but_linkserver_Click(object sender, EventArgs k)
        {

            LinkClientService();
            //CountLink = 0;
            //timer.Enabled = true;
            //timer.Start();
        }
        /// <summary>
        /// 链接客户端
        /// </summary>
        public async  void LinkClientService()
        {

            var m = “Eohi_” + Guid.NewGuid().ToString();
            options = new MqttClientOptions
            {

                ClientId = m,
                CleanSession = true,
                ChannelOptions = new MqttClientTcpOptions
                {

                    Server = txtb_serverip.Text.Trim(),
                    Port = Convert.ToInt32(txtb_serverport.Text.Trim()),
                },
                Credentials = new MqttClientCredentials()
                {

                    Username = tb_username.Text,
                    Password = tb_userpwd.Text
                }

            };
            var factory = new MqttFactory();
            mqttClient =  factory.CreateMqttClient() as MqttClient;
            try
            {

                await mqttClient.ConnectAsync(options);
                this.Invoke(new Action(() =>
                {

                    lab_linkstatus.Text = “连接成功!”;
                    lab_linktimer.Text = DateTime.Now.ToString();
                }));
                mqttClient.Disconnected += async (s, e) =>
                {

                    if (e.ClientWasConnected==false)
                    {

                        try
                        {

                            await mqttClient.ConnectAsync(options);
                            this.Invoke(new Action(() =>
                            {

                                lab_linkstatus.Text = “连接成功!”;
                                lab_linktimer.Text = DateTime.Now.ToString();
                            }));
                        }
                        catch (Exception ex)
                        {

                            lab_linkstatus.Text = “连接失败!”+ex.Message;
                            lab_linktimer.Text = DateTime.Now.ToString();
                        }

                    }
                };
            }
            catch (Exception ex)
            {

                lab_linkstatus.Text = “连接失败!请检查ip/端口” ;
                lab_linktimer.Text = DateTime.Now.ToString();
            }
 
        }

        private void tb_username_TextChanged(object sender, EventArgs e)
        {

        }

        private void but_clientsend_Click(object sender, EventArgs e)
        {

            if (mqttClient != null)
            {

                var message = new MqttApplicationMessageBuilder();
                message.WithTopic(txtb_msgtopic.Text.Trim());
                message.WithPayload(rtb_pubmsg.Text.Trim());
                message.WithExactlyOnceQoS();
                message.WithRetainFlag();
                mqttClient.PublishAsync(message.Build());
            }
        }
        private async void but_submsg_Click(object sender, EventArgs k)
        {

            if (mqttClient != null)
            {

                await mqttClient.SubscribeAsync(new TopicFilter(txtb_subtopic.Text.Trim(), MqttQualityOfServiceLevel.AtMostOnce));
                mqttClient.ApplicationMessageReceived += (s, e) =>
                {

                    this.Invoke(new Action(() =>
                    {

                        rtb_submsgclient.AppendText(“ClientID=” + e.ClientId + “\n”);
                        rtb_submsgclient.AppendText($”+ Topic = {e.ApplicationMessage.Topic}” + “\n”);
                        rtb_submsgclient.AppendText($”+ Payload = {Encoding.UTF8.GetString(e.ApplicationMessage.Payload) + “\n”}”);
                        rtb_submsgclient.AppendText($”+ QoS = {e.ApplicationMessage.QualityOfServiceLevel}” + “\n”);
                        rtb_submsgclient.AppendText($”+ Retain = {e.ApplicationMessage.Retain}” + “\n”);

                    }));

                };

            }
        }

        private void button1_Click(object sender, EventArgs e)
        {

            rtb_submsgclient.Text = “”;
        }
    }
}mqttnet消息推送与接收[通俗易懂]mqttnet消息推送与接收[通俗易懂]

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

发布者:全栈程序员-用户IM,转载请注明出处:https://javaforall.cn/152869.html原文链接:https://javaforall.cn

【正版授权,激活自己账号】: Jetbrains全家桶Ide使用,1年售后保障,每天仅需1毛

【官方授权 正版激活】: 官方授权 正版激活 支持Jetbrains家族下所有IDE 使用个人JB账号...

(0)


相关推荐

  • javaweb权限管理简单实现_javaweb用户权限管理

    javaweb权限管理简单实现_javaweb用户权限管理推荐最新技术springboot版权限管理(java后台通用权限管理系统(springboot)),采用最新技术架构,功能强大!注:由于该项目比较老,所以没有采用maven管理,建议下载springboot权限管理系统,对学习和使用会更有帮助。springboot权限管理系统介绍地址:https://blog.csdn.net/zwx19921215/article/details/978……………

  • React安装:

    React安装:

  • react路由守卫(路由拦截)

    react路由守卫(路由拦截)react不同于vue,通过在路由里设置meta元字符实现路由拦截。在使用Vue,框架提供了路由守卫功能,用来在进入某个路有前进行一些校验工作,如果校验失败,就跳转到404或者登陆页面,比如Vue中的beforeEnter函数:…router.beforeEach(async(to,from,next)=>{consttoPath=to.path;constfromPath=from.path;})…react实现路由拦截的基

  • 机器学习常见的采样方法[通俗易懂]

    机器学习常见的采样方法[通俗易懂]我们在训练模型的过程,都会经常进行数据采样,为了就是让我们的模型可以更好的去学习数据的特征,从而让效果更佳。但这是比较浅层的理解,更本质上,数据采样就是对随机现象的模拟,根据给定的概率分布从而模拟一个

  • 大学生英语竞赛常考词汇汇总_全国大学生英语竞赛难吗

    大学生英语竞赛常考词汇汇总_全国大学生英语竞赛难吗abbreviation节略,缩写,缩短abidevi遵守,vt忍受absent不在意的abolishvt废除,取消abstracta理论上的,n抽象accessory同谋a附属的accord调和,符合,协议acknowledgevt承认,告知收到acquaintvt使认识,使了解adherevi黏附,追随,坚持adjoinvt贴近,毗连,靠近adjustablea.可调整的,可校准的administrationn局(或署、处等)

  • 搭建Eurake集群

    搭建Eurake集群eureka作为SpringCloud的服务发现与注册中心,在整个的微服务体系中,处于核心位置。单一的eureka服务,显然不能满足高可用的实际生产环境,这就要求我们配置一个能够应对各种突发情况,具有较强容灾能力的eureka集群服务。其实我们只需要在部署时候,对eureka的配置文件做相应的修改,运行即可。在项目中,创建三个名字分别为eureka01,eureka02,eureka03的eur…

发表回复

您的电子邮箱地址不会被公开。

关注全栈程序员社区公众号