大家好,又见面了,我是你们的朋友全栈君。
服务端:
代码语言:javascript复制 class Program
{
private static MqttServer mqttServer = null;
static void Main(string[] args)
{
MqttNetTrace.TraceMessagePublished = MqttNetTrace_TraceMessagePublished;
new Thread(StartMqttServer).Start();
while (true)
{
var inputString = Console.ReadLine().ToLower().Trim();
if (inputString == "exit")
{
mqttServer?.StopAsync();
Console.WriteLine("MQTT服务已停止!");
break;
}
else if (inputString == "clients")
{
foreach (var item in mqttServer.GetConnectedClients())
{
Console.WriteLine($"客户端标识:{item.ClientId},协议版本:{item.ProtocolVersion}");
}
}
else
{
Console.WriteLine($"命令[{inputString}]无效!");
}
}
}
private static void StartMqttServer()
{
if (mqttServer == null)
{
try
{
var options = new MqttServerOptions
{
ConnectionValidator = p =>
{
if (p.ClientId == "c001")
{
if (p.Username != "u001" || p.Password != "p001")
{
return MqttConnectReturnCode.ConnectionRefusedBadUsernameOrPassword;
}
}
return MqttConnectReturnCode.ConnectionAccepted;
}
};
mqttServer = new MqttServerFactory().CreateMqttServer(options) as MqttServer;
mqttServer.ApplicationMessageReceived = MqttServer_ApplicationMessageReceived;
mqttServer.ClientConnected = MqttServer_ClientConnected;
mqttServer.ClientDisconnected = MqttServer_ClientDisconnected;
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
return;
}
}
mqttServer.StartAsync();
Console.WriteLine("MQTT服务启动成功!");
}
private static void MqttServer_ClientConnected(object sender, MqttClientConnectedEventArgs e)
{
Console.WriteLine($"客户端[{e.Client.ClientId}]已连接,协议版本:{e.Client.ProtocolVersion}");
}
private static void MqttServer_ClientDisconnected(object sender, MqttClientDisconnectedEventArgs e)
{
Console.WriteLine($"客户端[{e.Client.ClientId}]已断开连接!");
}
private static void MqttServer_ApplicationMessageReceived(object sender, MqttApplicationMessageReceivedEventArgs e)
{
Console.WriteLine($"客户端[{e.ClientId}]>> 主题:{e.ApplicationMessage.Topic} 负荷:{Encoding.UTF8.GetString(e.ApplicationMessage.Payload)} Qos:{e.ApplicationMessage.QualityOfServiceLevel} 保留:{e.ApplicationMessage.Retain}");
}
private static void MqttNetTrace_TraceMessagePublished(object sender, MqttNetTraceMessagePublishedEventArgs e)
{
/*Console.WriteLine($">> 线程ID:{e.ThreadId} 来源:{e.Source} 跟踪级别:{e.Level} 消息: {e.Message}");
if (e.Exception != null)
{
Console.WriteLine(e.Exception);
}*/
}
}
客户端
代码语言:javascript复制 class Program
{
private static MqttClient mqttClient = null;
static string topic = "测试1/测试2/测试3";
static void Main(string[] args)
{
mqttClient = new MqttClientFactory().CreateMqttClient() as MqttClient;
mqttClient.ApplicationMessageReceived = MqttClient_ApplicationMessageReceived;
mqttClient.Connected = MqttClient_Connected;
mqttClient.Disconnected = MqttClient_Disconnected;
var options = new MqttClientTcpOptions
{
Server = "127.0.0.1",
ClientId = Guid.NewGuid().ToString().Substring(0, 5),
UserName = "u001",
Password = "p001",
CleanSession = true
};
mqttClient.ConnectAsync(options);
Console.ReadLine();
Console.WriteLine("Hello World!");
}
private static void MqttClient_ApplicationMessageReceived(object sender, MqttApplicationMessageReceivedEventArgs e)
{
Console.WriteLine($">> {Encoding.UTF8.GetString(e.ApplicationMessage.Payload)}{Environment.NewLine}");
}
private static void MqttClient_Connected(object sender, EventArgs e)
{
Console.WriteLine("已连接到MQTT服务器!" Environment.NewLine);
mqttClient.SubscribeAsync(new List<TopicFilter> {
new TopicFilter(topic, MqttQualityOfServiceLevel.AtMostOnce)
});
Console.WriteLine($"已订阅[{topic}]主题" Environment.NewLine);
//开始发布消息
string inputString = "你好么";
var appMsg = new MqttApplicationMessage(topic, Encoding.UTF8.GetBytes(inputString), MqttQualityOfServiceLevel.AtMostOnce, false);
mqttClient.PublishAsync(appMsg);
}
private static void MqttClient_Disconnected(object sender, EventArgs e)
{
Console.WriteLine("已断开MQTT连接!" Environment.NewLine);
}
}
源码下载地址:MQTT
MQTT Windows 端验证程序: http://mqttfx.jensd.de/index.php/download
https://download.csdn.net/download/kesshei/10906614
发布者:全栈程序员栈长,转载请注明出处:https://javaforall.cn/152900.html原文链接:https://javaforall.cn