Flink join终结者:SQL Join

2022-04-18 12:08:37 浏览数 (1)

SQL是开发人员与数据分析师必备的技能,Flink也提供了Sql方式编写任务,能够很大程度降低开发运维成本,这篇是flink join的终极篇SQL Join, 首先介绍sql join使用方式、然后介绍global join带来的状态存储成本及解决方式、最后从源码角度分析sql join实现。

一、SQL JOIN使用方式

对于sql join可以分为两类:Global Join、Time-windowed Join

  • Global Join Global Join表示全局join, 也可以称为无限流join, 由于没有时间限制,任何时候流入的数据都可以被关联上,支持inner join、left join、right join、full join 连接语法。使用语法遵循standard ANSI SQL。使用方式:
代码语言:javascript复制
   SELECT *

   FROM Orders INNER/LEFT/RIGHT/FULL JOIN Product ON Orders.productId = Product.id
  • Time-windowed Join 基于时间窗口的join, 流表的数据关联必须在一定的时间范围内,同样支持inner join、left join、right join、full join,但是不同的是条件中带有时间属性条件,有以下几种使用方式:
代码语言:javascript复制
 ltime = rtime

 ltime >= rtime AND ltime < rtime   INTERVAL '10' MINUTE

 ltime BETWEEN rtime - INTERVAL '10' SECOND AND rtime   INTERVAL '5' SECOND

ltime、rtime表示流表的时间属性字段。 其实现与interval join 使用了相同的实现方式,不同的是: a. Time-windowed Join 即可支持Event-Time,也可支持Processing-Time b. interval join 只支持inner join,Time-windowed Join支持多种类型join 以Flink intervalJoin 使用与原理分析 中订单流与地址流为例,sql实现:

代码语言:javascript复制
   select o.userId,a.addrId from orders o left join address a on o.addrId=a.addrId

        and o.rtt BETWEEN a.rt - INTERVAL '5' SECOND AND a.rt - INTERVAL '1' SECOND

二、Idle State Retention Time 使用

global join 能够join 上任何时刻的数据,是由于状态中保存了两个流表的所有数据,这些数据都保存在状态中,默认情况下是不会被过期,但是两个流表又是持续输入的,待数日或者数月之后,状态数据会无限增大,但是很多时候我们数据关联具有时效性,例如只要求当天数据关联即可,那么这种方式会内存或者磁盘造成不必要浪费。那我们的目标就是能够设置状态ttl,在到达过期时间能够被自动清除,在DataStream API 可以通过StateTtlConfig 来设置状态的ttl, 但是sql方式就无法通过这种方式设置,好在flink 提供了Idle State Retention Time 空闲状态的保留时间,通过配置StreamQueryConfig来设置ttl时间,并且只能按照Processing-time来清理数据,从数据流入系统到当数据未被读写时间达到ttl 就会被自动清除。先看下其使用方式:

代码语言:javascript复制
 val config=tabEnv.queryConfig.withIdleStateRetentionTime(Time.minutes(1),Time.minutes(6))

tabEnv.sqlUpdate('"',config)

tabEnv.sqlQuery("",config)

tab.writeToSink(sink,config)

withIdleStateRetentionTime(minTime: Time, maxTime: Time), minTime/maxTime 分别表示空闲保留最小/最大时间,但是必须满足maxTime-minTime>=5min,接下来看下数据的ttl设置: 初始默认的数据ttl = curProcessTime(数据流入当前系统时间) maxRetentionTime(maxTime),之后每有相同的数据流入,只要满足curProcessTime minRetentionTime > oldExpiredTime(上一次设置ttl的时间),就将其ttl设置为curProcessTime maxRetentionTime。

另外还有两点需注意:

  • Idle State Retention Time 不是全局有效,需要在每一个使用sqlUpdate/sqlQuery中单独设置
  • 数据定时清理同样是依赖flink 定时机制,会将定时数据存储在内存状态中,会对内存造成比较大的压力,可以选择rocksDB 来代替内存作为stateBackend

三、源码分析

Flink SQL 中使用了apache calcite来完成sql解析、验证、逻辑计划/物理计划生成以及优化工作,物理计划都需要实现DataStreamRel接口,其中DataStreamWindowJoin与DataStreamJoin 分别对应Time-window join 与 global window的物理执行计划,由于Time-window join 与 interval-join的实现步骤大体相似,最终还是会调用到IntervalJoinOperator,这里不做分析。主要分析一下,Global window 的执行过程,从DataStreamJoin入手。

  • DataStreamJoin中translateToPlan方法。 该方法获取左右两个流表对应的DataStream, 根据不同join 类型选择不同的ProcessFunction,例如inner join 选择NonWindowInnerJoin,将leftDataStream 与 rightDataStream 进行connect 得到ConnectedStreams 然后执行对应的ProcessFunction
  • 以 inner join为例分析NonWindowInnerJoin, 继承了NonWindowJoin,而NonWindowJoin又继承了CoProcessFunction,与ProcessFunction针对一个流相反,CoProcessFunction是针对两个流的low level api, 可以访问状态、注册定时器。join 逻辑在其processElement方法中
代码语言:javascript复制
override def processElement(

      value: CRow,

      ctx: CoProcessFunction[CRow, CRow, CRow]#Context,

      out: Collector[CRow],

      timerState: ValueState[Long],

      currentSideState: MapState[Row, JTuple2[Long, Long]],

      otherSideState: MapState[Row, JTuple2[Long, Long]],

      isLeft: Boolean): Unit = {



    val inputRow = value.row

    updateCurrentSide(value, ctx, timerState, currentSideState)



    cRowWrapper.setCollector(out)

    cRowWrapper.setChange(value.change)

    val otherSideIterator = otherSideState.iterator()

    // join other side data

    while (otherSideIterator.hasNext) {

      val otherSideEntry = otherSideIterator.next()

      val otherSideRow = otherSideEntry.getKey

      val otherSideCntAndExpiredTime = otherSideEntry.getValue

      // join

      cRowWrapper.setTimes(otherSideCntAndExpiredTime.f0)

      callJoinFunction(inputRow, isLeft, otherSideRow, cRowWrapper)

      // clear expired data. Note: clear after join to keep closer to the original semantics

      if (stateCleaningEnabled && curProcessTime >= otherSideCntAndExpiredTime.f1) {

        otherSideIterator.remove()

      }

    }

  }
 

两个MapState对应两个流的缓存数据,key表示具体的数据ROW,Value表示数据ROW的数量与过期时间,由于数据流入过程中可能会存在多条相同的记录,以数据ROW作为key这种方式可以减少内存使用. ValueState 用于存储数据的过期时间,以便任务失败恢复能够继续对数据执行过期操作。 processElement 执行流程: a. updateCurrentSide 保存数据与更新数据的count与ttl, 同时会注册数据的过期时间,数据的过期时间是根据Idle State Retention Time来设置的,从StreamQueryConfig可以获取到 b. 循环遍历另外一个状态,调用callJoinFunction输出数据,在callJoinFunction里面使用的joinFunction是通过FunctionCodeGenerator动态生成的在,在DataStreamJoin的translateToPlan方法中被调用到,有兴趣可以debug 方式copy下来研读一下。

  • 过期数据的清理定时是在updateCurrentSide注册的,其清理工作是在NonWindowJoin的onTimer方法完成,onTimer方法是从CoProcessFunction中继承过来的。在onTimer主要做过期时间判断并且清理。

0 人点赞