Java使用EMQX实现MQTT通信

2023-07-20 15:34:12 浏览数 (2)

前言

在上一篇文章 《Java使用modbus4j实现ModbusTCP通信》 中我们介绍了Java与Modbus协议的TCP通信,本文讲解一下如何用Java实现对当下最流行的物联网协议之一的MQTT协议进行通信。

MQTT

MQTT(Message Queuing Telemetry Transport, 消息队列遥测传输协议),是一种基于发布/订阅(publish/subscribe)模式的"轻量级"通讯协议,该协议构建于TCP/IP协议上,由IBM在1999年发布。MQTT最大优点在于,可以以极少的代码和有限的带宽,为远程连接设备提过实时可靠的消息服务,作为一种低开销、低带宽占用的即时通讯协议,使其在物联网、小型设备、移动应用等方面有较广泛的应用。

MQTT是一个基于客户端-服务器的消息发布/订阅传输协议。MQTT协议是轻量、简单、开放和易于实现的,这些特点使它适用范围非常广泛。在很多情况下,包括受限的环境中,如:机器与机器(M2M)通信和物联网(loT)。其在,通过卫星链路通信传感器、偶尔拨号的医疗设备、智能家居、及一些小型化设备中已广泛使用。

特点:

  • 使用发布/订阅消息模式,提供一对多的消息发布,解除应用程序耦合;
  • 对负载内容屏蔽的消息传输;
  • 使用 TCP/IP 提供网络连接;
  • 有三种消息发布服务质量:
  • 小型传输,开销很小(固定长度的头部是 2 字节),协议交换最小化,以降低网络流量;
  • 使用 Last Will 和 Testament 特性通知有关各方客户端异常中断的机制。

EMQX

EMQX 是一个「无限连接,任意集成,随处运行」大规模分布式物联网接入平台。

EMQX 企业版提供一体化的分布式 MQTT 消息服务和强大的 IoT 规则引擎,为高可靠、高性能的物联网实时数据移动、处理和集成提供动力,助力企业快速构建关键业务的 IoT 平台与应用。附下载地址: https://www.emqx.com/zh/try?product=enterprise 可以自行下载对应版本运行

优势:

  • 海量连接:单节点支持 500 万 MQTT 设备连接,集群可水平扩展至支持 1 亿并发的 MQTT 连接。
  • 高可靠:弹性伸缩,无单点故障。内置 RocksDB 可靠地持久化 MQTT 消息,确保无数据损失。
  • 数据安全:端到端数据加密(支持国密),细粒度访问控制,保障数据安全,满足企业合规需求。
  • 多协议:支持 MQTT、HTTP、QUIC、WebSocket、LwM2M/CoAP 或专有协议连接任何设备。
  • 高性能:单节点支持每秒实时接收、处理与分发数百万条的 MQTT 消息。毫秒级消息交付时延。
  • 易运维:图形化配置、操作与管理,实时监测运行状态。支持 MQTT 跟踪进行端到端问题分析。

Mria 集群架构​

支持全新的 Mria 集群架构,在此架构下 EMQX 水平扩展性得到指数级提升,单个集群可以轻松支持 1 亿 MQTT 连接,这使得 EMQX 5.0 成为目前全球最具扩展性的 MQTT Broker。

在构建满足用户业务需求的更大规模集群的同时,Mria 架构还能够降低大规模部署下的脑裂风险以及脑裂后的影响,以提供更加稳定可靠的物联网数据接入服务。

具体可以查看官方文档: https://docs.emqx.com/zh/enterprise/v5.1/deploy/cluster/create-cluster.html

MQTTX

MQTTX 是由 EMQ 开发的一款开源跨平台 MQTT 5.0 桌面客户端,它兼容 macOS,Linux 以及 Windows 系统。MQTTX 的用户界面 UI 采用聊天式设计,使得操作逻辑更加简明直观。它支持用户快速创建和保存多个 MQTT 连接,便于测试 MQTT/MQTTS 连接,以及 MQTT 消息的订阅和发布。

