Edge2AI之使用 SQL 查询流

2022-04-27 16:59:23 浏览数 (1)

在本次实验中,您将在 Cloudera SQL Stream Builder使用 SQL 语言查询和操作数据流。SQL Stream Builder 是一项功能强大的服务,使您无需编写 Java/Scala 代码即可创建 Flink 作业。

您将从包含温度传感器数据点流的先前实验中创建和填充的iot_enriched主题中获取数据。

准备

本次实验以Edge和Nifi实验中开发的内容为基础。

实验总结

  • 实验 1 - 创建数据提供者
  • 实验 2 - 为包含 JSON 消息的主题创建表
  • 实验 3 - 将 SQL Stream Builder 与Schema Registry集成
  • 实验 4 - 计算和存储聚合结果
  • 实验 5 - 物化视图

实验 1 - 创建数据提供者

让我们从一个简单的目标开始:使用 SQL 查询iot_enriched主题的内容,以检查正在流式传输的数据。尽管很简单,但此任务将展示 SQL Stream Builder (SSB) 的易用性和强大功能。

在开始从 Kafka 主题查询数据之前,您需要将 Kafka 集群注册为SSB 中的数据源。

  1. 在 Cloudera Manager 控制台上,单击左上角的 Cloudera 徽标以确保您在主页上,然后单击SQL Stream Builder服务。
  2. 单击SQLStreamBuilder 控制台链接以打开 SSB UI。
  1. 在登录屏幕上,使用用户admin和密码进行身份验证supersecret1。
  1. 您会注意到 SSB 已经有一个注册为 Kafka Providers的Data Providers. 当 SSB 安装在也有 Kafka 服务的集群上时,会自动为 SSB 创建此提供程序:
  1. 您可以使用此屏幕将其他外部 Kafka 集群作为数据提供者添加到 SSB。在本实验中,您将使用不同的主机名添加第二个数据提供者,以展示它的简单性。
  2. 单击Register Kafka Provider并在Add Kafka Provider窗口中,输入新数据源的详细信息,然后单击Save changes
代码语言:javascript复制
Name:                           edge2ai-kafka
Brokers:                        <CLUSTER_HOSTNAME>:9092
Connection protocol:            PLAINTEXT

实验 2 - 为包含 JSON 消息的Topic创建表

现在您可以将iot_enriched Topic映射到 SQL Stream Builder 中的表。SSB 中的表是一种将 Kafka Topic与Schema相关联的方法,以便您可以在 SQL 查询中使用它。

  1. 要创建您的第一个表,请单击Console(左侧栏)> Table > Add Table > Apache Kafka
  1. Kafka窗口中,输入以下信息:
代码语言:javascript复制
Table name: iot_enriched
Kafka Cluster:      edge2ai-kafka
Topic Name:         iot_enriched
Data Format:        JSON
  1. 确保选择了Schema选项卡。滚动到选项卡底部,然后单击Detect Schema。SSB 将抽取流经主题的数据样本,并推断用于解析内容的Schema。或者,您也可以在此选项卡中指定Schema。
  1. 如果您需要操作源数据来修复、清理或转换某些值,您可以为表定义转换。转换是在 Javascript 代码中定义的。

从 Kafka 读取的序列化记录提供给record变量中的 Javascript 代码。转换代码的最后一个命令必须返回修改记录的序列化内容。

iot_enriched主题中的数据具有以微秒表示的时间戳。您需要将此字段转换为毫秒。让我们编写一个转换来为我们执行该转换。

单击Transformations选项卡并在代码框中输入以下代码:

代码语言:javascript复制
// parse the JSON record
var parsedVal = JSON.parse(record.value);
// Convert sensor_ts from micro to milliseconds
parsedVal['sensor_ts'] = Math.round(parsedVal['sensor_ts']/1000);
// serialize output as JSON
JSON.stringify(parsedVal);
  1. 现在您已将该sensor_ts字段转换为毫秒,您可以告诉 SSB 将其用作事件时间的来源,该时间将用于为您的查询定义聚合窗口。

为此,请单击事件时间选项卡并配置以下属性:

