一、时间语义
Flink在流处理中提供了不同的时间语义支持,其中有两种核心的时间语义:ProcessingTime与EventTime。
ProcessingTime表示的是处理时间,在处理时间流处理中,所有涉及的时间计算都是以本地机器的时间为准,例如每5分钟的一个时间窗口操作,0-5分钟的窗口触发需要满足本地机器到达5分钟后然后触发窗口函数操作这5分钟内的数据。由于操作的数据是这段时间到达的数据,所以其处理的数据量容易受上游处理的速度影响,处理的快汇聚的数据就多,所以其结果不具有确定性,特别是在流重放的情况下;
EventTime表示的是事件时间,也就是数据本身含有的时间属性,例如点击页面事件的点击时间,那么在事件时间的流处理中,事件时间就是表示当前的时间进度,而不是本地机器时间。在分配时间窗口的数据时会按照数据的时间属性来分配,由于数据时间属性不会改变,因此在数据重放过程中,分配在窗口的数据也不会改变,因此得到一个比较确定的结果,其目标也就是尽可能的还原数据场景。
二、EventTime与Watermark
在ProcessingTime流处理中时间窗口的触发是当本地机器时间达到窗口结束时自动触发,而在EventTime流处理中时间窗口的触发是当前Watermark大于等于窗口的结束时间时触发。
何为Watermark?Watermark也称之为水位,是用来衡量在EventTime语义的流处理中时间进度,也就是当前流处理达到的时间点,其本质上就是一个时间戳,系统会认为在水位以下(事件时间小于watermark值)的数据已经全部到达。
事件中已经包含时间属性,为什么还需要Watermark? 在流处理中不同节点处理数据的速度不一致,那么就会导致下游节点获取到的数据在时间上是一个乱序的数据序列,我们希望程序能够处理所属窗口时间范围内的数据,然而数据乱序会导致数据延时到达,那么在程序处理中需要等待延时数据的到达,但是程序不可能无限制等待,所以引入Watermark机制,使用Watermark来判断是否应该触发窗口函数。
Watermark如何产生? 第一种定时产生,需要依托事件时间属性,也就是从事件中提取得到的,但是由于数据乱序,需要设置允许的延时时间,例如事件时间是10,允许的延时时间是2,那么此时得到的watermark值就是8;第二种遇到特定事件时产生watermark, 特定事件由用户指定,当在流处理中遇到一条特殊标记则产生watermark。在实际使用中第一种更为常见。
三、Watermark在流中的流转
在Flink中认为Watermark是一种特殊的数据,会随着正常数据在任务中一起向下流动,它永远不会超越业务数据位置。Watermark每到达一个处理节点都会生成一个新的Watermark向下流动,由于在任务中可能会需要对数据进行重分布,例如keyBy操作,会导致某个Task输入是由上游多个Task的输出,因此Flink对watermark的流转也制定了特定的规则:
- 单输入取最大值
- 多输入取最小值
单输入指的是Task的数据流入是由上游一个Task的输出,例如在dataStream.map.filter 操作流中并且其并行度一致,那么filter的每一个task都是与map中每一个Task一一对应,因此Watermark采用forward形式向下流动,并且是单调递增的,即下一次发送的Watermark只能比上一次的值大;
多输入指的是Task的数据流入是上游多个Task的输出,例如dataStream.map.keyBy.window 操作流中,keyBy操作会导致数据根据key流入下游特定的task中,那么对于一个window task的流入就是由上游多个Task输出,对于这种情况Watermark采用broadcast形式向下流动即上游每一个map task会给下游所有的window task发送Watermark, 由于每一个map task的处理速度不一样,那么就会导致window task收到watermark也是在不同的时刻,对于这种情况Flink是如何做到取最小值呢?
在Window Task会做一个InputChannelStatus数组的初始化,该数组对应上游operator tasks,图例中数组大小为2,数组里面每一个InputChannelStatus都会有一个初始化为Long.MIN_VALUE的watermark值,也就是有可能在监控页面看到的-9223372036854775808值,另外还包含StreamStatus(表示流状态的)与Boolean类型的isWatermarkAligned,isWatermarkAligned表示的是否对齐与StreamStatus有关,如果流的状态是非激活状态的在后续watermark取值上忽略对应InputChannelStatus,在这里我们只关心watermark大小取值,忽略其他因素影响。除了InputChannelStatus数组还有一个lastOutputWatermark,表示最近发送的watermark值,也可以理解为window task生成的最新的watermark值,初始值为Long.MIN_VALUE
看下其处理流程: window1收到的watermark值的顺序是:w10(map0)->w9(map1)->w12(map0)->w10(map1) a. w10(map0), w10大于InputChannelStatus[0]的watermakr值,将w10覆盖其值,然后从InputChannelStatus数组里面找到最小的watermark,此时最小的watermark是InputChannelStatus[1]的Long.MIN_VALUE值,判断得到的最小值不大于lastOutputWatermark,不会产生新的watermark;
b. w9(map1),w9大于InputChannelStatus[1]的watermakr值,将w9覆盖其值,然后从InputChannelStatus数组里面找到最小的watermark,此时最小的watermark是InputChannelStatus[1]的9,大于值为Long.MIN_VALUE的lastOutputWatermark,那么产生新的watermark 9;
c. w12(map0),w12大于InputChannelStatus[0]的watermakr值,将w12覆盖其值,然后从InputChannelStatus数组里面找到最小的watermark,此时最小的watermark是InputChannelStatus[1]的9,不大于值为9的lastOutputWatermark,不会产生新的watermark;
d. w10(map1),w10大于InputChannelStatus[1]的watermakr值,将w10覆盖其值,然后从InputChannelStatus数组里面找到最小的watermark,此时最小的watermark是InputChannelStatus[1]的10,大于值为9的lastOutputWatermark,会产生新的watermark 10;
四、Watermark使用
Watermark分配方式:
- 在source端分配,通过在SourceFunction.run方法中调用SourceContext的collectWithTimestamp 发送一条带有时间属性的数据,调用SourceContext的emitWatermark发送一条Watermark数据,至于什么时候调用由用户自行决定,也就是说需要用户自定义实现SourceFunction接口;
- 通过assignTimestampsAndWatermarks方式分配,可以在流处理任意位置指定,同样有两种方式AssignerWithPeriodicWatermarks与AssignerWithPunctuatedWatermarks,AssignerWithPunctuatedWatermarks表示由用户指定根据事件生成;AssignerWithPeriodicWatermarks表示周期性的生成方式,在这种方式也是常用的使用方式,通过使用 BoundedOutOfOrdernessTimestampExtractor指定最大延时与事件时间提取;
这两种方式有各自优缺点,在source端指定可以更早的去处理乱序数据,通过assignTimestampsAndWatermarks可以在过滤无效数据之后来指定,以免无效数据对watermark造成影响。
Watermark 触发动作:
- 会循环遍历事件时间的优先级队列,如果取出来的时间小于Watermark则触发相应的动作,例如窗口函数操作或者用户注册的事件时间定时器
- 在ProcessFunction可获取到到当前的Watermark值,可根据Watermakr来做一些处理。