Flink SQL 状态越来越多?Idle State Retention Time 特性概览

2021-09-29 20:48:42 浏览数 (1)

在上一篇文章中,介绍了 Flink State TTL 机制,这项机制对于应对通用的状态暴增特别有效。然而,这个特性也有其缺陷,例如不能保证一定可以及时清理掉失效的状态,以及目前仅支持 Processing Time 时间模式等等,另外对于旧版本的 Flink(1.6 之前),State TTL 功能也无法使用。

针对 Table API 和 SQL 模块的持续查询/聚合语句,Flink 还提供了另一项失效状态清理机制,这就是本文要提到的 Idle State Retention Time 选项,Flink 很早就提供了这个选项,该特性是借助 Query Configuration 配置项来定义的,但很多人并未启用,也不理解其中隐藏的暗坑。本文将对这一特性做说明,并给出一些使用建议。

问题引入

同样以官网文档的案例为起点,这是一个持续查询的 GROUP BY 语句,它没有时间窗口的定义,理论上会无限地计算下去:

代码语言:sql复制
SELECT sessionId, COUNT(*) FROM clicks GROUP BY sessionId;

这就带来了一个问题:随着时间的不断推进,内存中积累的状态会越来越多,因为数据流是无穷无尽、持续流入的,Flink 并不知道如何丢弃旧的数据。在这种情况下,如果放任不管,那么迟早有一天作业的状态数达到了存储系统的容量极限,从而造成作业的崩溃。

针对这个问题,Flink 提出了空闲状态保留时间(Idle State Retention Time)的概念。通过为每个状态设置 Timer,如果这个状态中途被访问过,则重新设置 Timer;否则(如果状态一直未被访问,长期处于 Idle 状态)则在 Timer 到期时做状态清理。这样,就可以确保每个状态都能得到及时的清理。

通过调用 StreamQueryConfig 的 withIdleStateRetentionTime 方法,可以为这个 QueryConfig 对象设置最小和最大的清理周期。这样,Flink 可以保证最早和最晚的状态清理时间。

需要注意的是,旧版本 Flink 允许只指定一个参数,表示最早和最晚清理周期相同,但是这样可能会导致同一时间段有很多状态都到期,从而造成瞬间的处理压力。新版本的 Flink 要求两个参数之间的差距至少要达到 5 分钟,从而避免大量状态瞬间到期,对系统造成的冲击。

代码语言:java复制
StreamQueryConfig qConfig = ...

// set idle state retention time: min = 12 hours, max = 24 hours
qConfig.withIdleStateRetentionTime(Time.hours(12), Time.hours(24));

这里需要注意一点,默认情况下 StreamQueryConfig 的设置并不是全局的。因此当设置了清理周期以后,需要在 StreamTableEnvironment 类调用 toAppendStream 或 toRetractStream 将 Table 转为 DataStream 时,显式传入这个 QueryConfig 对象作为参数,才可以令该功能生效。

新版本的 Flink 提供了一个 QueryConfigProvider 类(它实现了 PlannerConfig 接口,允许嵌入一个 StreamQueryConfig 对象),可以通过对 TableConfig 设置 PlannerConfig 的方式(调用 addPlannerConfig 方法),来传入设置好 StreamQueryConfig 对象的 QueryConfigProvider. 这样,当 StreamPlanner 将定义的 Table 翻译为 Plan 时,可以自动使用之前定义的 StreamQueryConfig,从而实现全局的 StreamQueryConfig 设定。对于旧的 Flink 版本,只能通过修改源码的方式来设置,较为繁琐。

实现方式

Idle State Retention Time 的代码完全位于 flink-table 相关模块下,因此只有 Table API / SQL 的编程方式才可以用到这个特性。

具体来说,在 org.apache.flink.table.plan.nodes.datastream 包下,有三个类:DataStreamGroupAggregateBase(对应无时间窗口限定的 GROUP BY 语句)、DataStreamGroupWindowAggregateBase(对应有时间窗口限定的 GROUP BY 语句)、DataStreamOverAggregate(对应 OVER 语句)。当调用这三个类的 translateToPlan 方法时,如果没有指定 Idle State Retention Time,则会打印一行 WARNING 级别的日志,表明状态会无限增长。