代码语言:javascript复制
Use Kafka Timestamps:   Uncheck it
Input Timestamp Column: sensor_ts
Event Time Column:      event_time
Watermark Seconds:      3

这会将event_time列添加到表中。此列具有TIMESTAMP ROWTIME数据类型,并且派生自sensor_ts列的值。

  1. 单击Properties选项卡,为Consumer Group属性输入以下值,然后单击Save changes
代码语言:javascript复制
Consumer Group: ssb-iot-1

笔记

为虚拟表设置消费者组属性将确保如果您停止查询并稍后重新启动它,第二个查询执行将继续从第一个查询停止的点读取数据,而不会跳过数据。但是,如果多个查询使用同一个虚拟表,设置此属性将有效地将数据分布在查询中,以便每个记录仅由单个查询读取。如果要与多个不同查询共享虚拟表,请确保未设置 Consumer Group 属性。

  1. 让我们查询新创建的表以确保一切正常。单击>_Console > Compose > SQL并键入以下查询:
代码语言:javascript复制
SELECT
event_time,
sensor_id,
sensor_ts,
is_healthy,
sensor_0,
sensor_1
FROM
iot_enriched
  1. 点击Execute。几秒钟后,您应该会在结果面板上看到来自主题的数据:
  1. 单击Stop以停止作业并释放查询使用的所有集群资源。您可以通过单击SQL 作业选项卡来仔细检查所有查询/作业是否已停止。如果任何作业仍在运行,您可以从该页面停止它们。

实验 3 - 将 SQL Stream Builder 与Schema Registry集成

SQL Stream Builder 与Schema Registry的集成自动将存储在注册表中的Schema公开为 SSB 中的表。Schema Registry 中的 schema 名称必须与Kafka中相应的主题名称匹配。

在本实验中,您将在 SSB 中将 Schema Registry 注册为Catalog,以便您可以自动读取iot_enriched_avro以 AVRO 格式存储的主题内容。

  1. 转到以下 URL,其中包含iot_enriched_avro主题中数据的Schema定义。选择并复制页面内容。

https://raw.githubusercontent.com/cloudera-labs/edge2ai-workshop/master/sensor.avsc

  1. 在 Schema Registry Web UI 中,单击 符号以注册新的Schema。
  2. 单击Schema Text字段中的空白区域并粘贴您复制的内容。
  3. 通过填写以下属性完成Schema创建并保存Schema。
代码语言:javascript复制
Name:          iot_enriched_avro
Description:   Schema for the data in the iot_enriched_avro topic
Type:          Avro schema provider
Schema Group:  Kafka
Compatibility: Backward
Evolve:        checked
  1. 返回 SQL Stream Builder 页面,单击Data Providers(在左侧栏)> ( ) Register Catalog
  1. 添加Catalog屏幕中,输入以下详细信息:
代码语言:javascript复制
Name:                sr
Catalog Type:        Schema Registry
Kafka Cluster:       edge2ai-kafka
Schema Registry URL: http://<CLUSTER_HOSTNAME>:7788/api/v1
Enable TLS:          No
  1. 单击添加过滤器按钮并为过滤器输入以下配置:
代码语言:javascript复制
Database Filter: .*
Table Filter:    iot.*
  1. 点击Validate。如果配置正确,您应该会看到消息“Validation was successful, 1 table found.”
  1. 单击Add Tables
  2. Tables屏幕上,您现在应该看到从 Schemar Registry 导入的表的列表。
  1. 查询导入的表以确保其正常工作。

单击>_Console > Compose > SQL并键入以下查询:

代码语言:javascript复制
SELECT *
FROM `sr`.`default_database`.`iot_enriched_avro`
  1. 点击Execute。几秒钟后,您应该会在“Result”面板上看到来自该主题的数据。
  1. 单击停止以停止作业并释放查询使用的所有集群资源。您可以通过单击SQL 作业选项卡来仔细检查所有查询/作业是否已停止。如果任何作业仍在运行,您可以从该页面停止它们。

实验 4 - 计算和存储聚合结果

现在您已经运行了一些基本查询并确认您的表工作正常,您希望开始计算传入数据流的聚合并将结果提供给下游应用程序。

