Edge2AI之从边缘摄取数据

2022-04-27 16:55:10 浏览数 (1)

在本次实操中,您将使用 MiNiFi 从边缘捕获数据并将其转发到 NiFi。

实验总结

  • 实验 1 - 在 Apache NiFi 上运行模拟器,将 IoT 传感器数据发送到 MQTT broker。
  • 实验 2 - 创建一个流以使用 Cloudera Edge Flow Manager 从 MQTT broker收集数据并将其发布到 MiNiFi 代理。
  • 实验 3 - 使用Cloudera Edge Flow Manager更新现有边缘流程并在边缘执行额外处理

实验 1 - Apache NiFi:设置机器传感器模拟器

在本实验中,您将运行一个简单的 Python 脚本来模拟来自一些假设的机器的 IoT 传感器数据,并将数据发送到 MQTT 代理 ( mosquitto )。MQTT 代理扮演网关的角色,通过“mqtt”协议连接到许多不同类型的传感器。您的集群附带模拟脚本发布到的嵌入式 MQTT 代理。为方便起见,我们将使用 NiFi 来运行脚本而不是 Shell 命令。

  1. 转到 Apache NiFi 并将处理器 (ExecuteProcess) 添加到画布。
  1. 右键单击处理器,选择配置(或者,只需双击处理器)。在属性选项卡上,设置如下所示的属性以运行我们的 Python 模拟脚本。
代码语言:javascript复制
Command:           python3
Command Arguments: /opt/demo/simulate.py
  1. SCHEDULING选项卡中,设置为Run Schedule: 1 sec

或者,您可以将其设置为其他时间间隔:1 秒、30 秒、1 分钟等......

  1. Settings选项卡中
  2. AUTOMATICALLY TERMINATED RELATIONSHIP部分中的选中“success”关系
  3. 将处理器名称设置为“Generate Test Data
  4. 单击Apply
  1. 然后,您可以右键单击以start来启动此模拟器运行程序。
  1. 几秒钟后右键单击并选择Stop并查看数据出处。你会看到它已经运行了很多次并产生了结果。

实验 2 - 配置边缘流管理

Cloudera Edge Flow Management (EFM) 为您提供环境中所有 MiNiFi 代理的可视化总览,并允许您更新每个代理的流配置,并通过NiFi Registry集成进行版本控制。在本实验中,您将创建 MiNiFi 流并将其发布以供 MiNiFi 代理获取。

  1. 在http://<public_dns>:10088/efm/ui/打开 EFM Web UI并选择Monitor选项卡 (

)

  1. 单击EVENTS标题并验证您的 EFM 服务器是否正在接收来自 MiNiFi 代理的心跳。单击心跳记录上的信息图标以查看心跳的详细信息。
  1. 选择流设计器选项卡 (

)。要构建数据流,请从表中选择所需的类 ( iot-1),然后单击OPEN。或者,您可以双击所需的类。

  1. 通过将处理器图标拖到画布上,选择ConsumeMQTT处理器类型并单击“Add”按钮,将ConsumeMQTT处理器添加到画布。处理器位于画布上后,双击它并使用以下设置对其进行配置:
代码语言:javascript复制
Broker URI:     tcp://<CLUSTER_HOSTNAME>:1883
Client ID:      minifi-iot
Topic Filter:   iot/#
Max Queue Size: 60

并确保在属性页面上向下滚动以设置Topic Filter和Max Queue Size:

  1. 远程处理组(Remote Process Group:RPG) 添加到画布并进行如下配置:
代码语言:javascript复制
URL:http://<CLUSTER_HOSTNAME>:8080/nifi
Transport Protocol: HTTP
  1. 此时您需要将 ConsumerMQTT 处理器连接到 RPG。为此,您首先需要向远程 NiFi 服务器添加一个输入端口。
  2. 在以下位置打开 NiFi Web UI http://<public_dns>:8080/nifi/
  3. Input Port拖到画布上。
  4. 当提示输入其名称时,将其命名为“from Gateway”,然后单击ADD
  1. 要终止NiFI的Input Port的数据,现在让我们在画布上添加一个Funnel......
  1. …并建立从输入端口到它的连接。要建立连接,请将鼠标悬停在输入端口上,直到箭头符号显示在中心。单击箭头,将其拖放到漏斗上以连接两个元素。
  1. 右键单击输入端口并启动它。或者,单击输入端口将其选中,然后按操作面板上的开始(“play”)按钮:
  1. 您将需要Input Port的 ID来完成ConsumeMQTT处理器与 RPG (NiFi) 的连接。双击Input Port并复制其 ID。
  1. 回到 Flow Designer,将 ConsumeMQTT 处理器连接到 RPG。连接需要一个 ID,您可以在此处粘贴您从输入端口复制的 ID。确保没有空格
  1. 双击连接并更新以下配置:
