mqttnet消息推送与接收[通俗易懂]

2022-09-12 20:35:19 浏览数 (1)

大家好,又见面了,我是你们的朋友全栈君。

创建windows服务网上有很多,不多述;

服务端做好后一定要写bat安装卸载文件

install.bat

@echo.请稍等,MqttNetServiceAddUserAndPassword服务安装启动中………… @echo off @title 安装windows服务:MqttNetServiceAddUserAndPassword @sc create MqttNetServiceAddUserAndPassword binPath=”%~dp0MqttNetServiceAddUserAndPassword.exe” @sc config MqttNetServiceAddUserAndPassword start= auto @sc start MqttNetServiceAddUserAndPassword @echo.MqttNetServiceAddUserAndPassword启动完毕 pause

//binPath=”%~dp0MqttNetServiceAddUserAndPassword.exe” 当前路径,也可指定

delete.bat

@echo.服务MqttNetServiceAddUserAndPassword卸载中………. @echo off @sc stop MqttNetServiceAddUserAndPassword @sc delete MqttNetServiceAddUserAndPassword @echo off @echo.MqttNetServiceAddUserAndPassword卸载完毕 @pause

服务端:

using MQTTnet; using MQTTnet.Protocol; using MQTTnet.Server; using Newtonsoft.Json; using System; using System.Collections.Generic; using System.ComponentModel; using System.Data; using System.Diagnostics; using System.IO; using System.Linq; using System.Net; using System.Net.Sockets; using System.ServiceProcess; using System.Text; using System.Threading; using System.Threading.Tasks; using System.Timers;

namespace MqttNetServiceAddUserAndPassword { public partial class Service1 : ServiceBase { private readonly static object locker = new object(); private MqttServer mqttServer = null; private System.Timers.Timer timer = null;

private GodSharp.Sockets.SocketServer socketService = null;

//此集合用于判断写入日志在一段时间内不重,以客户端id为依据,最多2000个清零; private List<string> subClientIDs = new List<string>(); public Service1() { InitializeComponent(); //创建一个定时器,检查5s内有多少客户端接入并将相关信息记录到日志中 timer = new System.Timers.Timer(); timer.AutoReset = true; timer.Enabled = true; timer.Interval = 5000; timer.Elapsed = new ElapsedEventHandler(GetSubClientSAndSetShow);

}

protected override void OnStart(string[] args) { //开启服务 //CreateMQTTServer();

Task.Run(CreateMQTTServer);

if (timer.Enabled == false) { timer.Enabled = true; timer.Start(); } //创建socket服务端 //CreateServerSocket(); // SocketServer.StartSocketService(); }

protected override void OnStop() { if (timer.Enabled == true) { timer.Enabled = false; timer.Stop(); } } /// <summary> /// 开启服务 /// </summary> private async Task CreateMQTTServer() { if (mqttServer == null) { var optionsBuilder = new MqttServerOptionsBuilder(); optionsBuilder.WithConnectionValidator(c => { if (c.ClientId.Length < 5 || !c.ClientId.StartsWith(“Eohi_”)) { c.ReturnCode = MqttConnectReturnCode.ConnectionRefusedIdentifierRejected; return; }

if (c.Username != “user” || c.Password != “123456”) { c.ReturnCode = MqttConnectReturnCode.ConnectionRefusedBadUsernameOrPassword; return; } c.ReturnCode = MqttConnectReturnCode.ConnectionAccepted; }); //指定 ip地址,默认为本地,但此方法不能使用ipaddress报错,有哪位大神帮解答,感激。 //options.WithDefaultEndpointBoundIPAddress(IPAddress.Parse(“”)) //指定端口 optionsBuilder.WithDefaultEndpointPort(1884); //连接记录数,默认 一般为2000 //optionsBuilder.WithConnectionBacklog(2000); mqttServer = new MqttFactory().CreateMqttServer() as MqttServer; string msg = null; //将发送的消息加到日志 mqttServer.ApplicationMessageReceived = (s, e) => { msg = @”发送消息的客户端id:” e.ClientId “rn” “发送时间:” DateTime.Now “rn” “发送消息的主题:” e.ApplicationMessage.Topic “rn” “发送的消息内容:” Encoding.UTF8.GetString(e.ApplicationMessage.Payload ?? new byte[0]) “rn” “————————————————–rn” ; WriteMsgLog(msg); }; await mqttServer.StartAsync(optionsBuilder.Build());

} } #region 记录日志 /// <summary> /// 消息记录日志 /// </summary> /// <param name=”msg”></param> private void WriteMsgLog(string msg) {

