用 Apache NiFi、Kafka和 Flink SQL 做股票智能分析

2021-11-28 11:37:55 浏览数 (2)

原文翻译自 DZone,根据原文意译。

作者使用了 Cloudera 私有云构建,架构图如下:

股票智能分析股票智能分析

本文是关于如何在实时分析中使用云原生应用程序对股票数据进行连续 SQL 操作的教程。

我想使用 Apache NiFi 读取 REST API 来频繁地跟踪一些公司的股票。之后我得到一些数据流分析要使用 Apache Flink SQL 执行,最后使用 Apache Impala 查询 Apache Kudu 中的存储的数据。

我将在下面向您展示如何在几秒钟内在云原生应用程序中构建它。

源代码:https :   //github.com/tspannhw/SmartStocks

用脚本加载schema、表、警报,请参阅 scripts/setup.sh:

源代码:https :   //github.com/tspannhw/ApacheConAtHome2020

  • Kafka Topic
  • Kafka schema
  • Kudu 表
  • Flink 准备
  • Flink SQL 客户端运行
  • Flink SQL 客户端配置

一旦我们的自动化管理员构建了我们的云环境并用我们的应用程序的优点填充它,我们就可以开始我们的持续执行的 SQL。如果你知道你的数据,建立一个 Schema,与注册中心共享.

我们添加的一项独特n内容是Avro Schema中的默认值,并将其设为时间戳毫秒的逻辑类型。这对 Flink SQL 时间戳相关查询很有帮助。

{ "name" : "dt", "type" : "long", "default": 1, "logicalType": "timestamp-millis"}

您可以在 此处查看整个 Schema。

我们还需要一个关于股票警报的 Topic,稍后我们将使用 Flink SQL 创建该主题,因此让我们也为此定义一个模式。

对于今天的数据,我们将使用带有 AVRO Schema 的 AVRO 格式数据,以便在 Kafka Topic 中使用,无论谁将使用它。

如何通过 10 个简单步骤构建智能股票数据流

  1. 使用调度从源中检索数据(例如:InvokeHTTP针对 SSL REST Feed - 比如 TwelveData)。
  2. 设置 Schema 名称 ( UpdateAttribute )。
  3. ForkRecord:我们使用它从使用RecordPath语法的标头 (/values) 中分离出记录。
  4. QueryRecord:使用 SQL 转换类型和操作数据。我们在这个中没有做任何事情,但这是一个更改字段、添加字段等的选项。
  5. UpdateRecord: 在第一个中,我从属性设置记录中的一些字段并添加当前时间戳。我还按时间戳重新格式化以进行转换。
  6. UpdateRecord:我正在让 DT 制作数字化的 UNIX 时间戳。
  7. UpdateRecord:我将DateTime 设为我的格式化字符串日期时间。
  8. (LookupRecord):我还没有这一步,因为我的实时数据集市中没有这家公司的内部记录。我可能会添加此步骤来扩充或检查我的数据。
  9. ( ValidateRecord ):对于不太可靠的数据源,我可能想根据我们的模式验证我的数据,否则,我们将收到警告或错误。
  10. PublishKafkaRecord_2_0: 从 JSON 转换为 AVRO,发送到我们的 Kafka 主题,其中包含对正确模式股票的引用及其版本1.0。

现在我们正在将数据流式传输到 Kafka 主题,我们可以在 Flink SQL 连续 SQL 应用程序、NiFi 应用程序、Spark 3 应用程序等中使用它。所以在这种情况下,CFM NiFi 是我们的生产者,我们将拥有 CFM NiFi 和 CSA Flink SQL 作为 Kafka 消费者。

我们可以看到我们的数据在新的清理格式和我们需要的所有字段中的样子。

在 Kafka 中查看、监控、检查和警报我们的流数据

Cloudera Streams Messaging Manager 通过一个易于使用的预集成 UI 解决了所有这些难题。它预先连接到我的 Kafka Datahubs 并使用 SDX 进行保护。

我可以看到我的 AVRO 数据与相关的股票 schema 在 Topic 中,并且可以被消费。然后,我可以监控谁在消费、消费了多少,以及是否存在滞后或延迟。

如何将我们的流数据存储到云中的实时数据集市

消费AVRO 数据股票的schema,然后写入我们在Cloudera的数据平台由Apache Impala和Apache Kudu支持的实时数据集市。如果出现故障或无法连接,让我们重试 3 次。

我们使用 3 个 Kafka broker 。我们还可以有 Topic 名称和 consumer 名称的参数。我们从使用由 NiFi 自动准备好的 Kafka 标头中引用的股票 Schema 的股票表中读取。当我们向 Kafka 发送消息时,Nifi 通过NiFi 中的schema.name属性传递我们的 Schema 名称。正如我们所看到的,它是附加 Avro 的Schema,所以我们使用该 Reader 并使用该模式转换为简单的 JSON。

写入我们的云原生实时数据集市再简单不过了,我们引用了我们创建的股票表,并有权限使用 JSON Reader。我喜欢UPSERT,因为它能够处理 INSERT 和 UPDATE。

首先,我们需要在 Apache Hue 中从 CDP 或从脚本编写的命令行创建我们的 Kudu 表。   

示例:impala-shell -i edge2ai-1.dim.local -d default -f /opt/demo/sql/kudu.sql 

CREATE TABLEstocks(  uuid STRING,  datetime STRING,  symbol STRING,   open STRING,   close STRING,  high STRING,  volume STRING,  ts TIMESTAMP,  dt     TIMESTAMP,  low STRING,PRIMARY KEY (uuid,datetime) ) PARTITION BY HASH PARTITIONS 4 STORED AS KUDU TBLPROPERTIES ('kudu.num_tablet_replicas' = '1');