主要功能

  • 采用聊天界面设计,使得操作更加简单明了
  • 跨平台兼容,支持在 Windows,macOS,Linux 系统上运行
  • 100% 兼容 MQTT v5.0,v3.1.1 和 v3.1 协议
  • 订阅的 MQTT 主题支持自定义颜色标签
  • 支持单向和双向 SSL 认证,同时支持 CA 和自签名证书
  • 支持通过 WebSocket 连接 MQTT 服务器
  • 支持 Hex, Base64, JSON, Plaintext 等 Payload 格式转换
  • 自定义脚本支持模拟 MQTT 发布/订阅测试
  • 提供完整的日志记录功能
  • 多语言支持:简体中文、英语、日语、土耳其语及匈牙利语 ???????? ???????? ???????? ???????? ????????
  • 自由切换 Light、Dark、Night 三种主题模式

Java代码实现

引入maven

代码语言:javascript复制
 <!-- mqtt -->
 <dependency>
    <groupId>org.fusesource.mqtt-client</groupId>
    <artifactId>mqtt-client</artifactId>
    <version>1.16</version>
 </dependency>
 <dependency>
    <groupId>org.eclipse.paho</groupId>
    <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
    <version>1.2.2</version>
 </dependency>

config配置文件

新建 config.properties 文件,把连接EMQX的信息写在配置文件里,以便获取。也可以写在项目的yml配置文件里

代码语言:javascript复制
#用户名
mqtt.username=admin

#连接emqx密码
mqtt.password=xxxxxx
#是否清除会话
mqtt.cleanSession=true
#服务地址
mqtt.serverURI=tcp://192.168.1.22
#端口
mqtt.port = 1883
#客户端id
mqtt.clientId=xxxx
#mqtt.clientId=123456
#订阅topic
mqtt.service.subscribe.topic = xxx/xxx/ ,$SYS/brokers/ /clients/ /disconnected,$SYS/brokers/ /clients/ /connected
#发送topic
mqtt.facility.subscribe.topic = xxx/xxx

Service类

新建service核心类,主要用于连接EMQX并注册、订阅。

代码语言:javascript复制
package com.sss.common.core.protocol.mqtt.server;

import com.sss.common.core.protocol.strategy.FacilityDataProcess;
import com.sss.common.utils.properties.ResourceBundleUtil;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableScheduling;

import javax.annotation.PostConstruct;
import java.util.Arrays;

@Configuration
@EnableScheduling
public class MqttService {
    @Autowired
    @Qualifier("FacilityDataProcess")
    private FacilityDataProcess facilityDataProcess;

    public static final String HOST = ResourceBundleUtil.MQTT_LAMBDA.apply("mqtt.serverURI") ;
    public static final String PORT = ResourceBundleUtil.MQTT_LAMBDA.apply("mqtt.port") ;
    public static final String TOPIC = ResourceBundleUtil.MQTT_LAMBDA.apply("mqtt.service.subscribe.topic")  ;
    private static final String CLIENT_ID = ResourceBundleUtil.MQTT_LAMBDA.apply("mqtt.clientId")  ;
    private static final String USER_NAME = ResourceBundleUtil.MQTT_LAMBDA.apply("mqtt.username")  ;
    private static final String PASSWORD = ResourceBundleUtil.MQTT_LAMBDA.apply("mqtt.password")  ;
    private static final Boolean CLEAN_SESSION = "true".equalsIgnoreCase(ResourceBundleUtil.MQTT_LAMBDA.apply("mqtt.cleanSession"));

    private static MqttClient client;
    private static final Logger log = LoggerFactory.getLogger(MqttService.class);

    /**
     * 构造函数
     * @throws MqttException 异常信息
     */
    public MqttService() throws MqttException {
        // MemoryPersistence设置
        client = new MqttClient(HOST ":" PORT, CLIENT_ID, new MemoryPersistence());
    }

    @PostConstruct
    public void init(){
    	connect();
    }

