大家好,又见面了,我是你们的朋友全栈君。
一、什么是 MQTT ?
MQTT(Message Queuing Telemetry Transport,消息队列遥测传输)是 IBM 开发的一个即时通讯协议,有可能成为物联网的重要组成部分。MQTT 是基于二进制消息的发布/订阅编程模式的消息协议,如今已经成为 OASIS 规范,由于规范很简单,非常适合需要低功耗和网络带宽有限的 IoT 场景。 github上还发现了一个项目,可以直接看协议:MQTT协议中文版
二、MQTTnet
MQTTnet is a high performance .NET library for MQTT based communication. It provides a MQTT client and a MQTT server (broker). The implementation is based on the documentation from http://mqtt.org/.
MQTT 开源库还有 MqttDotNet、nMQTT、M2MQTT 等,不过这几个里面,目前star最多的还是MQTTnet(编辑时间2018.5.9)。
三、创建项目并导入类库
在解决方案在右键单击-选择“管理解决方案的 NuGet 程序包”-在“浏览”选项卡下面搜索 MQTTnet,为服务端项目和客户端项目都安装上 MQTTnet 库。示例中使用的是2.7.5.0版本,不同版本最低要求的.net版本或其它支持,在NuGet选中MQTTNet,右侧可以看到具体描述。
客户端简单Demo可以见官方文档:https://github.com/chkr1011/MQTTnet/wiki/Client 本文示例程序github:https://github.com/landbroken/MQTTLearning
评论有不知道怎么用示例程序的,简单解释一下。目录一开始建的有点问题,调试稍微麻烦一点,懒得改了。
- 1、clone或download项目
- 2、vs2015或以上版本打开MQTTLearning/MqttServerTest/MqttServerTest.sln
- 3、服务器端示例在SeverTest项目里面
- 4、客户端示例在ClientTest项目里面 调试: 方法1)是vs里两个项目设为同时启动; 方法2)一端用生成的exe启动,一端在vs里用debug启动
一般可以直接打开的,万一vs有路径依赖问题:
- 1、如果项目路径依赖有问题,删掉重新添加一遍SeverTest.csproj和ClientTest.csproj
- 2、如果MQTTnet 库引用路径有问题,删掉从packages里面重新引用一遍,或者nuget里面添加一下
3.1 服务器
直接跑一下示例程序就知道了 下面简单解释一下主要部分。
3.1.1 初始化并启动服务器
首先,初始化并启动服务器
1、这里是异步启动,用的2.7.5版本的库
代码语言:javascript复制Task.Run(async () => {
await StartMqttServer_2_7_5(); });
2、配置设置 WithDefaultEndpointPort是设置使用的端口,协议里默认是用1883,不过调试我改成8222了。 WithConnectionValidator是用于连接验证,验证client id,用户名,密码什么的。示例没用数据库,随便写死了两个值。 还有其他配置选项,比如加密协议,可以在官方文档里看看,示例就是先简单能用。
代码语言:javascript复制// Configure MQTT server.
var optionsBuilder = new MqttServerOptionsBuilder()
.WithConnectionBacklog(100)
.WithDefaultEndpointPort(8222)
.WithConnectionValidator(ValidatingMqttClients())
;
3、添加事件触发 ApplicationMessageReceived 是服务器接收到消息时触发的事件,可用来响应特定消息。 ClientConnected 是客户端连接成功时触发的事件。 ClientDisconnected 是客户端断开连接时触发的事件。
代码语言:javascript复制mqttServer = new MqttFactory().CreateMqttServer();
mqttServer.ApplicationMessageReceived = MqttServer_ApplicationMessageReceived;
mqttServer.ClientConnected = MqttServer_ClientConnected;
mqttServer.ClientDisconnected = MqttServer_ClientDisconnected;
异步启动mqtt服务器,实际项目可能要加一个启动失败处理
代码语言:javascript复制Task.Run(async () => {
await mqttServer.StartAsync(optionsBuilder.Build()); });
示例里面为了简单调试玩的,写了几个触发命令,在启动后,输入对于字符串即可触发。
字符串 | 效果 |
---|---|
exit | 关闭mqtt服务 |
hello: | 发送topic为topic/hello的消息,payload为冒号后面的数据 |
control: | 发送topic为topic/control的消息,payload为冒号后面的数据 |
subscribe: | 订阅topic为冒号后面的消息 |
3.1.2 消息发送
mqtt的消息包含topic和payload两部分。topic就是消息主题(类型),用于另外一端判断这个消息是干什么用的。payload就是实际想要发送的数据。 WithTopic给一个topic。 WithPayload给一个msg。 WithAtMostOnceQoS设置QoS,至多1次。也可以设为别的。 PublishAsync异步发送出去。
代码语言:javascript复制string topic = "topic/hello";
var message = new MqttApplicationMessageBuilder()
.WithTopic(topic)
.WithPayload(msg)
.WithAtMostOnceQoS()
.WithRetainFlag()
.Build();
await mqttServer.PublishAsync(message);
3.1.3 完整源码
代码语言:javascript复制using MQTTnet;
using MQTTnet.Diagnostics;
using MQTTnet.Protocol;
using MQTTnet.Server;
using System;
using System.Collections.Generic;
using System.Security.Cryptography.X509Certificates;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace MqttServerTest
{
class Program
{
private static IMqttServer mqttServer = null;
private static List<string> connectedClientId = new List<string>();
static void Main(string[] args)
{
Task.Run(async () => {
await StartMqttServer_2_7_5(); });
// Write all trace messages to the console window.
MqttNetGlobalLogger.LogMessagePublished = MqttNetTrace_TraceMessagePublished;
//2.4.0版本
//MqttNetTrace.TraceMessagePublished = MqttNetTrace_TraceMessagePublished;
//new Thread(StartMqttServer).Start();
while (true)
{
if (mqttServer==null)
{
Console.WriteLine("Please await mqttServer.StartAsync()");
Thread.Sleep(1000);
continue;
}
var inputString = Console.ReadLine().ToLower().Trim();
if (inputString == "exit")
{
Task.Run(async () => {
await EndMqttServer_2_7_5(); });
Console.WriteLine("MQTT服务已停止!");
break;
}
else if (inputString == "clients")
{
var connectedClients = mqttServer.GetConnectedClientsAsync();
Console.WriteLine($"客户端标识:");
//2.4.0
//foreach (var item in mqttServer.GetConnectedClients())
//{
// Console.WriteLine($"客户端标识:{item.ClientId},协议版本:{item.ProtocolVersion}");
//}
}
else if (inputString.StartsWith("hello:"))
{
string msg = inputString.Substring(6);
Topic_Hello(msg);
}
else if (inputString.StartsWith("control:"))
{
string msg = inputString.Substring(8);
Topic_Host_Control(msg);
}
else if (inputString.StartsWith("subscribe:"))
{
string msg = inputString.Substring(10);
Subscribe(msg);
}
else
{
Console.WriteLine($"命令[{inputString}]无效!");
}
Thread.Sleep(100);
}
}
private static void MqttServer_ClientConnected(object sender, MqttClientConnectedEventArgs e)
{
Console.WriteLine($"客户端[{e.Client.ClientId}]已连接,协议版本:{e.Client.ProtocolVersion}");
connectedClientId.Add(e.Client.ClientId);
}
private static void MqttServer_ClientDisconnected(object sender, MqttClientDisconnectedEventArgs e)
{
Console.WriteLine($"客户端[{e.Client.ClientId}]已断开连接!");
connectedClientId.Remove(e.Client.ClientId);
}
private static void MqttServer_ApplicationMessageReceived(object sender, MqttApplicationMessageReceivedEventArgs e)
{
Console.WriteLine("### RECEIVED APPLICATION MESSAGE ###");
Console.WriteLine($"客户端[{e.ClientId}]>>");
Console.WriteLine($" Topic = {e.ApplicationMessage.Topic}");
Console.WriteLine($" Payload = {Encoding.UTF8.GetString(e.ApplicationMessage.Payload)}");
Console.WriteLine($" QoS = {e.ApplicationMessage.QualityOfServiceLevel}");
Console.WriteLine($" Retain = {e.ApplicationMessage.Retain}");
Console.WriteLine();
}
private static void MqttNetTrace_TraceMessagePublished(object sender, MqttNetLogMessagePublishedEventArgs e)
{
var trace = $">> [{e.TraceMessage.Timestamp:O}] [{e.TraceMessage.ThreadId}] [{e.TraceMessage.Source}] [{e.TraceMessage.Level}]: {e.TraceMessage.Message}";
if (e.TraceMessage.Exception != null)
{
trace = Environment.NewLine e.TraceMessage.Exception.ToString();
}
Console.WriteLine(trace);
}
#region 2.7.5
private static async Task StartMqttServer_2_7_5()
{
if (mqttServer == null)
{
// Configure MQTT server.
var optionsBuilder = new MqttServerOptionsBuilder()
.WithConnectionBacklog(100)
.WithDefaultEndpointPort(8222)
.WithConnectionValidator(ValidatingMqttClients())
;
// Start a MQTT server.
mqttServer = new MqttFactory().CreateMqttServer();
mqttServer.ApplicationMessageReceived = MqttServer_ApplicationMessageReceived;
mqttServer.ClientConnected = MqttServer_ClientConnected;
mqttServer.ClientDisconnected = MqttServer_ClientDisconnected;
Task.Run(async () => {
await mqttServer.StartAsync(optionsBuilder.Build()); });
//mqttServer.StartAsync(optionsBuilder.Build());
Console.WriteLine("MQTT服务启动成功!");
}
}
private static async Task EndMqttServer_2_7_5()
{
if (mqttServer!=null)
{
await mqttServer.StopAsync();
}
else
{
Console.WriteLine("mqttserver=null");
}
}
private static Action<MqttConnectionValidatorContext> ValidatingMqttClients()
{
// Setup client validator.
var options =new MqttServerOptions();
options.ConnectionValidator = c =>
{
Dictionary<string, string> c_u = new Dictionary<string, string>();
c_u.Add("client001", "username001");
c_u.Add("client002", "username002");
Dictionary<string, string> u_psw = new Dictionary<string, string>();
u_psw.Add("username001", "psw001");
u_psw.Add("username002", "psw002");
if (c_u.ContainsKey(c.ClientId) && c_u[c.ClientId] == c.Username)
{
if (u_psw.ContainsKey(c.Username) && u_psw[c.Username] == c.Password)
{
c.ReturnCode = MqttConnectReturnCode.ConnectionAccepted;
}
else
{
c.ReturnCode = MqttConnectReturnCode.ConnectionRefusedBadUsernameOrPassword;
}
}
else
{
c.ReturnCode = MqttConnectReturnCode.ConnectionRefusedIdentifierRejected;
}
};
return options.ConnectionValidator;
}
private static void Usingcertificate(ref MqttServerOptions options)
{
var certificate = new X509Certificate(@"C:certstesttest.cer", "");
options.TlsEndpointOptions.Certificate = certificate.Export(X509ContentType.Cert);
var aes = new System.Security.Cryptography.AesManaged();
}
#endregion
#region Topic
private static async void Topic_Hello(string msg)
{
string topic = "topic/hello";
//2.4.0版本的
//var appMsg = new MqttApplicationMessage(topic, Encoding.UTF8.GetBytes(inputString), MqttQualityOfServiceLevel.AtMostOnce, false);
//mqttClient.PublishAsync(appMsg);
var message = new MqttApplicationMessageBuilder()
.WithTopic(topic)
.WithPayload(msg)
.WithAtMostOnceQoS()
.WithRetainFlag()
.Build();
await mqttServer.PublishAsync(message);
}
private static async void Topic_Host_Control(string msg)
{
string topic = "topic/host/control";
var message = new MqttApplicationMessageBuilder()
.WithTopic(topic)
.WithPayload(msg)
.WithAtMostOnceQoS()
.WithRetainFlag(false)
.Build();
await mqttServer.PublishAsync(message);
}
/// <summary>
/// 替指定的clientID订阅指定的内容
/// </summary>
/// <param name="topic"></param>
private static void Subscribe(string topic)
{
List<TopicFilter> topicFilter = new List<TopicFilter>();
topicFilter.Add(new TopicFilterBuilder()
.WithTopic(topic)
.WithAtMostOnceQoS()
.Build());
//给"client001"订阅了主题为topicFilter的payload
mqttServer.SubscribeAsync("client001", topicFilter);
Console.WriteLine($"Subscribe:[{"client001"}],Topic:{topic}");
}
#endregion
}
}
3.2 客户端
未完待续
参考文献
1、《使用 MQTTnet 快速实现 MQTT 通信》:链接 这篇文章是vs2017 .net core mqttnet2.4.0的,目前库已经更新了好几个版本,如果用最新版的不能直接运行文章里的程序。 2、MQTT官网 3、开源库地址:MQTTnet 4、开源库对应文档:https://github.com/chkr1011/MQTTnet/wiki/Client
发布者:全栈程序员栈长,转载请注明出处:https://javaforall.cn/152873.html原文链接:https://javaforall.cn