消息通讯——MQTT的入门和使用

2022-09-26 18:36:41 浏览数 (1)

Emqx简介

EMQ X (Erlang/Enterprise/Elastic MQTT Broker) 是基于 Erlang/OTP 平台开发的开源物联网 MQTT 消息服务器。 EMQ X 设计目标是实现高可靠,并支持承载海量物联网终端的MQTT连接,支持在海量物联网设备间低延时消息路由: 1. 稳定承载大规模的 MQTT 客户端连接,单服务器节点支持50万到100万连接。 2. 分布式节点集群,快速低延时的消息路由,单集群支持1000万规模的路由。 3. 消息服务器内扩展,支持定制多种认证方式、高效存储消息到后端数据库。 4. 完整物联网协议支持,MQTT、MQTT-SN、CoAP、LwM2M、WebSocket 或私有协议支持。

官方文档:https://docs.emqx.cn/broker/v4.3/getting-started/install.html

MQTT是什么?

MQTT全称消息队列遥测传输 (Message Queuing Telemetry Transport)。其主要提供了订阅/发布两种消息模式,更为简约、轻量,易于使用,特别适合于受限环境(带宽低、网络延迟高、网络通信不稳定)的消息分发,属于物联网(Internet of Thing)的一个标准传输协议。

MQTT实现方式

实现MQTT协议需要客户端和服务器端通讯完成,在通讯过程中,MQTT协议中有三种身份:发布者(Publish)、代理(Broker)(服务器)、订阅者(Subscribe)。其中,消息的发布者和订阅者都是客户端,消息代理是服务器,消息发布者可以同时是订阅者。

MQTT传输的消息分为:主题(Topic)和负载(payload)两部分: (1)Topic,可以理解为消息的类型,订阅者订阅(Subscribe)后,就会收到该主题的消息内容(payload); (2)payload,可以理解为消息的内容,是指订阅者具体要使用的内容。

Emqx安装

官方网站:https://www.emqx.cn/

安装步骤

  1. 下载地址:https://www.emqx.cn/downloads#broker
  2. 解压程序包
  3. 启动 EMQ X Broker 进入到emqx解压后目录,进入bin目录,执行其下的命令脚本 #启动emqx emqx start #查看emqx状态 emqx status #停止 EMQ X Broker emqx stop
  4. 卸载 EMQ X Broker 直接删除 EMQ X 目录即可

Emqx Dashboard插件

Emqx自带dashboard插件:通过Dashboard,你可以查看服务器基本信息、负载情况和统计数据,可以查看某个客户端的连接状态等信息甚至断开其连接,也可以动态加载和卸载指定插件。 除此之外,EMQ X Dashboard 还提供了规则引擎的可视化操作界面,同时集成了一个简易的 MQTT 客户端工具供用户测试使用。 当 EMQ X 成功运行在你的本地计算机上且 EMQ X Dashboard 被默认启用时,你可以访问 http://localhost:18083 来查看你的 Dashboard,默认用户名是admin,密码是 public

MQTT 设计了的3 QoS 等级

QoS 0:消息最多传递一次,如果当时客户端不可用,则会丢失该消息。 QoS 1:消息传递至少 1 次。 QoS 2:消息仅传送一次。

需要开放的端口

Emqx使用

java使用mqtt

使用步骤如下
  1. 导入依赖
代码语言:javascript复制
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-jpa</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.eclipse.paho</groupId>
            <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
            <version>1.2.2</version>
        </dependency>
  1. 订阅者(App.java)
代码语言:javascript复制
package cn.kt.mtqqdemo.mqtt;
/**
 * Created by tao.
 * Date: 2021/4/12 13:57
 * 描述:
 */
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import java.util.UUID;
public class App {
    public static void main(String[] args) {
        try {
            //apollo地址
            String HOST = "tcp://127.0.0.1:1883";
            //要订阅的主题
            String TOPIC1 = "ceshi";
            //指你Apollo中的用户名密码
            String userName = "admin";
            String pwd = "123456";
            String clientid = UUID.randomUUID().toString().replace("-", "");
            MqttClient client = new MqttClient(HOST, clientid, new MemoryPersistence());
            // MQTT的连接对象
            MqttConnectOptions options = new MqttConnectOptions();
            //设置连接参数
            //清除session回话
            options.setCleanSession(false);
            options.setUserName(userName);
            options.setPassword(pwd.toCharArray());
            //超时设置
            options.setConnectionTimeout(10);
            //心跳保持时间
            options.setKeepAliveInterval(20);
            //遗嘱:当该客户端端口连接时,会向whb主题发布一条信息
            options.setWill("nick", "我挂了,你加油".getBytes(), 1, true);
            //监听对象:自己创建
            client.setCallback(new PushCallback());
            //打开连接
            client.connect(options);
            //设置消息级别
            int[] Qos = {1};
            //订阅主题
            String[] topics = {TOPIC1};
            client.subscribe(topics, Qos);

        } catch (MqttException e) {
            e.printStackTrace();
        }
    }
}
  1. 发布者(sendOut.java)
