Flink之水位线

2023-10-20 08:10:19 浏览数 (2)

flink时间语义 1、Event Time:事件创建时间; 2、Ingestion Time:数据进入Flink的时间; 3、Processing Time:执行操作算子的本地系统时间,与机器相关;

flink 1.12之前版本默认使用的是Processing Time,后面的版本考虑事件时间更通过就默认使用Event Time 所以系统时间一到就会输出,而如果是watermark使用的是event time所以要等下一条数据到来,然后判断时间是否大于窗口时间才输出

Event Time是事件创建的时间。它通常由事件中的时间戳描述,例如采集的日志数据中,每一条日志都会记录自己的生成时间,

Watermark Watermark是一种衡量Event Time进展的机制。 Watermark是用于处理乱序事件的,而正确的处理乱序事件,通常用Watermark机制结合window来实现。 数据流中的Watermark用于表示timestamp小于Watermark的数据,都已经到达了,因此,window的执行也是由Watermark触发的。 Watermark可以理解成一个延迟触发机制,我们可以设置Watermark的延时时长t,每次系统会校验已经到达的数据中最大的maxEventTime,然后认定eventTime小于maxEventTime - t的所有数据都已经到达,如果有窗口的停止时间等于maxEventTime – t,那么这个窗口被触发执行。

水位线的时间戳必须单调递增 水位线是基于数据的时间戳生成的

通过dataStream.assignTimestampsAndWatermarks(WatermarkStrategy)来指定水位线

代码语言:javascript复制
没有延迟
WatermarkStrategy.forMonotonousTimestamps()
WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(0))
设置延迟5s 注意事件时间必须是ms单位的 所以这里*1000
dataStream.addSource(new ClickSource()).assignTimestampsAndWatermarks(WatermarkStrategy.<SensorReadingDTO>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner(new SerializableTimestampAssigner<SensorReadingDTO>() {
// 抽取时间戳的逻辑
@Override
public long extractTimestamp(SensorReadingDTO element, long
recordTimestamp) {
return element.getTimestamp() * 1000;
}
}));

这里需要注意的是,乱序流中生成的水位线真正的时间戳,其实是当前最大事件时间戳 – 延迟时间 – 1,这里的单位是毫秒。 为什么要减 1 毫秒呢?我们可以回想一下水位线的特点:时间戳为 t 的水位线,表示时间戳≤t 的数据全部到齐,不会再来了。 如果考虑有序流,也就是延迟时间为 0 的情况,那么时间戳为 7 秒的数据到来时,之后其实是还有可能继续来 7 秒的数据的; 所以生成的水位线不是 7 秒,而是 6 秒 999 毫秒,7 秒的数据还可以继续来。

周期性的生成 watermark:系统会周期性的将 watermark 插入到流中 默认周期是200毫秒,可以使用 ExecutionConfig.setAutoWatermarkInterval() 方法进行设置 一般大数据场景都是考虑高并发情况,所以一般使用周期性生成Watermark的方式,避免频繁地生成Watermark。

我们可以梳理一下事件时间语义下,之前例子中窗口的处理过程: (1)第一个数据时间戳为 2,判断之后创建第一个窗口[0, 10),并将 2 秒数据保存进去; (2)后续数据依次到来,时间戳均在 [0, 10)范围内,所以全部保存进第一个窗口; (3)11 秒数据到来,判断它不属于[0, 10)窗口,所以创建第二个窗口[10, 20),并将 11秒的数据保存进去。 由于水位线设置延迟时间为 2 秒,所以现在的时钟是 9 秒,第一个窗口也没有到关闭时间; (4)之后又有 9 秒数据到来,同样进入[0, 10)窗口中; (5)12 秒数据到来,判断属于[10, 20)窗口,保存进去。这时产生的水位线推进到了 10秒,所以 [0, 10)窗口应该关闭了。 第一个窗口收集到了所有的 7 个数据,进行处理计算后输出结果,并将窗口关闭销毁; (6)同样的,之后的数据依次进入第二个窗口,遇到 20 秒的数据时会创建第三个窗口[20, 30)并将数据保存进去; 遇到 22 秒数据时,水位线达到了 20 秒,第二个窗口触发计算,输出结果并关闭。

Flink对于迟到数据有三层保障,先来后到的保障顺序是: 1、 WaterMark => 约等于放宽窗口标准 2、 allowedLateness => 允许迟到(ProcessingTime超时,但是EventTime没超时) 3、 sideOutputLateData => 超过迟到时间,另外捕获,之后可以自己批处理合并先前的数据

allowedLateness 默认情况下,当watermark通过end-of-window之后,再有之前的数据到达时,这些数据会被删除。

为了避免有些迟到的数据被删除,因此产生了allowedLateness的概念。

简单来讲,allowedLateness就是针对event time而言,对于watermark超过end-of-window之后, 还允许有一段时间(也是以event time来衡量)来等待之前的数据到达,以便再次处理这些数据

默认情况下,如果不指定allowedLateness,其值是0,即对于watermark超过end-of-window之后,还有此window的数据到达时,这些数据被删除掉了。

注意:对于trigger是默认的EventTimeTrigger的情况下,allowedLateness会再次触发窗口的计算,而之前触发的数据, 会buffer起来,直到watermark超过end-of-window allowedLateness()的时间,窗口的数据及元数据信息才会被删除。 再次计算就是DataFlow模型中的Accumulating(积累)的情况。

0 人点赞