SQL Stream Builder 的表使我们能够将流数据发布/存储到几种不同的服务(Kafka、AWS S3、Google GCS、Kudu、HBase 等......)。

在本实验中,您将使用另一个 Kafka 表将聚合结果发布到另一个 Kafka 主题。

  1. 让我们首先创建一个主题 ( sensor6_stats) 来发布您的聚合结果:
  • 导航到 SMM UI(Cloudera Manager > SMM service > Streams Messaging Manager Web UI)。
  1. 在 SMM UI 上,单击主题选项卡 ( )。
  2. 单击Add New按钮。
  3. 输入主题的以下详细信息,并在准备好后单击保存:
代码语言:javascript复制
Topic name: sensor6_stats
Partitions: 10
Availability: Low
Cleanup Policy: delete
  1. 在 SSB UI 上,单击Console(在左侧栏上)> Compose > SQL
  2. 输入SQL 作业名称Sensor6Stats字段。
  3. SQL框中键入如下所示的查询。

此查询将计算每秒向前滑动的 30 秒窗口内的聚合。对于记录 ( sensor_6) 中的特定传感器值,它为每个窗口计算以下聚合:

    1. 收到的事件数
    2. sensor_6所有事件的值的总和
    3. sensor_6所有事件的平均值
    4. sensor_6字段的最小值和最大值
    5. sensor_6值超过的事件数70
代码语言:javascript复制
INSERT INTO sensor6stats
SELECT
sensor_id as device_id,
HOP_END(event_time, INTERVAL '1' SECOND, INTERVAL '30' SECOND) as windowEnd,
count(*) as sensorCount,
sum(sensor_6) as sensorSum,
avg(cast(sensor_6 as float)) as sensorAverage,
min(sensor_6) as sensorMin,
max(sensor_6) as sensorMax,
sum(case when sensor_6 > 70 then 1 else 0 end) as sensorGreaterThan60
FROM iot_enriched
GROUP BY
sensor_id,
HOP(event_time, INTERVAL '1' SECOND, INTERVAL '30' SECOND)
  1. 但是,在执行此查询之前,sensor6stats必须在 SSB 中创建表,并将其映射到sensor6_statsKafka 主题。

由于我们希望主题格式为 JSON,因此单击Templates > local-kafka > JSON

这将CREATE TABLE在 SQL 脚本前添加一个 DDL,以创建与查询结构匹配的表!

  1. 大多数表格属性已经为您填写好了。但是在执行语句之前必须编辑一个:topic属性和scan.statup.mode属性。

编辑 DDL 语句并将属性topic的…值替换为实际的主题名称:sensor6_stats。将scan.statup.mode属性的值设置为latest-offset

  1. 单击执行
  2. 滚动到页面底部,您将看到查询执行生成的日志消息。
  1. 几秒钟后,SQL 控制台将开始显示聚合查询的结果。

请注意,屏幕上显示的数据只是查询返回的数据的样本,而不是完整的数据。

  1. 通过单击控制台(左侧栏)> SQL 作业选项卡检查作业执行详细信息和日志。探索此屏幕上的选项:
  2. 点击Sensor6Stats作业。
  3. 单击“详细信息”选项卡以查看作业详细信息。
  4. 单击“日志”选项卡以查看作业执行生成的日志消息。
  1. 单击Flink Dashboard链接以在 Dashboard 上打开作业页面。导航仪表板页面以探索作业执行的详细信息和指标。
  1. 让我们查询该sensor6_stats主题以检查正在写入其中的数据。您已经在上一步中使用CREATE TABLE DDL创建了sensor6stats表并将其映射到该主题。

现在您只需要查询同一张表。

返回 SSB UI,单击New job以清除 SQL Compose 字段。

笔记

该Sensor6Stats作业将继续在后台运行。您可以通过SQL Jobs页面对其进行监控和管理。

  1. 在 SQL 字段中输入以下查询并执行它:
代码语言:javascript复制
SELECT *
FROM sensor6stats
  1. 几秒钟后,您应该会看到sensor6_stats屏幕上显示的主题内容:
  1. 您需要让Sensor6Stats作业继续运行才能在下一个实验中使用它。确保停止所有其他作业以释放集群资源。

