你好,我是田哥
最近除了忙于面试辅导、模拟面试以外,还在搞一件大事:充电桩项目。
充电桩项目肯定是和物联网相关的,聊到物联网又不得不聊的是MQTT协议
。
什么是MQTT
MQTT
,全称Message Queuing Telemetry Transport
,即消息MQTT
,即消息队列遥测传输,是一种基于客户端-服务器的消息发布/订阅传输协议。这种协议的设计思想是轻量、开放、简单和规范,因此易于实现。
MQTT
协议的这些特点使它在很多情况下都非常适用,特别是在受限的环境中,例如机器与机器(M2M)通信和物联网(IoT)。此外,对于需要通过带宽有限的资源受限网络进行数据传输的设备,如智能传感器、可穿戴设备等物联网(IoT)设备,使用MQTT进行数据传输是非常适合的。这主要是因为MQTT
拥有简单紧凑的架构和较小的代码占用空间,适用于低成本、低功耗的IoT微控制设备。
总的来说,MQTT协议是一种轻量级、易于实现且适用范围广泛的通信协议,特别适用于物联网设备的数据传输。
说明MQTT只是一种协议,既然是协议那就得有实现。
实现MQTT协议的第三方框架主要包括以下几个:
Paho MQTT
C库:这是一个用C语言实现的开源MQTT客户端库,主要用于在Linux环境下进行MQTT协议的实现。EMQX
:这是一个基于Erlang/OTP平台开发的开源物联网MQTT消息服务器,具有出色的软实时、低延时和分布式的特性。Qt MQTT
类库:Qt官方提供了两种开发MQTT程序的方式,一种是Qt官方提供的基于MQTT的封装,另一种是第三方(EMQ)开发的用于Qt调用MQTT的接口。
以上这些框架都可以用来实现MQTT协议,开发者可以根据实际需求选择适合自己的框架,本项目采用的是EMQX。
什么是EMQX
EMQX 是一款开源的大规模分布式 MQTT 消息服务器,功能丰富,专为物联网和实时通信应用而设计。EMQX 5.0 单集群支持 MQTT 并发连接数高达 1 亿条,单服务器的传输与处理吞吐量可达每秒百万级 MQTT 消息,同时保证毫秒级的低时延。
EMQX 支持多种协议,包括 MQTT (3.1、3.1.1 和 5.0)、HTTP、QUIC 和 WebSocket 等,保证各种网络环境和硬件设备的可访问性。EMQX 还提供了全面的 SSL/TLS 功能支持,比如双向认证以及多种身份验证机制,为物联网设备和应用程序提供可靠和高效的通信基础设施。
内置基于 SQL 的规则引擎,EMQX 可以实时提取、过滤、丰富和转换物联网数据。此外,EMQX 采用了无主分布式架构,以确保高可用性和水平扩展性,并提供操作友好的用户体验和出色的可观测性。
EMQX 拥有来自 50 多个国家的 20,000 多家企业用户,连接全球超过 1 亿台物联网设备,服务企业数字化、实时化、智能化转型。
architecture_image
MQTT
发布/订阅模式
发布订阅模式(Publish-Subscribe Pattern)是一种消息传递模式,它将发送消息的客户端(发布者)与接收消息的客户端(订阅者)解耦,使得两者不需要建立直接的联系也不需要知道对方的存在。
MQTT
发布/订阅模式的精髓在于由一个被称为代理(Broker)的中间角色负责所有消息的路由和分发工作,发布者将带有主题的消息发送给代理,订阅者则向代理订阅主题来接收感兴趣的消息。
在 MQTT
中,主题和订阅无法被提前注册或创建,所以代理也无法预知某一个主题之后是否会有订阅者,以及会有多少订阅者,所以只能将消息转发给当前的订阅者,如果当前不存在任何订阅,那么消息将被直接丢弃。
MQTT 发布/订阅模式有 4 个主要组成部分:发布者、订阅者、代理和主题。
- 发布者(Publisher) 负责将消息发布到主题上,发布者一次只能向一个主题发送数据,发布者发布消息时也无需关心订阅者是否在线。
- 订阅者(Subscriber) 订阅者通过订阅主题接收消息,且可一次订阅多个主题。MQTT 还支持通过共享订阅的方式在多个订阅者之间实现订阅的负载均衡。
- 代理(Broker) 负责接收发布者的消息,并将消息转发至符合条件的订阅者。另外,代理也需要负责处理客户端发起的连接、断开连接、订阅、取消订阅等请求。
- 主题(Topic)
主题是 MQTT 进行消息路由的基础,它类似 URL 路径,使用斜杠
/
进行分层,比如sensor/1/temperature
。一个主题可以有多个订阅者,代理会将该主题下的消息转发给所有订阅者;一个主题也可以有多个发布者,代理将按照消息到达的顺序转发。 MQTT 还支持订阅者使用主题通配符一次订阅多个主题。
EMQX 的优势
超大规模:EMQX 5.0 单集群可支持 MQTT 并发连接数高达 1 亿条。
高性能:单服务器的传输与处理吞吐量可达每秒百万级 MQTT 消息。
低延时:近乎实时的信息传递,保证延迟在亚毫秒级。
全面支持 MQTT 5.0 标准:100% 符合 MQTT 5.0 和 3.x 标准,具有更好的可扩展性、安全性和可靠性。
高可用:通过无主节点分布式架构实现高可用和水平扩展性。
云原生:通过 Kubernetes Operator 和 Terraform,可以轻松地在企业内部和公共云中进行部署。
项目集成
本文的开发环境为:
- 构建工具:Maven
- IDE:IntelliJ IDEA
- Java 版本:JDK 8
添加以下依赖到项目 pom.xml
文件中。
<dependencies>
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.5</version>
</dependency>
</dependencies>
下面是发布和订阅,
本文将使用 EMQX 提供的 免费公共 MQTT 服务器 创建。服务器接入信息如下:
- Broker: broker.emqx.io(中国用户可以使用 broker-cn.emqx.io)
- TCP Port: 1883
- SSL/TLS Port: 8883
也可以自己下载一个安装,并使用自己的。
下载地址:https://www.emqx.io/zh/downloads
发布
发布代码实现:
代码语言:javascript复制import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
/**
* @author tianwc 公众号:java后端技术全栈、面试专栏
* @version 1.0.0
* 2023年11月15日 10:01
* 在线刷题 1200 题和1000 篇干货文章:<a href="https://woaijava.cc/">博客地址</a>
* 发布
*/
public class PublishSample {
public static void main(String[] args) {
String broker = "tcp://broker.emqx.io:1883";
String topic = "mqtt/test";
String username = "emqx";
String password = "public";
String clientid = "publish_client";
String content = "你好,MQTT";
int qos = 0;
try {
MqttClient client = new MqttClient(broker, clientid, new MemoryPersistence());
// 连接参数
MqttConnectOptions options = new MqttConnectOptions();
// 设置用户名和密码
options.setUserName(username);
options.setPassword(password.toCharArray());
options.setConnectionTimeout(60);
options.setKeepAliveInterval(60);
// 连接
client.connect(options);
// 创建消息并设置 QoS
MqttMessage message = new MqttMessage(content.getBytes());
message.setQos(qos);
// 发布消息
client.publish(topic, message);
System.out.println("send content: " content);
System.out.println("topic: " topic);
System.out.println("Message published");
// 关闭连接
client.disconnect();
// 关闭客户端
client.close();
} catch (MqttException e) {
throw new RuntimeException(e);
}
}
}
接着我们看看订阅实现。
代码语言:javascript复制import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
/**
* @author tianwc 公众号:java后端技术全栈、面试专栏
* @version 1.0.0
* 2023年11月15日 10:01
* 在线刷题 1200 题和1000 篇干货文章:<a href="https://woaijava.cc/">博客地址</a>
* 订阅
*/
public class SubscribeSample {
public static void main(String[] args) {
String broker = "tcp://broker.emqx.io:1883";
String topic = "mqtt/test";
String username = "emqx";
String password = "public";
String clientid = "subscribe_client";
int qos = 0;
try {
MqttClient client = new MqttClient(broker, clientid, new MemoryPersistence());
// 连接参数
MqttConnectOptions options = new MqttConnectOptions();
options.setUserName(username);
options.setPassword(password.toCharArray());
options.setConnectionTimeout(60);
options.setKeepAliveInterval(60);
// 设置回调
client.setCallback(new MqttCallback() {
public void connectionLost(Throwable cause) {
System.out.println("connectionLost: " cause.getMessage());
}
public void messageArrived(String topic, MqttMessage message) {
System.out.println("topic: " topic);
System.out.println("Qos: " message.getQos());
System.out.println("received content: " new String(message.getPayload()));
}
public void deliveryComplete(IMqttDeliveryToken token) {
System.out.println("deliveryComplete---------" token.isComplete());
}
});
client.connect(options);
client.subscribe(topic, qos);
} catch (Exception e) {
e.printStackTrace();
}
}
}
我们先启动订阅,让它等待发布,接着启动发布。
简单的图一个图,让大家更好的理解:
发布控制台输出:
代码语言:javascript复制send content: 你好,MQTT
topic: mqtt/test
Message published
最后订阅控制台输出:
.到这里我们的demo案例就搞定了。具体在充电桩中的应用情况充电桩源码:
源码地址:https://gitee.com/trsunmu/charge-station-single
好了,今天就分享到这里。关于充电桩的文档(属于学习圈子内容)已更新到第十三篇了。
充电桩项目文档还在持续更新中。
推荐
MySQL 开发规范,非常详细,建议收藏!
16k面试中的10个问题
从0开始搭建公司技术栈,yyds
简历写成这样,CTO会主动联系你
全程面试辅导,保驾护航!