    /**
     *  连接EMQ X服务器
     */
    private void connect() {
        log.info("HOST:                           " HOST);
        log.info("PORT:                           " PORT);
        log.info("TOPIC:                           " TOPIC);
        log.info("CLIENT_ID:                           " CLIENT_ID);
        log.info("USER_NAME:                           " USER_NAME);

        MqttConnectOptions options = new MqttConnectOptions();
        options.setCleanSession(false);
        options.setUserName(USER_NAME);
        options.setPassword(PASSWORD.toCharArray());
        // 设置超时时间
        options.setConnectionTimeout(20);
        // 设置会话心跳时间
        options.setKeepAliveInterval(30);
        // 重连
        options.setAutomaticReconnect(true);
        // 设置是否清除会话
        options.setCleanSession(CLEAN_SESSION);
        try {
            client.setCallback(new MessageCallback(facilityDataProcess));
            client.connect(options);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public static void subscribe(){
    	try{
	    	 // 订阅消息
    		String[] topic1 = TOPIC.split(",")  ;
    		int[] qos = new int[topic1.length];
            // 循环将所有主题的Qos设置为1
            Arrays.fill(qos, 1);
	        client.subscribe(topic1, qos);
    	}catch (Exception e) {
            e.printStackTrace();
        }
    }
    /**
     * 消息发送
     * @param message byte
     * @param topic 主题
     * @param qos  qos
     */
    public static void sendMessage(byte[] message,String topic,Integer qos){
    	MqttMessage mess = new MqttMessage();
    	mess.setQos(qos);
    	mess.setRetained(false);
    	mess.setPayload(message);
    	try {
			client.publish(topic, mess);
		} catch (Exception e) {
            e.printStackTrace();
        }
    }
    /**
     * 消息发送
     * @param message 消息体
     * @param topic 主题
     * @param qos  qos
     */
    public static void sendMessage(String message,String topic,Integer qos){
    	MqttMessage mess = new MqttMessage();
    	mess.setQos(qos);
    	mess.setRetained(false);
    	mess.setPayload(message.getBytes());
    	try {
			client.publish(topic, mess);
		} catch (Exception e) {
            e.printStackTrace();
        }
    }
}

回调消息处理类

新建callback类,这是一个回调类,用于监听异步发生的事件回调,接收消息并处理。

代码语言:javascript复制
package com.sss.common.core.protocol.mqtt.server;

import cn.hutool.core.thread.ThreadUtil;
import com.sss.common.core.protocol.mqtt.handler.MessageHandler;
import com.sss.common.core.protocol.strategy.FacilityDataProcess;
import lombok.SneakyThrows;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import org.eclipse.paho.client.mqttv3.MqttMessage;

import java.util.concurrent.ExecutorService;


public class MessageCallback implements MqttCallbackExtended {

	private final FacilityDataProcess facilityDataProcess ;

	public MessageCallback(FacilityDataProcess facilityDataProcess) {
		super();
		this.facilityDataProcess = facilityDataProcess ;
	}

	@Override
	/**
	 * 与服务器的连接丢失时,将调用此方法。
	 * @param arg0 :失去连接的原因
	 */
	public void connectionLost(Throwable arg0) {
		System.err.println("断开连接的原因是:" arg0);
	}

	@SneakyThrows
	@Override
	/**
	 * 在完成消息传递并收到所有确认后调用
	 * @param token :与消息关联的传递令牌
	*/
	public void deliveryComplete(IMqttDeliveryToken token) {
		//   delivery 传送OK
		System.out.print("delivery 传送OK:");
		System.out.println(token.getResponse());
	}

	@Override
	/**
	 * 从服务器收到消息时,将调用此方法
	 */
	public void messageArrived(String topic, MqttMessage message) {
		try{
			//在这里处理设备发过来的消息
		}catch(Exception e){
			e.printStackTrace();
		}
	}

	@Override
	public void connectComplete(boolean arg0, String arg1) {
		// 连接成功后,重新订阅自己的主题
		MqttService.subscribe();
	}
}

最后

一个简易的Java MQTT服务端就搭好了,此时可以启动EMQX和MQTTX客户端进行测试。

参考: https://docs.emqx.com/zh/enterprise/v5.1/

0 人点赞