如何用Java实现一个基于MQTT协议的发布订阅示例

2021-10-25 10:12:50 浏览数 (1)

1 MQTT协议概述


      根据百度百科定义,MQTT是ISO 标准下基于发布/订阅模式的协议。它基于TCP/IP协议,具有轻量、简单、开放和易于实现的特点。该协议广泛应用于机器间通信,即物联网领域。它是为硬件性能低下的远程设备以及网络状况不佳的情况下而设计协议,为此,它需要一个消息中间件(MQTT Broker,可以理解为Server端。)MQTT协议这些特点使它适用范围非常广泛。

     MQTT协议通信示意图如下所示:

    其中的消息发布和订阅需要通过中间代理MQTT Broker实现,而MQTT Client则可以发布消息,也可以订阅消息。换句话说,消息的发布者和订阅者都是客户端,消息代理是服务器(例如EMQTT、Mosquitto、Apollo等),消息发布者可以同时是订阅者。MQTT传输的消息分为:主题(Topic)和负载(Payload)两部分。Topic,可以理解为消息的类型,订阅者订阅(Subscribe)后,就会收到该主题的消息内容。而Payload可以理解为消息的内容,是指订阅者具体要使用的内容。另外,MQTT协议是一个分布式通信协议,消息传递需要关心数据的质量,它有三个值分别是: QoS 0:“至多一次”,消息发布完全依赖底层 TCP/IP 网络。会发生消息丢失或重复。这一级别可用于如下情况,环境传感器数据,丢失一次读记录无所谓,因为不久后还会有第二次发送。 QoS 1:“至少一次”,确保消息到达,但消息重复可能会发生。 QoS 2:“只有一次”,确保消息到达一次。这一级别可用于如下情况,在计费系统中,消息重复或丢失会导致不正确的结果。

2 Mosquitto概述


      Mosquitto是一个开源消息代理(Broker),实现了MQTT协议版本3.1和3.1.1。它非常轻量,适用于低功耗单板计算机到完整服务器的所有设备。Mosquitto项目还提供了用于实现MQTT客户端的C库以及非常受欢迎的mosquitto_pub和mosquitto_sub命令行MQTT客户端。首先安装从官网 http://mosquitto.org/download/ 下载对应的安装软件,并安装:

   安装后,可以启动该服务,默认端口为1883。如下图所示:

另外,官网还提供MQTT客户端工具,可以非常方便的提供测试,下载地址为http://www.eclipse.org/paho/components/tool 可以下载org.eclipse.paho.mqtt.utility-1.0.0.jar这个Jar包,进行双击运行:

3 MQTT Java实现


      首先保证Mosquitto Broker运行,否则无法进行客户端的消息发布和订阅。首先需要用maven下载客户端库,代码如下:

代码语言:javascript复制
<dependency>
	<groupId>org.eclipse.paho</groupId>
	<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
	<version>1.2.5</version>
</dependency>

     下面给出发布者核心代码:

代码语言:javascript复制
package com.example.demo.mqtt;

import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

public class MqttProducer {

    private MqttClient mqttClient;

    public MqttProducer(String SERVER_URI,String CLIENT_ID){
        try {
            MemoryPersistence persistence = new MemoryPersistence();
            mqttClient = new MqttClient(SERVER_URI, CLIENT_ID,persistence);
            MqttConnectOptions connOpts = new MqttConnectOptions();
            connOpts.setCleanSession(true);
            System.out.println("Connecting to broker: "  SERVER_URI);
            mqttClient.connect(connOpts);
        }catch (Exception ex){
            ex.printStackTrace();
        }
    }
    public void send(String topic, int qos, boolean retained, String payload) {
        if (mqttClient == null){
            return;
        }
        try {
            mqttClient.publish(topic, payload.getBytes(), qos, retained);
        } catch (MqttException e) {
            System.out.println(e.getMessage());
        }
    }
}

  MqttClient是MQTT客户端的抽象,它需要指定Broker地址,如"tcp://localhost:1883",同时需要指定一个唯一的客户端ID,在发布消息时,mqttClient.publish(topic, payload.getBytes(), qos, retained),需要指定主题topic,内容payload,以及质量Qos等。下面再给出订阅者代码:

代码语言:javascript复制
package com.example.demo.mqtt;

import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

public class MqttSubscriber {
    
    private MqttClient mqttClient;

    public MqttSubscriber(String SERVER_URI,String CLIENT_ID){
        try {
            MemoryPersistence persistence = new MemoryPersistence();
            mqttClient = new MqttClient(SERVER_URI, CLIENT_ID,persistence);
            MqttConnectOptions connOpts = new MqttConnectOptions();
            connOpts.setCleanSession(true);
            mqttClient.connect(connOpts);

        }catch (Exception ex){
            ex.printStackTrace();
        }
    }
    public void subscribe(String topic) {
        if (mqttClient == null){
            return;
        }
        try {
            mqttClient.subscribe(topic);
            mqttClient.setCallback(new MqttCallback() {
                @Override
                public void connectionLost(Throwable throwable) {
                    System.out.println("连接丢失");
                }

                @Override
                public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
                    System.out.println(topic);
                    System.out.println(mqttMessage.toString());
                }

                @Override
                public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
                    System.out.println("delivery isComplete:"   iMqttDeliveryToken.isComplete());
                }
            });
        } catch (MqttException e) {
            System.out.println(e.getMessage());
        }
    }
}

项目启动代码如下:

代码语言:javascript复制
package com.example.demo.mqtt;

public class Test {
    public static void main(String[] args) {
        String serverURI="tcp://localhost:1883";
        String clientID="demo_mqtt";
        MqttProducer mqttProducer = new MqttProducer(serverURI, clientID);
        String msg ="";
        while (true){
            msg = "time:"   System.currentTimeMillis();
            mqttProducer.send("topic/msg02", 1, true, msg);
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}
/////////////////////////////////
package com.example.demo.mqtt;

public class Test2 {
    public static void main(String[] args) {
        String serverURI="tcp://localhost:1883";
        String clientID="demo_mqtt000";
        MqttSubscriber mqttSubscriber = new MqttSubscriber(serverURI, clientID);
        mqttSubscriber.subscribe("topic/msg02");
    }
}

运行后,控制台打印如下:

代码语言:javascript复制
topic/msg02
time:1634979250907
topic/msg02
time:1634979251908
topic/msg02
time:1634979252908
topic/msg02
time:1634979253909
//.......................

0 人点赞