Java物联网开发(二) —— 开源百万级分布式 MQTT 消息服务器EMQX

2021-08-12 10:53:05 浏览数 (1)

开源百万级分布式 MQTT 消息服务器EMQX

  • 一. 是什么
    • 1. 简介
    • 2. 分类
    • 3. EMQ X 消息服务器功能列表
  • 二. 安装
    • 1. 安装方式
      • rpm安装
      • docker安装
      • 免安装
    • 2. 目录结构
    • 3. 使用websocket实现消息的收发
  • 三. 使用emqx基础功能
    • 1. Dashboard
    • 2. 认证
      • 认证介绍
        • 认证方式
        • 匿名认证
        • 密码加盐规则与哈希方法
        • EMQ X 身份认证流程
        • 认证结果
      • 认证操作案例
        • username 认证
        • Client ID 认证
        • http认证
  • 四. 使用emqx高级功能
    • 1. 客户端SDK
      • sdk-paho
      • MQTT.js
    • 2. 日志与追踪
      • 日志级别
      • 运行时修改日志级别
      • 日志追踪
      • 日志文件和日志滚动
    • 3. ACL 发布订阅权限控制
      • 功能搭建
      • 测试结果
    • 4. WebHook插件
    • 5. 管理监控API的使用
    • 6. 保留消息
    • 7. 共享订阅
      • 带群组的共享订阅
      • 不带群组的共享订阅消息
      • 均衡策略与派发 Ack 配置
    • 10. 代理订阅
      • 静态/内置代理订阅
      • 基于Webhook和API实现动态代理订阅
    • 11. 主题重写
    • 12. 黑名单配置
    • 13. 速率限制
    • 14. 消息重传
    • 15. 系统调优
  • 五. 规则引擎
    • 介绍及原理
    • SQL语句
    • 规则引擎接收案例

一. 是什么

1. 简介

MQTT属于是物联网的通信协议,在MQTT协议中有两大角色:客户端(发布者/订阅者),服务端(Mqtt broker);针对客户端和服务端需要有遵循该协议的的具体实现,EMQ/EMQ X就是MQTT Broker的一种实现。

EMQ X 是开源百万级分布式 MQTT 消息服务器(MQTT Messaging Broker),用于支持各种接入标准 MQTT协议的设备,实现从设备端到服务器端的消息传递,以及从服务器端到设备端的设备控制消息转发。从而实现物联网设备的数据采集,和对设备的操作和控制 EMQ官网传送门(本博客基于emq x 4.1产品编写)

2. 分类

EMQ X 公司主要提供三个产品,可在官网首页产品导航查看每一种产品;主要体现在支持的连接数量、产品功能和商业服务等方面的区别:

  • EMQ X Broker:EMQ X 开源版,完整支持 MQTT V3.1.1/V5.0 协议规范,完整支持 TCP、TLS、 WebSocket 连接,支持百万级连接和分布式集群架构;LDAP, MySQL, Redis, MongoDB 等扩展插件集成,支持插件模式扩展服务器功能;支持跨 Linux、Windows、macOS 平台安装,支持公有云、私有云、K8S/容器部署
  • EMQ X Enterprise:EMQ X 企业版,在开源版基础上,支持物联网主流协议 MQTT、MQTT-SN、 CoAP/LwM2M、HTTP、WebSocket 一站式设备接入;JT-808/GBT-32960 等行业协议支持,基于 TCP/UDP私有协议的旧网设备接入兼容,多重安全机制与认证鉴权;高并发软实时消息路由;强大灵活的内置规则引擎;企业服务与应用集成;多种数据库持久化支持;消息变换桥接转发 Kafka;管理监控中心
  • EMQ X Platform:EMQ X 平台版,EMQ X Platform 是面向千万级超大型 IoT 网络和应用,全球首选电信级物联网终端接入解决方案。千万级大容量;多物联网协议;电信级高可靠;卓越 5G 网络支持;跨云跨IDC 部署;兼容历史系统;完善的咨询服务(从咨询到运维)

产品功能对比图

3. EMQ X 消息服务器功能列表

虽然EMQ X Enterprise, EMQ X Platform性能更加强大, EMQ X Enterprise 使用说明传送门 但是收费. 本着学习的原则选择的是性能稍差的EMQ X Broker

  • 完整的 MQTT V3.1/V3.1.1 及 V5.0 协议规范支持 QoS0, QoS1, QoS2 消息支持 持久会话与离线消息支持 Retained 消息支持 Last Will 消息支持
  • TCP/SSL 连接支持
  • MQTT/WebSocket/SSL 支持
  • HTTP 消息发布接口支持
  • $SYS/# 系统主题支持
  • 客户端在线状态查询与订阅支持
  • 客户端 ID 或 IP 地址认证支持
  • 用户名密码认证支持
  • LDAP、Redis、MySQL、PostgreSQL、MongoDB、HTTP 认证集成
  • 浏览器 Cookie 认证
  • 基于客户端 ID、IP 地址、用户名的访问控制 (ACL)
  • 多服务器节点集群 (Cluster)
  • 支持 manual、mcast、dns、etcd、k8s 等多种集群发现方式
  • 网络分区自动愈合
  • 消息速率限制
  • 连接速率限制
  • 按分区配置节点
  • 多服务器节点桥接 (Bridge)
  • MQTT Broker 桥接支持
  • Stomp 协议支持
  • MQTT-SN 协议支持
  • CoAP 协议支持
  • LwM2M 协议支持
  • Stomp/SockJS 支持
  • 延时 Publish ($delay/topic)
  • Flapping 检测
  • 黑名单支持
  • 共享订阅 ($share/:group/topic)
  • TLS/PSK 支持
  • 规则引擎 空动作 (调试) 消息重新发布 桥接数据到 MQTT Broker 检查 (调试) 发送数据到 Web 服务

二. 安装

EMQX支持的安装方式多种多样, docker安装, rpm安装, zip安装 点击跳转到安装地址

1. 安装方式

rpm安装

下面首先演示rpm安装

代码语言:javascript复制
# rpm安装
## rpm安装指定版本的emqx
rpm -ivh emqx-centos7-v4.0.5.x86_64.rpm
## 查询emqx安装是否成功
rpm -qa | grep emqx
## 启动emqx
empx start 
empx restart
## 查看emqx运行状态
emqx_ctl status
## 关闭emqx
emqx stop 
## 卸载 emqx
rpm -e emqx