使用集成到 CDP 中的 Apache Hue,我可以检查我的实时数据集市表,然后查询表。

我的数据现在已准备好用于报告、仪表板、应用、笔记本、Web 应用程序、移动应用程序和机器学习。

我现在可以在几秒钟内在这张桌子上启动一个 Cloudera 可视化应用程序。

现在我们可以在 Flink 中构建我们的流分析应用程序。

如何通过 10 个简单步骤构建智能股票流分析

我可以从命令行 Flink SQL Client 连接到 Flink SQL 开始探索我的 Kafka 和 Kudu 数据,创建临时表,并启动一些应用程序(插入语句)。该环境让我可以看到所有不同的可用目录,包括注册表(Cloudera Cloud Schema Registry)、hive(云原生数据库表)和 kudu(Cloudera 实时云数据集市)表。

1. 运行 Flink SQL 客户端

  • 这是一个两步过程,首先设置一个纱线会话。您可能需要添加Kerberos凭据。

flink-yarn-session -tm 2048 -s 2 -d

  • 然后,启动命令行 SQL 客户端。

flink-sql-client 嵌入式 -e sql-env.yaml 

在此处和此处查看更多信息。

2. 运行 Flink SQL

3. 跨目录查询股票的 Kafka Topic

代码语言:txt复制
Select * from registry.default_database.stocks;

4. 对股票 Kudu/Impala 表的跨目录查询

代码语言:txt复制
Select * from kudu.default_database.impala::default.stocks;

5. 默认 Catalog

代码语言:sql复制
Use catalog default_catalog;

CREATE TABLE stockEvents ( symbol STRING, uuid STRING, ts BIGINT, dt BIGINT, datetime STRING, open STRING, close STRING, high STRING, volume STRING, low STRING, 
event_time AS CAST(from_unixtime(floor(ts/1000)) AS TIMESTAMP(3)), 
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND ) 
WITH ( 'connector.type' = 'kafka', 'connector.version' = 'universal', 
'connector.topic' = 'stocks', 
'connector.startup-mode' = 'earliest-offset', 
'connector.properties.bootstrap.servers' = 'edge2ai-1.dim.local:9092', 
'format.type' = 'registry', 
'format.registry.properties.schema.registry.url' = 'http://edge2ai-1.dim.local:7788/api/v1' );

show tables;;
代码语言:sql复制
Flink SQL> describe stockEvents; 
root |-- symbol: STRING |-- uuid: STRING |-- ts: BIGINT |-- dt: BIGINT |-- datetime: STRING |-- 
open: STRING |-- close: STRING |-- high: STRING |-- volume: STRING |-- low: STRING |-- 
event_time: TIMESTAMP(3) AS CAST(FROM_UNIXTIME(FLOOR(ts / 1000)) AS TIMESTAMP(3)) |--  
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND

我们添加了从时间戳中提取的 watermark 和事件时间。

6. 简单的全选查询

代码语言:sql复制
Select * from default_catalog.default_database.stockEvents;

我们可以对我们创建的这个表进行一些有趣的查询。

7. 翻滚窗口

代码语言:sql复制
SELECT symbol

8. TUMBLE_START

代码语言:sql复制
(event_time, INTERVAL '1' MINUTE) as tumbleStart, 
TUMBLE_END(event_time, INTERVAL '1' MINUTE) as tumbleEnd, 
AVG(CAST(high as DOUBLE)) as avgHigh 
FROM stockEvents 
WHERE symbol is not null 
GROUP BY TUMBLE(event_time, INTERVAL '1' MINUTE), symbol;

9. Top 3

代码语言:sql复制
SELECT * 
FROM 
( SELECT * , ROW_NUMBER() OVER 
( PARTITION BY window_start ORDER BY num_stocks desc ) AS rownum 
FROM ( 
SELECT TUMBLE_START(event_time, INTERVAL '10' MINUTE) AS window_start, 
symbol, 
COUNT(*) AS num_stocks 
FROM stockEvents 
GROUP BY symbol, 
TUMBLE(event_time, INTERVAL '10' MINUTE) ) ) 
WHERE rownum <=3;

10. 股票警报

代码语言:sql复制
INSERT INTO stockalerts 
/*  OPTIONS('sink.partitioner'='round-robin') */ 
SELECT CAST(symbol as STRING) symbol, 
CAST(uuid as STRING) uuid, ts, dt, open, close, high, volume, low, 
datetime, 'new-high' message, 'nh' alertcode, CAST(CURRENT_TIMESTAMP AS BIGINT) alerttime FROM stocks st 
WHERE symbol is not null 
AND symbol <> 'null' 
AND trim(symbol) <> '' 
AND CAST(close as DOUBLE) > 11;

监控 Flink 作业

使用 CSA Flink Global Dashboard,我可以看到我所有的 Flink 作业正在运行,包括 SQL 客户端作业、断开连接的 Flink SQL 插入和部署的 Flink 应用程序。

我们还可以看到在股票警报 Topic 中热门的数据。我们可以针对这些数据运行 Flink SQL、Spark 3、NiFi 或其他应用程序来处理警报。那可能是下一个应用程序,我可能会将这些警报发送到 iPhone 消息、Slack 消息、数据库表和 WebSockets 应用程序。

数据血缘和治理

我们都知道 NiFi 拥有深厚的数据血缘,可以通过 REST、报告任务或 CLI 推送或拉取,以用于审计、指标和跟踪。如果我想要整个流媒体管道的所有治理后的数据,我将使用 Apache Atlas,它在我的云数据平台中作为 SDX 一部分预先连接的数据。

0 人点赞