//string path = @”C:log.txt”;

//该日志文件会存在windows服务程序目录下 string path = AppDomain.CurrentDomain.BaseDirectory “\Msglog.txt”; FileInfo file = new FileInfo(path); if (!file.Exists) { FileStream fs; fs = File.Create(path); fs.Close(); } using (FileStream fs = new FileStream(path, FileMode.Append, FileAccess.Write)) { using (StreamWriter sw = new StreamWriter(fs)) { sw.WriteLine(DateTime.Now.ToString() ” ” msg); } } } private void PubMessage(string topic, string msg) { if (mqttServer != null) { lock (locker) { var message = new MqttApplicationMessageBuilder(); message.WithTopic(topic); message.WithPayload(msg); mqttServer.PublishAsync(message.Build()); } } } /// <summary> ///客户端链接日志 客户端接入 /// </summary> /// <param name=”msg”></param> private void WriteClientLinkLog(string msg) { //string path = @”C:log.txt”;

//该日志文件会存在windows服务程序目录下 string path = AppDomain.CurrentDomain.BaseDirectory “\ClientLinklog.txt”; FileInfo file = new FileInfo(path); if (!file.Exists) { FileStream fs; fs = File.Create(path); fs.Close(); }

using (FileStream fs = new FileStream(path, FileMode.Append, FileAccess.Write)) { using (StreamWriter sw = new StreamWriter(fs)) { sw.WriteLine(msg); } } } /// <summary> /// 通过定时器将客户端链接信息写入日志 /// </summary> /// <param name=”sender”></param> /// <param name=”e”></param> private void GetSubClientSAndSetShow(object sender, ElapsedEventArgs e) { // List<SetServiceM> dic = new List<SetServiceM>(); if (mqttServer != null) { List<ConnectedMqttClient> subclients = mqttServer.GetConnectedClientsAsync().Result.ToList(); if (subclients.Count > 0) { string subclientcount = @”客户端接入的总数为:” (subclients.Count – 1).ToString() “rn” “——————————————————- rn”; WriteClientLinkLog(subclientcount); PubMessage(“ClientsCount”, (subclients.Count – 1).ToString()); List<string> clientids = new List<string>(); //连接客户端的个数 // dic.Add(SetServiceM.SetService( “ClientCount”, subclients.Count.ToString())); // var dicclientlink = new Dictionary<string, string>();