# 访问emqx
## 访问地址
http://emqx安装ip:18083/
## 默认用户名:admin,默认密码:public

图1: 安装命令截图

图2: 登陆emqx后的Dashboard

docker安装

需要安装docker, 如果之前没有安装请跳转至 docker教程第二章

代码语言:javascript复制
# docker安装
## emqx版本查看地址
https://hub.docker.com/r/emqx/emqx/tags?page=1&ordering=last_updated
## 拉取指定版本镜像
docker pull emqx/emqx:v4.0.5
## 将该镜像生成对应容器并运行
docker run -tid --name emqx -p 1883:1883 -p 8083:8083 -p 8081:8081 -p 8883:8883 -p 8084:8084 -p 18083:18083 emqx/emqx:v4.0.5


# 访问emqx
## 访问地址
http://emqx安装ip:18083/
## 默认用户名:admin,默认密码:public

免安装

解压zip包, 进入 bin目录下直接运行相关命令即可 bin文件夹文件截图

安装命令

代码语言:javascript复制
# bin目录下运行
[root@docker01 bin]# ./emqx start
EMQ X Broker v4.0.5 is started successfully!

[root@docker01 bin]# ./emqx_ctl status
Node 'emqx@127.0.0.1' is started
emqx 4.0.5 is running

[root@docker01 bin]# ./emqx stop


# 访问emqx
## 访问地址
http://emqx安装ip:18083/
## 默认用户名:admin,默认密码:public

2. 目录结构