而在 org.apache.flink.table.runtime.aggregate 包下,Flink 定义了名为 CleanupState 的 Scala Trait, 代码如下:

代码语言:javascript复制
trait CleanupState {

  def registerProcessingCleanupTimer(
      cleanupTimeState: ValueState[JLong],  // 上次注册的 Timer 时间戳
      currentTime: Long,                    // 当前时间戳
      minRetentionTime: Long,               // 空闲状态最短保留时间
      maxRetentionTime: Long,               // 空闲状态最长保留时间
      timerService: TimerService): Unit = {

    // 获取本状态上次注册的 Timer 时间戳
    val curCleanupTime = cleanupTimeState.value()

    // 检查是否注册过清理的 Timer, 如果注册过则检查是否还未到期
    if (curCleanupTime == null || (currentTime   minRetentionTime) > curCleanupTime) {
      // 如果没有注册过 Timer, 或者注册过但是还没到期, 那就更新一个新的 Timer
      val cleanupTime = currentTime   maxRetentionTime
      timerService.registerProcessingTimeTimer(cleanupTime)
      // 删除旧的 Timer
      if (curCleanupTime != null) {
        timerService.deleteProcessingTimeTimer(curCleanupTime)
      }
      cleanupTimeState.update(cleanupTime)
    }
  }
}

可以看到,在新版本的 Flink 内部实现中,Timer 的时间戳也是作为一种 ValueState 来保存的,这样可以和其他的 Keyed 状态一起,统一管理。同时也能得到 Flink 刷新时间戳的逻辑。

从 Flink 的实现原理上我们知道,对于 KeyedProcessFunction,都有一个

代码语言:javascript复制
public void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) throws Exception {}

方法,这个方法会被 Flink 的 InternalTimerService 所间接调用,从而当向 timerService 注册的 Timer 到期后,会进入处理逻辑。

为了支持 CleanupState 功能,Flink 还提供了一个名为 ProcessFunctionWithCleanupState 的抽象基类,它实现了 CleanupState 特性(接口),并继承了 KeyedProcessFunction 类,用来处理 GROUP BY 和 OVER 等语句。在这个类中,提供了若干公共的状态定时器的注册和清理方法:

代码语言:javascript复制
abstract class ProcessFunctionWithCleanupState[KEY, IN,OUT](queryConfig: StreamQueryConfig)
  extends KeyedProcessFunction[KEY, IN, OUT]
  with CleanupState {

  protected val minRetentionTime: Long = queryConfig.getMinIdleStateRetentionTime
  protected val maxRetentionTime: Long = queryConfig.getMaxIdleStateRetentionTime
  protected val stateCleaningEnabled: Boolean = minRetentionTime > 1

  // 保存最近注册过的清理 Timer 时间戳
  protected var cleanupTimeState: ValueState[JLong] = _

  // 初始化指定 stateName 的状态清理 Timer 状态, 例如在 GroupAggProcessFunction 的 open() 方法里被调用
  protected def initCleanupTimeState(stateName: String) {
    if (stateCleaningEnabled) {
      val cleanupTimeDescriptor: ValueStateDescriptor[JLong] = new ValueStateDescriptor[JLong](stateName, Types.LONG)
      cleanupTimeState = getRuntimeContext.getState(cleanupTimeDescriptor)
    }
  }
  
  
  // 注册或更新状态清理 Timer, 例如在 GroupAggProcessFunction 的 processElement() 方法里被调用
  protected def processCleanupTimer(
    ctx: KeyedProcessFunction[KEY, IN, OUT]#Context,
    currentTime: Long): Unit = {
    if (stateCleaningEnabled) {
      registerProcessingCleanupTimer(
        cleanupTimeState,
        currentTime,
        minRetentionTime,
        maxRetentionTime,
        ctx.timerService()
      )
    }
  }

  // 判断当前是 Processing Time 还是 Event Time 时间模式, 两者触发方式不同
  // Processing Time 由定时任务触发, 而 Event Time 由 Watermark 步进触发, 而状态清理仅在 Processing Time 触发时启动
  protected def isProcessingTimeTimer(ctx: OnTimerContext): Boolean = {
    ctx.timeDomain() == TimeDomain.PROCESSING_TIME
  }

  // 状态列表清理
  protected def cleanupState(states: State*): Unit = {
    // clear all state
    states.foreach(_.clear())
    this.cleanupTimeState.clear()
  }
}

