物联网项目:充电桩项目实战~

2023-11-15 15:02:17 浏览数 (2)

你好,我是田哥

最近除了忙于面试辅导、模拟面试以外,还在搞一件大事:充电桩项目。

充电桩项目肯定是和物联网相关的,聊到物联网又不得不聊的是MQTT协议

什么是MQTT

MQTT,全称Message Queuing Telemetry Transport,即消息MQTT,即消息队列遥测传输,是一种基于客户端-服务器的消息发布/订阅传输协议。这种协议的设计思想是轻量、开放、简单和规范,因此易于实现。

MQTT协议的这些特点使它在很多情况下都非常适用,特别是在受限的环境中,例如机器与机器(M2M)通信和物联网(IoT)。此外,对于需要通过带宽有限的资源受限网络进行数据传输的设备,如智能传感器、可穿戴设备等物联网(IoT)设备,使用MQTT进行数据传输是非常适合的。这主要是因为MQTT拥有简单紧凑的架构和较小的代码占用空间,适用于低成本、低功耗的IoT微控制设备。

总的来说,MQTT协议是一种轻量级、易于实现且适用范围广泛的通信协议,特别适用于物联网设备的数据传输

说明MQTT只是一种协议,既然是协议那就得有实现。

实现MQTT协议的第三方框架主要包括以下几个:

  1. Paho MQTT C库:这是一个用C语言实现的开源MQTT客户端库,主要用于在Linux环境下进行MQTT协议的实现。
  2. EMQX:这是一个基于Erlang/OTP平台开发的开源物联网MQTT消息服务器,具有出色的软实时、低延时和分布式的特性。
  3. Qt MQTT类库:Qt官方提供了两种开发MQTT程序的方式,一种是Qt官方提供的基于MQTT的封装,另一种是第三方(EMQ)开发的用于Qt调用MQTT的接口。

以上这些框架都可以用来实现MQTT协议,开发者可以根据实际需求选择适合自己的框架,本项目采用的是EMQX。

什么是EMQX

EMQX 是一款开源的大规模分布式 MQTT 消息服务器,功能丰富,专为物联网和实时通信应用而设计。EMQX 5.0 单集群支持 MQTT 并发连接数高达 1 亿条,单服务器的传输与处理吞吐量可达每秒百万级 MQTT 消息,同时保证毫秒级的低时延。

EMQX 支持多种协议,包括 MQTT (3.1、3.1.1 和 5.0)、HTTP、QUIC 和 WebSocket 等,保证各种网络环境和硬件设备的可访问性。EMQX 还提供了全面的 SSL/TLS 功能支持,比如双向认证以及多种身份验证机制,为物联网设备和应用程序提供可靠和高效的通信基础设施。

内置基于 SQL 的规则引擎,EMQX 可以实时提取、过滤、丰富和转换物联网数据。此外,EMQX 采用了无主分布式架构,以确保高可用性和水平扩展性,并提供操作友好的用户体验和出色的可观测性。

EMQX 拥有来自 50 多个国家的 20,000 多家企业用户,连接全球超过 1 亿台物联网设备,服务企业数字化、实时化、智能化转型。

architecture_image

MQTT 发布/订阅模式

发布订阅模式(Publish-Subscribe Pattern)是一种消息传递模式,它将发送消息的客户端(发布者)与接收消息的客户端(订阅者)解耦,使得两者不需要建立直接的联系也不需要知道对方的存在。

MQTT 发布/订阅模式的精髓在于由一个被称为代理(Broker)的中间角色负责所有消息的路由和分发工作,发布者将带有主题的消息发送给代理,订阅者则向代理订阅主题来接收感兴趣的消息。

MQTT 中,主题和订阅无法被提前注册或创建,所以代理也无法预知某一个主题之后是否会有订阅者,以及会有多少订阅者,所以只能将消息转发给当前的订阅者,如果当前不存在任何订阅,那么消息将被直接丢弃。

MQTT 发布/订阅模式有 4 个主要组成部分:发布者、订阅者、代理和主题。

  • 发布者(Publisher) 负责将消息发布到主题上,发布者一次只能向一个主题发送数据,发布者发布消息时也无需关心订阅者是否在线。
  • 订阅者(Subscriber) 订阅者通过订阅主题接收消息,且可一次订阅多个主题。MQTT 还支持通过共享订阅的方式在多个订阅者之间实现订阅的负载均衡。
  • 代理(Broker) 负责接收发布者的消息,并将消息转发至符合条件的订阅者。另外,代理也需要负责处理客户端发起的连接、断开连接、订阅、取消订阅等请求。
  • 主题(Topic) 主题是 MQTT 进行消息路由的基础,它类似 URL 路径,使用斜杠 / 进行分层,比如 sensor/1/temperature。一个主题可以有多个订阅者,代理会将该主题下的消息转发给所有订阅者;一个主题也可以有多个发布者,代理将按照消息到达的顺序转发。 MQTT 还支持订阅者使用主题通配符一次订阅多个主题。

EMQX 的优势

超大规模:EMQX 5.0 单集群可支持 MQTT 并发连接数高达 1 亿条。