代码语言:javascript复制
package cn.kt.mtqqdemo.mqtt;
/**
 * Created by tao.
 * Date: 2021/4/12 14:09
 * 描述:
 */
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import java.io.UnsupportedEncodingException;
import java.util.Scanner;
public class SendOut {
    //tcp://MQTT安装的服务器地址:MQTT定义的端口号
    String HOST = "tcp://127.0.0.1:1883";
    //定义一个主题
    public static final String TOPIC = "ceshi";
    //    public static final String TOPIC = "abc";
    //定义MQTT的ID,可以在MQTT服务配置中指定
    private static final String clientid = "server1";
    private MqttMessage message;
    public static final String TOPIC1 = "topic1";
    public static final String userName = "admin";
    public static final String pwd = "123456";
    public MqttClient client;
    private MqttTopic topic;
    public SendOut() {
        try {
            client = new MqttClient(HOST, clientid, new MemoryPersistence());
            connect();
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }

    //发布消息
    public void publish(MqttTopic topic, MqttMessage message) throws MqttException {
        MqttDeliveryToken token = topic.publish(message);
        token.waitForCompletion();
        //打印发送状态
        System.out.println("message is published completely!"   token.isComplete());
    }

    //建立连接:参数与订阅端相似
    private void connect() throws MqttException {
        MqttConnectOptions options = new MqttConnectOptions();
        options.setCleanSession(false);
        options.setUserName(userName);
        options.setPassword(pwd.toCharArray());
        options.setConnectionTimeout(10);
        options.setKeepAliveInterval(20);
        client.setCallback(new PushCallback());
        client.connect(options);
    }

    public static void main(String[] args) throws MqttException, UnsupportedEncodingException {
        SendOut service = new SendOut();
        Scanner sc = new Scanner(System.in);
        service.topic = service.client.getTopic(TOPIC);
        service.message = new MqttMessage();
        //确保被收到一次
        service.message.setQos(1);
        service.message.setPayload("干嘛这么想不开,要在脸上贴个输字".getBytes("UTF-8"));
        service.publish(service.topic, service.message);
    }
}
  1. 订阅消息回调(OnMessageCallback.java)
代码语言:javascript复制
package cn.kt.mtqqdemo.mqtt;
/**
 * Created by tao.
 * Date: 2021/4/12 13:58
 * 描述:
 */
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
public class OnMessageCallback implements MqttCallback {
    public void connectionLost(Throwable cause) {
        // 连接丢失后,一般在这里面进行重连
        System.out.println("连接断开,可以做重连");
    }
    public void messageArrived(String topic, MqttMessage message) throws Exception {
        // subscribe后得到的消息会执行到这里面
        System.out.println("接收消息主题:"   topic);
        System.out.println("接收消息Qos:"   message.getQos());
        System.out.println("接收消息内容:"   new String(message.getPayload()));
    }

    public void deliveryComplete(IMqttDeliveryToken token) {
        System.out.println("deliveryComplete---------"   token.isComplete());
    }
}
  1. 发布消息回调(PushCallback.java)
代码语言:javascript复制
package cn.kt.mtqqdemo.mqtt;
/**
 * Created by tao.
 * Date: 2021/4/12 14:01
 * 描述:
 */
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
public class PushCallback implements MqttCallback {

    //连接丢失:一般用与重连
    public void connectionLost(Throwable throwable) {
        System.out.println("丢失连接");
    }
    //消息到达:指收到消息
    public void messageArrived(String topic, MqttMessage message) throws Exception {
        System.out.println("接收消息主题 : "   topic);
        System.out.println("接收消息Qos : "   message.getQos());
        System.out.println("接收消息内容 : "   new String(message.getPayload()));
    }

    public void deliveryComplete(IMqttDeliveryToken token) {
        //(发布)publish后会执行到这里,发送状态
        System.out.println("deliveryComplete---------"
                  token.isComplete());
    }
}
测试效果
  1. 发布者
  1. 订阅者

js使用mqtt

引入mqttws31.js

可以下载: 链接:https://pan.baidu.com/s/1c9CfyhT4CSY2FEOa1OgxPw 提取码:siwg

也可以用对应的cdn 地址

代码语言:javascript复制
<!-- For the plain library-->
<script src="https://cdnjs.cloudflare.com/ajax/libs/paho-mqtt/1.0.1/mqttws31.js" type="text/javascript"></script>
<!-- For the minified library-->
<script src="https://cdnjs.cloudflare.com/ajax/libs/paho-mqtt/1.0.1/mqttws31.min.js" type="text/javascript"></script>
代码如下
代码语言:javascript复制
<!DOCTYPE html >
<html>

<head>
    <meta charset="UTF-8">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <meta http - equiv="X-UA-Compatible" content="ie=edge">
    <title> Document </title>
    <link href="https://cdn.bootcdn.net/ajax/libs/twitter-bootstrap/3.3.7/css/bootstrap.min.css" rel="nofollow noopener"  rel="stylesheet">
    <script src="https://cdn.bootcdn.net/ajax/libs/jquery/2.2.1/jquery.min.js"></script>
    <script src="./js/mqttws31.js" type="text/javascript"></script>
    <style>
        #contentList li {
            word-break: break-all;
            word-wrap: break-word;
        }
    </style>