不同安装方式得到的 EMQ X 其目录结构会有所不同,具体如下:

  • bin 目录 emqx、emqx.cmd:EMQ X 的可执行文件 emqx_ctl、emqx_ctl.cmd:EMQ X 管理命令的可执行文件
  • etc 目录 EMQ X 通过 etc 目录下配置文件进行设置,主要配置文件包括
  • data 目录 EMQ X 将运行数据存储在 data 目录下,主要的文件包括: configs/app.*.config EMQ X 读取 etc/emqx.conf 和 etc/plugins/*.conf 中的配置后,转换为 Erlang 原生配置文件格式,并在运行时读取其中的配置。 loaded_plugins : 记录了 EMQ X 默认启动的插件列表,可以修改此文件以增删默认启动的插件。loaded_plugins 中启动项格式为 {<Plugin Name>, <Enabled>}.<Enabled> 字段为布尔类型,EMQ X 会在启动时根据 的值判断是否需要启动该插件。

Mnesia 数据库是 Erlang 内置的一个分布式 DBMS,可以直接存储 Erlang 的各种数据结构 EMQ X 使用 Mnesia 数据库存储自身运行数据,例如告警记录、规则引擎已创建的资源和规则、Dashbaord用户信息等数据,这些数据都将被存储在 mnesia 目录下,因此一旦删除该目录,将导致 EMQ X 丢失所有业务数据。可以通过 emqx_ctl mnesia 命令查询 EMQ X 中 Mnesia 数据库的系统信息。

  • log 目录 emqx.log.:EMQ X 运行时产生的日志文件 crash.dump:EMQ X 的崩溃转储文件,可以通过 etc/emqx.conf 修改配置。 erlang.log.:以 emqx start 方式后台启动 EMQ X 时,控制台日志的副本文件。

3. 使用websocket实现消息的收发

使用基于emqx的图形化ui中的websocket实现图片的收发 请求地址: http://emqx服务器ip:18083/#/websocket

  1. 创建连接
  1. 订阅以 testtopic/ 开头的消息
  1. 向主题名为 testtopic/1 的主题发送消息
  1. 这样订阅了该主题客户端就能够收到相关消息

三. 使用emqx基础功能

1. Dashboard

  1. 功能展示截图
  1. 功能描述 这些功能将在后续的使用中介绍到
  1. 需要注意的是可以在admin->settings->Language 中设置当前语言为中文

2. 认证

认证介绍

身份认证是大多数应用的重要组成部分,MQTT 协议支持用户名密码认证,启用身份认证能有效阻止非法客户端的连接。 EMQ X 中的认证指的是当一个客户端连接到 EMQ X 的时候,通过服务器端的配置来控制客户端连接服务器的权限。

EMQ X 的认证支持包括两个层面:

  • MQTT 协议本身在 CONNECT 报文中指定用户名和密码,EMQ X 以插件形式支持基于 Username、ClientID、HTTP、JWT、LDAP 及各类数据库如 MongoDB、MySQL、PostgreSQL、Redis 等多种形式的认证。
  • 在传输层上,TLS 可以保证使用客户端证书的客户端到服务器的身份验证,并确保服务器向客户端验证服务器证书。也支持基于 PSK 的 TLS/DTLS 认证。
认证方式

EMQ X 支持使用内置数据源(文件、内置数据库)、JWT、外部主流数据库和自定义 HTTP API 作为身份认证数据源。 连接数据源、进行认证逻辑通过插件实现的,每个插件对应一种认证方式,使用前需要启用相应的插件。 客户端连接时插件通过检查其 username/clientid 和 password 是否与指定数据源的信息一致来实现对客户端的身份认证。

EMQ X 支持的认证方式:

  • 内置数据源 Mnesia (用户名/Client ID)认证 使用配置文件与 EMQ X 内置数据库提供认证数据源,通过 HTTP API 进行管理,足够简单轻量。
  • 外部数据库 可以存储大量数据,同时方便与外部设备管理系统集成。 LDAP 认证 MySQL 认证 PostgreSQL 认证 Redis 认证 MongoDB 认证
  • 其他 HTTP 认证 JWT 认证

注意

  1. 更改插件配置后需要重启插件才能生效,部分认证鉴权插件包含 ACL 功能。
  2. 各种认证方式配置详情请点击 这里 查看
匿名认证

EMQ X 默认配置中启用了匿名认证,任何客户端都能接入 EMQ X。没有启用认证插件或认证插件没有显式允许/拒绝(ignore)连接请求时,EMQ X 将根据匿名认证启用情况决定是否允许客户端连接。 生产环境中请禁用匿名认证。

代码语言:javascript复制
# 进入 etc/emqx.conf

# 配置是否开启匿名认证(默认为true)
## Value: true | false
allow_anonymous = false

配置之后, 可以发现再次使用websocket 直接连接就会失败, 必须要输入指定的用户名和密码
密码加盐规则与哈希方法

EMQ X 多数认证插件中可以启用哈希方法,数据源中仅保存密码密文,保证数据安全。 启用哈希方法时,用户可以为每个客户端都指定一个 salt(盐)并配置加盐规则,数据库中存储的密码是按照加盐规则与哈希方法处理后的密文。

以 MySQL 认证为例:加盐规则与哈希方法配置

代码语言:javascript复制
# etc/plugins/emqx_auth_mysql.conf

## 不加盐,仅做哈希处理
auth.mysql.password_hash = sha256

## salt 前缀:使用 sha256 加密 salt   密码 拼接的字符串
auth.mysql.password_hash = salt,sha256

## salt 后缀:使用 sha256 加密 密码   salt 拼接的字符串
auth.mysql.password_hash = sha256,salt

## pbkdf2 with macfun iterations dklen
## macfun: md4, md5, ripemd160, sha, sha224, sha256, sha384, sha512
## auth.mysql.password_hash = pbkdf2,sha256,1000,20
EMQ X 身份认证流程
  1. 根据配置的认证 SQL 结合客户端传入的信息,查询出密码(密文)和 salt(盐)等认证数据,没有查询结果时,认证将终止并返回 ignore 结果
  2. 根据配置的加盐规则与哈希方法计算得到密文,没有启用哈希方法则跳过此步
  3. 将数据库中存储的密文与当前客户端计算的到的密文进行比对,比对成功则认证通过,否则认证失败 PostgreSQL 认证功能逻辑图:
认证结果

任何一种认证方式最终都会返回一个结果:

  • 认证成功:经过比对客户端认证成功
  • 认证失败:经过比对客户端认证失败,数据源中密码与当前密码不一致
  • 忽略认证(ignore):当前认证方式中未查找到认证数据,无法显式判断结果是成功还是失败,交由认证链下一认证方式或匿名认证来判断

认证操作案例

username 认证

Username 认证使用配置文件预设客户端用户名与密码,支持通过 HTTP API 管理认证数据。 Username 认证不依赖外部数据源,使用上足够简单轻量。使用这种认证方式前需要开启插件,我们可以在Dashboard里找到这个插件并开启。

Client ID 认证

配置和测试流程同username认证

http认证

开启http认证, 通过客户端来进行认证, emqx通过客户端返回的响应码来判断请求成功或失败 响应码可见 认证结果 这一部分介绍

  • HTTP 请求方法为 GET 时,请求参数将以 URL 查询字符串的形式传递;POST、PUT 请求则将请求参数以普通表单形式提交(content-type 为 x-www-form-urlencoded)。
  • 你可以在认证请求中使用以下占位符,请求时 EMQ X 将自动填充为客户端信息: %u:用户名 %c:Client ID %a:客户端 IP 地址 %r:客户端接入协议 %P:明文密码 %p:客户端端口 %C:TLS 证书公用名(证书的域名或子域名),仅当 TLS 连接时有效 %d:TLS 证书 subject,仅当 TLS 连接时有效
  • 推荐使用 POST 与 PUT 方法,使用 GET 方法时明文密码可能会随 URL 被记录到传输过程中的服务器日志中。

四. 使用emqx高级功能

1. 客户端SDK

在实际项目中我们要针对接MQTT消息代理服务端,从而向其发布消息、订阅消息等来完成我们自己的业务逻辑的开发。EMQX针对不同的客户端语言都提供了不同的SDK工具包,可以在官网上查看并下下载

sdk-paho

基于Java语言开发的sdk工具, 通过该sdk来实现对emqx的操作 官方文档及源码地址 https://github.com/eclipse/paho.mqtt.java

实现步骤

MQTT.js

MQTT.js是MQTT协议的客户端库,用JavaScript编写,适用于node.js和浏览器。 GitHub项目地址:https://github.com/mqttjs/MQTT.js

下面将演示基于sdk-matt.js实现消息的收发

  1. 在sdk-paho创建的springboot项目的resources/static 目录下, 创建html文件, 类型为html5, 名称为 index
  2. 编写 index.html文件代码 作用是: 创建三个div, 第一个div监听客户端收到的数据包. 第二个div显示每隔2s客户端发送的数据, 第三个div显示消息到达客户端的消息
代码语言:javascript复制
<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>mqtt.js测试</title>
    <script src="https://cdn.staticfile.org/jquery/1.10.2/jquery.min.js" ></script>

    <script src="https://unpkg.com/mqtt/dist/mqtt.min.js" ></script>
    <style>
        div{
            width: 300px;
            height: 300px;
            float: left;
            border: 1px solid red;
        }
    </style>
    <script type="text/javascript">
        $(function () {

            //定义连接选项对象
            const  options = {
                clean: true, // 不保留回话
                connectTimeout: 4000, // 超时时间
                // 认证信息
                clientId: 'emqx_h5_client',
                username: 'user',
                password: '123456',
            }
            // 连接字符串, 通过协议指定使用的连接方式
            // ws 未加密 WebSocket 连接 8083端口
            // wss 加密 WebSocket 连接 8084端口
            // mqtt 未加密 TCP 连接
            // mqtts 加密 TCP 连接
            // wxs 微信小程序连接
            // alis 支付宝小程序连接
            const connectUrl = "ws://emqx所在服务器ip:8083/mqtt";
            const client = mqtt.connect(connectUrl,options);

            /**
             * mqtt.Client相关事件
             */
            //当重新连接启动触发回调
            client.on('reconnect', () => {
                $("#div1").text("正在重连.....");
            });
            //连接断开后触发的回调
            client.on("close",function () {
                $("#div1").text("客户端已断开连接.....");
            });
            //从broker接收到断开连接的数据包后发出。MQTT 5.0特性
            client.on("disconnect",function (packet) {
                $("#div1").text("从broker接收到断开连接的数据包....." packet);
            });
            //客户端脱机下线触发回调
            client.on("offline",function () {
                $("#div1").text("客户端脱机下线.....");
            });
            //当客户端无法连接或出现错误时触发回调
            client.on("error",(error) =>{
                $("#div1").text("客户端出现错误....." error);
            });

            //以下两个事件监听粒度细
            //当客户端发送任何数据包时发出。这包括published()包以及MQTT用于管理订阅和连接的包
            client.on("packetsend",(packet)=>{
                $("#div1").text("客户端已发出数据包....." packet);
            });
            //当客户端接收到任何数据包时发出。这包括来自订阅主题的信息包以及MQTT用于管理订阅和连接的信息包
            client.on("packetreceive",(packet)=>{
                $("#div1").text("客户端已收到数据包....." packet);
            });


            //注册监听connect事件
            client.on("connect",function (connack) {
                //成功连接上服务端之后
                $("#div1").text("成功连接上服务器" new Date());
                //订阅 testtopic/#
                client.subscribe("testtopic/#",{qos:2});


                //每隔2秒钟发布一次
                setInterval(publish,2000);

            });

            function publish() {
                //发布数据
                /*** client.publish(topic,message,[options], [callback])
                 * message: Buffer or String
                 * options:{
                 * qos:0, //默认0
                 * retain:false, //默认false
                 * dup:false, //默认false
                 * properties:{}
                 * }
                 * callback:function (err){}
                 */
                const message = "h5 message " Math.random() new Date();
                client.publish("testtopic/123",message,{qos:2});
                $("#div2").text("客户端发布了数据:" message);
            }

            //注册消息到达的事件
            client.on("message",(topic, message, packet)=>{
                $("#div3").text("客户端收到订阅消息,topic=" topic ";消息数据:" message ";数据包:" packet);
            });


            //页面离开自动断开连接
            $(window).bind("beforeunload",()=>{
                $("#div1").text("客户端窗口关闭,断开连接");
                client.end();
            })

        });

    </script>
