mqttnet 详解_vs2017通过mqttnet创建mqtt服务端 客户端

2022-09-12 20:37:45 浏览数 (1)

大家好,又见面了,我是你们的朋友全栈君。

服务端:

using MQTTnet;

using MQTTnet.Server;

using System;

using System.Collections.Generic;

using System.ComponentModel;

using System.Data;

using System.Diagnostics;

using System.IO;

using System.Linq;

using System.Net;

using System.ServiceProcess;

using System.Text;

using System.Threading.Tasks;

using System.Timers;

namespace MQTTNetService

{

public partial class MQTTNetService : ServiceBase

{

private MqttServer mqttServer = null;

private System.Timers.Timer timer = null;

//此集合用于判断写入日志在一段时间内不重,以客户端id为依据,最多1000个清零;

private List subClientIDs = new List();

public MQTTNetService()

{

InitializeComponent();

//创建一个定时器,检查5s内有多少客户端接入并将相关信息记录到日志中

timer = new System.Timers.Timer();

timer.AutoReset = true;

timer.Enabled = true;

timer.Interval = 5000;

timer.Elapsed = new ElapsedEventHandler(GetSubClientSAndSetShow);

}

protected override void OnStart(string[] args)

{

//开启服务

CreateMQTTServer();

if (timer.Enabled == false)

{

timer.Enabled = true;

timer.Start();

}

}

protected override void OnStop()

{

if (timer.Enabled == true)

{

timer.Enabled = false;

timer.Stop();

}

}

///

/// 开启服务

///

private async void CreateMQTTServer()

{

if (mqttServer == null)

{

var options = new MqttServerOptions();

var optionsBuilder = new MqttServerOptionsBuilder();

//指定 ip地址,默认为本地,但此方法不能使用ipaddress报错,有哪位大神帮解答,感激。

//options.WithDefaultEndpointBoundIPAddress(IPAddress.Parse(“”))

//指定端口

optionsBuilder.WithDefaultEndpointPort(1883);

//连接记录数,默认 一般为2000

//optionsBuilder.WithConnectionBacklog(2000);

mqttServer = new MqttFactory().CreateMqttServer() as MqttServer;

//将发送的消息加到日志

mqttServer.ApplicationMessageReceived = (s, e) =>

{

string msg = @”发送消息的客户端id:” e.ClientId “n”

“发送时间:” DateTime.Now “n”

“发送消息的主题:” e.ApplicationMessage.Topic “n”

“发送的消息内容:” Encoding.UTF8.GetString(e.ApplicationMessage.Payload ?? new byte[0]) “n”

“————————————————–n”

;

WriteMsgLog(msg);

};

await mqttServer.StartAsync(options);

}

}

#region 记录日志

///

/// 消息记录日志

///

///

private void WriteMsgLog(string msg)

{

//string path = @”C:log.txt”;

//该日志文件会存在windows服务程序目录下

string path = AppDomain.CurrentDomain.BaseDirectory “\Msglog.txt”;

FileInfo file = new FileInfo(path);

if (!file.Exists)

{

FileStream fs;

fs = File.Create(path);

fs.Close();

}

using (FileStream fs = new FileStream(path, FileMode.Append, FileAccess.Write))

{

using (StreamWriter sw = new StreamWriter(fs))

{

sw.WriteLine(DateTime.Now.ToString() ” ” msg);

}

}

}

///

///客户端链接日志

///

///

private void WriteClientLinkLog(string msg)

{

//string path = @”C:log.txt”;

//该日志文件会存在windows服务程序目录下

string path = AppDomain.CurrentDomain.BaseDirectory “\ClientLinklog.txt”;

FileInfo file = new FileInfo(path);

if (!file.Exists)

{

FileStream fs;

fs = File.Create(path);

fs.Close();

}

using (FileStream fs = new FileStream(path, FileMode.Append, FileAccess.Write))

{

using (StreamWriter sw = new StreamWriter(fs))

{

sw.WriteLine(DateTime.Now.ToString() ” ” msg);

}

}

}

///

/// 通过定时器将客户端链接信息写入日志

///

///

///

private void GetSubClientSAndSetShow(object sender, ElapsedEventArgs e)

{

if (mqttServer != null)

{

List subclients = mqttServer.GetConnectedClientsAsync().Result.ToList();

if (subclients.Count > 0)

{

foreach (var item in subclients)

{

if (!subClientIDs.Contains(item.ClientId))

{

subClientIDs.Add(item.ClientId);

string msg = @”接收客户端ID:” item.ClientId “n”

“接收时间:” DateTime.Now “n”

“协议版本:” item.ProtocolVersion “n”

“最后收到的非保持活包” item.LastNonKeepAlivePacketReceived “n”

“最后收到的包” item.LastPacketReceived “n”

“挂起的应用程序消息” item.PendingApplicationMessages “n”

“————————————————” “n”;

WriteClientLinkLog(msg);

}

}

if (subClientIDs.Count >= 1000)

{

subClientIDs.Clear();

}

}

}

}

#endregion

}

}

