Edge2AI之NiFi 和流处理

2022-04-27 16:56:29 浏览数 (1)

在本次实验中,您将实施一个数据管道来处理之前从边缘捕获的数据。您将使用 NiFi 将这些数据摄取到 Kafka,然后使用来自 Kafka 的数据并将其写入 Kudu 表。

准备

该实验以Edge Workshop中开发的内容为基础。

实验总结

  • 实验 1 - 在 Schema Registry 上,注册描述 IoT 传感器生成的数据的Schema。
  • 实验 2 - 在 NiFi 集群上,准备数据并将其发送到Kafka集群。
  • 实验 3 - 在Streams Messaging Manager (SMM) Web UI 上,监控 Kafka 集群并确认数据被正确摄取。
  • 实验 4 - 使用 NiFi 处理每条记录,调用Model 端点并将结果保存到Kudu
  • 实验 5 - 检查 Kudu 上的数据。

实验 1 - 在Schema Registry中注册Schema

温度传感器产生的数据由文件中的模式描述sensor.avsc。在本实验中,我们将在 Schema Registry 中注册此Schema,以便我们在 NiFi 中的流可以使用统一服务引用Schema。这也将允许我们在未来Schema发送变化,如果需要的话,将旧版本保持在版本控制之下,以便现有的流和流文件将继续工作。

  1. 转到以下 URL,其中包含我们将用于本实验的架构定义。选择页面的所有内容并复制它。
  2. https://raw.githubusercontent.com/cloudera-labs/edge2ai-workshop/master/sensor.avsc
  3. 在 Schema Registry Web UI 中,单击 符号以注册新Schema。
  4. 单击Schema Text字段中的空白区域并粘贴您复制的内容。
  5. 通过填写以下属性完成Schema创建并保存Schema。
代码语言:javascript复制
Name:          SensorReading
Description:   Schema for the data generated by the IoT sensors
Type:          Avro schema provider
Schema Group:  Kafka
Compatibility: Backward
Evolve:        checked

实验 2 - 配置 NiFi 流并将数据推送到 Kafka

在本实验中,您将创建一个 NiFi 流来接收来自网关所有的数据并将其推送到Kafka

创建处理组

在开始构建流程之前,让我们创建一个处理组来帮助组织 NiFi 画布中的流程并启用流程版本控制。

  • 打开 NiFi Web UI,创建一个新的 Process Group 并将其命名为Process Sensor Data
  • 我们希望能够对我们将添加到处理组的流程进行版本控制。为此,我们首先需要将 NiFi 连接到NiFi Registry。在 NiFi 全局菜单上,单击“Control Settings”,导航到“Registry Clients”选项卡并添加具有以下 URL 的注册表客户端:
代码语言:javascript复制
Name: NiFi Registry
URL:  http://<CLUSTER_HOSTNAME>:18080
  • NiFi Registry Web UI 上,添加另一个存储桶来存储我们即将构建的 Sensor 流。称它为SensorFlows:

返回NiFi Web UI,为处理组启用版本控制,右键单击它并选择Version > Start version control并输入下面的详细信息。完成后,处理组上将出现一个 ,表示现在已为其启用版本控制。

代码语言:javascript复制
Registry:  NiFi Registry
Bucket:    SensorFlows
Flow Name: SensorProcessGroup
  • 我们还让这个处理组中的处理器能够使用存储在Schema Registry中的Schema。右键单击处理组,选择配置并导航到Controller Services选项卡。单击该 图标并添加HortonworksSchemaRegistry服务。添加服务后,单击服务的配置图标 ( ),转到Properties选项卡并使用以下Schema Registry URL对其进行配置,然后单击Apply
代码语言:javascript复制
URL:http://<CLUSTER_HOSTNAME>:7788/api/v1
  • 单击闪电图标 ( ) 以启用HortonworksSchemaRegistry控制器服务。
  • 仍然在Controller Services屏幕上,让我们添加两个额外的服务来处理 JSON 记录的读取和写入。单击按钮并添加以下两个服务:
    1. JsonTreeReader,具有以下属性:
