一、时间类型
- 事件时间:指时间发生的时间,一旦确定之后再也不会改变。
- 处理时间:指消息被计算引擎处理的时间,以各个计算节点的本地时间为准。
- 摄取时间:指事件进去流处理系统的时间,对于一个事件来说,使用其被读取的那一刻时间戳。
二、窗口
2.1 窗口类型
- Count Window(计算窗口)
1)Tumble Count Window:累积固定个数的元素就视为一个窗口,该类型的窗口无法像时间窗口一样事先切分好。
2)Sliding Count Window:累积固定个数的元素视为一个窗口,每超过一定个数的元素,则产生一个新的窗口。
- Time Window(时间窗口)
1)Tumble Time Window:表示在时间上按照事先约定的窗口大小切分的窗口,窗口之间不会相互重叠。
关键属性有两个:
1.Offset:窗口的起始时间。
2.Size:窗口的长度。
2)Sliding Time Window:表示在时间上按照事先约定的窗口大小、滑动步长切分的窗口,滑动窗口之间可能存在相互重叠的情况。
关键属性有三个:
1.Offset:窗口的起始时间。
2.Size:窗口的长度。
3.Slide:滑动距离。
- Session Window(会话窗口)
是一种特殊的窗口,当超过一段时间,该窗口没有收到新的数据元素,则视为该窗口结束,所以无法事先确定窗口的长度、元素个数,窗口之间也不会相互重叠。
Session Window的4种实现:
1)ProcessingTimeSessionWindows:处理时间会话窗口,使用固定会话间隔时长。
2)DynamicProcessingTimeSessionWindows:处理时间会话窗口,使用自定义会话间隔时长。
3)EventTimeSessionWindows:事件时间会话窗口,使用固定会话间隔时长。
4)DynamicEventTimeSessionWindows:事件时间会话窗口,使用自定义会话间隔时长。
会话窗口不同于事件窗口,它的切分依赖于事件的行为,而不是时间序列,所以在很多情况下会因为事件乱序使得原本相互独立的窗口因为新事件的到来导致窗口重叠,而必须要进行窗口的合并。
窗口合并涉及3个要素:
1)窗口对象合并和清理
2)窗口State的合并和清理
3)窗口触发器的合并和清理
会话窗口合并逻辑图:
2.2 窗口原理与机制
窗口算子负责处理窗口,数据流源源不断进入算子,每一个数据元素进入算子时,首先会被交给WindowAssigner。WindowAssigner决定元素被放到哪个或哪些窗口,在这个过程中可能会创建新窗口或者合并旧的窗口。在Window Operator中可能同时存在多个窗口,一个元素可以被放入多个窗口中。
数据进入窗口时,分配窗口和计算的逻辑图如下:
2.3 WindowAssigner
WindowAssigner用来决定某个元素被分配到哪个/哪些窗口中去。SessionWindowAssigner比较特殊,因为Session Window无法事先确定窗口的范围,是动态改变的。
2.4 WindowTrigger
Trigger触发器决定了一个窗口何时能够被计算或清除,每一个窗口都拥有一个属于自己的Trigger,Trigger上会有定时器,用来决定一个窗口何时能够被计算或清除。每当有元素加入该窗口,或者之前注册的定时器超时时,Trigger都会被调用。
Trigger触发的结果如下:
1)Continue:继续,不做任务操作。
2)Fire:触发计算,处理窗口数据。
3)Purge:触发清理,移除窗口和窗口中的数据。
4)Fire Purge:触发计算 清理,处理数据并移除窗口和窗口中的数据。
当数据到来时,调用Trigger判断是否需要触发计算,如果调用结果只是Fire,则计算窗口并保留窗口原样,窗口中的数据不清理,数据保持不变,等待下次触发计算的时候再次执行计算。窗口中的数据会被反复计算,直到触发结果清理。在清理之前,窗口和数据不会释放,所以窗口会一直占用内存。
2.5 WindowEvictor
Evictor 可以理解为窗口数据的过滤器,Evictor可在Window Function执行前或后,从Window中过滤元素。Flink内置了3种窗口数据过滤器。
- CountEvictor:计数过滤器。在Window中保留指定数量的元素,并从窗口头部开始丢弃其余元素。
- DeltaEvictor:阈值过滤器。本质上来说是一个自定义规则,计算窗口中每个数据记录,然后与一个事先定义好的阈值做比较,丢弃超过阈值的数据记录。
- TimeEvictor:时间过滤器。保留Window中最近一段时间内的元素,并丢弃其余元素。
2.6 Window函数
- 增量计算函数
增量计算指的是窗口保存一份中间数据,每流入一个新元素,新元素都会与中间数据合并,生成新的中间数据,在保存到窗口中。如ReduceFunction、AggregateFunction、FoldFnction。
- 全量计算函数
全量计算函数指的是先缓存该窗口的所有元素,等到触发条件后对窗口内的所有元素执行计算。如ProcessWindowFunction。
三、Watermark
3.1 DataStream Watermark生成
- Source Function中生成Watermark
SourceFunction 可以直接为数据元素分配时间戳,同时也会向下游发送Watermark。需要注意的是:如果一个timestamp 分配器被使用的话,由源提供的任何Timestramp和Watermark都会被重写。
为了通过SourceFunction直接为一个元素分配一个时间戳,SourceFunction需要调用SourceContext中的collectWithTimestamp(T element, long timestamp)方法。为了生成Watermark,源需要调用emitWatermark(Watermark)方法。
- DataStreamApi 中生成Watermark
DataStreamApi中使用的TimestampAssigner接口定义了时间戳的提取行为,其有两个不同接口AssignerWithPeriodicWatermarks和AssingerWithPunctuatedWatermarks,分别代表了不同的Watermark生成策略。
1)AssignerWithPeriodicWatermarks是周期性生成Watermark策略的顶层抽象接口,该接口的实现类周期性地生成watermark,而不会针对每一个事件都生成。
2)AssignerWithPunctuatedWatermarks对每一个事件都会尝试进行Watermark的生成,但是如果生成的Watermark是null或者Watermark小于之前的Watermark,则该Watermark不会发往下游,因为发往下游也不会有任何效果,不会触发任何窗口的执行。
3.2 FlinkSQL Watermark生成
其Watermark的生成主要是在TableSource中完成的,其定义了3类Watermark生成策略。
- 周期性Watermark策略
PeriodicWatermarkAssigner周期性(一定时间间隔或者达到一定的记录条数)地产生一个Watermark。生产使用时,一定注意时间和数据量,结合时间和积累条数两个维度继续周期性产生Watermark,否则在极端情况下会有很大延迟。
1)AscendingTimestamps:递增Watermark,作用在Flink SQL中的Rowtime属性上,Watermark = 当前收到的数据元素的最大时间戳 -1,此处减1的目的是确保有最大时间戳的时间不会被当做迟到的数据丢弃。
2)BoundedOutOfOrderTimestamps:固定延迟Watermark,作用在Flink SQL的Rowtime属性上,Watermark = 当前收到的数据元素的最大时间戳-固定延迟。
- 每个事件Watermark策略
PuntuatedWatermarkAssigner,数据流中每一个递增的EventTime都会产生一个Watermark。在生产中,除非实时性非常高的场景下才会选择Puntuated的方式进行Watermark生成。
- 无为策略
PreserveWatermark,可以使用DataStream Api和Table & SQL混合编程,所以FlinkSQL中不设定Watermark策略。
3.3 多流的Watermark
Flink内部实现每一个边上只能有一个递增的Watermark,当出现多流携带EventTime汇聚到一起(GroupBy或Union)时,Flink会选择所有流入的EventTime中最小的一个向下游流出,从而保证Watermark的单调递增和数据的完整性。
Flink作业一般是并行执行的,作业包含多个Task,每个Task运行一个或一组算子(operator chain) 实例,Task在生成Watermark的时候是相互独立的,也就是说在作业中存在多个并行的Watermark。多流输入会被分解成多个双流输入,对于多个双流Watermark处理,无论哪一个流的Watermark进入算子,都需要跟另一个流的当前算子进行比较,选择较小的的Watermark,即Min(input1 Watermark,input2Watermark),与算子当前的Watermark比较,如果大于当前算子的Watermark,则更新算子的Watermark为新的Watermark,并发送下游。
AbstractStreamOperator.java
3.4 时间服务
- 定时器服务器
定时器服务器在Flink中叫作TimeService,窗口算子(WindowOperator)中使用了InternalTimerService来管理定时器(Timer),其初始化是在WindowOperator#open()内实现的。
InternaleTimerService有几个元素比较重要:名称、命名空间类型N(及其序列化器)、键类型K(及其序列化器)和Triggerable对象(支持延时计算的算子,继承了Triggerable接口来实现回调)
一个算子中可以有多个InternalTimeService,通过名称进行区分:
1)WindowOperator:名称为 "window-timers"
2)KeyedProcessOperator:名称为"user-timers"
3)CepOperator:名称为"watermark-callbacks"
InternalTimerService接口实现类是InternalTimerServiceImpl,Timer的实现类是InternalTimer。InternalTimeServiceImpl使用了两个TimerHeapInternalTimer的优先队列(HeapPriorityQueueSet,该优先队列是Flink自己实现的),分别用于维护事件时间和处理时间的Timer。
InternalTimeServiceManager是Task级别提供的InternalService集中管理器。其使用Map保存了当前所有的InterTimerService,Map的key是InternalTimerService的名字。
- 定时器
定时器在Flink中叫作Timer。窗口的触发器与定时器是紧密联系的。
Flink的定时器使用InternalTimer接口定义行为。
Timer到底是如何触发然后回调用户逻辑的呢?
在InternalTimerServiceImpl中寻找答案,对于事件时间,会根据Watermark的时间,从事件时间的定时器队列中找到比给定时间小的所有定时器 ,触发该Timer所在的算子,然后由算子去调用UDF中的onTime()方法。处理时间也是类似的逻辑,区别在于,处理时间是从处理时间Timer优先队列中找到Timer。处理时间依赖于当前系统是,所以使用的周期性调度。
- 优先队列
Flink在优先级队列中使用了KeyGroup,是按照KeyGroup去重的,并不是按照全局的Key去重。
Flink自己实现了优先级队列来管理Timer:
1)基于堆内存的优先级队列HeapPriorityQueueSet:基于Java堆内存的优先级队列,其实现思路与Java的PriorityQueue类型,使用了二叉树。
2)基于RocksDB的优先级队列:分为Cache RocksDB量级,Cache中保存了前N个元素,其余的保存在RocksDB中。写入的时候采用Write-through策略,即写入Cache的同时要更新RocksDB中的数据,可能需要访问磁盘。
接下来Flink类型与序列化篇,如果对Flink感兴趣或者正在使用的小伙伴,可以加我入群一起探讨学习。