以上服务端不能判断特定标识的客户端接入,也就是只要有客户端连接就会接入,不够完善

客户端:简单用于测试 接收net core

using MQTTnet;

using MQTTnet.Client;

using MQTTnet.Protocol;

using System;

using System.Text;

using System.Threading.Tasks;

namespace mqttclienttest0101

{

class Program

{

static MqttClient mqttClient = null;

static void Main(string[] args)

{

//var factory = new MqttFactory();

//var mqttClient = factory.CreateMqttClient();

//var options = new MqttClientOptionsBuilder();

//options.WithClientId(new Guid().ToString());

//options.WithTcpServer(“192.168.3.193”);

//options.WithTls();

//options.WithCleanSession();

//var task = mqttClient.ConnectAsync(options.Build());

RunAsync();

Console.ReadLine();

if (mqttClient != null)

{

var message = new MqttApplicationMessageBuilder();

message.WithTopic(“TopicTest”);

message.WithPayload(“topictest001”);

message.WithExactlyOnceQoS();

message.WithRetainFlag();

mqttClient.PublishAsync(message.Build());

}

Console.WriteLine(“Hello World!”);

}

public static async Task RunAsync()

{

try

{

var options = new MqttClientOptions

{

ClientId = new Guid().ToString(),

CleanSession = true,

ChannelOptions = new MqttClientTcpOptions

{

//Server = “localhost”,

Server = “127.0.1.1”

},

//ChannelOptions = new MqttClientWebSocketOptions

//{

// Uri = “ws://localhost:59690/mqtt”

//}

};

var factory = new MqttFactory();

mqttClient = factory.CreateMqttClient()as MqttClient;

mqttClient.ApplicationMessageReceived = (s, e) =>

{

Console.WriteLine(“### RECEIVED APPLICATION MESSAGE ###”);

Console.WriteLine($” Topic = {e.ApplicationMessage.Topic}”);

Console.WriteLine($” Payload = {Encoding.UTF8.GetString(e.ApplicationMessage.Payload)}”);

Console.WriteLine($” QoS = {e.ApplicationMessage.QualityOfServiceLevel}”);

Console.WriteLine($” Retain = {e.ApplicationMessage.Retain}”);

Console.WriteLine();

};

mqttClient.Connected = async (s, e) =>

{

Console.WriteLine(“### CONNECTED WITH SERVER ###”);

await mqttClient.SubscribeAsync(new TopicFilterBuilder().WithTopic(“#”).Build());

Console.WriteLine(“### SUBSCRIBED ###”);

};

mqttClient.Disconnected = async (s, e) =>

{

Console.WriteLine(“### DISCONNECTED FROM SERVER ###”);

await Task.Delay(TimeSpan.FromSeconds(5));

try

{

await mqttClient.ConnectAsync(options);

}

catch

{

Console.WriteLine(“### RECONNECTING FAILED ###”);

}

};

try

{

await mqttClient.ConnectAsync(options);

}

catch (Exception exception)

{

Console.WriteLine(“### CONNECTING FAILED ###” Environment.NewLine exception);

}

Console.WriteLine(“### WAITING FOR APPLICATION MESSAGES ###”);

//while (true)

//{

// Console.ReadLine();

// await mqttClient.SubscribeAsync(new TopicFilter(“test”, MqttQualityOfServiceLevel.AtMostOnce));

// var applicationMessage = new MqttApplicationMessageBuilder()

// .WithTopic(“A/B/C”)

// .WithPayload(“Hello World”)

// .WithAtLeastOnceQoS()

// .Build();

// await mqttClient.PublishAsync(applicationMessage);

//}

}

catch (Exception exception)

{

Console.WriteLine(exception);

}

}

}

}

客户端2 发送

using MQTTnet;

using MQTTnet.Client;

using MQTTnet.Protocol;

using System;

using System.Collections.Generic;

using System.ComponentModel;

using System.Data;

using System.Drawing;

using System.Linq;

using System.Text;

using System.Threading.Tasks;

using System.Windows.Forms;

namespace MqttClientTest01

{

public partial class Form1 : Form

{

private MqttClient mqttClient = null;

public Form1()

{

InitializeComponent();

}

private void Form1_Load(object sender, EventArgs e)

{

}

private void but_linkserver_Click(object sender, EventArgs e)

{

RunAsync();

创建一个新的MQTT客户端。

//var factory = new MqttFactory();

//mqttClient = factory.CreateMqttClient() as MqttClient;

使用构建器创建基于TCP的选项。

//var options = new MqttClientOptionsBuilder();

//options.WithClientId(new Guid().ToString());

//options.WithTcpServer(txtb_serverip.Text.Trim(), Convert.ToInt32(txtb_serverport.Text.Trim()));

//options.WithTls();

//options.WithCleanSession();

//but_submsg_Click(sender, e);

//var task = mqttClient.ConnectAsync(options.Build());

//if (task != null)

//{

//}

}

public void RunAsync()

{

try

{

var options = new MqttClientOptions

{

ClientId = Guid.NewGuid().ToString(),

CleanSession = true,

ChannelOptions = new MqttClientTcpOptions

{

//Server = “localhost”,

Server = txtb_serverip.Text.Trim(),

Port = Convert.ToInt32(txtb_serverport.Text.Trim()) ,

BufferSize=20*400,

},

//ChannelOptions = new MqttClientWebSocketOptions

//{

// Uri = “ws://localhost:59690/mqtt”

//}

};

var factory = new MqttFactory();

mqttClient = factory.CreateMqttClient() as MqttClient;

try

{

var task= mqttClient.ConnectAsync(options);

if (task!=null)

{

lab_linkstatus.Text = “连接成功!”;

lab_linktimer.Text = DateTime.Now.ToString();

}

}

catch (Exception exception)

{

// Console.WriteLine(“### CONNECTING FAILED ###” Environment.NewLine exception);

}

// Console.WriteLine(“### WAITING FOR APPLICATION MESSAGES ###”);

//while (true)

//{

// Console.ReadLine();

// await mqttClient.SubscribeAsync(new TopicFilter(“test”, MqttQualityOfServiceLevel.AtMostOnce));

// var applicationMessage = new MqttApplicationMessageBuilder()

// .WithTopic(“A/B/C”)

// .WithPayload(“Hello World”)

// .WithAtLeastOnceQoS()

// .Build();

// await mqttClient.PublishAsync(applicationMessage);

//}

}

catch (Exception exception)

{

Console.WriteLine(exception);

}

}

private void tb_username_TextChanged(object sender, EventArgs e)

{

}

private void but_clientsend_Click(object sender, EventArgs e)

{

if (mqttClient != null)

{

var message = new MqttApplicationMessageBuilder();

message.WithTopic(txtb_msgtopic.Text.Trim());

message.WithPayload(rtb_pubmsg.Text.Trim());

message.WithExactlyOnceQoS();

message.WithRetainFlag();

mqttClient.PublishAsync(message.Build());

}

}

private async void but_submsg_Click(object sender, EventArgs k)

{

if (mqttClient != null)

{

//mqttClient.ApplicationMessageReceived = (s, e) =>

//{

// rtb_submsgclient.AppendText(“### RECEIVED APPLICATION MESSAGE ###”);

// rtb_submsgclient.AppendText($” Topic = {e.ApplicationMessage.Topic}”);

// rtb_submsgclient.AppendText($” Payload = {Encoding.UTF8.GetString(e.ApplicationMessage.Payload)}”);

// rtb_submsgclient.AppendText($” QoS = {e.ApplicationMessage.QualityOfServiceLevel}”);

// rtb_submsgclient.AppendText($” Retain = {e.ApplicationMessage.Retain}”);

//};

var meww= await mqttClient.SubscribeAsync(new TopicFilter(txtb_subtopic.Text.Trim(), MqttQualityOfServiceLevel.AtMostOnce));

订阅主题

//IList subresult = await mqttClient.SubscribeAsync(new TopicFilterBuilder().WithTopic(txtb_subtopic.Text.Trim()).Build());

// if (subresult != null)

// {

// List result = subresult.ToList();

// if (result.Count > 0)

// {

// this.Invoke(new Action(() =>

// {

// foreach (var item in result)

// {

// rtb_submsgclient.AppendText(item.TopicFilter.QualityOfServiceLevel.ToString() “n”);

// rtb_submsgclient.AppendText(item.TopicFilter.Topic.ToString() “n”);

// rtb_submsgclient.AppendText(item.ReturnCode.ToString() “n”);

// }

// }));

// }

// }

}

}

}

}

发布者:全栈程序员栈长,转载请注明出处:https://javaforall.cn/152866.html原文链接:https://javaforall.cn

0 人点赞