</head>
<body>


<div id="div1"></div>
<div id="div2"></div>
<div id="div3"></div>
</body>
</html>

2. 日志与追踪

日志级别

  • EMQ X 的日志分 8 个等级, 由低到高分别为:debug < info < notice < warning < error < critical < alert < emergency
  • EMQ X 的默认日志级别为 warning,可在 /etc/emqx.conf中修改:log.level = warning 此配置将所有 log handler 的配置设置为 warning。
代码语言:javascript复制
 [Primary Level] -- global log level and filters
		  /  
 [Handler 1] [Handler 2] -- log levels and filters at each handler

EMQ X 使用了分层的日志系统,在日志级别上,包括全局日志级别 (primary log level)、以及各 log handler 是负责日志处理和输出的工作进程,它由 log handler id 唯一标识,并负有如下任务:

  • 接收什么级别的日志
  • 如何过滤日志消息
  • 将日志输出到什么地方的日志级别。

查看 emqx 默认安装的 log handlers:

  • file: 负责输出到日志文件的 log handler。 没有设置特殊过滤条件,所有日志消息只要级别满足要求就输出。输出目的地为日志文件。
  • default: 负责输出到控制台的 log handler。 没有设置特殊过滤条件,所有日志消息只要级别满足要求就输出。输出目的地为控制台。
  • ssl_handler: ssl 的 log handler。 它的过滤条件设置为当日志是来自 ssl 模块时输出。输出目的地为控制台。

总结:

  • 在日志级别小节中提到的 log.level 是修改了全局的日志级别。这包括 primary log level 和各个 handlers的日志级别,都设置为了同一个值。
  • Primary Log Level 相当于一个自来水管道系统的总开关,一旦关闭则各个分支管道都不再有水流通过。这个机制保证了日志系统的高性能运作。

运行时修改日志级别

可以使用 EMQ X 的命令行工具 emqx_ctl 在运行时修改 emqx 的日志级别:

日志追踪

日志文件和日志滚动

  • EMQ X 的默认日志文件目录在软件根目录下的 log目录下。 可在软件根目录下 /etc/emqx.conf 中配置:log.dir = log
  • 在文件日志启用的情况下 (log.to = file 或 both),日志目录下会有如下几种文件: emqx.log.N: : 包含了 EMQ X 的所有日志消息。比如emqx.log.1 , emqx.log.2 … emqx.log.siz 和 emqx.log.idx: 记录日志滚动信息的系统文件。 run_erl.log: 以 emqx start 方式后台启动 EMQ X 时,用于记录启动信息的系统文件。 erlang.log.N: 以 emqx start 方式后台启动 EMQ X 时,控制台日志的副本文件。比如 erlang.log.1 , erlang.log.2 …

详情见官方文档-日志参数配置


3. ACL 发布订阅权限控制

  • 发布订阅 ACL 指对 发布 (PUBLISH)/订阅 (SUBSCRIBE) 操作的 权限控制。例如拒绝用户名为 Anna 向 open/elsa/door 发布消息。
  • EMQ X 支持通过客户端发布订阅 ACL 进行客户端权限的管理,官网关于ACL详细介绍

HTTP 认证使用外部自建 HTTP 应用认证授权数据源,根据 HTTP API 返回的数据判定授权结果,能够实现复杂的 ACL 校验逻辑。

功能搭建

搭建实现过程

注意:

  • 配置文件配置好后, 需要重启emqx
  • auth.http.super_reqauth.http.acl_req配置的是客户端应用程序superuser 请求和ACL 授权查询请求接口地址, 因此我们不仅要关注接口的地址正确与否, 还要编写这两个接口的代码

测试结果

通过MQTTX测试发布订阅权限是否成功 MQTTX安装教程在第三章第2节认证操作案例之username认证第5步

  1. 测试超级用户 连接参数, 账号admin,密码admin

订阅主题

向指定主题发送消息并通过订阅消息接收 因为是超级用户, 所以订阅和接收都没有限制

客户端应用程序输出信息

  1. 测试emq-client2(账号: emq-client2, 密码123456) 因为上面应用程序代表逻辑中规定, 只有用户名为 emq-client2 的客户端能够去订阅 testtopic/# 的权限其他客户端都不可以. 因此我们先测试emq-client2然后测试emq-client3能否订阅该主题

主题订阅

客户端应用程序输出信息

  1. 测试emq-client3 (账号: emq-client3, 密码: 123456) 因为客户端代码逻辑规定, 只有用户名为 emq-client3 的客户端能够向 testtopic/123 发布消息的权限其他都没有. 因此可以先去测试emq-client3能否订阅testtopic/# 后, 在测试其能否向testtopic/123 发布消息 连接参数