代码语言:javascript复制
Flowfile Expiration:60 seconds
Back Pressure Object Threshold: 10000
Connection Name:Sensor data
  1. 流程现已完成,但在发布之前,请在NiFi Registry中创建存储桶,以便存储流程的所有版本以供审核和审核。打开 NiFi Registry:http://<public_dns>:18080/nifi-registry,单击右上角的扳手/扳手图标 ( ) 并创建一个名为IoT(注意: 存储桶名称是大小写敏感的) 的存储桶。
  1. 您现在可以发布流以供 MiNiFi 代理自动获取。单击publish,为您的更改添加描述性注释,然后单击Apply。
  1. 返回NiFi Registry Web UI 并单击Cloudera 徽标旁边的NiFi Registry名称。如果流发布成功,您应该在 NiFi 注册表中看到流的版本详细信息。
  1. 此时,您可以测试边缘流直到 NiFi。再次启动 NiFi 模拟器(ExecuteProcess 处理器)并确认您可以看到 NiFi 中排队的消息。
  1. 右键单击队列并选择List Queue以查看收到消息的详细信息。
  1. 尝试单击其中一条消息的Info、Eye和Provenance图标,以分别查看消息属性、内容和出处详细信息。
  1. 例如,每条消息中的传感器读数都包含温度值,这些值都应该在 0 到 100 摄氏度之间。如果您对一些消息的内容进行采样,您应该能够注意到一些读数sensor_0并sensor_1报告了一些虚假值,如下所示。我们将在下一节中解决这个问题。
  1. 您现在可以停止该模拟器(停止 NiFi 处理器)。

实验 3 - 更新流程以在边缘执行额外处理

在之前的实验中,我们注意到一些传感器间歇性地发送错误的测量值。如果我们让这些测量由我们的下游应用程序处理,我们可能会遇到这些应用程序的输出质量问题。

我们可以过滤掉 NiFi 中的错误读数。但是,如果有问题的数据量很大,我们首先会浪费网络带宽将该数据发送到 NiFi。相反,我们要做的是将额外的逻辑推送到边缘,以识别和过滤这些问题,并避免将它们发送到 NiFi 的开销。

我们注意到,问题总是发生在测量中的温度sensor_0和sensor_1,只有。如果这两个温度中的任何一个大于 500,我们必须丢弃整个传感器读数。如果这两个温度都在正常范围内(< 500),我们可以保证报告的所有温度都是正确的,并且可以发送到 NiFi。

  1. 转至 CEM Web UI 并将新处理器添加到画布。在出现的对话框的过滤器框中,键入“JsonPath”。选择EvaluateJSONPath处理器并单击Add
  1. 双击新处理器并使用以下属性对其进行配置:
代码语言:javascript复制
Processor Name: Extract sensor_0 and sensor1 values
Destination:    flowfile-attribute
  1. 单击Add Property按钮并输入以下属性:

Property Name

Property Value

sensor_0

$.sensor_0

sensor_1

$.sensor_1

  1. 单击Apply以保存处理器配置。
  2. 将一个新的处理器拖到画布上。在出现的对话框的过滤器框中,键入“Routeon”。选择RouteOnAttribute处理器并单击Add
  1. 双击新处理器并使用以下属性对其进行配置:
代码语言:javascript复制
Processor Name: Filter Errors
Route Strategy: Route to Property name
  1. 单击Add Property按钮并输入以下属性:

Property Name

Property Value

error

${sensor_0:ge(500):or(${sensor_1:ge(500)})}

  1. 单击Apply以保存处理器配置。
  2. ConsumeMQTT处理器重新连接到Extract sensor_0 and sensor1 values处理器:
  3. 单击ConsumeMQTT和RPG之间的现有连接以将其选中。
  4. 将连接的目标端拖动到Extract sensor_0 and sensor1 values处理器。
  1. Extract sensor_0 and sensor1 values连接到Filter Errors处理器。当Create Connection对话框出现时,选择“ matched ”并点击Create
  1. 双击连接,更新以下配置并Apply更改:
代码语言:javascript复制
Flowfile Expiration:60 seconds
Back Pressure Object Threshold: 10000
Connection Name:Extracted attributes
  1. 双击Extract sensor_0 and sensor1 values并复选AUTOMATICALLY TERMINATED RELATIONSHIPS部分中的以下值,然后单击Apply
代码语言:javascript复制
failure
unmatched
  1. 在创建最后一个连接之前,您将需要(再次) NiFi Input Port的 ID 。转到 NiFi Web UI,双击“from Gateway”输入端口并复制其 ID。
  1. 返回 CEM Web UI,将Filter Errors处理器连接到 RPG:
  1. Create Connection对话框中,选中“ unmatched ”复选框并输入复制的输入端口 ID,然后单击Add:
  1. 双击连接,更新以下配置并Apply更改:
代码语言:javascript复制
Flowfile Expiration:60 seconds
Back Pressure Object Threshold: 10000
Connection Name:Valid data
  1. 要忽略错误,请双击Filter Errors处理器,选中自动终止关系部分下的error复选框,然后单击应用
  1. 最后,在 CEM 画布上单击ACTIONS > Publish... ,输入描述性注释,例如“添加了错误读数的过滤”,然后单击Publish
  1. 再次启动模拟器。
  2. 转到 NiFi Web UI 并确认数据正在流向 NiFi。检查消息的内容,就像我们之前所做的那样,确认有问题的读数已经消失。

验证数据后停止模拟器。

0 人点赞