SQL是开发人员与数据分析师必备的技能,Flink也提供了Sql方式编写任务,能够很大程度降低开发运维成本,这篇是flink join的终极篇SQL Join, 首先介绍sql join使用方式、然后介绍global join带来的状态存储成本及解决方式、最后从源码角度分析sql join实现。
一、SQL JOIN使用方式
对于sql join可以分为两类:Global Join、Time-windowed Join
- Global Join Global Join表示全局join, 也可以称为无限流join, 由于没有时间限制,任何时候流入的数据都可以被关联上,支持inner join、left join、right join、full join 连接语法。使用语法遵循standard ANSI SQL。使用方式:
SELECT *
FROM Orders INNER/LEFT/RIGHT/FULL JOIN Product ON Orders.productId = Product.id
- Time-windowed Join 基于时间窗口的join, 流表的数据关联必须在一定的时间范围内,同样支持inner join、left join、right join、full join,但是不同的是条件中带有时间属性条件,有以下几种使用方式:
ltime = rtime
ltime >= rtime AND ltime < rtime INTERVAL '10' MINUTE
ltime BETWEEN rtime - INTERVAL '10' SECOND AND rtime INTERVAL '5' SECOND
ltime、rtime表示流表的时间属性字段。 其实现与interval join 使用了相同的实现方式,不同的是: a. Time-windowed Join 即可支持Event-Time,也可支持Processing-Time b. interval join 只支持inner join,Time-windowed Join支持多种类型join 以Flink intervalJoin 使用与原理分析 中订单流与地址流为例,sql实现:
代码语言:javascript复制 select o.userId,a.addrId from orders o left join address a on o.addrId=a.addrId
and o.rtt BETWEEN a.rt - INTERVAL '5' SECOND AND a.rt - INTERVAL '1' SECOND
二、Idle State Retention Time 使用
global join 能够join 上任何时刻的数据,是由于状态中保存了两个流表的所有数据,这些数据都保存在状态中,默认情况下是不会被过期,但是两个流表又是持续输入的,待数日或者数月之后,状态数据会无限增大,但是很多时候我们数据关联具有时效性,例如只要求当天数据关联即可,那么这种方式会内存或者磁盘造成不必要浪费。那我们的目标就是能够设置状态ttl,在到达过期时间能够被自动清除,在DataStream API 可以通过StateTtlConfig 来设置状态的ttl, 但是sql方式就无法通过这种方式设置,好在flink 提供了Idle State Retention Time 空闲状态的保留时间,通过配置StreamQueryConfig来设置ttl时间,并且只能按照Processing-time来清理数据,从数据流入系统到当数据未被读写时间达到ttl 就会被自动清除。先看下其使用方式:
代码语言:javascript复制 val config=tabEnv.queryConfig.withIdleStateRetentionTime(Time.minutes(1),Time.minutes(6))
tabEnv.sqlUpdate('"',config)
tabEnv.sqlQuery("",config)
tab.writeToSink(sink,config)
withIdleStateRetentionTime(minTime: Time, maxTime: Time), minTime/maxTime 分别表示空闲保留最小/最大时间,但是必须满足maxTime-minTime>=5min,接下来看下数据的ttl设置: 初始默认的数据ttl = curProcessTime(数据流入当前系统时间) maxRetentionTime(maxTime),之后每有相同的数据流入,只要满足curProcessTime minRetentionTime > oldExpiredTime(上一次设置ttl的时间),就将其ttl设置为curProcessTime maxRetentionTime。
另外还有两点需注意:
- Idle State Retention Time 不是全局有效,需要在每一个使用sqlUpdate/sqlQuery中单独设置
- 数据定时清理同样是依赖flink 定时机制,会将定时数据存储在内存状态中,会对内存造成比较大的压力,可以选择rocksDB 来代替内存作为stateBackend
三、源码分析
Flink SQL 中使用了apache calcite来完成sql解析、验证、逻辑计划/物理计划生成以及优化工作,物理计划都需要实现DataStreamRel接口,其中DataStreamWindowJoin与DataStreamJoin 分别对应Time-window join 与 global window的物理执行计划,由于Time-window join 与 interval-join的实现步骤大体相似,最终还是会调用到IntervalJoinOperator,这里不做分析。主要分析一下,Global window 的执行过程,从DataStreamJoin入手。
- DataStreamJoin中translateToPlan方法。 该方法获取左右两个流表对应的DataStream, 根据不同join 类型选择不同的ProcessFunction,例如inner join 选择NonWindowInnerJoin,将leftDataStream 与 rightDataStream 进行connect 得到ConnectedStreams 然后执行对应的ProcessFunction
- 以 inner join为例分析NonWindowInnerJoin, 继承了NonWindowJoin,而NonWindowJoin又继承了CoProcessFunction,与ProcessFunction针对一个流相反,CoProcessFunction是针对两个流的low level api, 可以访问状态、注册定时器。join 逻辑在其processElement方法中
override def processElement(
value: CRow,
ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
out: Collector[CRow],
timerState: ValueState[Long],
currentSideState: MapState[Row, JTuple2[Long, Long]],
otherSideState: MapState[Row, JTuple2[Long, Long]],
isLeft: Boolean): Unit = {
val inputRow = value.row
updateCurrentSide(value, ctx, timerState, currentSideState)
cRowWrapper.setCollector(out)
cRowWrapper.setChange(value.change)
val otherSideIterator = otherSideState.iterator()
// join other side data
while (otherSideIterator.hasNext) {
val otherSideEntry = otherSideIterator.next()
val otherSideRow = otherSideEntry.getKey
val otherSideCntAndExpiredTime = otherSideEntry.getValue
// join
cRowWrapper.setTimes(otherSideCntAndExpiredTime.f0)
callJoinFunction(inputRow, isLeft, otherSideRow, cRowWrapper)
// clear expired data. Note: clear after join to keep closer to the original semantics
if (stateCleaningEnabled && curProcessTime >= otherSideCntAndExpiredTime.f1) {
otherSideIterator.remove()
}
}
}
两个MapState对应两个流的缓存数据,key表示具体的数据ROW,Value表示数据ROW的数量与过期时间,由于数据流入过程中可能会存在多条相同的记录,以数据ROW作为key这种方式可以减少内存使用. ValueState 用于存储数据的过期时间,以便任务失败恢复能够继续对数据执行过期操作。 processElement 执行流程: a. updateCurrentSide 保存数据与更新数据的count与ttl, 同时会注册数据的过期时间,数据的过期时间是根据Idle State Retention Time来设置的,从StreamQueryConfig可以获取到 b. 循环遍历另外一个状态,调用callJoinFunction输出数据,在callJoinFunction里面使用的joinFunction是通过FunctionCodeGenerator动态生成的在,在DataStreamJoin的translateToPlan方法中被调用到,有兴趣可以debug 方式copy下来研读一下。
- 过期数据的清理定时是在updateCurrentSide注册的,其清理工作是在NonWindowJoin的onTimer方法完成,onTimer方法是从CoProcessFunction中继承过来的。在onTimer主要做过期时间判断并且清理。