测试能否订阅 testtopic/# 主题

测试能否向 testtopic/123 主题发送消息(可以)

同时可以看到emq-client2接收到了emq-client3发送的消息

测试过程客户端应用程序输出

由测试结果123可以看出:

  • 超级用户订阅和发布不会受到限制
  • 通过客户端代码可以控制具体哪个用户能订阅到那个主题, 或者能够向哪个主题发布消息

4. WebHook插件

插件可以将所有 EMQ X 的事件及消息都发送到指定的 HTTP 服务器 只需要在etc/plugins/emqx_web_hook.conf 中配置用于通知emqx 客户端服务端消息事件的事件类型以及被通知的Web Server地址

插件原理

WebHook 的内部实现是基于钩子,但它更靠近顶层一些。它通过在钩子上的挂载回调函数,获取到 EMQ X中的各种事件,并转发至 emqx_web_hook 中配置的 Web 服务器。

以客户端成功接入(client.connected) 事件为例,其事件的传递流程如下:

模拟实现

5. 管理监控API的使用

EMQ X 提供了 HTTP API 以实现与外部系统的集成,例如查询客户端信息、发布消息和创建规则等。 EMQ X 的 HTTP API 服务默认监听 8081 端口,可通过 etc/plugins/emqx_management.conf 配置文件修改监听端口,或启用 HTTPS 监听。EMQ X 4.0.0 以后的所有 API 调用均以 api/v4 开头。

接口安全及响应码

EMQ X 的 HTTP API 使用 Basic 认证方式, id 和 password 须分别填写 AppID 和 AppSecret。 默认的AppID 和 AppSecret 是: amdin/public 。你可以在 Dashboard 的左侧菜单栏里,选择 “MANAGEMENT” ->“Applications” 来修改和添加 AppID/AppSecret。

  1. 响应码 EMQ X 接口在调用成功时总是通过HTTP status code返回响应码
  1. 返回码 EMQ X 接口的响应消息体为 JSON 格式,其中总是包含返回码 code 。

接口请求工具

在第三章第2节 认证操作案例->client id认证第4步中, 我们已经使用了vscode的rest client 插件发送http请求. 下面我们将继续使用该插件来请求其他api

代码语言:javascript复制
@hostname = emqx所在服务器ip
@port=8081
@contentType=application/json
@userName=admin 
@password=public

#############获取所有支持的API接口######## 
GET  http://{{hostname}}:{{port}}/api/v4 HTTP/1.1 
Content-Type: {{contentType}} 
Authorization: Basic {{userName}}:{{password}}

#############获取所有Broker基本信息######## 
GET  http://{{hostname}}:{{port}}/api/v4/brokers HTTP/1.1 
Content-Type: {{contentType}} 
Authorization: Basic {{userName}}:{{password}}

#############获取Broker基本信息######## 
GET  http://{{hostname}}:{{port}}/api/v4/brokers/{node} HTTP/1.1 
Content-Type: {{contentType}} 
Authorization: Basic {{userName}}:{{password}}

#############获取客户端列表信息######## 
GET  http://{{hostname}}:{{port}}/api/v4/clients HTTP/1.1 
Content-Type: {{contentType}} 
Authorization: Basic {{userName}}:{{password}}

#############获取集群下所有订阅信息######## 
GET  http://{{hostname}}:{{port}}/api/v4/subscriptions HTTP/1.1 
Content-Type: {{contentType}} 
Authorization: Basic {{userName}}:{{password}}

#############获取所有状态数据######## 
GET  http://{{hostname}}:{{port}}/api/v4/stats HTTP/1.1 
Content-Type: {{contentType}} 
Authorization: Basic {{userName}}:{{password}}

#############获取集群下当前告警信息######## 
GET  http://{{hostname}}:{{port}}/api/v4/alarms/present HTTP/1.1 
Content-Type: {{contentType}} 
Authorization: Basic {{userName}}:{{password}}

#############获取黑名单信息######## 
GET  http://{{hostname}}:{{port}}/api/v4/banned HTTP/1.1 
Content-Type: {{contentType}} 
Authorization: Basic {{userName}}:{{password}}

6. 保留消息

当客户端建立订阅时,如果服务端存在主题匹配的保留消息,则这些保留消息将被立即发送给该客户端。借助保留消息,新的订阅者能够立即获取最近的状态,而不需要等待无法预期的时间,这在很多场景下非常重要的。

EMQ X 默认开启保留消息的功能,可以在 etc/emqx.conf 中修改 mqtt.retain_available 为 false 以禁用保留消息功能。如果 EMQ X 在保留消息功能被禁用的情况下依然收到了保留消息,那么将返回原因码为0x9A(不支持保留消息)的 DISCONNECT 报文。

应用场景

某车联网项目,车辆出租公司会实时监控所有车辆的GPS地理位置信息,这些信息是通过每个车辆每10分钟定时上报的GPS信息,这些信息需要展示在某调度系统的大屏上,该调度系统因为其他模块升级需要重新部署,升级后也需要去订阅获取所有车辆的GPS信息,上线完成后刚好错过了车辆最近一次上报的GPS信息,如果这些消息不是保留消息,该调度系统大屏上是空白的,必须等10分钟后才能调度这些车辆,10分钟内无法做出任何操作,用户体验非常差,但是如果这些信息是保留消息,该系统上线后立即就会收到最近所有车辆的位置信息,立即就可以展示然后进行调度。

保留消息配置

EMQ X 的保留消息功能是由 emqx_retainer 插件实现,该插件默认开启,通过修改 emqx_retainer 插件的配置,可以调整 EMQ X 储存保留消息的位置,限制接收保留消息数量和 Payload 最大长度,以及调整保留消息的过期时间。 emqx_retainer 插件默认开启,插件的配置路径为 etc/plugins/emqx_retainer.conf 。

代码语言:javascript复制
## retained 消息存储方式
##  - ram: 仅内存
##  - disc: 内存和磁盘
##  - disc_only: 仅磁盘
retainer.storage_type = ram

## 最大存储数 (0表示未限制)
retainer.max_retained_messages = 0

