Emqx简介
EMQ X (Erlang/Enterprise/Elastic MQTT Broker) 是基于 Erlang/OTP 平台开发的开源物联网 MQTT 消息服务器。 EMQ X 设计目标是实现高可靠,并支持承载海量物联网终端的MQTT连接,支持在海量物联网设备间低延时消息路由: 1. 稳定承载大规模的 MQTT 客户端连接,单服务器节点支持50万到100万连接。 2. 分布式节点集群,快速低延时的消息路由,单集群支持1000万规模的路由。 3. 消息服务器内扩展,支持定制多种认证方式、高效存储消息到后端数据库。 4. 完整物联网协议支持,MQTT、MQTT-SN、CoAP、LwM2M、WebSocket 或私有协议支持。
官方文档:https://docs.emqx.cn/broker/v4.3/getting-started/install.html
MQTT是什么?
MQTT全称消息队列遥测传输 (Message Queuing Telemetry Transport)。其主要提供了订阅/发布两种消息模式,更为简约、轻量,易于使用,特别适合于受限环境(带宽低、网络延迟高、网络通信不稳定)的消息分发,属于物联网(Internet of Thing)的一个标准传输协议。
MQTT实现方式
实现MQTT协议需要客户端和服务器端通讯完成,在通讯过程中,MQTT协议中有三种身份:发布者(Publish)、代理(Broker)(服务器)、订阅者(Subscribe)。其中,消息的发布者和订阅者都是客户端,消息代理是服务器,消息发布者可以同时是订阅者。
MQTT传输的消息分为:主题(Topic)和负载(payload)两部分: (1)Topic,可以理解为消息的类型,订阅者订阅(Subscribe)后,就会收到该主题的消息内容(payload); (2)payload,可以理解为消息的内容,是指订阅者具体要使用的内容。
Emqx安装
官方网站:https://www.emqx.cn/
安装步骤
- 下载地址:https://www.emqx.cn/downloads#broker
- 解压程序包
- 启动 EMQ X Broker 进入到emqx解压后目录,进入bin目录,执行其下的命令脚本 #启动emqx emqx start #查看emqx状态 emqx status #停止 EMQ X Broker emqx stop
- 卸载 EMQ X Broker 直接删除 EMQ X 目录即可
Emqx Dashboard插件
Emqx自带dashboard插件:通过Dashboard,你可以查看服务器基本信息、负载情况和统计数据,可以查看某个客户端的连接状态等信息甚至断开其连接,也可以动态加载和卸载指定插件。 除此之外,EMQ X Dashboard 还提供了规则引擎的可视化操作界面,同时集成了一个简易的 MQTT 客户端工具供用户测试使用。 当 EMQ X 成功运行在你的本地计算机上且 EMQ X Dashboard 被默认启用时,你可以访问 http://localhost:18083 来查看你的 Dashboard,默认用户名是admin,密码是 public。
MQTT 设计了的3 QoS 等级
QoS 0:消息最多传递一次,如果当时客户端不可用,则会丢失该消息。 QoS 1:消息传递至少 1 次。 QoS 2:消息仅传送一次。
需要开放的端口
Emqx使用
java使用mqtt
使用步骤如下
- 导入依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.2</version>
</dependency>
- 订阅者(App.java)
package cn.kt.mtqqdemo.mqtt;
/**
* Created by tao.
* Date: 2021/4/12 13:57
* 描述:
*/
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import java.util.UUID;
public class App {
public static void main(String[] args) {
try {
//apollo地址
String HOST = "tcp://127.0.0.1:1883";
//要订阅的主题
String TOPIC1 = "ceshi";
//指你Apollo中的用户名密码
String userName = "admin";
String pwd = "123456";
String clientid = UUID.randomUUID().toString().replace("-", "");
MqttClient client = new MqttClient(HOST, clientid, new MemoryPersistence());
// MQTT的连接对象
MqttConnectOptions options = new MqttConnectOptions();
//设置连接参数
//清除session回话
options.setCleanSession(false);
options.setUserName(userName);
options.setPassword(pwd.toCharArray());
//超时设置
options.setConnectionTimeout(10);
//心跳保持时间
options.setKeepAliveInterval(20);
//遗嘱:当该客户端端口连接时,会向whb主题发布一条信息
options.setWill("nick", "我挂了,你加油".getBytes(), 1, true);
//监听对象:自己创建
client.setCallback(new PushCallback());
//打开连接
client.connect(options);
//设置消息级别
int[] Qos = {1};
//订阅主题
String[] topics = {TOPIC1};
client.subscribe(topics, Qos);
} catch (MqttException e) {
e.printStackTrace();
}
}
}
- 发布者(sendOut.java)
package cn.kt.mtqqdemo.mqtt;
/**
* Created by tao.
* Date: 2021/4/12 14:09
* 描述:
*/
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import java.io.UnsupportedEncodingException;
import java.util.Scanner;
public class SendOut {
//tcp://MQTT安装的服务器地址:MQTT定义的端口号
String HOST = "tcp://127.0.0.1:1883";
//定义一个主题
public static final String TOPIC = "ceshi";
// public static final String TOPIC = "abc";
//定义MQTT的ID,可以在MQTT服务配置中指定
private static final String clientid = "server1";
private MqttMessage message;
public static final String TOPIC1 = "topic1";
public static final String userName = "admin";
public static final String pwd = "123456";
public MqttClient client;
private MqttTopic topic;
public SendOut() {
try {
client = new MqttClient(HOST, clientid, new MemoryPersistence());
connect();
} catch (MqttException e) {
e.printStackTrace();
}
}
//发布消息
public void publish(MqttTopic topic, MqttMessage message) throws MqttException {
MqttDeliveryToken token = topic.publish(message);
token.waitForCompletion();
//打印发送状态
System.out.println("message is published completely!" token.isComplete());
}
//建立连接:参数与订阅端相似
private void connect() throws MqttException {
MqttConnectOptions options = new MqttConnectOptions();
options.setCleanSession(false);
options.setUserName(userName);
options.setPassword(pwd.toCharArray());
options.setConnectionTimeout(10);
options.setKeepAliveInterval(20);
client.setCallback(new PushCallback());
client.connect(options);
}
public static void main(String[] args) throws MqttException, UnsupportedEncodingException {
SendOut service = new SendOut();
Scanner sc = new Scanner(System.in);
service.topic = service.client.getTopic(TOPIC);
service.message = new MqttMessage();
//确保被收到一次
service.message.setQos(1);
service.message.setPayload("干嘛这么想不开,要在脸上贴个输字".getBytes("UTF-8"));
service.publish(service.topic, service.message);
}
}
- 订阅消息回调(OnMessageCallback.java)
package cn.kt.mtqqdemo.mqtt;
/**
* Created by tao.
* Date: 2021/4/12 13:58
* 描述:
*/
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
public class OnMessageCallback implements MqttCallback {
public void connectionLost(Throwable cause) {
// 连接丢失后,一般在这里面进行重连
System.out.println("连接断开,可以做重连");
}
public void messageArrived(String topic, MqttMessage message) throws Exception {
// subscribe后得到的消息会执行到这里面
System.out.println("接收消息主题:" topic);
System.out.println("接收消息Qos:" message.getQos());
System.out.println("接收消息内容:" new String(message.getPayload()));
}
public void deliveryComplete(IMqttDeliveryToken token) {
System.out.println("deliveryComplete---------" token.isComplete());
}
}
- 发布消息回调(PushCallback.java)
package cn.kt.mtqqdemo.mqtt;
/**
* Created by tao.
* Date: 2021/4/12 14:01
* 描述:
*/
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
public class PushCallback implements MqttCallback {
//连接丢失:一般用与重连
public void connectionLost(Throwable throwable) {
System.out.println("丢失连接");
}
//消息到达:指收到消息
public void messageArrived(String topic, MqttMessage message) throws Exception {
System.out.println("接收消息主题 : " topic);
System.out.println("接收消息Qos : " message.getQos());
System.out.println("接收消息内容 : " new String(message.getPayload()));
}
public void deliveryComplete(IMqttDeliveryToken token) {
//(发布)publish后会执行到这里,发送状态
System.out.println("deliveryComplete---------"
token.isComplete());
}
}
测试效果
- 发布者
- 订阅者
js使用mqtt
引入mqttws31.js
可以下载: 链接:https://pan.baidu.com/s/1c9CfyhT4CSY2FEOa1OgxPw 提取码:siwg
也可以用对应的cdn 地址
代码语言:javascript复制<!-- For the plain library-->
<script src="https://cdnjs.cloudflare.com/ajax/libs/paho-mqtt/1.0.1/mqttws31.js" type="text/javascript"></script>
<!-- For the minified library-->
<script src="https://cdnjs.cloudflare.com/ajax/libs/paho-mqtt/1.0.1/mqttws31.min.js" type="text/javascript"></script>
代码如下
代码语言:javascript复制<!DOCTYPE html >
<html>
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<meta http - equiv="X-UA-Compatible" content="ie=edge">
<title> Document </title>
<link href="https://cdn.bootcdn.net/ajax/libs/twitter-bootstrap/3.3.7/css/bootstrap.min.css" rel="nofollow noopener" rel="stylesheet">
<script src="https://cdn.bootcdn.net/ajax/libs/jquery/2.2.1/jquery.min.js"></script>
<script src="./js/mqttws31.js" type="text/javascript"></script>
<style>
#contentList li {
word-break: break-all;
word-wrap: break-word;
}
</style>
</head>
<body>
<div style="width: 900px;margin: 50px auto;">
<div class="form-group">
<label>评论人:</label>
<input type="text" class="form-control" id="user">
</div>
<div class="form-group">
<label>评论内容:</label>
<textarea class="form-control" id="content" style="word-break:break-all;word-wrap:break-word;"></textarea>
</div>
<div class="form-group">
<input type="button" value="发表评论" class="btn btn-primary" onclick="send()">
</div>
<div>
<ul id="contentList" class="list-group">
<!-- <li class="list-group-item">
<span class="badge">评论人: {{ item.user }} 时间:{{item.time}}</span> {{ item.content }}
</li> -->
</ul>
</div>
</div>
<script>
// http://192.168.3.181/
var hostname = '192.168.3.181',
port = 8083,
clientId = 'client-' generateUUID(),
timeout = 1000,
keepAlive = 2000,
cleanSession = false,
ssl = false,
userName = 'Nick',
password = '12356',
topic = 'ceshi';
client = new Paho.MQTT.Client(hostname, port, clientId);
//建立客户端实例
var options = {
invocationContext: {
host: hostname,
port: port,
path: client.path,
clientId: clientId
},
timeout: timeout,
keepAliveInterval: keepAlive,
cleanSession: cleanSession,
useSSL: ssl,
userName: userName,
password: password,
onSuccess: onConnect,
onFailure: function(e) {
console.log(e);
}
};
client.connect(options);
//连接服务器并注册连接成功处理事件
function onConnect() {
console.log("onConnected");
client.subscribe(topic);
}
client.onConnectionLost = onConnectionLost;
//注册连接断开处理事件
client.onMessageArrived = onMessageArrived;
//注册消息接收处理事件
function onConnectionLost(responseObject) {
console.log(responseObject);
if (responseObject.errorCode !== 0) {
console.log("onConnectionLost:" responseObject.errorMessage);
console.log("连接已断开");
}
}
//收到消息时处理事件
function onMessageArrived(message) {
var msg = message.payloadString;
var obj = JSON.parse(msg);
console.log("收到消息:" obj);
/*
<li class="list-group-item">
<span class="badge">评论人: {{ item.user }} 时间:{{item.time}}</span> {{ item.content }}
</li>
*/
$('#contentList').append($(`<li class="list-group-item" > <span class="badge">评论人:` obj.name `,时间:` obj.time `</span>` obj.content `</li>`));
}
//点击发送按钮事件
function send() {
var name = document.getElementById("user").value;
var content = document.getElementById("content").value;
console.log('name :>> ', name);
console.log('content :>> ', content);
var time = new Date().Format("yyyy-MM-dd hh:mm:ss");
var getConment = {
name: name,
content: content,
time: time,
}
if (name) {
var str = getConment;
message = new Paho.MQTT.Message(JSON.stringify(str));
message.destinationName = topic;
client.send(message);
document.getElementById("content").value = "";
document.getElementById("user").value = "";
}
}
//生成UUID
function generateUUID() {
var d = new Date().getTime();
if (window.performance && typeof window.performance.now === "function") {
d = performance.now(); //use high-precision timer if available
}
var uuid = 'xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx'.replace(/[xy]/g, function(c) {
var r = (d Math.random() * 16) % 16 | 0;
d = Math.floor(d / 16);
return (c == 'x' ? r : (r & 0x3 | 0x8)).toString(16);
});
return uuid;
}
//date时间格式化
Date.prototype.Format = function(fmt) {
var o = {
"M ": this.getMonth() 1, //月份
"d ": this.getDate(), //日
"h ": this.getHours(), //小时
"m ": this.getMinutes(), //分
"s ": this.getSeconds(), //秒
"q ": Math.floor((this.getMonth() 3) / 3), //季度
"S": this.getMilliseconds() //毫秒
};
if (/(y )/.test(fmt)) fmt = fmt.replace(RegExp.$1, (this.getFullYear() "").substr(4 - RegExp.$1.length));
for (var k in o)
if (new RegExp("(" k ")").test(fmt)) fmt = fmt.replace(RegExp.$1, (RegExp.$1.length == 1) ? (o[k]) : (("00" o[k]).substr(("" o[k]).length)));
return fmt;
}
</script>
</body>
</html>
测试效果
页面效果
java 连接mqtt订阅者收到消息