一、背景
在电商商品购买过程中有这样一些场景:用户点击下单,此时订单处于待支付状态,如果在2小时之后还处于待支付状态那么就将这笔订单取消,置为取消状态;用户收货之后可以对商品进行评价,如果在24小时内仍然没有评价,那么自动将用户对商品的评分设置为5星….等等,这样的场景都可以称之为延时处理场景,当数据发送出去了,不立刻进行处理,而是等待一段时间之后在处理,目前对于延时处理的方案也有很多,例如:
- java中DelayQueue 内部使用优先级队列方式存储消息体,存放的消息体实现Dealy接口,然后使用一个线程不断消费队列数据。
- redis中SortedSet 借助Redis的SortedSet数据结构,使用时间作为排序的方式,外部使用一个线程不断轮询该SortedSet。
- 定时扫描数据库 将延时触发的任务信息存储在数据库中,然后使用线程去轮序查询符合要求触发的定时任务。 ……
在流处理中也经常会有一些定时触发的场景,例如定时监控报警等,并且时间窗口的触发也是通过延时调用触发,接下来了解flink中是如何实现延时处理。
二、Flink中延时调用
在flink实时处理中,涉及到延时处理可使用KeyedProcessFunction来完成,KeyedProcessFunction是flink提供面向用户的low level api,可以访问状态、当前的watermark或者当前的processingtime, 更重要的是提供了注册定时器的功能,分为:
- 注册处理时间定时器,直到系统的processingTime超过了注册的时间就会触发定时任务
- 注册事件时间定时器,直到watermark值超过了注册的时间就会触发定时任务 另外也可以删除已经注册的定时器。
看一个实际案例:服务器下线监控报警,服务器上下线都会发送一条消息,如果发送的是下线消息,在之后的5min内没有收到上线消息则循环发出警告,直到上线取消告警。 实现思路: 1.由于根据服务器不在线时间来告警,应该使用ProcessingTime语义 2.首先将服务器信息按照serverId分组,然后使用一个继承KeyedProcessFunction的类的Function接受处理,定义两个ValueState分别存储触发时间与服务器信息,
- open方法,初始化状态信息
- processElement方法,处理每条流入的数据,如果收到的是offline状态,则注册一个ProcessingTime的定时器,并且将服务器信息与定时时间存储状态中;如果收到的是online状态并且状态中定时时间不为-1,则删除定时器并将状态时间置为-1
- onTimer方法,定时回调的方法,触发报警并且注册下一个定时告警 代码实现如下:
case class ServerMsg(serverId: String, isOnline: Boolean, timestamp: Long)
class MonitorKeyedProcessFunction extends KeyedProcessFunction[String, ServerMsg, String] {
private var timeState: ValueState[Long] = _
private var serverState: ValueState[String] = _
override def open(parameters: Configuration): Unit = {
timeState = getRuntimeContext.getState(new ValueStateDescriptor[Long]("time-state", TypeInformation.of(classOf[Long])))
serverState = getRuntimeContext.getState(new ValueStateDescriptor[String]("server-state", TypeInformation.of(classOf[String])))
}
override def processElement(value: ServerMsg, ctx: KeyedProcessFunction[String, ServerMsg, String]#Context, out: Collector[String]): Unit = {
if (!value.isOnline) {
val monitorTime = ctx.timerService().currentProcessingTime() 300000
timeState.update(monitorTime)
serverState.update(value.serverId)
ctx.timerService().registerProcessingTimeTimer(monitorTime)
}
if (value.isOnline && -1 != timeState.value()) {
ctx.timerService().deleteProcessingTimeTimer(timeState.value())
timeState.update(-1)
}
}
override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[String, ServerMsg, String]#OnTimerContext, out: Collector[String]): Unit = {
if (timestamp == timeState.value()) {
val newMonitorTime = timestamp 300000
timeState.update(newMonitorTime)
ctx.timerService().registerProcessingTimeTimer(newMonitorTime)
println("告警:" serverState.value() " is offline, please restart")
}
}
}
注意点:只能在KeyedStream中注册定时器。
三、Flink延时设计原理
上图表示flink延时调用的总体流程,其设计也是借助于优先级队列来完成,队列中存储的数据结构如下:
- Key 表示KeyedStream中提取的Key
- Namespace 表示命名空间,在普通的KeyedStream中是固定的VoidNamespace,在WindowedStream表示的是Window
- Timestamp表示触发的时间戳,long类型 优先级队列使用其Timestamp升序排序,每一次的定时触发都是与固定的key与时间戳绑定,即使注册多次相同的key与时间戳,但是也只会触发一次。 注册
- ProcessingTime类型注册使用registerProcessingTimeTimer,传入的是一个触发的时间戳,内部会将获取到当前的Key、VoidNamespace 、timestamp封装成为一个InternalTimer对象存入优先级队列中,并且会使用ScheduledThreadPoolExecutor注册一个触发时间与当前时间差值大小的延时调用;
- EventTime类型注册使用registerEventTimeTimer,与ProcessingTime类型注册不同的是不需要做延时调用,并且二者使用的是不同的队列 触发
- ProcessingTime类型的定时触发由注册的时候的延时调度触发,会不断遍历优先级队列触发任务,直到获取到InternalTimer对象中的时间小于延时调度时间;
- EventTime类型的定时器触发是由Watermark决定的,同样会不断遍历优先级队列触发任务,直到获取到InternalTimer对象中的时间小于Watermark值; 持久化与恢复 为了保证任务重启仍然能够执行未完成的延时调用,flink会在checkpoint过程中将优先级队列中的数据一起持久化到hdfs上,待下次任务重启仍然能够获取到这部分数据。由于EventTime类型定时器是由Watermark,那么只要任务产生watermark就能正常触发恢复的定时任务,但是ProcessingTime类型的定时器是由系统注册的延时调度来触发,所以在重启的时候获取到队列中第一个元素来注册延时调度,保证其恢复之后的正常触发。 四、使用注意
- 优先级队列默认使用的是内存存储,在一些数据量比较大并且重度依赖定时触发的任务会占用比较大的内存,可以选择Rocksdb存储定时信息
- 由于flink中数据的处理涉及到key的切换,并且状态与key绑定,flink为了保证定时触发操作(onTimer)与正常处理(processElement)操作的线程安全,做了同步处理,在调用触发时必须要获取到锁,也就是二者同时只能有一个执行,因此一定要保证onTimer处理的速度,以免任务发生阻塞。
end