【Flink】第十一篇:join 之 interval join

2022-03-31 11:06:44 浏览数 (1)

往期看点:

【Flink】第五篇:checkpoint【1】

【Flink】第五篇:checkpoint【2】

【Flink】第六篇:记一次Flink状态(State Size)增大不收敛,最终引起OOM问题排查

【Flink】第八篇:Flink 内存管理

【Flink】第九篇:Flink SQL 性能优化实战

【Flink】第十篇:join 之 regular join

上一篇 【Flink】第十篇:join 之 regular join 验证了Flink SQL中的regular join的一些设计逻辑。

Flink Regular Join是最为基础的、没有缓存剔除策略的Join,两个表的输入和更新都会对全局可见,会影响之后所有的Join 结果。

因为历史数据不会被清理,所以Regular Join 允许对输入表进行任意种类的更新操作(insert、update、delete)。然而因为资源问题Regular Join 通常是不可持续的,一般只用做有界数据流的Join。

interval join

  • 支持 INNER JOIN, LEFT JOIN, RIGHT JOIN, FULL OUTER JOIN
  • 左右流都会触发结果更新
  • State 自动清理,根据时间区间保留数据
  • 输出流保留时间属性

interval join 其实就是一种开窗的 regular join,他由flink自身维护状态缓存Row无限增大的问题。

例如,

上面这个join为Orders 表设置了o.ordertime > s.shiptime- INTERVAL ‘4’ HOUR 的时间下界。

为Shipmenets 表设置了s.shiptime >= o.ordertime 的时间下界

所以,这个时间界限之前的状态是可以被释放的,数据行在时间轴上不断推进水位线向前。

interval join 的限制


Interval join需要至少一个 equi-join 谓词和一个限制了双方时间的 join 条件。例如使用两个适当的范围谓词(<, <=, >=, >),一个 BETWEEN 谓词或一个比较两个输入表中相同类型的时间属性(即处理时间和事件时间)的相等谓词

比如,以下谓词是合法的 interval join 条件:

  • ltime = rtime
  • ltime >= rtime AND ltime < rtime INTERVAL '10' MINUTE
  • ltime BETWEEN rtime - INTERVAL '10' SECOND AND rtime INTERVAL '5' SECOND
代码语言:javascript复制
SELECT *
FROM Orders o, Shipments s
WHERE o.id = s.orderId AND
      o.ordertime BETWEEN s.shiptime - INTERVAL '4' HOUR AND s.shiptime

以上示例中,所有在收到后四小时内发货的 order 会与他们相关的 shipment 进行 join。

还有一个很重要的点是:与regular join相比较,interval join只支持带有时间属性的append-only流。由于时间属性是单调递增的,Flink可以在不影响结果正确性的情况下从其状态中删除旧值。

验证

left-json:

代码语言:javascript复制
create table left_json(
    appl_seq  string
    ,amount   decimal(16,2)
    ,op_ts    timestamp(3)
    ,ts   as  op_ts
    watermark for op_ts as op_ts
) with (
'connecotr' = 'kafka'
    ,'topic' = 'left-json'
    ,'value.format' = 'json'
    ,'scan.startup.mode' = 'latest-offset'
    ,'properties.bootstrap.servers' = 'xxx:9092,xxx:9092,xxx:9092'
    ,'properties.group.id' = 'left-json-group'
)

right_json:

代码语言:javascript复制
create table right_json(
    appl_seq  string
    ,amount   decimal(16,2)
    ,op_ts    timestamp(3)
    ,ts   as  op_ts
    watermark for op_ts as op_ts    
) with (
'connecotr' = 'kafka'
    ,'topic' = 'right-json'
    ,'value.format' = 'json'
    ,'scan.startup.mode' = 'latest-offset'
    ,'properties.bootstrap.servers' = 'xxx:9092,xxx:9092,xxx:9092'
    ,'properties.group.id' = 'right-json-group'
)

查询SQL:

代码语言:javascript复制
select l.appl_seq,l.ts,l.amount,r.appl_seq.r.ts,r.amount
from left_upsert l, right_upsert r
where l.appl_seq = r.appl_seq
and r.op_ts >= l.op_ts
and r.op_ts <= l.op_ts   interval '10' minute

总结

  • 支持 INNER JOIN, LEFT JOIN, RIGHT JOIN, FULL OUTER JOIN
  • 左右流都会触发结果更新
  • State 自动清理,根据时间区间保留数据
  • 输出流保留时间属性
  • 时间界限之前的状态是可以被释放的,数据行Row在时间轴上不断推进水位线向前。
  • Interval join需要至少一个 equi-join 谓词和一个限制了双方时间的 join 条件。
  • 与regular join相比较,interval join只支持带有时间属性的append-only流。

0 人点赞