实验 5 - 物化视图

SQL Stream Builder 还可以获取数据流的键控快照,并通过 REST 接口以实体化视图的形式提供这些快照。在本实验中,您将创建和查询物化视图 (MV)。

您将在上一个实验中创建的查询之上定义 MV。在执行以下步骤之前确保查询正在运行。

  1. Console_ > SQL Jobs选项卡上,验证Sensor6Stats作业是否正在运行。选择作业并单击编辑选定作业按钮。
  1. 为了将物化视图添加到查询中,需要停止作业。在作业页面上,单击停止按钮以暂停作业。
  1. 由于您已经sensor6stats在第一次执行此作业时创建了表,因此您将不再需要该CREATE TABLE语句。事实上,如果您尝试使用其中的CREATE TABLE语句再次运行该作业,它将失败,因为该表已经存在。

编辑 SQL Compose 字段以删除整个CREATE TABLE语句。该字段中唯一剩下的应该是实际的INSERT … SELECT语句,如下所示:

  1. 单击Materialize View选项卡并设置以下属性:
代码语言:javascript复制
Materialized View:     Enabled
Primary Key:           device_id
Retention:             300
Ignore NULLs:          False
  1. 要创建 MV,您需要有一个 API 密钥。API 密钥是提供给客户端的信息,以便他们可以访问 MV。如果您有多个 MV 并希望它们被不同的客户端访问,您可以拥有多个 API 密钥来控制对不同 MV 的访问。

如果您已经在 SSB 中创建了 API Key,您可以从下拉列表中选择它。否则,通过单击上面显示的“添加 API Key”按钮在现场创建一个。用作ssb-lab键名。

  1. 单击添加查询以创建新的 MV。您将创建一个视图,显示sensor6在最后记录的 30 秒窗口中至少有 1 次读数高于 60 的所有设备。为此,在 MV 查询配置页面中输入以下参数:
代码语言:javascript复制
URL Pattern:   above60
Query Builder: <click "Select All" to add all columns>
Filters:       sensorGreatThan60  greater  0
  1. 单击保存更改
  1. 返回SQL选项卡并单击执行以开始作业。
  2. Materialized Views选项卡上,复制屏幕上显示的新 MV URL 并在新的浏览器选项卡中打开它(或直接单击 URL 链接)。您将看到 MV 当前快照的内容。

如果您刷新页面几次,您会注意到 MV 快照随着新数据点通过流而更新。

SSB 为定义的主键的每个值保留数据的最后状态。

带参数的物化视图

您在上面创建的 MV 没有参数;当您调用 REST 端点时,它总是返回 MV 的完整内容。可以为 MV 指定参数,以便在查询时过滤内容。

在本节中,您将创建一个允许通过指定sensorAverage列的范围进行过滤的新 MV。

  1. 首先,再次停止作业,以便您可以添加另一个 MV。
  2. Materialized Views选项卡上,单击Add Query按钮以创建一个新的 MV,输入以下参数并单击Save Changes
代码语言:javascript复制
URL Pattern:   above60withRange/{lowerTemp}/{upperTemp}
Query Builder: <click "Select All" to add all columns>
Filters:       sensorGreatThan60  greater           0
AND
sensorAverage      greater or equal  {lowerTemp}
AND
sensorAverage      less or equal     {upperTemp}
  1. 你会注意到这个 MV 的新 URL 有{lowerTemp}和{upperTemp}参数的占位符:
  1. 返回SQL选项卡并再次执行作业。
  2. 单击Materialise Views选项卡,然后单击您刚刚创建的 MV 的链接然后在另一个tab打开,修改其中的参数值。
  1. 验证sensorAverageMV 中字段的值是否都必须在您指定的范围内。
  2. 尝试更改值范围以验证过滤器是否按预期工作。
  3. 完成实验后,单击SQL Jobs选项卡并停止所有作业以释放集群资源。

结论

您现在已经从一个主题中获取数据,计算了汇总结果并将其写入另一个主题。为了验证这是否成功,您使用独立的选择查询选择了结果。最后,您为其中一项作业创建了物化视图,并通过它们的 REST 端点查询了这些视图。

0 人点赞