消息通讯——springboot集成MQTT

2022-09-26 18:36:10 浏览数 (1)

MQTT基本使用查看上一篇文章:消息通讯——MQTT的入门和使用

springboot集成MQTT设计

springboot集成MQTT步骤

1. 引入pom依赖

代码语言:javascript复制
       <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <!--Spring boot Web容器-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <!--FreeMarker模板视图依赖-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-freemarker</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-stream</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-mqtt</artifactId>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>RELEASE</version>
            <scope>compile</scope>
        </dependency>
        <dependency>
            <groupId>org.eclipse.paho</groupId>
            <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
            <version>1.2.2</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
        </dependency>
        <dependency>
            <groupId>net.sf.json-lib</groupId>
            <artifactId>json-lib</artifactId>
            <version>2.4</version>
        </dependency>

2. application.properties

代码语言:javascript复制
# 应用服务 WEB 访问端口
server.port=8080
# 配置静态资源路径
spring.resources.static-locations=classpath:/resources/,classpath:/static/,classpath:/templates/

### FreeMarker 配置
spring.freemarker.allow-request-override=false
#Enable template caching.启用模板缓存。
spring.freemarker.cache=false
spring.freemarker.check-template-location=true
spring.freemarker.charset=UTF-8
spring.freemarker.content-type=text/html
spring.freemarker.expose-request-attributes=false
spring.freemarker.expose-session-attributes=false
spring.freemarker.expose-spring-macro-helpers=false
#设置面板后缀
spring.freemarker.suffix=.ftl

## MQTT##
mqtt.host=tcp://127.0.0.1:1883
mqtt.clientId=mqttClient
mqtt.username=admin
mqtt.password=123456
mqtt.timeout=1000
mqtt.keepalive=2000
mqtt.topic1=ceshi

3. MqttConfiguration.java

代码语言:javascript复制
package cn.kt.mqttdemo2.config;


import cn.kt.mqttdemo2.mqtt.MyMQTTClient;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * Created by tao.
 * Date: 2021/4/12 14:47
 * 描述:
 */
@Configuration
@Slf4j
public class MqttConfiguration {

    @Value("${mqtt.host}")
    String host;
    @Value("${mqtt.username}")
    String username;
    @Value("${mqtt.password}")
    String password;
    @Value("${mqtt.clientId}")
    String clientId;
    @Value("${mqtt.timeout}")
    int timeOut;
    @Value("${mqtt.keepalive}")
    int keepAlive;
    @Value("${mqtt.topic1}")
    String topic1;

    @Bean//注入spring
    public MyMQTTClient myMQTTClient() {
        MyMQTTClient myMQTTClient = new MyMQTTClient(host, username, password, clientId, timeOut, keepAlive);
        for (int i = 0; i < 10; i  ) {
            try {
                myMQTTClient.connect();
                myMQTTClient.subscribe(topic1, 1);
                return myMQTTClient;
            } catch (MqttException e) {
                log.error("MQTT connect exception,connect time = "   i);
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e1) {
                    e1.printStackTrace();
                }
            }
        }
        return myMQTTClient;
    }
}

4. MyMQTTClient.java

代码语言:javascript复制
package cn.kt.mqttdemo2.mqtt;


import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * Created by tao.
 * Date: 2021/4/12 14:46
 * 描述:
 */
public class MyMQTTClient {

    private static final Logger LOGGER = LoggerFactory.getLogger(MyMQTTClient.class);

    private static MqttClient client;

    public static MqttClient getClient() {
        return client;
    }

    public static void setClient(MqttClient client) {
        MyMQTTClient.client = client;
    }

    private String host;
    private String username;
    private String password;
    private String clientId;
    private int timeout;
    private int keepalive;

    public MyMQTTClient(String host, String username, String password, String clientId, int timeOut, int keepAlive) {
        this.host = host;
        this.username = username;
        this.password = password;
        this.clientId = clientId;
        this.timeout = timeOut;
        this.keepalive = keepAlive;
    }

    /**
     * 设置mqtt连接参数
     *
     * @param username
     * @param password
     * @param timeout
     * @param keepalive
     * @return
     */
    public MqttConnectOptions setMqttConnectOptions(String username, String password, int timeout, int keepalive) {
        MqttConnectOptions options = new MqttConnectOptions();
        options.setUserName(username);
        options.setPassword(password.toCharArray());
        options.setConnectionTimeout(timeout);
        options.setKeepAliveInterval(keepalive);
        options.setCleanSession(false);
        return options;
    }

