在本次实操中,您将使用 MiNiFi 从边缘捕获数据并将其转发到 NiFi。
实验总结
- 实验 1 - 在 Apache NiFi 上运行模拟器,将 IoT 传感器数据发送到 MQTT broker。
- 实验 2 - 创建一个流以使用 Cloudera Edge Flow Manager 从 MQTT broker收集数据并将其发布到 MiNiFi 代理。
- 实验 3 - 使用Cloudera Edge Flow Manager更新现有边缘流程并在边缘执行额外处理
实验 1 - Apache NiFi:设置机器传感器模拟器
在本实验中,您将运行一个简单的 Python 脚本来模拟来自一些假设的机器的 IoT 传感器数据,并将数据发送到 MQTT 代理 ( mosquitto )。MQTT 代理扮演网关的角色,通过“mqtt”协议连接到许多不同类型的传感器。您的集群附带模拟脚本发布到的嵌入式 MQTT 代理。为方便起见,我们将使用 NiFi 来运行脚本而不是 Shell 命令。
- 转到 Apache NiFi 并将处理器 (ExecuteProcess) 添加到画布。
- 右键单击处理器,选择配置(或者,只需双击处理器)。在属性选项卡上,设置如下所示的属性以运行我们的 Python 模拟脚本。
Command: python3
Command Arguments: /opt/demo/simulate.py
- 在SCHEDULING选项卡中,设置为Run Schedule: 1 sec
或者,您可以将其设置为其他时间间隔:1 秒、30 秒、1 分钟等......
- 在Settings选项卡中
- 在 AUTOMATICALLY TERMINATED RELATIONSHIP部分中的选中“success”关系
- 将处理器名称设置为“Generate Test Data”
- 单击Apply。
- 然后,您可以右键单击以start来启动此模拟器运行程序。
- 几秒钟后右键单击并选择Stop并查看数据出处。你会看到它已经运行了很多次并产生了结果。
实验 2 - 配置边缘流管理
Cloudera Edge Flow Management (EFM) 为您提供环境中所有 MiNiFi 代理的可视化总览,并允许您更新每个代理的流配置,并通过NiFi Registry集成进行版本控制。在本实验中,您将创建 MiNiFi 流并将其发布以供 MiNiFi 代理获取。
- 在http://<public_dns>:10088/efm/ui/打开 EFM Web UI并选择Monitor选项卡 (
)
- 单击EVENTS标题并验证您的 EFM 服务器是否正在接收来自 MiNiFi 代理的心跳。单击心跳记录上的信息图标以查看心跳的详细信息。
- 选择流设计器选项卡 (
)。要构建数据流,请从表中选择所需的类 ( iot-1),然后单击OPEN。或者,您可以双击所需的类。
- 通过将处理器图标拖到画布上,选择ConsumeMQTT处理器类型并单击“Add”按钮,将ConsumeMQTT处理器添加到画布。处理器位于画布上后,双击它并使用以下设置对其进行配置:
Broker URI: tcp://<CLUSTER_HOSTNAME>:1883
Client ID: minifi-iot
Topic Filter: iot/#
Max Queue Size: 60
并确保在属性页面上向下滚动以设置Topic Filter和Max Queue Size:
- 将远程处理组(Remote Process Group:RPG) 添加到画布并进行如下配置:
URL:http://<CLUSTER_HOSTNAME>:8080/nifi
Transport Protocol: HTTP
- 此时您需要将 ConsumerMQTT 处理器连接到 RPG。为此,您首先需要向远程 NiFi 服务器添加一个输入端口。
- 在以下位置打开 NiFi Web UI http://<public_dns>:8080/nifi/
- 将Input Port拖到画布上。
- 当提示输入其名称时,将其命名为“from Gateway”,然后单击ADD。
- 要终止NiFI的Input Port的数据,现在让我们在画布上添加一个Funnel......
- …并建立从输入端口到它的连接。要建立连接,请将鼠标悬停在输入端口上,直到箭头符号显示在中心。单击箭头,将其拖放到漏斗上以连接两个元素。
- 右键单击输入端口并启动它。或者,单击输入端口将其选中,然后按操作面板上的开始(“play”)按钮:
- 您将需要Input Port的 ID来完成ConsumeMQTT处理器与 RPG (NiFi) 的连接。双击Input Port并复制其 ID。
- 回到 Flow Designer,将 ConsumeMQTT 处理器连接到 RPG。连接需要一个 ID,您可以在此处粘贴您从输入端口复制的 ID。确保没有空格!
- 双击连接并更新以下配置:
Flowfile Expiration:60 seconds
Back Pressure Object Threshold: 10000
Connection Name:Sensor data
- 流程现已完成,但在发布之前,请在NiFi Registry中创建存储桶,以便存储流程的所有版本以供审核和审核。打开 NiFi Registry:http://<public_dns>:18080/nifi-registry,单击右上角的扳手/扳手图标 ( ) 并创建一个名为IoT(注意: 存储桶名称是大小写敏感的) 的存储桶。
- 您现在可以发布流以供 MiNiFi 代理自动获取。单击publish,为您的更改添加描述性注释,然后单击Apply。
- 返回NiFi Registry Web UI 并单击Cloudera 徽标旁边的NiFi Registry名称。如果流发布成功,您应该在 NiFi 注册表中看到流的版本详细信息。
- 此时,您可以测试边缘流直到 NiFi。再次启动 NiFi 模拟器(ExecuteProcess 处理器)并确认您可以看到 NiFi 中排队的消息。
- 右键单击队列并选择List Queue以查看收到消息的详细信息。
- 尝试单击其中一条消息的Info、Eye和Provenance图标,以分别查看消息属性、内容和出处详细信息。
- 例如,每条消息中的传感器读数都包含温度值,这些值都应该在 0 到 100 摄氏度之间。如果您对一些消息的内容进行采样,您应该能够注意到一些读数sensor_0并sensor_1报告了一些虚假值,如下所示。我们将在下一节中解决这个问题。
- 您现在可以停止该模拟器(停止 NiFi 处理器)。
实验 3 - 更新流程以在边缘执行额外处理
在之前的实验中,我们注意到一些传感器间歇性地发送错误的测量值。如果我们让这些测量由我们的下游应用程序处理,我们可能会遇到这些应用程序的输出质量问题。
我们可以过滤掉 NiFi 中的错误读数。但是,如果有问题的数据量很大,我们首先会浪费网络带宽将该数据发送到 NiFi。相反,我们要做的是将额外的逻辑推送到边缘,以识别和过滤这些问题,并避免将它们发送到 NiFi 的开销。
我们注意到,问题总是发生在测量中的温度sensor_0和sensor_1,只有。如果这两个温度中的任何一个大于 500,我们必须丢弃整个传感器读数。如果这两个温度都在正常范围内(< 500),我们可以保证报告的所有温度都是正确的,并且可以发送到 NiFi。
- 转至 CEM Web UI 并将新处理器添加到画布。在出现的对话框的过滤器框中,键入“JsonPath”。选择EvaluateJSONPath处理器并单击Add
- 双击新处理器并使用以下属性对其进行配置:
Processor Name: Extract sensor_0 and sensor1 values
Destination: flowfile-attribute
- 单击Add Property按钮并输入以下属性:
Property Name | Property Value |
---|---|
sensor_0 | $.sensor_0 |
sensor_1 | $.sensor_1 |
- 单击Apply以保存处理器配置。
- 将一个新的处理器拖到画布上。在出现的对话框的过滤器框中,键入“Routeon”。选择RouteOnAttribute处理器并单击Add。
- 双击新处理器并使用以下属性对其进行配置:
Processor Name: Filter Errors
Route Strategy: Route to Property name
- 单击Add Property按钮并输入以下属性:
Property Name | Property Value |
---|---|
error | ${sensor_0:ge(500):or(${sensor_1:ge(500)})} |
- 单击Apply以保存处理器配置。
- 将ConsumeMQTT处理器重新连接到Extract sensor_0 and sensor1 values处理器:
- 单击ConsumeMQTT和RPG之间的现有连接以将其选中。
- 将连接的目标端拖动到Extract sensor_0 and sensor1 values处理器。
- 将Extract sensor_0 and sensor1 values连接到Filter Errors处理器。当Create Connection对话框出现时,选择“ matched ”并点击Create。
- 双击连接,更新以下配置并Apply更改:
Flowfile Expiration:60 seconds
Back Pressure Object Threshold: 10000
Connection Name:Extracted attributes
- 双击Extract sensor_0 and sensor1 values并复选AUTOMATICALLY TERMINATED RELATIONSHIPS部分中的以下值,然后单击Apply:
failure
unmatched
- 在创建最后一个连接之前,您将需要(再次) NiFi Input Port的 ID 。转到 NiFi Web UI,双击“from Gateway”输入端口并复制其 ID。
- 返回 CEM Web UI,将Filter Errors处理器连接到 RPG:
- 在Create Connection对话框中,选中“ unmatched ”复选框并输入复制的输入端口 ID,然后单击Add:
- 双击连接,更新以下配置并Apply更改:
Flowfile Expiration:60 seconds
Back Pressure Object Threshold: 10000
Connection Name:Valid data
- 要忽略错误,请双击Filter Errors处理器,选中自动终止关系部分下的error复选框,然后单击应用:
- 最后,在 CEM 画布上单击ACTIONS > Publish... ,输入描述性注释,例如“添加了错误读数的过滤”,然后单击Publish。
- 再次启动模拟器。
- 转到 NiFi Web UI 并确认数据正在流向 NiFi。检查消息的内容,就像我们之前所做的那样,确认有问题的读数已经消失。
验证数据后停止模拟器。