往期看点:
【Flink】第五篇:checkpoint【1】
【Flink】第五篇:checkpoint【2】
【Flink】第六篇:记一次Flink状态(State Size)增大不收敛,最终引起OOM问题排查
【Flink】第八篇:Flink 内存管理
【Flink】第九篇:Flink SQL 性能优化实战
【Flink】第十篇:join 之 regular join
上一篇 【Flink】第十篇:join 之 regular join 验证了Flink SQL中的regular join的一些设计逻辑。
Flink Regular Join是最为基础的、没有缓存剔除策略的Join,两个表的输入和更新都会对全局可见,会影响之后所有的Join 结果。
因为历史数据不会被清理,所以Regular Join 允许对输入表进行任意种类的更新操作(insert、update、delete)。然而因为资源问题Regular Join 通常是不可持续的,一般只用做有界数据流的Join。
interval join
- 支持 INNER JOIN, LEFT JOIN, RIGHT JOIN, FULL OUTER JOIN
- 左右流都会触发结果更新
- State 自动清理,根据时间区间保留数据
- 输出流保留时间属性
interval join 其实就是一种开窗的 regular join,他由flink自身维护状态缓存Row无限增大的问题。
例如,
上面这个join为Orders 表设置了o.ordertime > s.shiptime- INTERVAL ‘4’ HOUR 的时间下界。
为Shipmenets 表设置了s.shiptime >= o.ordertime 的时间下界
所以,这个时间界限之前的状态是可以被释放的,数据行在时间轴上不断推进水位线向前。
interval join 的限制
Interval join需要至少一个 equi-join 谓词和一个限制了双方时间的 join 条件。例如使用两个适当的范围谓词(<, <=, >=, >
),一个 BETWEEN
谓词或一个比较两个输入表中相同类型的时间属性(即处理时间和事件时间)的相等谓词
比如,以下谓词是合法的 interval join 条件:
ltime = rtime
ltime >= rtime AND ltime < rtime INTERVAL '10' MINUTE
ltime BETWEEN rtime - INTERVAL '10' SECOND AND rtime INTERVAL '5' SECOND
SELECT *
FROM Orders o, Shipments s
WHERE o.id = s.orderId AND
o.ordertime BETWEEN s.shiptime - INTERVAL '4' HOUR AND s.shiptime
以上示例中,所有在收到后四小时内发货的 order 会与他们相关的 shipment 进行 join。
还有一个很重要的点是:与regular join相比较,interval join只支持带有时间属性的append-only流。由于时间属性是单调递增的,Flink可以在不影响结果正确性的情况下从其状态中删除旧值。
验证
left-json:
代码语言:javascript复制create table left_json(
appl_seq string
,amount decimal(16,2)
,op_ts timestamp(3)
,ts as op_ts
watermark for op_ts as op_ts
) with (
'connecotr' = 'kafka'
,'topic' = 'left-json'
,'value.format' = 'json'
,'scan.startup.mode' = 'latest-offset'
,'properties.bootstrap.servers' = 'xxx:9092,xxx:9092,xxx:9092'
,'properties.group.id' = 'left-json-group'
)
right_json:
代码语言:javascript复制create table right_json(
appl_seq string
,amount decimal(16,2)
,op_ts timestamp(3)
,ts as op_ts
watermark for op_ts as op_ts
) with (
'connecotr' = 'kafka'
,'topic' = 'right-json'
,'value.format' = 'json'
,'scan.startup.mode' = 'latest-offset'
,'properties.bootstrap.servers' = 'xxx:9092,xxx:9092,xxx:9092'
,'properties.group.id' = 'right-json-group'
)
查询SQL:
代码语言:javascript复制select l.appl_seq,l.ts,l.amount,r.appl_seq.r.ts,r.amount
from left_upsert l, right_upsert r
where l.appl_seq = r.appl_seq
and r.op_ts >= l.op_ts
and r.op_ts <= l.op_ts interval '10' minute
总结
- 支持 INNER JOIN, LEFT JOIN, RIGHT JOIN, FULL OUTER JOIN
- 左右流都会触发结果更新
- State 自动清理,根据时间区间保留数据
- 输出流保留时间属性
- 时间界限之前的状态是可以被释放的,数据行Row在时间轴上不断推进水位线向前。
- Interval join需要至少一个 equi-join 谓词和一个限制了双方时间的 join 条件。
- 与regular join相比较,interval join只支持带有时间属性的append-only流。