    /**
     * 连接mqtt服务端,得到MqttClient连接对象
     */
    public void connect() throws MqttException {
        if (client == null) {
            client = new MqttClient(host, clientId, new MemoryPersistence());
            client.setCallback(new MyMQTTCallback(MyMQTTClient.this));
        }
        MqttConnectOptions mqttConnectOptions = setMqttConnectOptions(username, password, timeout, keepalive);
        if (!client.isConnected()) {
            client.connect(mqttConnectOptions);
        } else {
            client.disconnect();
            client.connect(mqttConnectOptions);
        }
        LOGGER.info("MQTT connect success");//未发生异常,则连接成功
    }

    /**
     * 发布,默认qos为0,非持久化
     *
     * @param pushMessage
     * @param topic
     */
    public void publish(String pushMessage, String topic) {
        publish(pushMessage, topic, 0, false);
    }

    /**
     * 发布消息
     *
     * @param pushMessage
     * @param topic
     * @param qos
     * @param retained:留存
     */
    public void publish(String pushMessage, String topic, int qos, boolean retained) {
        MqttMessage message = new MqttMessage();
        message.setPayload(pushMessage.getBytes());
        message.setQos(qos);
        message.setRetained(retained);
        MqttTopic mqttTopic = MyMQTTClient.getClient().getTopic(topic);
        if (null == mqttTopic) {
            LOGGER.error("topic is not exist");
        }
        MqttDeliveryToken token;//Delivery:配送
        synchronized (this) {//注意:这里一定要同步,否则,在多线程publish的情况下,线程会发生死锁,分析见文章最后补充
            try {
                token = mqttTopic.publish(message);//也是发送到执行队列中,等待执行线程执行,将消息发送到消息中间件
                token.waitForCompletion(1000L);
            } catch (MqttPersistenceException e) {
                e.printStackTrace();
            } catch (MqttException e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * 订阅某个主题,qos默认为0
     *
     * @param topic
     */
    public void subscribe(String topic) {
        subscribe(topic, 0);
    }

    /**
     * 订阅某个主题
     *
     * @param topic
     * @param qos
     */
    public void subscribe(String topic, int qos) {
        try {
            MyMQTTClient.getClient().subscribe(topic, qos);
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }


    /**
     * 取消订阅主题
     *
     * @param topic 主题名称
     */
    public void cleanTopic(String topic) {
        if (client != null && client.isConnected()) {
            try {
                client.unsubscribe(topic);
            } catch (MqttException e) {
                e.printStackTrace();
            }
        } else {
            System.out.println("取消订阅失败!");
        }
    }
}

5. MyMQTTCallback.java

代码语言:javascript复制
package cn.kt.mqttdemo2.mqtt;

import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * Created by tao.
 * Date: 2021/4/12 14:46
 * 描述:
 */
public class MyMQTTCallback implements MqttCallback {

    private static final Logger LOGGER = LoggerFactory.getLogger(MyMQTTCallback.class);

    private MyMQTTClient myMQTTClient;

    public MyMQTTCallback(MyMQTTClient myMQTTClient) {
        this.myMQTTClient = myMQTTClient;
    }

    /**
     * 丢失连接,可在这里做重连
     * 只会调用一次
     *
     * @param throwable
     */
    @Override
    public void connectionLost(Throwable throwable) {
        LOGGER.error("连接断开,下面做重连...");
        long reconnectTimes = 1;
        while (true) {
            try {
                if (MyMQTTClient.getClient().isConnected()) {
                    LOGGER.warn("mqtt reconnect success end");
                    return;
                }
                LOGGER.warn("mqtt reconnect times = {} try again...", reconnectTimes  );
                MyMQTTClient.getClient().reconnect();
            } catch (MqttException e) {
                LOGGER.error("", e);
            }
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e1) {
                //e1.printStackTrace();
            }
        }
    }

    /**
     * @param s
     * @param mqttMessage
     * @throws Exception
     * subscribe后得到的消息会执行到这里面
     */
    @Override
    public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
        //System.out.println("我收到消息了!!!");
        LOGGER.info("接收消息主题 : {},接收消息内容 : {}", s, new String(mqttMessage.getPayload()));
    }

    /**
     * 消息到达后
     * subscribe后,执行的回调函数
     *
     * @param s
     * @param mqttMessage
     * @throws Exception
     */
    /**
     * publish后,配送完成后回调的方法
     *
     * @param iMqttDeliveryToken
     */
    @Override
    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
//        LOGGER.info("==========deliveryComplete={}==========", iMqttDeliveryToken.isComplete());
    }
}

6. MqttMsg.java

代码语言:javascript复制
package cn.kt.mqttdemo2.domain;

/**
 * Created by tao.
 * Date: 2021/5/19 15:22
 * 描述:
 */
public class MqttMsg {
    private String name = "";
    private String content = "";
    private String time = "";

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public String getContent() {
        return content;
    }

    public void setContent(String content) {
        this.content = content;
    }

