本文是我在查找Temporal Tables的资料时看到的,详细情况你可以参考这里:https://blog.csdn.net/Lcumin/article/details/113065901
前言
Flink 1.12正式发布后,带来了很多新的特性,本文重点学习和总结一下Flink 1.11和 Flink1.12中时态表的使用和自己的一个小总结,文章如有问题,请大家留言交流讨论,我会及时改正。
本文主要将在Flink1.12中新的时态表的一些新的概念和注意事项,如何在Join中使用会在之后另一个篇文章中具体讨论。
Flink中的时态表的设计初衷
首先,大家需要明确一个概念,就是传统SQL中表一般表示的都是有界的数据,而直接套用于流计算这样源源不断的数据上是存在问题的,所以在Flink SQL中,提出了一种叫做动态表的概念,这一点官网已经有了明确的说明。
不过这里还是补充一些说明:
- 与静态表相比,动态表随时间而变化。将SQL查询作用与动态表,查询会持续执行而不会终止,是一个连续的查询。
- 因为数据会持续产生没有尽头,所以连续查询不会给出一个最终而不变的结果,流上的SQL实际上给出的总是中间结果。
- 连续的查询不会终止且会根据其输入表(动态表)上的数据变化,持续计算并将变化反应到其结果表中。
在明确了上面的3个概念后,我们来看看时态表的设计初衷。
在业务中,我们会遇到维度表是在时刻更新的,正常来说,我们只能获取到最近一个时间的维度表数据,但是在业务中,我们往往最关心的是当某时间发生时,该事件的事件时间对应的维度应该是怎样的,结合官网的一个例子,解释说明一下:
代码语言:javascript复制rowtime currency rate
======= ======== ======
09:00 US Dollar 102
09:00 Euro 114
09:00 Yen 1
10:45 Euro 116
11:15 Euro 119
11:49 Pounds 108
我们有一张汇率表,从上数据来看,其是根据时间不断的变化的,例如,当一个订单在9:00 到来是时,对应的维度结果应该是:
代码语言:javascript复制rowtime currency rate
======= ======== ======
09:00 US Dollar 102
09:00 Euro 114
09:00 Yen 1
而当一个订单在12:00 到来是时,对应的维度结果应该是:
代码语言:javascript复制rowtime currency rate
======= ======== ======
09:00 US Dollar 102
09:00 Yen 1
11:15 Euro 119 (Euro 更新)
11:49 Pounds 108 (Pounds新增)
在Flink SQL1.11的时候,SQL的DDL上只支持处理时间语义的时态表join,如果我们想达到事件时间语义的效果,只能使用时态表函数来实现,例如:
代码语言:javascript复制log.info("注册订单表完场");
tEnv.createTemporaryView("RatesHistory", ratesHistory);
log.info("注册汇率表完成");
// 创建和注册时态表函数
// 指定 "r_proctime" 为时间属性,指定 "r_currency" 为主键
TemporalTableFunction rates = ratesHistory.createTemporalTableFunction(
$("rowtime"), // <==== (1)指定时态表函数的时间属性
$("currency")); // <==== (2) 指定时态表函数的主键
log.info("创建时态表函数完成");
tEnv.createTemporarySystemFunction("Rates", rates);
log.info("注册失态表函数完成");
String dml = "SELECT * FROM Orders AS o , LATERAL TABLE (Rates(o.time)) AS r WHERE r.currency = o.currency";
这里要注意的是:如果要传入TemporalTableFunction事件时间属性,那么定义TemporalTableFunction时,也需要定义成事件时间,否则会报错:Non processing timeAttribute [TIME ATTRIBUTE(ROWTIME)] passed as the argument to TemporalTableFunction。
而在Flink1.12中,完善了1.11中的不足,在DDL直接支持事件时间和处理时间两种语义,也引出了版本表(1.12),版本视图(1.12),普通表(1.12),时态表函数(1.11)等概念。
Flink1.12中时态表的类型
时态表可以划分成一系列带版本的表快照集合,表快照中的版本代表了快照中所有记录的有效区间,有效区间的开始时间和结束时间可以通过用户指定,根据时态表是否可以追踪自身的历史版本与否,时态表可以分为版本表和 普通表。
- 版本表
版本是不同时间段上反应表数据的一种形态。比如我们上面举到的汇率的例子,在9:00 和12:00 就是汇率表的两个版本。
版本表则是表在不同时间段版本的一个集合,我们可以追踪和并访问它的历史版本。
而在Flink1.12中,对于任何其基础源或格式直接定义变更日志的表,都将隐式定义版本化表。包括upsert Kafka源以及数据库changelog日志格式,例如debezium和canal。如上所述,唯一的附加要求是CREATE表语句必须包含PRIMARY KEY和事件时间属性。定义了主键约束和事件时间属性的表就是版本表。
- 如何定义版本表
使用主键约束和事件时间来定义一张版本表,因为主键和时间事件可以唯一确定一条维度数据。
代码语言:javascript复制-- 定义一张版本表
CREATE TABLE product_changelog (
product_id STRING,
product_name STRING,
product_price DECIMAL(10, 4),
update_time TIMESTAMP(3) METADATA FROM 'value.source.timestamp' VIRTUAL,
PRIMARY KEY(product_id) NOT ENFORCED, -- (1) 定义主键约束
WATERMARK FOR update_time AS update_time -- (2) 通过 watermark 定义事件时间
) WITH (
'connector' = 'kafka',
'topic' = 'products',
'scan.startup.mode' = 'earliest-offset',
'properties.bootstrap.servers' = 'localhost:9092',
'value.format' = 'debezium-json'
);
以上的DML在 (1) 为表 product_changelog 定义了主键, (2) 把 update_time 定义为表 product_changelog 的事件时间,因此 product_changelog 是一张版本表。METADATA FROM 'value.source.timestamp' VIRTUAL 语法的意思是从每条 changelog 中抽取 changelog 对应的数据库表中操作的执行时间。
使用版本表Join时需要注意的事项:
- 如果是基于事件时间的时态表 Join 的 join key 必须包含时态表的主键,例如:表 product_changelog 的主键 P.product_id 必须包含在 join 条件 O.product_id = P.product_id 中。这个好理解,主键和事件时间唯一确定一条数据。
- watermark的设置:基于事件时间的时态表 Join 是通过左右两侧的 watermark 触发,请确保为 join 两侧的表设置了合适的 watermark。事件时间的一个重要概念就是watermark,这个没必要解释了。
- 官网强烈推荐使用数据库表中操作的执行时间作为事件时间,否则通过时间抽取的版本可能和数据库中的版本不匹配。
- 版本视图
什么是视图,视图表?视图就是是已经编译好的SQL语句,视图表就是通过已经编译好的SQL语句产生的虚拟表。
为什么要有视图表?在流上,我们往往得到的是一个append-only流,这意味着我们无法定义PRIMARY KEY,但是,我们很清楚该表具有定义版本表的所有必要信息,所以我们可以通过Flink SQL提供的DISTINCT做去重处理,去重查询可以产出一个有序的 changelog 流。
如何定义视图表:去重查询能够推断主键并保留原始数据流的事件时间属性,如下:
代码语言:javascript复制SELECT * FROM RatesHistory;
currency_time currency rate
============= ========= ====
09:00:00 US Dollar 102
09:00:00 Euro 114
09:00:00 Yen 1
10:45:00 Euro 116
11:15:00 Euro 119
11:49:00 Pounds 108
-- 视图中的去重 query 会被 Flink 优化并高效地产出 changelog stream, 产出的 changelog 保留了主键约束和事件时间。
CREATE VIEW versioned_rates AS
SELECT currency, rate, currency_time -- (1) `currency_time` 保留了事件时间
FROM (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY currency -- (2) `currency` 是去重 query 的 unique key,可以作为主键
ORDER BY currency_time DESC) AS rowNum
FROM RatesHistory )
WHERE rowNum = 1;
-- 视图 `versioned_rates` 将会产出如下的 changelog:
(changelog kind) currency_time currency rate
================ ============= ========= ====
(INSERT) 09:00:00 US Dollar 102
(INSERT) 09:00:00 Euro 114
(INSERT) 09:00:00 Yen 1
(UPDATE_AFTER) 10:45:00 Euro 116
(UPDATE_AFTER) 11:15:00 Euro 119
(INSERT) 11:49:00 Pounds 108
使用是视图表需要注意的事项:视图表首先需要一个append_only流,这样我们可以使用DISTINCT操作,通过事件的变化产出一张changlog流。而Flink SQL1.12会自动推断主键并保留原始数据流的事件时间。
- 普通表
什么是普通表?版本表保留了表在各个时间段的版本,而普通表则只保留该表最新的一份数据。
如何定义普通表:普通表的特性就和他名称一样,就是Flink中的一个普通表,其声明和 Flink 建表 DDL一致,如下:
代码语言:javascript复制-- 用 DDL 定义一张 HBase 表,然后我们可以在 SQL 中将其当作一张时态表使用
-- 'currency' 列是 HBase 表中的 rowKey
CREATE TABLE LatestRates (
currency STRING,
fam1 ROW<rate DOUBLE>
) WITH (
'connector' = 'hbase-1.4',
'table-name' = 'rates',
'zookeeper.quorum' = 'localhost:2181'
);
使用普通表需要注意的事项:
理论上讲任意都能用作时态表并在基于处理时间的时态表 Join 中使用,但当前支持作为时态表的普通表必须实现接口 LookupableTableSource。接口 LookupableTableSource 的实例只能作为时态表用于基于处理时间的时态 Join 。
通过 LookupableTableSource 定义的表意味着该表具备了在运行时通过一个或多个 key 去查询外部存储系统的能力,当前支持在 基于处理时间的时态表 join 中使用的表包括 JDBC, HBase 和 Hive。
在基于处理时间的时态表 Join 中支持任意表作为时态表会在不远的将来支持。
时态表函数
时态表函数在本文的第二部分已经有了说明,需要注意的就是
在join时左表(左输入/探针侧)去关联一个时态表(右输入/构建侧),两边的时间语义必须相同,否则会抛出类似的异常:Non processing timeAttribute [TIME ATTRIBUTE(ROWTIME)] passed as the argument to TemporalTableFunction。
基于处理时间的时态 Join 中, 如果右侧表不是可以直接查询外部系统的表而是普通的数据流,时态表函数 Join 和 时态表 Join 的语义都有问题,时态表函数 Join 仍然允许使用,但是时态表 Join 禁用了该功能。语义问题的原因是 Join 算子没办法知道右侧时态表(构建侧)的完整快照是否到齐,这可能导致左侧的流在启动时关联不到用户期待的数据, 在生产环境中可能误导用户。
在处理时间语义下的时态表函数的只会保留最新的一份数据,时间事件语义下则会保留每个水位对应的动态表。
总结
本文总结了Flink1.11时态关联的不足和Flink1.12中时态表设计的一些新的概念和一些基本的定义表的方法和注意事项。后续会写一个Join篇章来进行时态表,时态函数的使用补充。