## 单条最大可存储消息大小
retainer.max_payload_size = 1MB

## 过期时间, 0 表示永不过期
## 单位: h 小时; m 分钟; s 秒。如 60m 表示 60 分钟
retainer.expiry_interval = 0

测试保留消息

可以通过DashBoard -> 工具 -> websocket 来模拟保留消息的实现以及效果 1.首先去新建一个保留消息(图1), 然后订阅. 查看接收到的消息(图2) 2.取消步骤1订阅(图3), 然后创建一个非保留消息(图4), 然后再去订阅并查看收到的消息(图5)

图1

图2

图3

图4

图5

7. 共享订阅

共享订阅是在多个订阅者之间实现负载均衡的订阅方式 共享订阅的主题格式是针对订阅端来指定的,例如: $share/g/t/a ;而消息的发布方是向主题: t/a发布消息。这样在订阅方才能达到负载均衡的效果

带群组的共享订阅

$share/<group-name> 为前缀的共享订阅是带群组的共享订阅 group-name 可以为任意字符串,属于同一个群组内部的订阅者将以负载均衡接收消息,但 EMQ X 会向不同群组广播消息。

例如,假设订阅者 s1,s2,s3 属于群组 g1,订阅者 s4,s5 属于群组 g2。那么当 EMQ X 向这个主题发布消息 msg1 的时候:EMQ X 会向两个群组

  • g1 和 g2 同时发送 msg1s1,s2,s3 中只有一个会收到 msg1
  • s4,s5 中只有一个会收到 msg1

实现过程

不带群组的共享订阅消息

queue/ 为前缀的共享订阅是不带群组的共享订阅。它是 share 订阅的一种特例,相当与所有订阅者都在一个订阅组里面:

实现过程

  1. 基于上面带群组的共享订阅进行测试
  2. 使用MQTTX 使用 emq-client2, emq-client3用户建立连接, 且都订阅 $queue/g1/t1/a 主题.
  3. 登录emqx的Dashboard的websocket插件, 创建连接, 使用上面emqx-demo项目中创建的超级用户 admin/admin , 向t1/a主题发布5条消息不带群组的共享群组订阅消息.
  4. 可以看到client2和client3随机收到了消息

均衡策略与派发 Ack 配置

常用的均衡策略有随机, 轮询, 哈希等, 具体如下图

配置均衡策略 在emqx根目录下. 进入 /etc/emqx.conf , 通过修改broker.shared_dispatch_ack_enabled 属性来修改负载均衡策略

代码语言:javascript复制
# 均衡策略 
## Dispatch strategy for shared subscription 
#### Value: Enum 
## - random 
## - round_robin 
## - sticky 
## - hash broker.shared_subscription_strategy = random 
# 共享分发时是否需要 ACK,适用于 QoS1 QoS2 消息,启用时,当通过shared_subscription_strategy选中的 一个订阅者离线时,应该允许将消息发送到组中的另一个订阅者 
broker.shared_dispatch_ack_enabled = false
  1. 延时发布

EMQ X 的延迟发布功能可以实现按照用户配置的时间间隔延迟发布 PUBLISH 报文的功能。当客户端使用特殊主题前缀$delayed/{DelayInteval} 发布消息到 EMQ X 时,将触发延迟发布功能。延迟发布的功能是针对消息发布者而言的,订阅方只需要按照正常的主题订阅即可。

应用场景

某智能售货机平台在双十一当天要对设备中所有商品做5折销售,双十一过去之后要立马恢复原价,为了满足这样的场景,我们可以在双十一0点给所有设备发送两条消息,一条消息是通过正常的主题发送,消息内容打5折;第二条消息延迟消息,延迟24小时,消息内容是恢复原价。这样在一个实现中可以完成两个业务场景

消息发布格式

代码语言:javascript复制
# DelayInterval延时时间, 单位秒, TopicName: 主题名
$delayed/{DelayInterval}/{TopicName}

功能实现

  1. 登录Dashboard http://emqx所在ip:18083/#/plugins, 开启延时发布插件emqx_delayed_publish
  2. 在websocket http://emqx所在ip:18083/#/websocket中, 在创建连接后, 首先订阅一个主题 t1/1, 然后根据这个主题按照延时发布的格式 $delayed/10/t1/1发布消息{ "msg": "测试延时发布" }, 即: 消息发送后消息订阅者延时10s才能接收到消息

10. 代理订阅

静态/内置代理订阅

EMQ X 的代理订阅功能使得客户端在连接建立时,不需要发送额外的 SUBSCRIBE 报文,便能自动建立用户预设的订阅关系。 静态代理订阅的核心就是通过配置文件来配置订阅的主题, 在我们建立连接后就会自动为我们创建创建订阅

功能实现

基于Webhook和API实现动态代理订阅

开启了 emqx_web_hook 组件后,EMQ X的事件都会勾起对我们配置的webhook接口进行回调,在该webhook接口中我们能够获取客户端的相关信息比如 clientId,username 等,然后我们可以在该接口方法中针对该客户端自动订阅某一主题,订阅的实现我们基于EMQ X给我们提供的监控管理的相关HTTP API,意味着我们调用相关的HTTP API可完成客户端订阅的功能,相关的HTTP API可在Dashboard中查看,也可以在官方的产品文档中查找: 功能概括就是: 让客户端连接时自动订阅某一主题, 反之客户端下线时我们可以自动取消订阅

功能实现

  1. 登录Dashboard http://emqx所在ip:18083/#/plugins, 开启webhook插件emqx_web_hook

11. 主题重写

EMQ X 的主题重写功能支持根据用户配置的规则在客户端订阅主题、发布消息、取消订阅的时候将 A 主题重写为 B 主题。 EMQ X 的保留消息和延迟发布可以与主题重写配合使用,例如,当用户想使用延迟发布功能,但不方便修改客户端发布的主题时,可以使用主题重写将相关主题重写为延迟发布的主题格式。

应用场景:

某共享单车平台A运营着大量的共享单车,每个单车上都装有一个物联网终端芯片,芯片上的程序是将一些数据通过mqtt协议上报到EMQ服务器;该公司某一天收购了另一家共享单车平台B,B平台下原有的单车也是通过mqtt上报消息数据,但是消息主题跟A平台的不一样,如果A平台想接入B平台的车上报的数据,我们就需要把B平台下所有车上芯片程序更改一下,这样虽然可行但是会耗费大量的人力物力成本,这时我们通过主题重写就可以实现B平台下所有单车数据的接收,几乎不需要编码,成本非常低。