    public String getTime() {
        return time;
    }

    public void setTime(String time) {
        this.time = time;
    }

    @Override
    public String toString() {
        return "MqttMsg{"  
                "name='"   name   '''  
                ", content='"   content   '''  
                ", time='"   time   '''  
                '}';
    }
}

7. MqttController.java

代码语言:javascript复制
package cn.kt.mqttdemo2.controller;

import cn.kt.mqttdemo2.domain.MqttMsg;
import cn.kt.mqttdemo2.mqtt.MyMQTTClient;
import net.sf.json.JSONObject;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.ResponseBody;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.LinkedList;
import java.util.Queue;
import java.util.UUID;

/**
 * Created by tao.
 * Date: 2021/4/21 15:59
 * 描述:
 */
@Controller
public class MqttController {

    @Autowired
    private MyMQTTClient myMQTTClient;

    @Value("${mqtt.topic1}")
    private String topic1;

    @RequestMapping("/mqtt")
    public String mqttClint() {
        return "test.html";
    }

    Queue<String> msgQueue = new LinkedList<String>();
    int count = 1;

    /*@PostMapping("/getMsg")
    @ResponseBody
    public void mqttMsg(MqttMsg mqttMsg) {
        System.out.println("***************"   mqttMsg.getName()   ":"   mqttMsg.getContent()   "****************");
        SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        String time = df.format(new Date());
        mqttMsg.setTime(time);

        JSONObject json = JSONObject.fromObject(mqttMsg);
        String sendMsg = json.toString();
        System.out.println(sendMsg);
        System.out.println("时间戳"   new Date().getTime());

        //发布消息
        myMQTTClient.publish(sendMsg, topic1);
    }*/

    @PostMapping("/getMsg")
    @ResponseBody
    public synchronized void mqttMsg(MqttMsg mqttMsg) {
        System.out.println("队列元素数量:"   msgQueue.size());
        System.out.println("***************"   mqttMsg.getName()   ":"   mqttMsg.getContent()   "****************");

        //时间格式化
        SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        String time = df.format(new Date());
        mqttMsg.setTime(time);

        mqttMsg.setContent(mqttMsg.getContent()   "——后台编号:"   count);
        count  ;

        //map转json
        JSONObject json = JSONObject.fromObject(mqttMsg);
        String sendMsg = json.toString();
        System.out.println(sendMsg);

        //队列添加元素
        boolean flag = msgQueue.offer(sendMsg);
        if (flag) {
            //发布消息
            myMQTTClient.publish(msgQueue.poll(), topic1);
            System.out.println("时间戳"   new Date().getTime());
        }
        System.out.println("队列元素数量:"   msgQueue.size());
    }
}

8. mqttws31.js

可以下载: 链接:https://pan.baidu.com/s/1c9CfyhT4CSY2FEOa1OgxPw 提取码:siwg 也可以用对应的cdn 地址

代码语言:javascript复制
<!-- For the plain library-->
<script src="https://cdnjs.cloudflare.com/ajax/libs/paho-mqtt/1.0.1/mqttws31.js" type="text/javascript"></script>
<!-- For the minified library-->
<script src="https://cdnjs.cloudflare.com/ajax/libs/paho-mqtt/1.0.1/mqttws31.min.js" type="text/javascript"></script>

9. test.html

代码语言:javascript复制
<!DOCTYPE html >
<html>

<head>
    <meta charset="UTF-8">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <meta http - equiv="X-UA-Compatible" content="ie=edge">
    <title> Document </title>
    <link href="https://cdn.bootcdn.net/ajax/libs/twitter-bootstrap/3.3.7/css/bootstrap.min.css" rel="nofollow noopener"  rel="stylesheet">
    <script src="https://cdn.bootcdn.net/ajax/libs/jquery/2.2.1/jquery.min.js"></script>
    <script src="/mqttws31.js" type="text/javascript"></script>
    <style>
        #contentList li {
            word-break: break-all;
            word-wrap: break-word;
        }
    </style>
</head>

<body>
<div style="width: 900px;margin: 50px auto;">
    <div class="form-group">
        <label>评论人:</label>
        <input type="text" class="form-control" id="user">
    </div>

    <div class="form-group">
        <label>评论内容:</label>
        <textarea class="form-control" id="content" style="word-break:break-all;word-wrap:break-word;"></textarea>
    </div>

    <div class="form-group">
        <input type="button" value="发表评论" class="btn btn-primary" onclick="send()">
    </div>
    <div class="form-group">
        <input type="button" value="连发测试" class="btn btn-primary" onclick="sendTest()">
    </div>

    <div>
        <ul id="contentList" class="list-group">
            <!-- <li class="list-group-item">
                <span class="badge">评论人: {{ item.user }} 时间:{{item.time}}</span> {{ item.content }}
            </li> -->
        </ul>
    </div>
</div>

<script>
    var hostname = '127.0.0.1',
        port = 8083,
        clientId = 'client-'   generateUUID(),
        timeout = 1000,
        keepAlive = 2000,
        cleanSession = false,
        ssl = false,
        userName = 'Nick',
        password = '12356',
        topic = 'ceshi';
    client = new Paho.MQTT.Client(hostname, port, clientId);
    //建立客户端实例
    var options = {
        invocationContext: {
            host: hostname,
            port: port,
            path: client.path,
            clientId: clientId
        },
        timeout: timeout,
        keepAliveInterval: keepAlive,
        cleanSession: cleanSession,
        useSSL: ssl,
        userName: userName,
        password: password,
        onSuccess: onConnect,
        onFailure: function (e) {
            console.log(e);
        }
    };
    client.connect(options);

    //连接服务器并注册连接成功处理事件
    function onConnect() {
        console.log("onConnected");
        client.subscribe(topic);
    }

    client.onConnectionLost = onConnectionLost;

    //注册连接断开处理事件
    client.onMessageArrived = onMessageArrived;

    //注册消息接收处理事件
    function onConnectionLost(responseObject) {
        console.log(responseObject);
        if (responseObject.errorCode !== 0) {
            console.log("onConnectionLost:"   responseObject.errorMessage);
            console.log("连接已断开");
        }
    }

    //收到消息时处理事件
    function onMessageArrived(message) {
        var msg = message.payloadString;
        console.log("收到消息:"   msg);
        console.log("收到消息时间戳:"   new Date().getTime());
        var obj = JSON.parse(msg);
        /*
        <li class="list-group-item">
                <span class="badge">评论人: {{ item.user }} 时间:{{item.time}}</span> {{ item.content }}
            </li>
        */
        $('#contentList').append($(`<li class="list-group-item" > <span class="badge">评论人:`   obj.name   `,时间:`   obj.time   `</span>`   obj.content   `</li>`));
    }

    //点击发送按钮事件
    function send() {
        var name = document.getElementById("user").value;
        var content = document.getElementById("content").value;
        console.log('name :>> ', name);
        console.log('content :>> ', content);
        var time = new Date().Format("yyyy-MM-dd hh:mm:ss");
        console.log('time :>> ', time);
        console.log("发送前时间戳:"   new Date().getTime());

        if (name) {
            $.ajax({
                type: "post",
                url: "/getMsg",
                data: {
                    name: name,
                    content: content,
                    time: time
                },
                dataType: "json"
            });
            document.getElementById("content").value = "";
            document.getElementById("user").value = "";
        }
    }

    //生成UUID
    function generateUUID() {
        var d = new Date().getTime();
        if (window.performance && typeof window.performance.now === "function") {
            d  = performance.now(); //use high-precision timer if available
        }
        var uuid = 'xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx'.replace(/[xy]/g, function (c) {
            var r = (d   Math.random() * 16) % 16 | 0;
            d = Math.floor(d / 16);
            return (c == 'x' ? r : (r & 0x3 | 0x8)).toString(16);
        });
        return uuid;
    }

    //date时间格式化
    Date.prototype.Format = function (fmt) {
        var o = {
            "M ": this.getMonth()   1, //月份
            "d ": this.getDate(), //日
            "h ": this.getHours(), //小时
            "m ": this.getMinutes(), //分
            "s ": this.getSeconds(), //秒
            "q ": Math.floor((this.getMonth()   3) / 3), //季度
            "S": this.getMilliseconds() //毫秒
        };
        if (/(y )/.test(fmt)) fmt = fmt.replace(RegExp.$1, (this.getFullYear()   "").substr(4 - RegExp.$1.length));
        for (var k in o)
            if (new RegExp("("   k   ")").test(fmt)) fmt = fmt.replace(RegExp.$1, (RegExp.$1.length == 1) ? (o[k]) : (("00"   o[k]).substr((""   o[k]).length)));
        return fmt;
    }


    function sendTest() {

        for (var i = 1; i < 100; i  ) {
            var name = "ceshi"   i;
            var content = "测试内容"   i;
            var time;
            time = new Date().getTime();
            $.ajax({
                type: "post",
                url: "/getMsg",
                data: {
                    name: name,
                    content: content,
                    time: time
                },
                dataType: "json"
            });
        }
    }
</script>
</body>
</html>

集成后效果

客户端页面

后台处理

在其他的页面客户端也收到了订阅消息

demo源代码

链接:https://pan.baidu.com/s/1UtU_iAEI-DcSfsK8Z_rvxA 提取码:lmyq

0 人点赞