flink时间系统系列之Processing Time源码分析

2022-04-18 11:18:57 浏览数 (1)

flink 中Processing Time也就是处理时间在watermark定时生成、ProcessFunction中定时器与时间类型的窗口中都有使用,但是其内部是如何实现注册定时器、如何调用、如何容错保证在任务挂掉在下次重启仍然能够触发任务执行,都是我们今天的主题。首先需要了解一下在flink内部时间系统是由哪些类来共同完成这件事,下面画了一个简易的类关系图:

AbstractStreamOperator: flink runtime 的核心operator, 包含了一个operator生命周期所有的执行方法(后面做单独介绍),其包含一个InternalTimeServiceManager的对象,在initializeState完成初始化;

InternalTimeServiceManager:flink 内部提供时间服务的manager, getInternalTimerService负责注册一个时间服务类型InternalTimeService、advanceWatermark在eventTime中使用(下节分享),snapshotStateForKeyGroup对InternalTimeService进行checkpoint,restoreStateForKeyGroup任务重启恢复InternalTimeService;

InternalTimerServiceImpl:负责具体的时间服务操作,包含KeyGroupedInternalPriorityQueue属性,是一种flink自身实现的优先级队列,存储的数据是TimerHeapInternalTimer类型,包含三个属性key/namespace/timestamp,在优先级队列中按照timestamp进行排序,InternalTimerServiceImpl的startTimerService方法主要用于时间状态恢复初始化,registerProcessingTimeTimer注册处理时间定时器,registerEventTimeTimer注册事件时间定时器,onProcessingTime方法是实现ProcessingTimeCallback接口的回调方法,用于处理时间触发执行方法,另外还有两个对应的delete删除注册的定时器方法,advanceWatermark方法同样用于事件时间中,snapshotTimersForKeyGroup对该timerservice进行checkpoint, restoreTimersForKeyGroup时间状态恢复;

ProcessingTimeService : 负责processing time 的处理类,实现类是SystemProcessingTimeService,包含registerTimer的方法,用于注册一个具体的定时器;

TimerHeapInternalTimer:实现了InternalTimer接口,为什么要这个单独讲一下,由于它flink 注册定时器的基本注册对象,所有需要注册定时器的最后基本都会转换成为该对象然后进行注册,在这里我们需要理解一点,在flink内部确定一个具体的状态的具体数据需要key/namespce, 第一个具体代表的是operator/statedesc,由于TimerHeapInternalTimer需要容错所以同样包含key/namespace,从另外一个角度也说明在一个operator中如果我们多次注册同一个key相同的时间,达到的效果是一样,只会触发一次(默认在flink内部namespce是相同的),同样也说明了注册的定时器必须是在keyedStream中。

以上就是flink内部时间所涉及的核心类,那么接下来具体看一下注册一个processing time 将会发生哪些方法的调用:

registerProcessingTimeTimer方法接受两个参数:namespace与time, namespace在普通的keyedStream 中namespace表示VoidNamespace, 在WindowedStream中namespace表示的是Window对象,time表示的是注册的触发时间,在这个方法里面主要做两件事情:

一、将namespace与time转换为InternalTimer存入KeyGroupedInternalPriorityQueue优先级队列中,其中key从当前的KeyContext中获取,如果该Queue中包含相同的key/namspace/time,将不会被添加进去并且不会执行下面调用

二、调用SystemProcessingTimeService.registerTimer方法,传入具体的时间参数time,在registerTimer方法中使用ScheduledThreadPoolExecutor提交一个定时执行的方法,定时执行对象是实现Runnable接口的TriggerTask,那么当达到执行时间就会执行里面的run方法。

processing time 触发调用逻辑:

TriggerTask就是上面提到的实现Runnable接口的类,在run方法里面会调用

ProcessingTimeCallback.onProcessingTime方法,ProcessingTimeCallback就是在InternalTimerServiceImpl.registerProcessingTimeTimer 调用是传入进来的,传入的是当前对象this, 也就是会调用InternalTimerServiceImpl.onProcessingTime方法,在该方法中会循环遍历KeyGroupedInternalPriorityQueue这个优先级队列,如果获取到的时间小于调用的触发时间,那么就会执行Triggerable.onProcessingTime方法,Triggerable表示具体定时操作接口,例如WindowOperator/KeyedProcessOperator 都实现了该接口。

注册的定时数据都存储在KeyGroupedInternalPriorityQueue这个优先级队列中,也就是内存中,如果任务出现问题挂掉了,那么内存数据就会丢失,所以需要对其进行备份,备份入口是InternalTimeServiceManager.snapshotStateForKeyGroup 会将其Map<String, InternalTimerServiceImpl<K, ?>> 做checkpoint,从而对InternalTimerServiceImpl<K, ?>中具体的队列做checkpoint,相反数据恢复调用InternalTimeServiceManager.restoreStateForKeyGroup 方法恢复InternalTimerServiceImpl。对于processing time任务恢复重启有一个重要的方法InternalTimerServiceImpl.startTimerService 在获取InternalTimerServiceImpl时会被调用,里面的有一个判断逻辑processingTimeTimersQueue获取数据,如果不为空,那么就调用processingTimeService.registerTimer 重新注册定时器。

以上关于flink 时间系统与Processing Time的源码分析逻辑,最好还是对照源码多看几遍。

0 人点赞