代码语言:javascript复制
Schema Access Strategy: Use 'Schema Name' Property
Schema Registry:        HortonworksSchemaRegistry
Schema Name:            ${schema.name} -> already set by default!
    1. JsonRecordSetWriter,具有以下属性:
代码语言:javascript复制
Schema Write Strategy:  HWX Schema Reference Attributes
Schema Access Strategy: Use 'Schema Name' Property
Schema Registry:        HortonworksSchemaRegistry
  • 通过单击它们各自的闪电图标 ( ) ,启用您刚刚创建的JsonTreeReaderJsonRecordSetWriter控制器服务。

创建流程

  1. 双击新创建的处理组将其展开。

在处理组内,添加一个新的Input Port并将其命名为“Sensor Data”。在Receive From字段中,选择Local connections

  1. 我们需要告诉 NiFi 应该使用哪个模式来读取和写入 Sensor Data。为此,我们将使用UpdateAttribute处理器向 FlowFile 添加一个属性,指示模式名称。

通过将处理器图标拖到画布上来添加UpdateAttribute处理器:

  1. 双击UpdateAttribute处理器并进行如下配置:
  • Settings选项卡中:
代码语言:javascript复制
Name:Set Schema Name
  • 在“Properties”选项卡中,单击按钮并添加以下属性:
代码语言:javascript复制
Property Name:  schema.name
Property Value: SensorReading
  • 单击Apply
  1. Sensor Data输入端口连接到Set Schema Name处理器。
  1. 添加一个PublishKafkaRecord_2.6处理器并配置如下:
  • Settings选项卡:
代码语言:javascript复制
Name: Publish to Kafka topic: iot
  • Properties选项卡:
代码语言:javascript复制
Kafka Brokers:                         <CLUSTER_HOSTNAME>:9092
Topic Name:                            iot
Record Reader:                         JsonTreeReader
Record Writer:                         JsonRecordSetWriter
Use Transactions:                      false
Attributes to Send as Headers (Regex): schema.*

笔记

确保您使用的是 PublishKafkaRecord_2.6 处理器而不是PublishKafka_2.6 处理器

  1. 在PublishKafkaRecord_2.6处理器的PROPERTIES选项卡中,单击按钮并添加以下属性:
代码语言:javascript复制
Property Name:  client.id
Property Value: nifi-sensor-data

稍后,这将帮助我们清楚地识别谁在将数据生成到 Kafka 主题中。

  1. 将Set Schema Name处理器连接到Publish to Kafka topic: iot处理器。
  2. 在画布上添加一个新的漏斗并将 PublishKafkaRecord 处理器连接到它。当“创建连接”对话框出现时,选择“Failure”并单击Add
  1. 双击Publish to Kafka topic: iot processor,进入SETTINGS选项卡,勾选AUTOMATICALLY TERMINATED RELATIONSHIPS部分中的“ success ”关系。单击应用
  1. 回到NiFi Flow页面,将From Gataway的输入端口的数据连接到漏洞的连接去掉,使得数据可以进入到Process Sensor Data组。(先stop,然后Empty Queue,再删除connection,最后再将From Gateway连接到Process Sensor Data组)
  1. 启动输入端口和两个处理器。在NiFi Flow画布全选,然后点击Play按钮,将所有的处理器和输入端口启动。您的画布现在应该如下所示:
  1. 刷新屏幕(Ctrl R在 Linux/Windows 上;Cmd R在 Mac 上),您应该会看到在“from Gateway”输入端口上排队的记录消失了。它们流入Process Sensor Data流。如果您展开 Process Group,您应该会看到这些记录已由PublishKafkaRecord处理器处理,并且“Failure”输出队列中应该没有排队的记录。

此时,消息已经在 Kafka 主题中。您可以根据需要添加更多处理器来处理、拆分、复制或重新路由您的 FlowFile 到所有其他目的地和处理器。

  1. 为了完成这个实验,让我们提交和版本化我们刚刚完成的工作。返回 NiFi 根画布,单击“Nifi Flow”面包屑。右键单击Process Sensor Data Process Group 并选择Version > Commit local changes。输入描述性注释并保存。

实验 3 - 使用 SMM 确认数据正确流动

