前言
在上一篇文章 《Java使用modbus4j实现ModbusTCP通信》 中我们介绍了Java与Modbus协议的TCP通信,本文讲解一下如何用Java实现对当下最流行的物联网协议之一的MQTT协议进行通信。
MQTT
MQTT(Message Queuing Telemetry Transport, 消息队列遥测传输协议),是一种基于发布/订阅(publish/subscribe)模式的"轻量级"通讯协议,该协议构建于TCP/IP协议上,由IBM在1999年发布。MQTT最大优点在于,可以以极少的代码和有限的带宽,为远程连接设备提过实时可靠的消息服务,作为一种低开销、低带宽占用的即时通讯协议,使其在物联网、小型设备、移动应用等方面有较广泛的应用。
MQTT是一个基于客户端-服务器的消息发布/订阅传输协议。MQTT协议是轻量、简单、开放和易于实现的,这些特点使它适用范围非常广泛。在很多情况下,包括受限的环境中,如:机器与机器(M2M)通信和物联网(loT)。其在,通过卫星链路通信传感器、偶尔拨号的医疗设备、智能家居、及一些小型化设备中已广泛使用。
特点:
- 使用发布/订阅消息模式,提供一对多的消息发布,解除应用程序耦合;
- 对负载内容屏蔽的消息传输;
- 使用 TCP/IP 提供网络连接;
- 有三种消息发布服务质量:
- 小型传输,开销很小(固定长度的头部是 2 字节),协议交换最小化,以降低网络流量;
- 使用 Last Will 和 Testament 特性通知有关各方客户端异常中断的机制。
EMQX
EMQX 是一个「无限连接,任意集成,随处运行」大规模分布式物联网接入平台。
EMQX 企业版提供一体化的分布式 MQTT 消息服务和强大的 IoT 规则引擎,为高可靠、高性能的物联网实时数据移动、处理和集成提供动力,助力企业快速构建关键业务的 IoT 平台与应用。附下载地址: https://www.emqx.com/zh/try?product=enterprise 可以自行下载对应版本运行
优势:
- 海量连接:单节点支持 500 万 MQTT 设备连接,集群可水平扩展至支持 1 亿并发的 MQTT 连接。
- 高可靠:弹性伸缩,无单点故障。内置 RocksDB 可靠地持久化 MQTT 消息,确保无数据损失。
- 数据安全:端到端数据加密(支持国密),细粒度访问控制,保障数据安全,满足企业合规需求。
- 多协议:支持 MQTT、HTTP、QUIC、WebSocket、LwM2M/CoAP 或专有协议连接任何设备。
- 高性能:单节点支持每秒实时接收、处理与分发数百万条的 MQTT 消息。毫秒级消息交付时延。
- 易运维:图形化配置、操作与管理,实时监测运行状态。支持 MQTT 跟踪进行端到端问题分析。
Mria 集群架构
支持全新的 Mria 集群架构,在此架构下 EMQX 水平扩展性得到指数级提升,单个集群可以轻松支持 1 亿 MQTT 连接,这使得 EMQX 5.0 成为目前全球最具扩展性的 MQTT Broker。
在构建满足用户业务需求的更大规模集群的同时,Mria 架构还能够降低大规模部署下的脑裂风险以及脑裂后的影响,以提供更加稳定可靠的物联网数据接入服务。
具体可以查看官方文档: https://docs.emqx.com/zh/enterprise/v5.1/deploy/cluster/create-cluster.html
MQTTX
MQTTX 是由 EMQ 开发的一款开源跨平台 MQTT 5.0 桌面客户端,它兼容 macOS,Linux 以及 Windows 系统。MQTTX 的用户界面 UI 采用聊天式设计,使得操作逻辑更加简明直观。它支持用户快速创建和保存多个 MQTT 连接,便于测试 MQTT/MQTTS 连接,以及 MQTT 消息的订阅和发布。
主要功能
- 采用聊天界面设计,使得操作更加简单明了
- 跨平台兼容,支持在 Windows,macOS,Linux 系统上运行
- 100% 兼容 MQTT v5.0,v3.1.1 和 v3.1 协议
- 订阅的 MQTT 主题支持自定义颜色标签
- 支持单向和双向 SSL 认证,同时支持 CA 和自签名证书
- 支持通过 WebSocket 连接 MQTT 服务器
- 支持 Hex, Base64, JSON, Plaintext 等 Payload 格式转换
- 自定义脚本支持模拟 MQTT 发布/订阅测试
- 提供完整的日志记录功能
- 多语言支持:简体中文、英语、日语、土耳其语及匈牙利语 ???????? ???????? ???????? ???????? ????????
- 自由切换 Light、Dark、Night 三种主题模式
Java代码实现
引入maven
代码语言:javascript复制 <!-- mqtt -->
<dependency>
<groupId>org.fusesource.mqtt-client</groupId>
<artifactId>mqtt-client</artifactId>
<version>1.16</version>
</dependency>
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.2</version>
</dependency>
config配置文件
新建 config.properties 文件,把连接EMQX的信息写在配置文件里,以便获取。也可以写在项目的yml配置文件里
代码语言:javascript复制#用户名
mqtt.username=admin
#连接emqx密码
mqtt.password=xxxxxx
#是否清除会话
mqtt.cleanSession=true
#服务地址
mqtt.serverURI=tcp://192.168.1.22
#端口
mqtt.port = 1883
#客户端id
mqtt.clientId=xxxx
#mqtt.clientId=123456
#订阅topic
mqtt.service.subscribe.topic = xxx/xxx/ ,$SYS/brokers/ /clients/ /disconnected,$SYS/brokers/ /clients/ /connected
#发送topic
mqtt.facility.subscribe.topic = xxx/xxx
Service类
新建service核心类,主要用于连接EMQX并注册、订阅。
代码语言:javascript复制package com.sss.common.core.protocol.mqtt.server;
import com.sss.common.core.protocol.strategy.FacilityDataProcess;
import com.sss.common.utils.properties.ResourceBundleUtil;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableScheduling;
import javax.annotation.PostConstruct;
import java.util.Arrays;
@Configuration
@EnableScheduling
public class MqttService {
@Autowired
@Qualifier("FacilityDataProcess")
private FacilityDataProcess facilityDataProcess;
public static final String HOST = ResourceBundleUtil.MQTT_LAMBDA.apply("mqtt.serverURI") ;
public static final String PORT = ResourceBundleUtil.MQTT_LAMBDA.apply("mqtt.port") ;
public static final String TOPIC = ResourceBundleUtil.MQTT_LAMBDA.apply("mqtt.service.subscribe.topic") ;
private static final String CLIENT_ID = ResourceBundleUtil.MQTT_LAMBDA.apply("mqtt.clientId") ;
private static final String USER_NAME = ResourceBundleUtil.MQTT_LAMBDA.apply("mqtt.username") ;
private static final String PASSWORD = ResourceBundleUtil.MQTT_LAMBDA.apply("mqtt.password") ;
private static final Boolean CLEAN_SESSION = "true".equalsIgnoreCase(ResourceBundleUtil.MQTT_LAMBDA.apply("mqtt.cleanSession"));
private static MqttClient client;
private static final Logger log = LoggerFactory.getLogger(MqttService.class);
/**
* 构造函数
* @throws MqttException 异常信息
*/
public MqttService() throws MqttException {
// MemoryPersistence设置
client = new MqttClient(HOST ":" PORT, CLIENT_ID, new MemoryPersistence());
}
@PostConstruct
public void init(){
connect();
}
/**
* 连接EMQ X服务器
*/
private void connect() {
log.info("HOST: " HOST);
log.info("PORT: " PORT);
log.info("TOPIC: " TOPIC);
log.info("CLIENT_ID: " CLIENT_ID);
log.info("USER_NAME: " USER_NAME);
MqttConnectOptions options = new MqttConnectOptions();
options.setCleanSession(false);
options.setUserName(USER_NAME);
options.setPassword(PASSWORD.toCharArray());
// 设置超时时间
options.setConnectionTimeout(20);
// 设置会话心跳时间
options.setKeepAliveInterval(30);
// 重连
options.setAutomaticReconnect(true);
// 设置是否清除会话
options.setCleanSession(CLEAN_SESSION);
try {
client.setCallback(new MessageCallback(facilityDataProcess));
client.connect(options);
} catch (Exception e) {
e.printStackTrace();
}
}
public static void subscribe(){
try{
// 订阅消息
String[] topic1 = TOPIC.split(",") ;
int[] qos = new int[topic1.length];
// 循环将所有主题的Qos设置为1
Arrays.fill(qos, 1);
client.subscribe(topic1, qos);
}catch (Exception e) {
e.printStackTrace();
}
}
/**
* 消息发送
* @param message byte
* @param topic 主题
* @param qos qos
*/
public static void sendMessage(byte[] message,String topic,Integer qos){
MqttMessage mess = new MqttMessage();
mess.setQos(qos);
mess.setRetained(false);
mess.setPayload(message);
try {
client.publish(topic, mess);
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 消息发送
* @param message 消息体
* @param topic 主题
* @param qos qos
*/
public static void sendMessage(String message,String topic,Integer qos){
MqttMessage mess = new MqttMessage();
mess.setQos(qos);
mess.setRetained(false);
mess.setPayload(message.getBytes());
try {
client.publish(topic, mess);
} catch (Exception e) {
e.printStackTrace();
}
}
}
回调消息处理类
新建callback类,这是一个回调类,用于监听异步发生的事件回调,接收消息并处理。
代码语言:javascript复制package com.sss.common.core.protocol.mqtt.server;
import cn.hutool.core.thread.ThreadUtil;
import com.sss.common.core.protocol.mqtt.handler.MessageHandler;
import com.sss.common.core.protocol.strategy.FacilityDataProcess;
import lombok.SneakyThrows;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import java.util.concurrent.ExecutorService;
public class MessageCallback implements MqttCallbackExtended {
private final FacilityDataProcess facilityDataProcess ;
public MessageCallback(FacilityDataProcess facilityDataProcess) {
super();
this.facilityDataProcess = facilityDataProcess ;
}
@Override
/**
* 与服务器的连接丢失时,将调用此方法。
* @param arg0 :失去连接的原因
*/
public void connectionLost(Throwable arg0) {
System.err.println("断开连接的原因是:" arg0);
}
@SneakyThrows
@Override
/**
* 在完成消息传递并收到所有确认后调用
* @param token :与消息关联的传递令牌
*/
public void deliveryComplete(IMqttDeliveryToken token) {
// delivery 传送OK
System.out.print("delivery 传送OK:");
System.out.println(token.getResponse());
}
@Override
/**
* 从服务器收到消息时,将调用此方法
*/
public void messageArrived(String topic, MqttMessage message) {
try{
//在这里处理设备发过来的消息
}catch(Exception e){
e.printStackTrace();
}
}
@Override
public void connectComplete(boolean arg0, String arg1) {
// 连接成功后,重新订阅自己的主题
MqttService.subscribe();
}
}
最后
一个简易的Java MQTT服务端就搭好了,此时可以启动EMQX和MQTTX客户端进行测试。
参考: https://docs.emqx.com/zh/enterprise/v5.1/