高性能:单服务器的传输与处理吞吐量可达每秒百万级 MQTT 消息。

低延时:近乎实时的信息传递,保证延迟在亚毫秒级。

全面支持 MQTT 5.0 标准:100% 符合 MQTT 5.0 和 3.x 标准,具有更好的可扩展性、安全性和可靠性。

高可用:通过无主节点分布式架构实现高可用和水平扩展性。

云原生:通过 Kubernetes Operator 和 Terraform,可以轻松地在企业内部和公共云中进行部署。

项目集成

本文的开发环境为:

  • 构建工具:Maven
  • IDE:IntelliJ IDEA
  • Java 版本:JDK 8

添加以下依赖到项目 pom.xml 文件中。

代码语言:javascript复制
<dependencies>
   <dependency>
       <groupId>org.eclipse.paho</groupId>
       <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
       <version>1.2.5</version>
   </dependency>
</dependencies>

下面是发布和订阅,

本文将使用 EMQX 提供的 免费公共 MQTT 服务器 创建。服务器接入信息如下:

  • Broker: broker.emqx.io(中国用户可以使用 broker-cn.emqx.io
  • TCP Port: 1883
  • SSL/TLS Port: 8883

也可以自己下载一个安装,并使用自己的。

下载地址:https://www.emqx.io/zh/downloads

发布

发布代码实现:

代码语言:javascript复制
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
/**
 * @author tianwc  公众号:java后端技术全栈、面试专栏
 * @version 1.0.0
 * 2023年11月15日 10:01
 * 在线刷题 1200 题和1000 篇干货文章:<a href="https://woaijava.cc/">博客地址</a>
 * 发布
 */
public class PublishSample {

    public static void main(String[] args) {

        String broker = "tcp://broker.emqx.io:1883";
        String topic = "mqtt/test";
        String username = "emqx";
        String password = "public";
        String clientid = "publish_client";

        String content = "你好,MQTT";
        int qos = 0;

        try {
            MqttClient client = new MqttClient(broker, clientid, new MemoryPersistence());
            // 连接参数
            MqttConnectOptions options = new MqttConnectOptions();
            // 设置用户名和密码
            options.setUserName(username);
            options.setPassword(password.toCharArray());
            options.setConnectionTimeout(60);
            options.setKeepAliveInterval(60);
            // 连接
            client.connect(options);
            // 创建消息并设置 QoS
            MqttMessage message = new MqttMessage(content.getBytes());
            message.setQos(qos);
            // 发布消息
            client.publish(topic, message);

            System.out.println("send content: "   content);
            System.out.println("topic: "   topic);
            System.out.println("Message published");

            // 关闭连接
            client.disconnect();
            // 关闭客户端
            client.close();
        } catch (MqttException e) {
            throw new RuntimeException(e);
        }
    }
}

接着我们看看订阅实现。

代码语言:javascript复制
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

/**
 * @author tianwc  公众号:java后端技术全栈、面试专栏
 * @version 1.0.0
 * 2023年11月15日 10:01
 * 在线刷题 1200 题和1000 篇干货文章:<a href="https://woaijava.cc/">博客地址</a>
 * 订阅
 */

public class SubscribeSample {
    public static void main(String[] args) {
        String broker = "tcp://broker.emqx.io:1883";
        String topic = "mqtt/test";
        String username = "emqx";
        String password = "public";
        String clientid = "subscribe_client";
        int qos = 0;

        try {
            MqttClient client = new MqttClient(broker, clientid, new MemoryPersistence());
            // 连接参数
            MqttConnectOptions options = new MqttConnectOptions();
            options.setUserName(username);
            options.setPassword(password.toCharArray());
            options.setConnectionTimeout(60);
            options.setKeepAliveInterval(60);
            // 设置回调
            client.setCallback(new MqttCallback() {

                public void connectionLost(Throwable cause) {
                    System.out.println("connectionLost: "   cause.getMessage());
                }

                public void messageArrived(String topic, MqttMessage message) {
                    System.out.println("topic: "   topic);
                    System.out.println("Qos: "   message.getQos());
                    System.out.println("received content: "   new String(message.getPayload()));

                }

                public void deliveryComplete(IMqttDeliveryToken token) {
                    System.out.println("deliveryComplete---------"   token.isComplete());
                }

            });
            client.connect(options);
            client.subscribe(topic, qos);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

我们先启动订阅,让它等待发布,接着启动发布。

简单的图一个图,让大家更好的理解:

发布控制台输出:

代码语言:javascript复制
send content: 你好,MQTT
topic: mqtt/test
Message published

最后订阅控制台输出:

.到这里我们的demo案例就搞定了。具体在充电桩中的应用情况充电桩源码:

源码地址:https://gitee.com/trsunmu/charge-station-single

好了,今天就分享到这里。关于充电桩的文档(属于学习圈子内容)已更新到第十三篇了。

充电桩项目文档还在持续更新中。

推荐

MySQL 开发规范,非常详细,建议收藏!

16k面试中的10个问题

从0开始搭建公司技术栈,yyds

简历写成这样,CTO会主动联系你

全程面试辅导,保驾护航!

0 人点赞