相关推荐:
【Flink】第十篇:join 之 regular join
【Flink】第十一篇:join 之 interval join
继以上 Flink Join 两篇文章之后探讨最后一类Flink的Join:temporal join。
传统 join 方式
传统的离线 Batch SQL (面向有界数据集的 SQL) 有三种基础的实现方式,分别是 Nested-loop Join、Sort-Merge Join 和 Hash Join。
1. Nested-loop Join 最为简单直接,将两个数据集加载到内存,并用内嵌遍历的方式来逐个比较两个数据集内的元素是否符合 Join 条件。Nested-loop Join 的时间效率以及空间效率都是最低的,可以使用:table.exec.disabled-operators:NestedLoopJoin 来禁用。
2. Sort-Merge Join 分为 Sort 和 Merge 两个阶段:首先将两个数据集进行分别排序,然后再对两个有序数据集分别进行遍历和匹配,类似于归并排序的合并。(Sort-Merge Join 要求对两个数据集进行排序,但是如果两个输入是有序的数据集,则可以作为一种优化方案)。
3. Hash Join 同样分为两个阶段:首先将一个数据集转换为 Hash Table,然后遍历另外一个数据集元素并与 Hash Table 内的元素进行匹配。
- 第一阶段和第一个数据集分别称为 build 阶段和 build table;
- 第二个阶段和第二个数据集分别称为 probe 阶段和 probe table。
Hash Join 效率较高但是对空间要求较大,通常是作为 Join 其中一个表为适合放入内存的小表的情况下的优化方案 (并不是不允许溢写磁盘)。
注意:Sort-Merge Join 和 Hash Join 只适用于 Equi-Join ( Join 条件均使用等于作为比较算子)。
Flink SQL 流批一体的核心是:流表二象性。围绕这一核心有若干概念,例如,动态表(Dynamic Table)/时态表(Temporal Table)、版本(Version)、版本表(Version Table)、普通表、连续查询、物化视图/虚拟视图、CDC(Change Data Capture)、Changelog Stream。
- 将流转换为动态表。
- 在动态表上计算一个连续查询,生成一个新的动态表。
- 生成的动态表被转换回流。
理解:流和表只是数据在特定场景下的两种形态(联想到光的波粒二象性?笔者已经傻傻分不清)
temporal join
Flink Join 主要包含:
- Event Time Temporal Join
- Processing Time Temporal Join
语法(SQL 2011 标准):
代码语言:javascript复制SELECT [column_list]
FROM table1 [AS <alias1>]
[LEFT] JOIN table2 FOR SYSTEM_TIME AS OF table1.{ proctime | rowtime } [AS <alias2>]
ON table1.column-name1 = table2.column-name1
其中,
- 左表:任意表(探针侧,probe site)
- 右表:版本表(versioned table)/普通表(构建侧,build side)
本文主要探索Event Time temporal join的一些设计特性,即右侧是版本表的join。
Event Time Temporal Join
一个典型的场景是订单和汇率,下方示例展示了一个append-only 订单表Orders
与一个不断改变的汇率表 RatesHistory
的 Join 操作:
SELECT * FROM Orders;
rowtime amount currency
======= ====== =========
10:15 2 Euro
10:30 1 US Dollar
10:32 50 Yen
10:52 3 Euro
11:04 5 US Dollar
RatesHistory
表示不断变化的汇率信息。汇率以日元为基准(即 Yen
永远为 1)。
SELECT * FROM RatesHistory;
rowtime currency rate
======= ======== ======
09:00 US Dollar 102
09:00 Euro 114
09:00 Yen 1
10:45 Euro 116
11:15 Euro 119
11:49 Pounds 108
基于上述信息,欲计算 Orders 表中所有交易量并全部转换成日元。
例如,09:00
到 10:45
间欧元对日元的汇率是 114
,10:45
到 11:15
间为 116,10:45
以后是119。如果要将10:52的这笔订单进行汇率转换,最终选择 10:45这个版本,
由于temporal join设计很多特定的影响因素,以以下测试用例探索join规则:
左流(主表、探针侧):
代码语言:javascript复制create table left_upsert (
id string,
op_ts timestamp(3),
primary key(id) not enforced,
watermark for op_ts as op_ts - intervcal '0' second
) with (
'connector' = 'upsert-kafka',
'properties.bootstrap.servers' = '...',
'topic' = '...'
'key.format' = 'json',
'value.format' = 'json',
'properties.group.id' = '...'
)
右流(维表、构建侧):
代码语言:javascript复制create table right_upsert (
id string,
op_ts timestamp(3),
primary key(id) not enforced,
watermark for op_ts as op_ts - intervcal '0' second
) with (
'connector' = 'upsert-kafka',
'properties.bootstrap.servers' = '...',
'topic' = '...'
'key.format' = 'json',
'value.format' = 'json',
'properties.group.id' = '...'
)
1. 支持inner join, left join
2. 右流版本表既要定义为事件时间(水位线)也要定义主键;左流需要定义为事件时间(水位线)。
其实,版本表的特点是可以追溯历史版本,所以,时间和主键是必须要同时具备的。
3. 关联等式条件必须有维表的主键,但是可以加入其它辅助条件,例如,
代码语言:javascript复制on left_upsert.id = right_upsert.id and left_upsert.id <> '2'
4. 水位线起到一个触发写出的作用,在写出之前,左右流的元素在缓存中join。
例如,测试数据及 join sql, 程序 join 结果如下,
代码语言:javascript复制Left:
key value produce seq
{"id":"1"} {"id":"1","op_ts":"1970-01-03 00:00:00"} 1 --- watermark
{"id":"2"} {"id":"2","op_ts":"1970-01-01 01:00:00"} 3
{"id":"3"} {"id":"3","op_ts":"1970-01-04 00:00:00"} 6 --- watermark
Right:
key value produce seq
{"id":"1"} {"id":"1","op_ts":"1970-01-03 00:00:00"} 2 --- watermark
{"id":"2"} {"id":"2","op_ts":"1970-01-01 00:00:00"} 4
{"id":"2"} {"id":"2","op_ts":"1970-01-01 02:00:00"} 5
{"id":"3"} {"id":"3","op_ts":"1970-01-04 00:00:00"} 7 --- watermark
join sql:
select * from left_upsert as l
left join right_upsert for system_time as of l.op_ts as r
on l.id = r.id
结果:
---- ------ ------------------- ------- -------------------
| op | id | op_ts | id0 | op_ts0 |
---- ------ ------------------- ------- -------------------
| I | 1 | 1970-01-03T00:00 | 1 | 1970-01-03T00:00 |
| I | 2 | 1970-01-01T01:00 | 2 | 1970-01-01T00:00 |
| I | 3 | 1970-01-04T00:00 | 3 | 1970-01-04T00:00 |
1和2消息将水位线提升到先生产1970-01-03 00:00:00,会触发join写出
代码语言:javascript复制| I | 1 | 1970-01-03T00:00 | 1 | 1970-01-03T00:00 |
紧接着按照测试数据的produce seq顺序发出测试数据,当在6和7测试数据发出后,又触发一次写出:
代码语言:javascript复制| I | 2 | 1970-01-01T01:00 | 2 | 1970-01-01T00:00 |
| I | 3 | 1970-01-04T00:00 | 3 | 1970-01-04T00:00 |
此时,会将内存中缓存的以下join结果也写出,
代码语言:javascript复制| I | 2 | 1970-01-01T01:00 | 2 | 1970-01-01T00:00 |
并且可以看到join的时间版本也符合之前的规则。
5. 左流元素才会触发join的作用,join的结果只会看到从左流探针侧触发的join。
例如,测试数据及 join sql, 程序 join 结果如下,
代码语言:javascript复制Left:
key value produce seq
{"id":"1"} {"id":"1","op_ts":"1970-01-03 00:00:00"} 1 --- watermark
{"id":"2"} {"id":"2","op_ts":"1970-01-01 01:00:00"} 3
{"id":"3"} {"id":"3","op_ts":"1970-01-04 00:00:00"} 7 --- watermark
Right:
key value produce seq
{"id":"1"} {"id":"1","op_ts":"1970-01-03 00:00:00"} 2 --- watermark
{"id":"2"} {"id":"2","op_ts":"1970-01-01 00:00:00"} 4
{"id":"2"} {"id":"2","op_ts":"1970-01-01 02:00:00"} 5
{"id":"2"} null 6
{"id":"3"} {"id":"3","op_ts":"1970-01-04 00:00:00"} 8 --- watermark
join sql:
select * from left_upsert as l
left join right_upsert for system_time as of l.op_ts as r
on l.id = r.id
结果:
---- ------ ------------------- ------ -------------------
| op | id | op_ts | id0 | op_ts0 |
---- ------ ------------------- ------ -------------------
| I | 1 | 1970-01-03T00:00 | 1 | 1970-01-03T00:00 |
| I | 2 | 1970-01-01T01:00 | 2 | 1970-01-01T00:00 |
| I | 3 | 1970-01-04T00:00 | 3 | 1970-01-04T00:00 |
在produce seq为6的数据发出之前,内存中左流的id=2的元素与右流的id=2的1970-01-01 00:00:00版本join,当右流{"id":"2"} null发出后,语义上理解{"id":"2"} {"id":"2","op_ts":"1970-01-01 02:00:00"}这条数据应该被撤回,但是从join结果看,并不是这样的:
代码语言:javascript复制| I | 2 | 1970-01-01T01:00 | 2 | 1970-01-01T00:00 |
说明,{"id":"2"} null并没有触发join结果的更新,这也说明了右流是不会触发join结果的更新的。
如果将上述左流测试数据{"id":"3"} {"id":"3","op_ts":"1970-01-04 00:00:00"}改为,
代码语言:javascript复制{"id":"2"} {"id":"2","op_ts":"1970-01-04 00:00:00"}
测试结果中,id=2的join结果变为,
代码语言:javascript复制 ---- ----- ------------------ -------- -------------------
| op | id | op_ts | id0 | op_ts0 |
---- ----- ------------------ -------- -------------------
| I | 1 | 1970-01-03T00:00 | 1 | 1970-01-03T00:00 |
| I | 2 | 1970-01-01T01:00 | 2 | 1970-01-01T00:00 |
| -U | 2 | 1970-01-01T01:00 | 2 | 1970-01-01T00:00 |
| U | 2 | 1970-01-04T00:00 | (NULL) | (NULL) |
从以上这个输出也可看出,
6. 在缓存中的join结果没有merge,而是将每次触发join的结果依次输出。
7. 当触发写出后,在缓存中只保留元素最新版本,过期版本将删除。
总结
- 支持inner join, left join。
- 右流版本表既要定义为事件时间(水位线)也要定义主键;左流需要定义为事件时间(水位线)。
- 关联等式条件必须有维表的主键,但是可以加入其它辅助条件。
- 水位线起到一个触发写出的作用,在写出之前,左右流的元素在缓存中join。
- 左流元素才会触发join的作用,join的结果只会看到从左流探针侧触发的join。
- 在缓存中的join结果没有merge,而是将每次触发join的结果依次输出。
- 当触发写出后,在缓存中只保留元素最新版本,过期版本将删除。
以上,可以看出Event Time Temporal Join的适用场景比较特殊,因为构建侧的维表的数据流必须是【缓慢变化维】,否则无法确join的合适的时间版本,并且水位线无法推进。
Processing Time Temporal Join
Processing Time Temporal Join用于和以处理时间作为时间属性的构建侧流表进行Join,这种维表通常我们用HBase、MySQL此类具有Lookup能力的表进行Join。
语法同Event Time Temporal Join,在此不做赘述。