现在我们的 NiFi 流程正在将数据推送到 Kafka,最好确认一切都按预期运行。在本实验中,您将使用 Streams Messaging Manager (SMM) 检查和监控 Kafka。

  1. 再次启动NiFi ExecuteProcess模拟器并确认您可以看到 NiFi 中排队的消息。让它运行。
  2. 转到 Stream Messaging Manager (SMM) Web UI 并熟悉那里的选项。注意屏幕顶部的过滤器(蓝色框)。
  1. 单击Producers过滤器并仅选择nifi-sensor-data生产者。这将隐藏所有不相关的主题,只显示生产者正在写的主题。
  1. 如果您改为按Topics过滤并选择iot主题,您将能够分别看到正在写入和读取的所有生产者消费者。由于我们还没有实现任何消费者,消费者列表应该是空的。
  1. 单击该主题以探索其详细信息。您可以查看更多详细信息、指标和每个分区的细分。单击其中一个分区,您将看到其他信息以及哪些生产者和消费者与该分区进行交互。
  1. 单击EXPLORE链接以可视化特定分区中的数据。确认 Kafka 主题中有数据,并且看起来像传感器模拟器生成的 JSON。
  1. 再次停止NiFi ExecuteProcess模拟器。

实验 4 - 使用 NiFi 调用 CDSW 模型端点并保存到 Kudu

在本实验中,您将使用 NiFi 消费包含我们在上一个实验中摄取的 IoT 数据的 Kafka 消息,调用 CDSW 模型 API 端点来预测读取数据的机器是否可能发生故障。

为了准备实验,我们在集群上运行的 Cloudera Data Science Workbench (CDSW) 上训练并部署了一个机器学习模型。模型 API 可以获取传感器提供的 12 个温度读数的特征向量,并根据该向量预测机器是否可能发生故障。

CDSW 模型 API 和访问密钥

在您将在本实验中构建的流程中,您将使用一些处理器/控制器服务将引用的变量:

  • Predict 机器运行状况处理器的request.body属性引用了一个名为cdsw.access.key
  • RestLookupService控制器服务的Authorization属性引用了一个名为cdsw.model.api.key

这些变量指定访问在 CDSW 中运行的机器学习模型所必需的键。按照以下步骤从 CDSW 检索密钥并在 NiFi 中设置变量及其值。

  1. 要获取Access key,请转到 CDSW Web UI 并单击Model > IoT Prediction Model > Settings。复制访问密钥。
  1. 返回 NiFi Web UI,右键单击Process Sensor Data画布的空白区域,然后单击Variables
  1. 单击加号按钮 ( ) 并添加以下变量:
代码语言:javascript复制
Variable Name:  cdsw.access.key
Variable Value: <access Key copied from CDSW>
  1. 要获取模型 API Key,请单击 Cloudera Data Science Workbench 图标(左上角)转到主页,然后单击Home > User Settings > API Keys→Create API Key。复制模型 API Key。
  1. 返回 NiFi Web UI,右键单击Process Sensor Data画布的空白区域,然后单击Variables
  2. 单击加号按钮 ( ) 并添加以下变量:
代码语言:javascript复制
Variable Name:  cdsw.model.api.key
Variable Value: <key copied from CDSW>
  1. 单击Apply

添加新的控制器服务

当传感器数据使用PublishKafkaRecord处理器发送到 Kafka 时,我们选择在 Kafka 消息的标头中附加模式信息。现在,我们可以利用元数据为每条消息动态加载正确的模式,而不是硬编码我们应该使用哪个模式来读取消息。

但是,要做到这一点,我们需要配置一个不同的JsonTreeReader,它将使用标头中的模式属性,而不是${schema.name}像以前那样使用属性。

我们还将添加一个新的RestLookupService控制器服务来执行对 CDSW 模型 API 端点的调用。

  1. 如果您不在Process Sensor Data处理组中,请双击它以展开它。在操作面板(左侧)上,单击齿轮图标 ( ) 以访问Process Sensor Data处理组的配置页面。
  1. 点击加号按钮(),添加一个新的JsonTreeReader,配置如下,完成后点击Apply