每条主题重写规则的格式:

代码语言:javascript复制
module.rewrite.rule.<number> = 主题过滤器 正则表达式 目标表达式

注意事项

  • EMQ X 的主题重写规则需要用户自行配置,用户可以自行添加多条主题重写规则,规则的数量没有限制,但由于任何携带主题的 MQTT 报文都需要匹配一遍重写规则,因此此功能在高吞吐场景下带来的性能损耗与规则数量是成正比的,用户需要谨慎地使用此功能
  • 在主题重写功能开启的前提下,EMQ X 在收到诸如 PUBLISH 报文等带有主题的 MQTT 报文时,将使用报文中的主题去依次匹配配置文件中规则的主题过滤器部分,一旦成功匹配,则使用正则表达式提取主题中的信息,然后替换至目标表达式以构成新的主题。
  • EMQ X 使用倒序读取配置文件中的重写规则,当一条主题可以同时匹配多条主题重写规则的主题过滤器时,EMQ X 仅会使用它匹配到的第一条规则进行重写,如果该条规则中的正则表达式与 MQTT 报文主题不匹配,则重写失败,不会再尝试使用其他的规则进行重写。因此用户在使用时需要谨慎的设计 MQTT 报文主题以及主题重写规则。
  • 目标表达式中可以使用 N 这种格式的变量匹配正则表达中提取出来的元素, N 的值为正则表达式中提取出来的第 N 个元素,比如

