MQTT基本使用查看上一篇文章:消息通讯——MQTT的入门和使用
springboot集成MQTT设计
springboot集成MQTT步骤
1. 引入pom依赖
代码语言:javascript复制 <dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<!--Spring boot Web容器-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!--FreeMarker模板视图依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-freemarker</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>RELEASE</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.2</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
<dependency>
<groupId>net.sf.json-lib</groupId>
<artifactId>json-lib</artifactId>
<version>2.4</version>
</dependency>
2. application.properties
代码语言:javascript复制# 应用服务 WEB 访问端口
server.port=8080
# 配置静态资源路径
spring.resources.static-locations=classpath:/resources/,classpath:/static/,classpath:/templates/
### FreeMarker 配置
spring.freemarker.allow-request-override=false
#Enable template caching.启用模板缓存。
spring.freemarker.cache=false
spring.freemarker.check-template-location=true
spring.freemarker.charset=UTF-8
spring.freemarker.content-type=text/html
spring.freemarker.expose-request-attributes=false
spring.freemarker.expose-session-attributes=false
spring.freemarker.expose-spring-macro-helpers=false
#设置面板后缀
spring.freemarker.suffix=.ftl
## MQTT##
mqtt.host=tcp://127.0.0.1:1883
mqtt.clientId=mqttClient
mqtt.username=admin
mqtt.password=123456
mqtt.timeout=1000
mqtt.keepalive=2000
mqtt.topic1=ceshi
3. MqttConfiguration.java
代码语言:javascript复制package cn.kt.mqttdemo2.config;
import cn.kt.mqttdemo2.mqtt.MyMQTTClient;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* Created by tao.
* Date: 2021/4/12 14:47
* 描述:
*/
@Configuration
@Slf4j
public class MqttConfiguration {
@Value("${mqtt.host}")
String host;
@Value("${mqtt.username}")
String username;
@Value("${mqtt.password}")
String password;
@Value("${mqtt.clientId}")
String clientId;
@Value("${mqtt.timeout}")
int timeOut;
@Value("${mqtt.keepalive}")
int keepAlive;
@Value("${mqtt.topic1}")
String topic1;
@Bean//注入spring
public MyMQTTClient myMQTTClient() {
MyMQTTClient myMQTTClient = new MyMQTTClient(host, username, password, clientId, timeOut, keepAlive);
for (int i = 0; i < 10; i ) {
try {
myMQTTClient.connect();
myMQTTClient.subscribe(topic1, 1);
return myMQTTClient;
} catch (MqttException e) {
log.error("MQTT connect exception,connect time = " i);
try {
Thread.sleep(2000);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
}
}
return myMQTTClient;
}
}
4. MyMQTTClient.java
代码语言:javascript复制package cn.kt.mqttdemo2.mqtt;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Created by tao.
* Date: 2021/4/12 14:46
* 描述:
*/
public class MyMQTTClient {
private static final Logger LOGGER = LoggerFactory.getLogger(MyMQTTClient.class);
private static MqttClient client;
public static MqttClient getClient() {
return client;
}
public static void setClient(MqttClient client) {
MyMQTTClient.client = client;
}
private String host;
private String username;
private String password;
private String clientId;
private int timeout;
private int keepalive;
public MyMQTTClient(String host, String username, String password, String clientId, int timeOut, int keepAlive) {
this.host = host;
this.username = username;
this.password = password;
this.clientId = clientId;
this.timeout = timeOut;
this.keepalive = keepAlive;
}
/**
* 设置mqtt连接参数
*
* @param username
* @param password
* @param timeout
* @param keepalive
* @return
*/
public MqttConnectOptions setMqttConnectOptions(String username, String password, int timeout, int keepalive) {
MqttConnectOptions options = new MqttConnectOptions();
options.setUserName(username);
options.setPassword(password.toCharArray());
options.setConnectionTimeout(timeout);
options.setKeepAliveInterval(keepalive);
options.setCleanSession(false);
return options;
}
/**
* 连接mqtt服务端,得到MqttClient连接对象
*/
public void connect() throws MqttException {
if (client == null) {
client = new MqttClient(host, clientId, new MemoryPersistence());
client.setCallback(new MyMQTTCallback(MyMQTTClient.this));
}
MqttConnectOptions mqttConnectOptions = setMqttConnectOptions(username, password, timeout, keepalive);
if (!client.isConnected()) {
client.connect(mqttConnectOptions);
} else {
client.disconnect();
client.connect(mqttConnectOptions);
}
LOGGER.info("MQTT connect success");//未发生异常,则连接成功
}
/**
* 发布,默认qos为0,非持久化
*
* @param pushMessage
* @param topic
*/
public void publish(String pushMessage, String topic) {
publish(pushMessage, topic, 0, false);
}
/**
* 发布消息
*
* @param pushMessage
* @param topic
* @param qos
* @param retained:留存
*/
public void publish(String pushMessage, String topic, int qos, boolean retained) {
MqttMessage message = new MqttMessage();
message.setPayload(pushMessage.getBytes());
message.setQos(qos);
message.setRetained(retained);
MqttTopic mqttTopic = MyMQTTClient.getClient().getTopic(topic);
if (null == mqttTopic) {
LOGGER.error("topic is not exist");
}
MqttDeliveryToken token;//Delivery:配送
synchronized (this) {//注意:这里一定要同步,否则,在多线程publish的情况下,线程会发生死锁,分析见文章最后补充
try {
token = mqttTopic.publish(message);//也是发送到执行队列中,等待执行线程执行,将消息发送到消息中间件
token.waitForCompletion(1000L);
} catch (MqttPersistenceException e) {
e.printStackTrace();
} catch (MqttException e) {
e.printStackTrace();
}
}
}
/**
* 订阅某个主题,qos默认为0
*
* @param topic
*/
public void subscribe(String topic) {
subscribe(topic, 0);
}
/**
* 订阅某个主题
*
* @param topic
* @param qos
*/
public void subscribe(String topic, int qos) {
try {
MyMQTTClient.getClient().subscribe(topic, qos);
} catch (MqttException e) {
e.printStackTrace();
}
}
/**
* 取消订阅主题
*
* @param topic 主题名称
*/
public void cleanTopic(String topic) {
if (client != null && client.isConnected()) {
try {
client.unsubscribe(topic);
} catch (MqttException e) {
e.printStackTrace();
}
} else {
System.out.println("取消订阅失败!");
}
}
}
5. MyMQTTCallback.java
代码语言:javascript复制package cn.kt.mqttdemo2.mqtt;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Created by tao.
* Date: 2021/4/12 14:46
* 描述:
*/
public class MyMQTTCallback implements MqttCallback {
private static final Logger LOGGER = LoggerFactory.getLogger(MyMQTTCallback.class);
private MyMQTTClient myMQTTClient;
public MyMQTTCallback(MyMQTTClient myMQTTClient) {
this.myMQTTClient = myMQTTClient;
}
/**
* 丢失连接,可在这里做重连
* 只会调用一次
*
* @param throwable
*/
@Override
public void connectionLost(Throwable throwable) {
LOGGER.error("连接断开,下面做重连...");
long reconnectTimes = 1;
while (true) {
try {
if (MyMQTTClient.getClient().isConnected()) {
LOGGER.warn("mqtt reconnect success end");
return;
}
LOGGER.warn("mqtt reconnect times = {} try again...", reconnectTimes );
MyMQTTClient.getClient().reconnect();
} catch (MqttException e) {
LOGGER.error("", e);
}
try {
Thread.sleep(1000);
} catch (InterruptedException e1) {
//e1.printStackTrace();
}
}
}
/**
* @param s
* @param mqttMessage
* @throws Exception
* subscribe后得到的消息会执行到这里面
*/
@Override
public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
//System.out.println("我收到消息了!!!");
LOGGER.info("接收消息主题 : {},接收消息内容 : {}", s, new String(mqttMessage.getPayload()));
}
/**
* 消息到达后
* subscribe后,执行的回调函数
*
* @param s
* @param mqttMessage
* @throws Exception
*/
/**
* publish后,配送完成后回调的方法
*
* @param iMqttDeliveryToken
*/
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
// LOGGER.info("==========deliveryComplete={}==========", iMqttDeliveryToken.isComplete());
}
}
6. MqttMsg.java
代码语言:javascript复制package cn.kt.mqttdemo2.domain;
/**
* Created by tao.
* Date: 2021/5/19 15:22
* 描述:
*/
public class MqttMsg {
private String name = "";
private String content = "";
private String time = "";
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getContent() {
return content;
}
public void setContent(String content) {
this.content = content;
}
public String getTime() {
return time;
}
public void setTime(String time) {
this.time = time;
}
@Override
public String toString() {
return "MqttMsg{"
"name='" name '''
", content='" content '''
", time='" time '''
'}';
}
}
7. MqttController.java
代码语言:javascript复制package cn.kt.mqttdemo2.controller;
import cn.kt.mqttdemo2.domain.MqttMsg;
import cn.kt.mqttdemo2.mqtt.MyMQTTClient;
import net.sf.json.JSONObject;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.ResponseBody;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.LinkedList;
import java.util.Queue;
import java.util.UUID;
/**
* Created by tao.
* Date: 2021/4/21 15:59
* 描述:
*/
@Controller
public class MqttController {
@Autowired
private MyMQTTClient myMQTTClient;
@Value("${mqtt.topic1}")
private String topic1;
@RequestMapping("/mqtt")
public String mqttClint() {
return "test.html";
}
Queue<String> msgQueue = new LinkedList<String>();
int count = 1;
/*@PostMapping("/getMsg")
@ResponseBody
public void mqttMsg(MqttMsg mqttMsg) {
System.out.println("***************" mqttMsg.getName() ":" mqttMsg.getContent() "****************");
SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String time = df.format(new Date());
mqttMsg.setTime(time);
JSONObject json = JSONObject.fromObject(mqttMsg);
String sendMsg = json.toString();
System.out.println(sendMsg);
System.out.println("时间戳" new Date().getTime());
//发布消息
myMQTTClient.publish(sendMsg, topic1);
}*/
@PostMapping("/getMsg")
@ResponseBody
public synchronized void mqttMsg(MqttMsg mqttMsg) {
System.out.println("队列元素数量:" msgQueue.size());
System.out.println("***************" mqttMsg.getName() ":" mqttMsg.getContent() "****************");
//时间格式化
SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String time = df.format(new Date());
mqttMsg.setTime(time);
mqttMsg.setContent(mqttMsg.getContent() "——后台编号:" count);
count ;
//map转json
JSONObject json = JSONObject.fromObject(mqttMsg);
String sendMsg = json.toString();
System.out.println(sendMsg);
//队列添加元素
boolean flag = msgQueue.offer(sendMsg);
if (flag) {
//发布消息
myMQTTClient.publish(msgQueue.poll(), topic1);
System.out.println("时间戳" new Date().getTime());
}
System.out.println("队列元素数量:" msgQueue.size());
}
}
8. mqttws31.js
可以下载: 链接:https://pan.baidu.com/s/1c9CfyhT4CSY2FEOa1OgxPw 提取码:siwg 也可以用对应的cdn 地址
代码语言:javascript复制<!-- For the plain library-->
<script src="https://cdnjs.cloudflare.com/ajax/libs/paho-mqtt/1.0.1/mqttws31.js" type="text/javascript"></script>
<!-- For the minified library-->
<script src="https://cdnjs.cloudflare.com/ajax/libs/paho-mqtt/1.0.1/mqttws31.min.js" type="text/javascript"></script>
9. test.html
代码语言:javascript复制<!DOCTYPE html >
<html>
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<meta http - equiv="X-UA-Compatible" content="ie=edge">
<title> Document </title>
<link href="https://cdn.bootcdn.net/ajax/libs/twitter-bootstrap/3.3.7/css/bootstrap.min.css" rel="nofollow noopener" rel="stylesheet">
<script src="https://cdn.bootcdn.net/ajax/libs/jquery/2.2.1/jquery.min.js"></script>
<script src="/mqttws31.js" type="text/javascript"></script>
<style>
#contentList li {
word-break: break-all;
word-wrap: break-word;
}
</style>
</head>
<body>
<div style="width: 900px;margin: 50px auto;">
<div class="form-group">
<label>评论人:</label>
<input type="text" class="form-control" id="user">
</div>
<div class="form-group">
<label>评论内容:</label>
<textarea class="form-control" id="content" style="word-break:break-all;word-wrap:break-word;"></textarea>
</div>
<div class="form-group">
<input type="button" value="发表评论" class="btn btn-primary" onclick="send()">
</div>
<div class="form-group">
<input type="button" value="连发测试" class="btn btn-primary" onclick="sendTest()">
</div>
<div>
<ul id="contentList" class="list-group">
<!-- <li class="list-group-item">
<span class="badge">评论人: {{ item.user }} 时间:{{item.time}}</span> {{ item.content }}
</li> -->
</ul>
</div>
</div>
<script>
var hostname = '127.0.0.1',
port = 8083,
clientId = 'client-' generateUUID(),
timeout = 1000,
keepAlive = 2000,
cleanSession = false,
ssl = false,
userName = 'Nick',
password = '12356',
topic = 'ceshi';
client = new Paho.MQTT.Client(hostname, port, clientId);
//建立客户端实例
var options = {
invocationContext: {
host: hostname,
port: port,
path: client.path,
clientId: clientId
},
timeout: timeout,
keepAliveInterval: keepAlive,
cleanSession: cleanSession,
useSSL: ssl,
userName: userName,
password: password,
onSuccess: onConnect,
onFailure: function (e) {
console.log(e);
}
};
client.connect(options);
//连接服务器并注册连接成功处理事件
function onConnect() {
console.log("onConnected");
client.subscribe(topic);
}
client.onConnectionLost = onConnectionLost;
//注册连接断开处理事件
client.onMessageArrived = onMessageArrived;
//注册消息接收处理事件
function onConnectionLost(responseObject) {
console.log(responseObject);
if (responseObject.errorCode !== 0) {
console.log("onConnectionLost:" responseObject.errorMessage);
console.log("连接已断开");
}
}
//收到消息时处理事件
function onMessageArrived(message) {
var msg = message.payloadString;
console.log("收到消息:" msg);
console.log("收到消息时间戳:" new Date().getTime());
var obj = JSON.parse(msg);
/*
<li class="list-group-item">
<span class="badge">评论人: {{ item.user }} 时间:{{item.time}}</span> {{ item.content }}
</li>
*/
$('#contentList').append($(`<li class="list-group-item" > <span class="badge">评论人:` obj.name `,时间:` obj.time `</span>` obj.content `</li>`));
}
//点击发送按钮事件
function send() {
var name = document.getElementById("user").value;
var content = document.getElementById("content").value;
console.log('name :>> ', name);
console.log('content :>> ', content);
var time = new Date().Format("yyyy-MM-dd hh:mm:ss");
console.log('time :>> ', time);
console.log("发送前时间戳:" new Date().getTime());
if (name) {
$.ajax({
type: "post",
url: "/getMsg",
data: {
name: name,
content: content,
time: time
},
dataType: "json"
});
document.getElementById("content").value = "";
document.getElementById("user").value = "";
}
}
//生成UUID
function generateUUID() {
var d = new Date().getTime();
if (window.performance && typeof window.performance.now === "function") {
d = performance.now(); //use high-precision timer if available
}
var uuid = 'xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx'.replace(/[xy]/g, function (c) {
var r = (d Math.random() * 16) % 16 | 0;
d = Math.floor(d / 16);
return (c == 'x' ? r : (r & 0x3 | 0x8)).toString(16);
});
return uuid;
}
//date时间格式化
Date.prototype.Format = function (fmt) {
var o = {
"M ": this.getMonth() 1, //月份
"d ": this.getDate(), //日
"h ": this.getHours(), //小时
"m ": this.getMinutes(), //分
"s ": this.getSeconds(), //秒
"q ": Math.floor((this.getMonth() 3) / 3), //季度
"S": this.getMilliseconds() //毫秒
};
if (/(y )/.test(fmt)) fmt = fmt.replace(RegExp.$1, (this.getFullYear() "").substr(4 - RegExp.$1.length));
for (var k in o)
if (new RegExp("(" k ")").test(fmt)) fmt = fmt.replace(RegExp.$1, (RegExp.$1.length == 1) ? (o[k]) : (("00" o[k]).substr(("" o[k]).length)));
return fmt;
}
function sendTest() {
for (var i = 1; i < 100; i ) {
var name = "ceshi" i;
var content = "测试内容" i;
var time;
time = new Date().getTime();
$.ajax({
type: "post",
url: "/getMsg",
data: {
name: name,
content: content,
time: time
},
dataType: "json"
});
}
}
</script>
</body>
</html>
集成后效果
客户端页面
后台处理
在其他的页面客户端也收到了订阅消息
demo源代码
链接:https://pan.baidu.com/s/1UtU_iAEI-DcSfsK8Z_rvxA 提取码:lmyq