Settings选项卡上:

代码语言:javascript复制
Name: JsonTreeReader - With schema identifier

Properties选项卡上:

代码语言:javascript复制
Schema Access Strategy: HWX Schema Reference Attributes
Schema Registry:        HortonworksSchemaRegistry
  1. 单击闪电图标 ( ) 以启用JsonTreeReader - With schema identifier controller 服务。
  2. 再次点击加号按钮(),添加一个RestLookupService控制器服务,如下图配置,完成后点击应用

Properties选项卡上:

代码语言:javascript复制
URL:http://modelservice.cdsw.<YOUR_CLUSTER_PUBLIC_IP>.nip.io/model
Record Reader: JsonTreeReader
Record Path:   /response

笔记

<YOUR_CLUSTER_PUBLIC_IP>以上必须替换为您的集群的公共 IP,而不是DNS 名称。最终 URL 应如下所示:http://modelservice.cdsw.12.34.56.78.nip.io/model

  1. 通过单击加号按钮 ( ) 添加另一个用户定义的属性:
代码语言:javascript复制
Authorization: Bearer ${cdsw.model.api.key}
  1. 单击闪电图标 ( ) 以启用RestLookupService控制器服务。
  1. 关闭Process Sensor Data Configuration页面。

创建流

我们现在将创建流程以从 Kafka 读取传感器数据,为每个传感器执行模型预测并将结果写入 Kudu。在本节结束时,您的流程应如下所示:

ConsumeKafkaRecord_2_6处理器
  1. 我们将向我们之前使用的同一画布添加一个新流(在Process Sensor Data Process Group 内)。单击画布的空白区域并将其拖到一边,以便为您提供更多空间来添加新处理器。
  2. 在画布上添加一个ConsumeKafkaRecord_2_6处理器并进行如下配置:

Settings选项卡:

代码语言:javascript复制
Name: Consume Kafka iot messages

Properties选项卡:

代码语言:javascript复制
Kafka Brokers:                        <CLUSTER_HOSTNAME>:9092
Topic Name(s):                        iot
Topic Name Format:                    names
Record Reader:                        JsonTreeReader - With schema identifier
Record Writer:                        JsonRecordSetWriter
Honor Transactions:                   false
Group ID:                             iot-sensor-consumer
Offset Reset:                         latest
Headers to Add as Attributes (Regex): schema.*
  1. 在画布上添加一个新的Funnel并将Consume Kafka iot messages连接到它。出现提示时,复选此连接的parse.failure关系:
LookupRecord 处理器
  1. LookupRecord处理器添加到画布并进行如下配置:

Settings选项卡:

代码语言:javascript复制
Name: Predict machine health

Properties选项卡:

代码语言:javascript复制
Record Reader:          JsonTreeReader - With schema identifier
Record Writer:          JsonRecordSetWriter
Lookup Service:         RestLookupService
Result RecordPath:      /response
Routing Strategy:       Route to 'success'
Record Result Contents: Insert Entire Record
  1. 通过单击每个属性的加号按钮 ( ) 添加另外 3 个用户定义属性:
代码语言:javascript复制
mime.type:      toString('application/json', 'UTF-8')
request.body:   concat('{"accessKey":"', '${cdsw.access.key}', '","request":{"feature":"', /sensor_0, ', ', /sensor_1, ', ', /sensor_2, ', ', /sensor_3, ', ', /sensor_4, ', ', /sensor_5, ', ', /sensor_6, ', ', /sensor_7, ', ', /sensor_8, ', ', /sensor_9, ', ', /sensor_10, ', ', /sensor_11, '"}}')
request.method: toString('post', 'UTF-8')

单击Apply以将更改保存到Predict machine health处理器。

  1. Consume Kafka iot messages处理器连接到Predict machine health。出现提示时,复选此连接的success关系。
  2. Predict machine health连接到您在上面创建的同一漏斗。出现提示时,复选此连接的failure关系。
UpdateRecord处理器
  1. UpdateRecord处理器添加到画布并进行如下配置:

Settings选项卡:

代码语言:javascript复制
Name: Update health flag

Properties选项卡:

代码语言:javascript复制
Record Reader:              JsonTreeReader - With schema identifier
Record Writer:              JsonRecordSetWriter
Replacement Value Strategy: Record Path Value
  1. 通过单击加号按钮 ( ) 添加另一个用户定义的属性:

/is_healthy: /response/result

  1. 将Predict machine health处理器连接到Update health flag之一。出现提示时,复选此连接的success关系。
  2. Update health flag连接到您在上面创建的同一funnel。出现提示时,复选此连接的failure关系。
创建 Kudu 表

在下一部分中,您将在 NiFi 中配置PutKudu处理器以将数据写入 Kudu 表。在配置该处理器之前,让我们创建 Kudu 表。

笔记

如果您已经在之前的实验中创建了此表,则可以跳过以下创建步骤。

  1. 转到 Hue Web UI 并登录。第一个登录到 Hue 安装的用户会自动创建并在 Hue 中授予管理员权限。
  2. 默认情况下,Hue UI 应该使用 Impala 查询编辑器打开。如果没有,您始终可以通过单击Query 按钮 > Editor → Impala找到它:
  1. 创建 Kudu 表。

登录到 Hue,然后在Impala 查询编辑器中,运行以下语句:

代码语言:javascript复制
CREATE TABLE sensors
(
sensor_id INT,
sensor_ts BIGINT,
sensor_0 DOUBLE,
sensor_1 DOUBLE,
sensor_2 DOUBLE,
sensor_3 DOUBLE,
sensor_4 DOUBLE,
sensor_5 DOUBLE,
sensor_6 DOUBLE,
sensor_7 DOUBLE,
sensor_8 DOUBLE,
sensor_9 DOUBLE,
sensor_10 DOUBLE,
sensor_11 DOUBLE,
is_healthy INT,
PRIMARY KEY (sensor_id, sensor_ts)
)
PARTITION BY HASH PARTITIONS 16
STORED AS KUDU
TBLPROPERTIES ('kudu.num_tablet_replicas' = '1');
  1. 当您sensors在 Impala 中创建表时,Impala 创建的后端 Kudu 表的名称可以非常取决于您的 CDP 集群的确切版本。您需要知道在下一节中配置PutKudu处理器时要使用的表的确切名称。

您可以在 Hue 的表格浏览器中找到 Kudu 表的名称。

单击左侧的表浏览器default图标并导航到数据库。单击sensors表并打开其详细信息选项卡。

记下 Kudu的表名。

PutKudu 处理器
  1. 在画布上添加一个PutKudu处理器并进行如下配置:

设置选项卡:

代码语言:javascript复制
Name: Write to Kudu

属性选项卡:

代码语言:javascript复制
Kudu Masters:  <CLUSTER_HOSTNAME>:7051
Table Name:    <KUDU_TABLE_NAME (see previous section)>
Record Reader: JsonTreeReader - With schema identifier
  1. 更新运行状况标志处理器连接到写入 Kudu处理器。出现提示时,复选此连接的success关系。
  2. 将写入 Kudu连接到您在上面创建的同一漏斗。出现提示时,复选此连接的Failure关系。
  3. 双击写入 Kudu处理器,转到Settings 选项卡,复选自动终止关系部分中的“success”关系。单击应用
运行流程

我们现在已经准备好运行和测试我们的流程了。请按照以下步骤操作:

  1. 启动流程中的所有处理器。
  2. 刷新您的 NiFi 页面,您应该会看到消息通过您的流程。失败队列应该没有排队的记录。

实验 5 - 检查 Kudu 上的数据

在本实验中,您将使用 Impala 引擎运行一些 SQL 查询,并验证 Kudu 表是否按预期更新。

  1. 登录到 Hue 并在 Impala 查询编辑器中运行以下查询:
代码语言:javascript复制
SELECT count(*)
FROM sensors;
SELECT *
FROM sensors
ORDER by sensor_ts DESC
LIMIT 100;
  1. 运行查询几次并验证传感器读数的数量随着数据被摄取到 Kudu 表中而增加。这使您可以构建实时报告以便快速采取行动。

select is_healthy,count(*) from sensors group by is_healthy

0 人点赞