正则表达式解析:

  • ^ 匹配输入字符串的开始位置,除非在方括号表达式中使用,当该符号在方括号表达式中使用时,表示不接受该方括号表达式中的字符集合
  • $ 匹配输入字符串的结尾位置
  • ( ) 表示一个标记一个子表达式的开始和结束位置,
  • [ 标记一个中括号表达式的开始
  • . 匹配除换行符 n 之外的任何单字符,
  • 匹配前面的子表达式一次或多次
  • * 匹配前面的子表达式零次或多次
  • ? 匹配前面的子表达式零次或一次
  • | 指明两项之间的一个选择
  • {n} n 是一个非负整数。匹配确定的 n 次
  • {n,} n 是一个非负整数。至少匹配n 次
  • {n,m} m 和 n 均为非负整数,其中n <= m。最少匹配 n 次且最多匹配 m 次 d 匹配一个数字字符。等价于 [0-9]

主题重写配置实例

代码语言:javascript复制
module.rewrite.rule.1 = y/ /z/# ^y/(. )/z/(. )$ y/z/$2 
module.rewrite.rule.2 = x/# ^x/y/(. )$ z/y/x/$1 
module.rewrite.rule.3 = x/y/  ^x/y/(d )$ z/y/$1

配置解析

功能实现

主题重写功能默认关闭,开启此功能需要修改 etc/emqx.conf 文件中的 module.rewrite 配置项, 修改后重启 emqx

代码语言:javascript复制
## Rewrite Module

## Enable Rewrite Module.
##
## Value: on | off
module.rewrite = on
##x/y/1 -> z/y/1 ,这里通过下面规则会将原来的主题x/y/1 重写从 z/y/1
module.rewrite.rule.1 = x/# ^x/y/(. )$ z/y/$1

功能测试 登录MQTTX, 订阅重写后的主题, 然后向重写前的主题发送消息, 测试重写配置是否正确, 如下图

12. 黑名单配置

EMQ X 为用户提供了黑名单功能,用户可以通过相关的 HTTP API 将指定客户端加入黑名单以拒绝该客户端访问,除了客户端标识符以外,还支持直接封禁用户名甚至 IP 地址。 黑名单只适用于少量客户端封禁需求,如果有大量客户端需要认证管理,我们需要使用认证功能来实现。

自动封禁

在黑名单功能的基础上,EMQ X 支持自动封禁那些被检测到短时间内频繁登录的客户端,并且在一段时间内拒绝这些客户端的登录,以避免此类客户端过多占用服务器资源而影响其他客户端的正常使用。 需要注意的是,自动封禁功能只封禁客户端标识符,并不封禁用户名和 IP 地址,即该机器只要更换客户端标识符就能够继续登录。

自动封禁功能配置

手动配置(通过管理端api实现)

13. 速率限制

EMQ X 提供对接入速度、消息速度的限制:当客户端连接请求速度超过指定限制的时候,暂停新连接的建立;当消息接收速度超过指定限制的时候,暂停接收消息

速率限制原理

EMQ X 使用令牌桶Token Bucket算法来对所有的 Rate Limit 来做控制。令牌桶算法 的逻辑如下图:

  • 存在一个可容纳令牌(Token) 的最大值 burst 的桶(Bucket),最大值 burst 简记为 b 。
  • 存在一个 rate 为每秒向桶添加令牌的速率,简记为 r 。当桶满时则不不再向桶中加入令牌。
  • 每当有 1 个(或 N 个)请求抵达时,则从桶中拿出 1 个 (或 N 个) 令牌。如果令牌不不够则阻塞,等待令牌的生成。

由此可知该算法中:

  • 长期来看,所限制的请求速率的平均值等于 rate 的值。
  • 记实际请求达到速度为 M,且 M > r,那么,实际运行中能达到的最大(峰值)速率为 M = b r. 证明: 最大速率 M 为:能在1个单位时间内消耗完满状态令牌桶的速度。而桶中令牌的消耗速度为M - r,故可知:b / (M - r) = 1,得 M = b r

功能实现

  1. 速率限制是一种 backpressure 方案,从入口处避免了系统过载,保证了系统的稳定和可预测的吞吐。速率限制可在 etc/emqx.conf 中配置:
  • max_conn_rate 是单个 emqx 节点上连接建立的速度限制。 1000 代表每秒最多允许 1000 个客户端接入。
  • publish_limit 是单个连接上接收 PUBLISH 报文的速率限制。 100,10s 代表每个连接上允许收到的最大PUBLISH 消息速率是每 10 秒 100 个。
  • rate_limit 是单个连接上接收 TCP数据包的速率限制。 100KB,10s 代表每个连接上允许收到的最大 TCP报文速率是每 10 秒 100KB。 publish_limit 和 rate_limit 提供的都是针对单个连接的限制,EMQ X 目前没有提供全局的消息速率限制。

14. 消息重传

消息重传 (Message Retransmission) 是属于 MQTT 协议标准规范的一部分。

协议中规定了作为通信的双方 服务端 和 客户端 对于自己发送到对端的 PUBLISH 消息都应满足其 服务质量(Quality of Service levels) 的要求。如:

  • QoS 1:表示 消息至少送达一次 (At least once delivery);即发送端会一直重发该消息,除非收到了对端对该消息的确认。在 MQTT 协议的上层(即业务的应用层)相同的 QoS 1 消息可能会收到多次。
  • QoS 2:表示 消息只送达一次 (Exactly once delivery);即该消息在上层仅会接收到一次。

虽然,QoS 1 和 QoS 2 的 PUBLISH 报文在 MQTT 协议栈这一层都会发生重传,但注意:

  • QoS 1 消息发生重传后,在 MQTT 协议栈上层,也会收到这些重发的 PUBLISH 消息。
  • QoS 2 消息无论如何重传,最终在 MQTT 协议栈上层,都只会收到一条 PUBLISH 消息

功能实现

有两种场景会导致消息重发:

  1. PUBLISH 报文发送给对端后,规定时间内未收到应答。则重发这个报文。
  2. 在保持会话的情况下,客户端重连后;EMQ X 会自动重发 未应答的消息,以确保 QoS 流程的正确。

在 etc/emqx.conf 中可配置:

15. 系统调优

EMQ X 消息服务器 4.x 版本 MQTT 连接压力测试到 130 万,在一台 8 核心、32G 内存的 CentOS 服务器上。100 万连接测试所需的 Linux 内核参数,网络协议栈参数,Erlang 虚拟机参数.EMQ X 消息服务器参数以及测试客户端设置可见官网 系统调优 介绍, 非常全面


五. 规则引擎

介绍及原理

EMQ X Rule Engine (以下简称规则引擎) 用于配置 EMQ X 消息流与设备事件的处理、响应规则。 规则引擎用于配置一套规则,该规则是针对EMQ X的消息流和设备事件如何处理的一套细则

原理

EMQ X 在 消息发布事件触发 时将触发规则引擎,满足触发条件的规则将执行各自的 SQL 语句筛选并处理消息和事件的上下文信息。

  1. 消息发布

规则引擎借助响应动作可将特定主题的消息处理结果存储到关系型数据库(mysql,PostgreSQL),NoSql(Redis,MongoDB),发送到 HTTP Server,转发到消息队列 Kafka 或 RabbitMQ,重新发布到新的主题甚至是另一个 Broker 集群中,每个规则可以配置多个响应动作。

  1. 事件触发

规则引擎使用 $events/ 开头的虚拟主题(事件主题)处理 EMQ X 内置事件,内置事件提供更精细的消息控制和客户端动作处理能力,可用在 QoS 1 QoS 2 的消息抵达记录、设备上下线记录等业务中。

应用场景

  • 动作监听:智慧家庭智能门锁开发中,门锁会因为网络、电源故障、人为破坏等原因离线导致功能异常,使用规则引擎配置监听离线事件向应用服务推送该故障信息,可以在接入层实现第一时间的故障检测的能力;
  • 数据筛选:车辆网的卡车车队管理,车辆传感器采集并上报了大量运行数据,应用平台仅关注车速大于40 km/h 时的数据,此场景下可以使用规则引擎对消息进行条件过滤,向业务消息队列写入满足条件的数据;
  • 消息路由:智能计费应用中,终端设备通过不同主题区分业务类型,可通过配置规则引擎将计费业务的消息接入计费消息队列并在消息抵达设备端后发送确认通知到业务系统,非计费信息接入其他消息队列,实现业务消息路由配置;
  • 消息编解码:其他公共协议 / 私有 TCP 协议接入、工控行业等应用场景下,可以通过规则引擎的本地处理函数(可在 EMQ X 上定制开发)做二进制 / 特殊格式消息体的编解码工作;亦可通过规则引擎的消息路由将相关消息流向外部计算资源如函数计算进行处理(可由用户自行开发处理逻辑),将消息转为业务易于处理的 JSON 格式,简化项目集成难度、提升应用快速开发交付能力。

规则引擎组成

与 EMQ X 规则引擎相关的概念包括: 规则(rule)、动作(action)、资源(resource) 和 资源类型(resourcetype)。 规则、动作、资源的关系:

  • 规则(Rule): 规则由 SQL 语句和动作列表组成。动作列表包含一个或多个动作及其参数。
  • SQL 语句用于筛选或转换消息中的数据。
  • 动作(Action) 是 SQL 语句匹配通过之后,所执行的任务。动作定义了一个针对数据的操作。 动作可以绑定资源,也可以不绑定。例如,“inspect” 动作不需要绑定资源,它只是简单打印数据内容和动作参数。而“data_to_webserver” 动作需要绑定一个 web_hook 类型的资源,此资源中配置了 URL。
  • 资源(Resource): 资源是通过资源类型为模板实例化出来的对象,保存了与资源相关的配置(比如数据库连接地址和端口、用户名和密码等) 和系统资源(如文件句柄,连接套接字等)。
  • 资源类型 (Resource Type): 资源类型是资源的静态定义,描述了此类型资源需要的配置项。

SQL语句

基本语法

基本语法举例

CASE-WHEN 语法示例

规则引擎接收案例

现需要通过规则引擎提取出从 username=emq-client2 的客户端发送过来原始数据中的msg,user,orderNo 等数据,需要过滤 password 字段,同时还需要提取消息发布的qos信息,然后将最终过滤出来的消息通知到我们的web服务上。

功能实现


现将该教程所使用的emq x,mqttx, xhell xftp 通过百度云分享如下: 觉得不错麻烦动动手点个赞吧~~~

链接:https://pan.baidu.com/s/1dSgs79rw5bnSkQyH-FbR4A 提取码:6mte 复制这段内容后打开百度网盘手机App,操作更方便哦–来自百度网盘超级会员V4的分享

0 人点赞