上一篇“【Flink】第二篇:维表Join之版本表”写的有些仓促,最后的结论部分总结的不够精炼,本篇对其进行进一步总结,并给出Demo的输出示例,目的在于探索Flink SQL 版本表join的一些设计规则。
上一篇主要内容是:Flink join中的时态表join中的版本表关联,Demo是一个 kafka json temporal join upsert-kafka:
- 左表(左流、主表、事实表):
CREATE TABLE left_kafka (
order_id STRING,
prod_id STRING,
op_ts TIMESTAMP(3),
WATERMARK FOR op_ts AS op_ts
) WITH (
'connector' = 'kafka',
'properties.bootstrap.servers' = '10.164.200.103:9092,10.164.200.104:9092,10.164.200.105:9092',
'topic' = 'left_kafka-join-01',
'value.format' = 'json',
'properties.group.id' = 'test-01',
'scan.startup.mode' = 'latest-offset'
)
- 右表(右流、维度表):
CREATE TABLE right_upsert (
prod_id STRING,
prod_name STRING,
price DECIMAL(16,2),
op_ts TIMESTAMP(3),
PRIMARY KEY(prod_id) NOT ENFORCED,
WATERMARK FOR op_ts AS op_ts
) WITH (
'connector' = 'upsert-kafka',
'properties.bootstrap.servers' = '10.164.200.103:9092,10.164.200.104:9092,10.164.200.105:9092',
'topic' = 'left_kafka-join-10',
'key.format' = 'json',
'value.format' = 'json',
'properties.group.id' = 'test-01'
)
- 关联SQL:
SELECT t1.order_id, t1.prod_id,cast(t1.op_ts as string), t2.prod_name, t2.price, cast(t2.op_ts as string)
FROM left_kafka AS t1 LEFT JOIN right_upsert FOR SYSTEM_TIME AS OF t1.op_ts AS t2
ON t1.prod_id = t2.prod_id
以下用若干组测试数据探索Flink SQL join的一些设计规则:
一、时间版本规则:
左流选择join右流的时间版本早于左流时间版本并且“确保”是最新的时间版本。
- 测试数据:
左流:
代码语言:javascript复制{"order_id":"1","prod_id":"01","op_ts":"1970-01-01 00:00:00.000000"}
{"order_id":"1","prod_id":"01","op_ts":"1970-01-01 00:00:01.000000"}
{"order_id":"1","prod_id":"01","op_ts":"1970-01-01 00:00:01.500000"}
右流:
代码语言:javascript复制{"prod_id":"01"} {"prod_id":"01","prod_name":"产品1","price":1.12,"op_ts":"1970-01-01 00:00:00.000000"}
{"prod_id":"01"} {"prod_id":"01","prod_name":"产品1","price":2.12,"op_ts":"1970-01-01 00:00:01.000000"}
{"prod_id":"01"} {"prod_id":"01","prod_name":"产品1","price":3.12,"op_ts":"1970-01-01 00:00:02.000000"}
输出:
代码语言:javascript复制 ---- ----------- --------- ------------------------- ----------- ------- -------------------------
| op | order_id | prod_id | EXPR$2 | prod_name | price | EXPR$5 |
---- ----------- --------- ------------------------- ----------- ------- -------------------------
| I | 1 | 01 | 1970-01-01 00:00:00.000 | 产品1 | 1.12 | 1970-01-01 00:00:00.000 |
| I | 1 | 01 | 1970-01-01 00:00:01.000 | 产品1 | 2.12 | 1970-01-01 00:00:01.000 |
| I | 1 | 01 | 1970-01-01 00:00:01.500 | 产品1 | 2.12 | 1970-01-01 00:00:01.000 |
- 分析:
(1) 左表1970-01-01 00:00:01.500,那么,右表1970-01-01 00:00:01.000来后不能立刻”确认“是否触发join,因为不能”确认“还会不会有比这条1970-01-01 00:00:01.000维表数据更加新的时间版本(并且满足不晚于主表时间版本1970-01-01 00:00:01.500)的维表数据了,例如,后续可能又来了一条1970-01-01 00:00:01.400,那之前如果join 1970-01-01 00:00:01.000维表数据显然是不合理的。
注意:这时候左表1970-01-01 00:00:01.500会缓存在状态里,一直等待可以“确认”最新时间版本的那条维表消息到来!
(2) 1970-01-01 00:00:00.000左表,那么右表1970-01-01 00:00:00.000一来就马上可以“确认”触发join,因为维表这条数据已经是满足join时间版本的极限条件了,可以”确认“不会有比这条维表数据更加新的时间版本并且不晚于主表时间版本的维表数据了。
二、维表历史版本缓存和主表乱序数据缓存:
- 测试数据:
左流:
代码语言:javascript复制{"order_id":"1","prod_id":"01","op_ts":"1970-01-01 00:00:01.500000"}
{"order_id":"1","prod_id":"01","op_ts":"1970-01-01 00:00:02.500000"}
{"order_id":"1","prod_id":"01","op_ts":"1970-01-01 00:00:01.000000"}
{"order_id":"1","prod_id":"01","op_ts":"1970-01-01 00:00:02.000000"}
{"order_id":"1","prod_id":"01","op_ts":"1970-01-01 00:00:03.000000"}
右流:
代码语言:javascript复制{"prod_id":"01"} {"prod_id":"01","prod_name":"产品1","price":1.12,"op_ts":"1970-01-01 00:00:01.000000"}
{"prod_id":"01"} {"prod_id":"01","prod_name":"产品1","price":2.12,"op_ts":"1970-01-01 00:00:02.500000"}
{"prod_id":"01"} {"prod_id":"01","prod_name":"产品1","price":3.12,"op_ts":"1970-01-01 00:00:03.000000"}
输出:
代码语言:javascript复制 ---- ----------- ---------- -------------------------- ------------ --------- --------------------------
| op | order_id | prod_id | EXPR$2 | prod_name | price | EXPR$5 |
---- ----------- ---------- -------------------------- ------------ --------- --------------------------
| I | 1 | 01 | 1970-01-01 00:00:01.500 | 产品1 | 1.12 | 1970-01-01 00:00:01.000 |
| I | 1 | 01 | 1970-01-01 00:00:02.500 | 产品1 | 2.12 | 1970-01-01 00:00:02.500 |
| I | 1 | 01 | 1970-01-01 00:00:01.000 | (NULL) | (NULL) | (NULL) |
| I | 1 | 01 | 1970-01-01 00:00:02.000 | (NULL) | (NULL) | (NULL) |
| I | 1 | 01 | 1970-01-01 00:00:03.000 | 产品1 | 3.12 | 1970-01-01 00:00:03.000 |
- 分析:
(1) 维表历史版本会删除已经触发过join写出的过期版本,没有触发join写出的会在触发写出时在状态中正常join。例如,左表1970-01-01 00:00:01.500,右表1970-01-01 00:00:01.000,左表又来了一条1970-01-01 00:00:02.500,右表来了1970-01-01 00:00:02.500,此时左表1970-01-01 00:00:01.500会被正确join右表1970-01-01 00:00:01.000,并被左表1970-01-01 00:00:02.500 join 右表1970-01-01 00:00:02.500触发一起写出,但是之后维表只会保留1970-01-01 00:00:02.500。
(2) 主表乱序数据缓存:由于是LEFT JOIN,所以,主表不存在过期的数据,但是当乱序晚到的主表数据应该被join的维表时间版本过期删除后,会join到NULL,扩展字段用NULL填充。还有一点要注意的是,晚到的数据不会触发join写出,而是被最新的触发写出的join数据连带着一起触发写出。
三、水位线延迟在kafka-json temporal join upsert-kafka中的作用:
延迟只是延迟了触发join写出的时机,例如,延迟两秒,左右流分别依次来:1970-01-01 00:00:00.000、1970-01-01 00:00:01.000、1970-01-01 00:00:02.000,延迟两秒,所以,当最后一条1970-01-01 00:00:02.000来时,会触发两边的1970-01-01 00:00:00.000被写出,但是在缓存中时会触发正确的join顺序。
代码语言:javascript复制--在定义水位线的地方加入延迟:
WATERMARK FOR op_ts AS op_ts - INTERVAL '2' SECOND
四、关联等式条件必须有维表的主键:
否则报错:Temporal table's primary key [appl_seq0] must be included in the equivalence condition of temporal join, but current temporal join condition is [state=state].
但是可以加入其它辅助条件,例如:
代码语言:javascript复制on leftT.prod_id=rightT.key and leftT.order_id <> '2'