</head>

<body>
    <div style="width: 900px;margin: 50px auto;">
        <div class="form-group">
            <label>评论人:</label>
            <input type="text" class="form-control" id="user">
        </div>

        <div class="form-group">
            <label>评论内容:</label>
            <textarea class="form-control" id="content" style="word-break:break-all;word-wrap:break-word;"></textarea>
        </div>

        <div class="form-group">
            <input type="button" value="发表评论" class="btn btn-primary" onclick="send()">
        </div>

        <div>
            <ul id="contentList" class="list-group">
                <!-- <li class="list-group-item">
                    <span class="badge">评论人: {{ item.user }} 时间:{{item.time}}</span> {{ item.content }}
                </li> -->
            </ul>
        </div>
    </div>

    <script>
        // http://192.168.3.181/
        var hostname = '192.168.3.181',
            port = 8083,
            clientId = 'client-'   generateUUID(),
            timeout = 1000,
            keepAlive = 2000,
            cleanSession = false,
            ssl = false,
            userName = 'Nick',
            password = '12356',
            topic = 'ceshi';
        client = new Paho.MQTT.Client(hostname, port, clientId);
        //建立客户端实例
        var options = {
            invocationContext: {
                host: hostname,
                port: port,
                path: client.path,
                clientId: clientId
            },
            timeout: timeout,
            keepAliveInterval: keepAlive,
            cleanSession: cleanSession,
            useSSL: ssl,
            userName: userName,
            password: password,
            onSuccess: onConnect,
            onFailure: function(e) {
                console.log(e);
            }
        };
        client.connect(options);

        //连接服务器并注册连接成功处理事件
        function onConnect() {
            console.log("onConnected");
            client.subscribe(topic);
        }

        client.onConnectionLost = onConnectionLost;

        //注册连接断开处理事件
        client.onMessageArrived = onMessageArrived;

        //注册消息接收处理事件
        function onConnectionLost(responseObject) {
            console.log(responseObject);
            if (responseObject.errorCode !== 0) {
                console.log("onConnectionLost:"   responseObject.errorMessage);
                console.log("连接已断开");
            }
        }

        //收到消息时处理事件
        function onMessageArrived(message) {
            var msg = message.payloadString;
            var obj = JSON.parse(msg);
            console.log("收到消息:"   obj);
            /*
            <li class="list-group-item">
                    <span class="badge">评论人: {{ item.user }} 时间:{{item.time}}</span> {{ item.content }}
                </li>
            */
            $('#contentList').append($(`<li class="list-group-item" > <span class="badge">评论人:`   obj.name   `,时间:`   obj.time   `</span>`   obj.content   `</li>`));
        }

        //点击发送按钮事件
        function send() {
            var name = document.getElementById("user").value;
            var content = document.getElementById("content").value;
            console.log('name :>> ', name);
            console.log('content :>> ', content);
            var time = new Date().Format("yyyy-MM-dd hh:mm:ss");
            var getConment = {
                name: name,
                content: content,
                time: time,
            }
            if (name) {
                var str = getConment;
                message = new Paho.MQTT.Message(JSON.stringify(str));
                message.destinationName = topic;
                client.send(message);
                document.getElementById("content").value = "";
                document.getElementById("user").value = "";
            }
        }

        //生成UUID
        function generateUUID() {
            var d = new Date().getTime();
            if (window.performance && typeof window.performance.now === "function") {
                d  = performance.now(); //use high-precision timer if available
            }
            var uuid = 'xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx'.replace(/[xy]/g, function(c) {
                var r = (d   Math.random() * 16) % 16 | 0;
                d = Math.floor(d / 16);
                return (c == 'x' ? r : (r & 0x3 | 0x8)).toString(16);
            });
            return uuid;
        }
        //date时间格式化
        Date.prototype.Format = function(fmt) {
            var o = {
                "M ": this.getMonth()   1, //月份
                "d ": this.getDate(), //日
                "h ": this.getHours(), //小时
                "m ": this.getMinutes(), //分
                "s ": this.getSeconds(), //秒
                "q ": Math.floor((this.getMonth()   3) / 3), //季度
                "S": this.getMilliseconds() //毫秒
            };
            if (/(y )/.test(fmt)) fmt = fmt.replace(RegExp.$1, (this.getFullYear()   "").substr(4 - RegExp.$1.length));
            for (var k in o)
                if (new RegExp("("   k   ")").test(fmt)) fmt = fmt.replace(RegExp.$1, (RegExp.$1.length == 1) ? (o[k]) : (("00"   o[k]).substr((""   o[k]).length)));
            return fmt;
        }
    </script>
</body>
</html>
测试效果

页面效果

java 连接mqtt订阅者收到消息

0 人点赞