flink在event time处理模式下的watermarks分析。
概念先行
- stream processor(event time)需要一种方法来衡量事件时间的进度。 例如当使用一小时时间窗口处理数据时,窗口时间结束时需要通知window operator(one hour operator)关闭正在运行的窗口,是否可以关闭运行的窗口,是由watermark和当前event time决定的。
- flink衡量event time进度的方式就是watermarks,watermarks是datastream的一部分,总会带有一个时间戳t。Watermark(t)表明event time已经到达了该数据流中的t时间点,流中后续不会再出现带有
t’<t
的元素。
下图是一个使用逻辑时间轴的steam,图下面是watermark数据。图中的events是按时间升序的,这样的stream中的watermark只是流中的周期性标记。
下面这个例子中的流是无序的,水印对于这种无序流是非常重要的。下图中的事件没有按事件排序。watermark可以理解为stream中的一点:
- 所有时间戳比这个点小的事件都已经到达了
- 换句话说,watermark(t)后面不会再出现比t小的事件
当operator读取到watermark会把内部的event time时钟调整到watermark的时间点
实例分析
code
代码语言:javascript复制package com.f3.training;
import com.f3.datatypes.ConnectedCarEvent;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.TimerService;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.util.Collector;
import org.joda.time.DateTime;
import javax.xml.crypto.Data;
import java.util.PriorityQueue;
public class Lab3CarEventSort {
public static void main(String[] args) throws Exception {
ParameterTool params = ParameterTool.fromArgs(args);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(1);
DataStream<String> carData = env.readTextFile(TrainingBase.pathToCarOutOfOrder);
DataStream<ConnectedCarEvent> events = carData
.map((MapFunction<String, ConnectedCarEvent>) ConnectedCarEvent::fromString)
.assignTimestampsAndWatermarks(new ConnectedCarAssigner());
//events.print();
events.keyBy((ConnectedCarEvent event) -> event.carId)
.process(new SortFunction())
.print();
env.execute();
}
public static class ConnectedCarAssigner implements AssignerWithPunctuatedWatermarks<ConnectedCarEvent> {
@Override
public long extractTimestamp(ConnectedCarEvent event, long previousElementTimestamp) {
return event.timestamp;
}
@Override
public Watermark checkAndGetNextWatermark(ConnectedCarEvent event, long extractedTimestamp) {
// simply emit a watermark with every event
return new Watermark(extractedTimestamp - 30000);
}
}
public static class SortFunction extends KeyedProcessFunction<String, ConnectedCarEvent, ConnectedCarEvent> {
private ValueState<PriorityQueue<ConnectedCarEvent>> queueState = null;
@Override
public void open(Configuration config) {
ValueStateDescriptor<PriorityQueue<ConnectedCarEvent>> descriptor = new ValueStateDescriptor<>(
"sorted-events",
TypeInformation.of(new TypeHint<PriorityQueue<ConnectedCarEvent>>() {}));
queueState = getRuntimeContext().getState(descriptor);
}
@Override
public void processElement(ConnectedCarEvent event, Context context, Collector<ConnectedCarEvent> out) throws Exception {
TimerService timerService = context.timerService();
// [1] ts - currentWatermark > 0
// [2] pre_ts - 30s = currentWatermark
// [1-2] ts - (pre_ts - 30s) > 0
// pre_ts - ts < 30
// if true: Ordered within error tolerance
if (context.timestamp() > timerService.currentWatermark()) {
PriorityQueue<ConnectedCarEvent> queue = queueState.value();
if (queue == null) {
queue = new PriorityQueue<>(10);
}
queue.add(event);
queueState.update(queue);
timerService.registerEventTimeTimer(event.timestamp);
}
}
@Override
public void onTimer(long timestamp, OnTimerContext context, Collector<ConnectedCarEvent> out) throws Exception {
PriorityQueue<ConnectedCarEvent> queue = queueState.value();
Long watermark = context.timerService().currentWatermark();
ConnectedCarEvent head = queue.peek();
while (head != null && head.timestamp <= watermark) {
out.collect(head);
queue.remove(head);
head = queue.peek();
}
}
}
}
分析
代码语言:javascript复制/**
* ----------> 重要排序实例 <----------
*
* id : String // a unique id for each event
* car_id : String // a unique id for the car
* timestamp : long // timestamp (milliseconds since the epoch)
* longitude : float // GPS longitude
* latitude : float // GPS latitude
* consumption : float // fuel consumption (liters per hour)
* speed : float // speed (kilometers per hour)
* throttle : float // throttle position (%)
* engineload : float // engine load (%)
*
*
* 看一个乱序的例子(重要):
*
* 1484892913000,2017-01-20T06:15:13 0000 - wm:1484892878000,2017-01-20T14:14:38.000 08:00 = last_ts-300000
* 1484892928000,2017-01-20T06:15:28 0000 - wm:1484892883000,2017-01-20T14:14:43.000 08:00 = last_ts-300000
* 1484892918000,2017-01-20T06:15:18 0000 - wm:1484892898000,2017-01-20T14:14:58.000 08:00 = last_ts-300000
**1484892893000,2017-01-20T06:14:53 0000 - wm:1484892898000,2017-01-20T14:14:58.000 08:00 = last_ts-200000
* 1484892923000,2017-01-20T06:15:23 0000 - wm:1484892898000,2017-01-20T14:14:58.000 08:00 = last_ts 6
* 1484892933000,2017-01-20T06:15:33 0000 - wm:1484892898000,2017-01-20T14:14:58.000 08:00 = last_ts-025
* 1484892938000,2017-01-20T06:15:38 0000 - wm:1484892903000,2017-01-20T14:15:03.000 08:00 = last_ts-300000
* 1484892943000,2017-01-20T06:15:43 0000 - wm:1484892908000,2017-01-20T14:15:08.000 08:00 = last_ts-300000
*
* WM不会减小,乱序的元素的wm还是按前面元素的值计算出来的,所以会由于本身乱序(比如递增数列中减小了)
* 还使用之前的wm(使用较大的wm)出现一种情况,就是wm>乱序ts的情况,这种情况出现说明乱序已经超过
* 了WM的容忍范围。
* 例如上面的1484892893000,2017-01-20T06:14:53 0000时间点的WM>TS,因为乱序的时间戳已经超过了
* 30000,综上可以通过判断ts是否小于wm来判断是否当前数据超出乱序容忍范围。
*
* 排序原理:
* 1、process准入条件是乱序不能大于30s
* 2、process压入最小堆
* 3、process对每个时间点注册Timer
*
* 4、Timer启动之后 准备弹出最小堆的数据,
* 条件是数据的ts<当前的wm,注意是触发点的wm,可能已经在几个ts之后了
*/