介绍
流处理是通过在数据运动时对数据应用逻辑来创造商业价值。很多时候,这涉及组合数据源以丰富数据流。Flink SQL 执行此操作并将您应用于数据的任何函数的结果定向到接收器中。业务用例,例如欺诈检测、广告印象跟踪、医疗保健数据丰富、增加财务支出信息、GPS 设备数据丰富或个性化客户通信,都是使用Hive表来丰富数据流的很好的例子。 因此,Hive 表与 Flink SQL 有两种常见的用例:
- Lookup(查找)表用于丰富数据流
- 用于写入 Flink 结果的接收器
对于这些用例中的任何一个,还有两种方法可以使用 Hive 表。您可以使用 Hive catalog,也可以使用 Flink DDL 中使用的 Flink JDBC 连接器。让我们讨论一下它们是如何工作的,以及它们的优点和缺点是什么。
在 SQL Stream Builder 中注册 Hive Catalog
SQL Stream Builder (SSB) 旨在为分析师提供无代码界面中 Flink 的强大功能。SSB 有一种注册Hive Catalog的简单方法:
- 单击侧边栏上的“Data Provider”菜单
- 单击下方框中的“Register Catalog”
- 选择“Hive”作为Catalog类型
- 给它起个名字
- 声明你的默认数据库
- 点击“验证”
- 验证成功后,点击“创建”
完成上述步骤后,您的 Hive 表将在您选择它作为活动Catalog后显示在表列表中。目前,通过Catalog概念,当直接从 HDFS 访问以进行读取或写入时,Flink 仅支持非事务性 Hive 表。
将 Flink DDL 与 JDBC 连接器结合使用
使用 Flink JDBC 连接器,可以直接从控制台屏幕为任何 Hive 表创建 Flink 表,其中可以提供表的 Flink DDL 创建脚本。这将为 Hive DB 和表名指定一个 URL。无论其类型如何,都可以通过这种方式访问所有 Hive 表。JDBC DDL 语句甚至可以通过“模板”生成。点击“Templates”->“jdbc”,控制台会将代码粘贴到编辑器中。
代码语言:javascript复制CREATE TABLE `ItemCategory_transactional_jdbc_2` (
`id` VARCHAR(2147483647),
`category` VARCHAR(2147483647)
) WITH (
‘connector’ = ‘jdbc’,
‘lookup.cache.ttl’ = ‘10s’,
‘lookup.cache.max.rows’ = ‘10000’,
‘tablename’ = ‘item_category_transactional’,
‘url’ = ‘jdbc:hive2://<host>:<port>/default’
)
Using a Hive table as a lookup table
Hive 表通常用作查找表以丰富 Flink 流。Flink 能够缓存在 Hive 表中找到的数据以提高性能。需要设置 FOR SYSTEM_TIME AS OF 子句来告诉 Flink 与时态表连接。有关详细信息,请查看相关的 Flink 文档。
代码语言:javascript复制SELECT t.itemId, i.category
FROM TransactionsTable t
LEFT JOIN ItemCategory_transactional_jdbc FOR SYSTEM_TIME AS OF t.event_time i ON i.id = t.itemId
Hive Catalog tables
对于 Hive Catalog 表,可以使用Hive 表的属性“lookup.join.cache.ttl”(此值的默认值为一小时)配置缓存查找表的 TTL(生存时间),就像 Beeline 中的这样或Hue:
优点: 不需要定义 DDL,一个简单的 Hive Catalog就可以了。
缺点:仅适用于非事务性表
使用 JDBC 连接器的 Flink DDL 表
使用带有 JDBC 连接器的 Hive 表时,默认情况下没有缓存,这意味着Flink 会为每个需要丰富的条目连接 Hive!我们可以通过在 DDL 命令中指定两个属性lookup.cache.max-rows和lookup.cache.ttl来更改它。
Flink 会先查找缓存,只有在缓存缺失时才向外部数据库发送请求,并用返回的行更新缓存。当缓存达到最大缓存行lookup.cache.max-rows或当行超过lookup.cache.ttl的最长时间时,缓存中最旧的行将过期。缓存的行可能不是最新的。一些用户可能希望通过调整 lookup.cache.ttl 来更频繁地刷新数据,但这可能会增加发送到数据库的请求数。用户将不得不平衡缓存数据的吞吐量和新鲜度。
代码语言:javascript复制CREATE TABLE `ItemCategory_transactional_jdbc_2` (
`id` VARCHAR(2147483647),
`category` VARCHAR(2147483647)
) WITH (
‘connector’ = `jdbc’,
‘lookup.cache.ttl’ = ‘10s’,
‘lookup.cache.max-rows’ = ‘10000’,
‘table-name’ = ‘item_category_transactional’,
‘url’ = ‘jdbc:hive2://<host>:<port>/default’
)
Pros: All Hive tables can be accessed this way, and the caching
请注意缓存参数——这是我们确保良好的JOIN性能与来自 Hive 的新数据平衡的方式,根据需要进行调整。
使用 Hive 表作为接收器
将 Flink 作业的输出保存到 Hive 表中,可以让我们存储处理过的数据以满足各种需求。为此,可以使用INSERT INTO语句并将查询结果写入指定的 Hive 表。请注意,您可能必须使用 Hive ACID 表调整 JDBC 接收器作业的检查点超时持续时间。
代码语言:javascript复制INSERT INTO ItemCategory_transactional_jdbc_2
SELECT t.itemId, i.category
FROM TransactionsTable t
LEFT JOIN ItemCategory_transactional_jdbc FOR SYSTEM_TIME AS OF t.event_time i ON i.id = t.itemId
Hive Catalog tables
No DDL needs to be written. Only non-transactional tables are supported, thus it only works with append-only streams.
Flink DDL tables with JDBC connector
With this option upsert type data can be written into transactional tables. In order to be able to do that a primary key should be defined.
CREATE TABLE `ItemCategory_transactional_jdbc_sink` (
`id` STRING,
`category` STRING,
PRIMARY KEY (`id`) NOT ENFORCED
) WITH (
‘connector’ = ‘jdbc’,
‘table-name’ = ‘item_category_transactional_sink’,
‘url’ = ‘jdbc:hive2://<host>:<port>/default’
)
当这个作业执行时,Flink 将覆盖所有具有相同主键值的记录,如果它已经存在于表中。这也适用于更新插入流以及事务性 Hive 表。
结论
我们已经介绍了如何使用 SSB 通过 Hive 表丰富 Flink 中的数据流,以及如何使用 Hive 表作为 Flink 结果的接收器。这在涉及使用查找数据丰富数据流的许多业务用例中非常有用。我们深入探讨了使用 Hive 表的不同方法。我们还讨论了不同方法的优缺点以及各种与缓存相关的选项以提高性能。有了这些信息,您就可以决定哪种方法最适合您。
如果您想亲身体验 SQL Stream Builder,请务必立即下载社区版!
原文作者:Jimit Patel, and Ferenc Csaky
原文链接:https://blog.cloudera.com/enriching-streams-with-hive-tables-via-flink-sql/