可以很清晰地看到,Flink 每收到上游传来的一条记录(element),就会更新这个算子所对应的状态。因此如果一个状态持续被读取,那么并不会被标记为空闲,也就不会被清理掉。这点和 State TTL 的 OnReadAndWrite 更新类型很一致(写入和读取时都更新时间戳),而区别于 OnCreateAndWrite 更新类型(只在写入时更新时间戳,而读取时不更新时间戳)。

另外还有一个 CoProcessFunctionWithCleanupState 类,这个类的作用和上述类似,只是为 JOIN 相关的处理逻辑服务,这里不再详细展开。

下面我们以 ProcTimeBoundedRangeOver 类(该类继承了 ProcessFunctionWithCleanupState)的 onTimer 方法为例,讲解 Timer 到期后是如何清理的:

代码语言:javascript复制
override def onTimer(
    timestamp: Long,
    ctx: KeyedProcessFunction[K, CRow, CRow]#OnTimerContext,
    out: Collector[CRow]): Unit = {

    if (stateCleaningEnabled) {
        val cleanupTime = cleanupTimeState.value()
        if (null != cleanupTime && timestamp == cleanupTime) {
            // 清理状态, 然后即可返回(无需执行后续的逻辑)
            cleanupState(rowMapState, accumulatorState)
            function.cleanup()
            return
        }
    }

    // ... 其他处理逻辑 ...
}

可以看到,当 Timer 到期后,onTimer 方法会被 Flink 的 InternalTimerService 调用,随后判断是否启用了状态清理逻辑,如果启用的话,获取要清理的时间戳。如果时间戳吻合,那么调用父类的 cleanupState 方法,执行具体清理逻辑。

对于 Window 而言,Flink 还提供了一个 StateCleaningCountTrigger,它可以对 Tumbling(滚动)窗口的元素做统计并清理过期的行。感兴趣的同学可以自行阅读其实现逻辑,与上述介绍也很类似。

实现优化

Flink 的空闲状态清理 Timer 也有其不足之处,例如状态清理 Timer 本身就是 ValueState 对象,当 Timer 数目过多时,会对内存造成很大的压力,甚至导致作业的提前崩溃。而且针对 Timer 的快照只能是全量、同步的,和其他 Keyed 状态的实现方式不统一,增加了改进的难度。针对这些问题,社区提出了将 Timer 保存到 RocksDB State Backend 的思路并进行了实现。

另外在旧的实现逻辑里,HeapInternalTimerService 的 Timer 的清理时间复杂度是 O(n),当状态数目超多时同样会造成性能影响。通过引入优先级队列(Priority Queue)和 HashSet,可以做到更高效的 Timer 删除操作。

通过我们的使用经验来看,目前 Idle State Retention Time 的实现还不够成熟,有些特殊情况下反而会加重问题。例如它读取时也会更新时间戳,导致如果一个状态持续被读取,而很久未写入,那么仍然不会被清理掉,即使它已经逻辑上过期了,但 Flink 并不知道。但是瑕不掩瑜,通过与 State TTL 功能配合使用,可以对大状态下任务的崩溃起到很好的预防效果。

参考文章

[FLINK-9485] Improving Flink’s timer management for large state

[PROPOSAL] Improving Flink’s timer management for large state.

0 人点赞