foreach (var item in subclients) { if (!subClientIDs.Contains(item.ClientId)) { subClientIDs.Add(item.ClientId); string msg = @”连接客户端ID:” item.ClientId “rn” “连接时间:” DateTime.Now “rn” “协议版本:” item.ProtocolVersion “rn” “最后收到的非保持活包:” item.LastNonKeepAlivePacketReceived “rn” “最后收到的包:” item.LastPacketReceived “rn” “挂起的应用程序消息:” item.PendingApplicationMessages “rn” “————————————————” “rn”; WriteClientLinkLog(msg); PubMessage(“clientlink”, msg); // mqttServer.PublishAsync(“clientlink”, msg); // dicclientlink.Add(item.ClientId, msg); } clientids.Add(item.ClientId); } if (subClientIDs.Count >= 2000) { subClientIDs.Clear(); } var exceptlist = subClientIDs.Except(clientids).ToList(); // var dicclientoutline = new Dictionary<string, string>(); if (exceptlist.Count > 0) {

exceptlist.ForEach(u => { string msgoutline = @”客户端下线ID:” u “rn” “客户端下线时间:” DateTime.Now.ToString() “rn” “———————————————————— rn” ; WriteClientLinkLog(msgoutline); subClientIDs.Remove(u); PubMessage(“clientlink”, msgoutline); // mqttServer.PublishAsync(“clientlink”, msgoutline); // dicclientoutline.Add(“OutLineID_” u, msgoutline); }); } 连接客户端的id //dic.Add(SetServiceM.SetService(“clientlink”, JsonConvert.SerializeObject(dicclientlink))); 客户端下线的时间 //dic.Add(SetServiceM.SetService(“clientoutline”, JsonConvert.SerializeObject(dicclientoutline))); //SocketServer.connection.Send(Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(dic))); } else { string subclientcount = @”暂无客户端接入!” “rn” “——————————————————– rn”; WriteClientLinkLog(subclientcount); } } } /// <summary> /// 客户端下线时间 /// </summary> /// <param name=”msg”></param> public void WriteClientOutLineLog(string msg) { string path = AppDomain.CurrentDomain.BaseDirectory “\ClientOutLineLog.txt”; FileInfo file = new FileInfo(path); if (!file.Exists) { FileStream fs = File.Create(path); fs.Close(); } using (FileStream fs = new FileStream(path, FileMode.Append, FileAccess.Write)) { using (StreamWriter sw = new StreamWriter(fs)) { sw.WriteLine(msg); } } } //windows服务里的服务端 private void CreateServerSocket() { if (socketService == null) { // IPEndPoint ipep = new IPEndPoint(IPAddress.Parse(“127.0.0.1”), 9001); socketService = new GodSharp.Sockets.SocketServer(“127.0.0.1”, 9001, ProtocolType.Tcp); //Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); socketService.Start(); socketService.Listen(10); Thread thread = new Thread(new ThreadStart(new Action(() => { while (true) { // socketClient = socketService.Clients[0]; // string data = “sql|” ; //在这里封装数据,通常是自己定义一种数据结构,如struct data{sql;result} // client.Send(Encoding.Default.GetBytes(msg)); } }))); } else { CreateServerSocket(); } }

#endregion }

}

服务端桌面显示程序:

using MQTTnet; using MQTTnet.Client; using MQTTnet.Protocol; using System; using System.Collections.Generic; using System.ComponentModel; using System.Data; using System.Diagnostics; using System.Drawing; using System.Linq; using System.Net; using System.Net.NetworkInformation; using System.Net.Sockets; using System.ServiceProcess; using System.Text; using System.Text.RegularExpressions; using System.Threading; using System.Threading.Tasks; using System.Windows.Forms;

namespace MQTTNetFrm { public partial class Form1 : Form { private ServiceController ServiceController = null; private MqttClientOptions options = null; private MqttClient mqttClient = null;

public Form1() { InitializeComponent();

new Thread(new ThreadStart(GetServiceStatus)).Start();

Task.Run(LinkClientService).Wait(); } /// <summary> /// 获取当前ip地址 /// </summary> /// <returns></returns> private string GetLocalIP() { string ip = null; var iplist = Dns.GetHostAddresses(Dns.GetHostName()).DefaultIfEmpty().ToList(); iplist.ForEach(u => { if (u.AddressFamily == AddressFamily.InterNetwork) ip= u.ToString(); }); return ip; } private async Task LinkClientService() { var m = “Eohi_Frm_” Guid.NewGuid().ToString(); options = new MqttClientOptions { ClientId = m, CleanSession = true, ChannelOptions = new MqttClientTcpOptions { Server = GetLocalIP(), Port = 1884, }, Credentials = new MqttClientCredentials() { Username = “user”, Password = “123456” }

}; var factory = new MqttFactory(); mqttClient = factory.CreateMqttClient() as MqttClient; try { await mqttClient.ConnectAsync(options); but_submsg_Click(); this.Invoke(new Action(() => { lab_serverstatus.Text = “连接正常,服务运行中…………”; })); } catch (Exception ex) {

}

} private async void but_submsg_Click() { if (mqttClient != null) { await mqttClient.SubscribeAsync(new TopicFilter(“ClientsCount”, MqttQualityOfServiceLevel.AtMostOnce)); await mqttClient.SubscribeAsync(new TopicFilter(“clientlink”, MqttQualityOfServiceLevel.AtMostOnce)); await mqttClient.SubscribeAsync(new TopicFilter(“msglog”, MqttQualityOfServiceLevel.AtMostOnce)); mqttClient.ApplicationMessageReceived = (s, e) => { this.Invoke(new Action(() => { var msg = Encoding.UTF8.GetString(e.ApplicationMessage.Payload); if (msg.Length<=5) { lab_clientcount.Text = msg; } if (msg.Length>10) { if (msg.StartsWith(“连接”) ) rtb_clientlog.AppendText(msg); rtb_msglog.AppendText(msg); }

})); };

} } private void GetServiceStatus() { ServiceController[] serviceControllers = ServiceController.GetServices(); if (serviceControllers.Length > 0) { serviceControllers.ToList().ForEach(u => { if (u.DisplayName == “MqttNetServiceAddUserAndPassword”) { if (ServiceController == null) { ServiceController = u; } if (u.Status == ServiceControllerStatus.Running) { lab_serverstatus.Text = “服务运行中…………”; } else { lab_serverstatus.Text = “服务已停止…………”; } } }); } } private void button2_Click(object sender, EventArgs e) { if (tabControl1.SelectedTab == tabPage1) { rtb_clientlog.Text = “”; } else { rtb_msglog.Text = “”; } }

private void Form1_Load(object sender, EventArgs e) {

} }

}

客户端:

using MQTTnet; using MQTTnet.Client; using MQTTnet.Protocol; using System; using System.Collections.Generic; using System.ComponentModel; using System.Data; using System.Drawing; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; using System.Timers; using System.Windows.Forms;

namespace MqttClientTest01 {

public partial class Form1 : Form { private MqttClient mqttClient = null; private System.Timers.Timer timer = null; private int CountLink = 0; private MqttClientOptions options = null; public Form1() { InitializeComponent(); 创建一个定时器,检查5s内有多少客户端接入并将相关信息记录到日志中 //timer = new System.Timers.Timer(); //timer.AutoReset = true; //timer.Interval = 1000; //timer.Elapsed = new ElapsedEventHandler(LinkMqttNetService); }

private void LinkMqttNetService(object sender, ElapsedEventArgs e) { if (mqttClient == null) { // RunAsync(); CountLink ; } if (CountLink >= 5) { MessageBox.Show(“连接多次失败,请确认各参数是否正确!”); CountLink = 0; timer.Enabled = false; } } private void but_linkserver_Click(object sender, EventArgs k) { LinkClientService(); //CountLink = 0; //timer.Enabled = true; //timer.Start(); } /// <summary> /// 链接客户端 /// </summary> public async void LinkClientService() { var m = “Eohi_” Guid.NewGuid().ToString(); options = new MqttClientOptions { ClientId = m, CleanSession = true, ChannelOptions = new MqttClientTcpOptions { Server = txtb_serverip.Text.Trim(), Port = Convert.ToInt32(txtb_serverport.Text.Trim()), }, Credentials = new MqttClientCredentials() { Username = tb_username.Text, Password = tb_userpwd.Text }

}; var factory = new MqttFactory(); mqttClient = factory.CreateMqttClient() as MqttClient; try { await mqttClient.ConnectAsync(options); this.Invoke(new Action(() => { lab_linkstatus.Text = “连接成功!”; lab_linktimer.Text = DateTime.Now.ToString(); })); mqttClient.Disconnected = async (s, e) => { if (e.ClientWasConnected==false) { try { await mqttClient.ConnectAsync(options); this.Invoke(new Action(() => { lab_linkstatus.Text = “连接成功!”; lab_linktimer.Text = DateTime.Now.ToString(); })); } catch (Exception ex) { lab_linkstatus.Text = “连接失败!” ex.Message; lab_linktimer.Text = DateTime.Now.ToString(); }

} }; } catch (Exception ex) { lab_linkstatus.Text = “连接失败!请检查ip/端口” ; lab_linktimer.Text = DateTime.Now.ToString(); } }

private void tb_username_TextChanged(object sender, EventArgs e) {

}

private void but_clientsend_Click(object sender, EventArgs e) { if (mqttClient != null) { var message = new MqttApplicationMessageBuilder(); message.WithTopic(txtb_msgtopic.Text.Trim()); message.WithPayload(rtb_pubmsg.Text.Trim()); message.WithExactlyOnceQoS(); message.WithRetainFlag(); mqttClient.PublishAsync(message.Build()); } } private async void but_submsg_Click(object sender, EventArgs k) { if (mqttClient != null) { await mqttClient.SubscribeAsync(new TopicFilter(txtb_subtopic.Text.Trim(), MqttQualityOfServiceLevel.AtMostOnce)); mqttClient.ApplicationMessageReceived = (s, e) => { this.Invoke(new Action(() => { rtb_submsgclient.AppendText(“ClientID=” e.ClientId “n”); rtb_submsgclient.AppendText(” QoS = {e.ApplicationMessage.QualityOfServiceLevel}” “n”); rtb_submsgclient.AppendText(” Retain = {e.ApplicationMessage.Retain}” “n”);

}));

};

} }

private void button1_Click(object sender, EventArgs e) { rtb_submsgclient.Text = “”; } } }

发布者:全栈程序员栈长,转载请注明出处:https://javaforall.cn/152869.html原文链接:https://javaforall.cn

0 人点赞