前言
在我们使用Flink DataStream API编写业务代码时,aggregate()算子、AggregateFunction、KeyedProcessFunction是非常常用的。下面我们把这两个知识点详细的讲解一下。
AggregateFunction
在我们使用Flink DataStream API编写业务代码时,aggregate()算子和AggregateFunction无疑是非常常用的。编写一个AggregateFunction需要实现4个方法:
代码语言:javascript复制public interface AggregateFunction<IN, ACC, OUT> extends Function, Serializable {
ACC createAccumulator();
ACC add(IN value, ACC accumulator);
OUT getResult(ACC accumulator);
ACC merge(ACC a, ACC b);
}
前三个方法都很容易理解,但第四个merge()方法就有些令人费解了:到底什么时候需要合并两个累加器的数据呢?最近也有童鞋问到了这个问题。实际上,这个方法是专门为会话窗口(session window)服务的。下面来解析一下会话窗口。
Session Window & MergingWindowAssigner
stream.keyBy("userId").window(EventTimeSessionWindows.withGap(Time.seconds(gap))) 在普通的翻滚窗口和滑动窗口中,窗口的范围是按时间区间固定的,虽然范围有可能重合,但是处理起来是各自独立的,并不会相互影响。但是会话窗口则不同,其范围是根据事件之间的时间差是否超过gap来确定的(超过gap就形成一个新窗口),也就是说并非固定。所以,我们需要在每个事件进入会话窗口算子时就为它分配一个初始窗口,起点是它本身所携带的时间戳(这里按event time处理),终点则是时间戳加上gap的偏移量。这样的话,如果两个事件所在的初始窗口没有相交,说明它们属于不同的会话;如果相交,则说明它们属于同一个会话,并且要把这两个初始窗口合并在一起,作为新的会话窗口。多个事件则依次类推,最终形成上面图示的情况。
为了支持会话窗口的合并,它们的WindowAssigner也有所不同,称为MergingWindowAssigner,如下类图所示。
MergingWindowAssigner是一个抽象类,代码很简单,定义了用于合并窗口的mergeWindows()方法以及合并窗口时的回调MergeCallback。
代码语言:javascript复制public abstract class MergingWindowAssigner<T, W extends Window> extends WindowAssigner<T, W> {
private static final long serialVersionUID = 1L;
public abstract void mergeWindows(Collection<W> windows, MergeCallback<W> callback);
public interface MergeCallback<W> {
void merge(Collection<W> toBeMerged, W mergeResult);
}
}
所有MergingWindowAssigner实现类的mergeWindows()方法都是相同的,即直接调用TimeWindow.mergeWindows()方法,其源码如下。
代码语言:javascript复制public static void mergeWindows(Collection<TimeWindow> windows, MergingWindowAssigner.MergeCallback<TimeWindow> c)
// sort the windows by the start time and then merge overlapping windows
List<TimeWindow> sortedWindows = new ArrayList<>(windows);
Collections.sort(sortedWindows, new Comparator<TimeWindow>() {
@Override
public int compare(TimeWindow o1, TimeWindow o2) {
return Long.compare(o1.getStart(), o2.getStart());
}
});
List<Tuple2<TimeWindow, Set<TimeWindow>>> merged = new ArrayList<>();
Tuple2<TimeWindow, Set<TimeWindow>> currentMerge = null;
for (TimeWindow candidate: sortedWindows) {
if (currentMerge == null) {
currentMerge = new Tuple2<>();
currentMerge.f0 = candidate;
currentMerge.f1 = new HashSet<>();
currentMerge.f1.add(candidate);
} else if (currentMerge.f0.intersects(candidate)) {
currentMerge.f0 = currentMerge.f0.cover(candidate);
currentMerge.f1.add(candidate);
} else {
merged.add(currentMerge);
currentMerge = new Tuple2<>();
currentMerge.f0 = candidate;
currentMerge.f1 = new HashSet<>();
currentMerge.f1.add(candidate);
}
}
if (currentMerge != null) {
merged.add(currentMerge);
}
for (Tuple2<TimeWindow, Set<TimeWindow>> m: merged) {
if (m.f1.size() > 1) {
c.merge(m.f1, m.f0);
}
}
}
// TimeWindow.intersects()
public boolean intersects(TimeWindow other) {
return this.start <= other.end && this.end >= other.start;
}
// TimeWindow.cover()
public TimeWindow cover(TimeWindow other) {
return new TimeWindow(Math.min(start, other.start), Math.max(end, other.end));
}
该方法将所有待合并的窗口按照起始时间升序排序,遍历排序好的窗口,并调用intersects()方法判断它们是否相交。如果相交,则调用cover()方法合并返回一个覆盖两个窗口的窗口;如果不相交,则启动下一次合并过程。列表merged中存储的就是[合并结果, 原窗口集合]的二元组,如果原窗口集合的大小大于1,说明发生了合并,需要调用回调方法MergeCallback.merge()。
就这么简单吗?当然不是。上面的逻辑只是在时域的角度合并了窗口,但是别忘了,窗口是需要维护状态和触发器的,所以它们也得被合并才能保证不出错。下面就来介绍跟踪窗口状态合并的MergingWindowSet组件。
MergingWindowSet
MergingWindowSet的思路很直接:既然状态的创建和维护是比较重的操作,那么就在一批窗口合并时,以其中一个窗口的状态为基准,其他窗口的状态都直接合并到这个基准窗口的状态里来,称为“状态窗口”。这样就避免了创建新的状态实例,只需要维护合并的窗口与状态窗口之间的映射关系,以及保证映射关系的容错(通过ListState)即可。
代码语言:javascript复制// Mapping from window to the window that keeps the window state...
private final Map<W, W> mapping;
// Mapping when we created the MergingWindowSet...
private final Map<W, W> initialMapping;
private final ListState<Tuple2<W, W>> state;
public W getStateWindow(W window) {
return mapping.get(window);
}
public void persist() throws Exception {
if (!mapping.equals(initialMapping)) {
state.clear();
for (Map.Entry<W, W> window : mapping.entrySet()) {
state.add(new Tuple2<>(window.getKey(), window.getValue()));
}
}
}
MergingWindowSet的核心逻辑位于add()方法中。该方法输入一个新窗口,并试图将其时域和状态进行合并,代码如下。
代码语言:javascript复制public W addWindow(W newWindow, MergeFunction<W> mergeFunction) throws Exception {
List<W> windows = new ArrayList<>();
windows.addAll(this.mapping.keySet());
windows.add(newWindow);
final Map<W, Collection<W>> mergeResults = new HashMap<>();
windowAssigner.mergeWindows(windows,
new MergingWindowAssigner.MergeCallback<W>() {
@Override
public void merge(Collection<W> toBeMerged, W mergeResult) {
if (LOG.isDebugEnabled()) {
LOG.debug("Merging {} into {}", toBeMerged, mergeResult);
}
mergeResults.put(mergeResult, toBeMerged);
}
});
W resultWindow = newWindow;
boolean mergedNewWindow = false;
// perform the merge
for (Map.Entry<W, Collection<W>> c: mergeResults.entrySet()) {
W mergeResult = c.getKey();
Collection<W> mergedWindows = c.getValue();
// if our new window is in the merged windows make the merge result the
// result window
if (mergedWindows.remove(newWindow)) {
mergedNewWindow = true;
resultWindow = mergeResult;
}
// pick any of the merged windows and choose that window's state window
// as the state window for the merge result
W mergedStateWindow = this.mapping.get(mergedWindows.iterator().next());
// figure out the state windows that we are merging
List<W> mergedStateWindows = new ArrayList<>();
for (W mergedWindow: mergedWindows) {
W res = this.mapping.remove(mergedWindow);
if (res != null) {
mergedStateWindows.add(res);
}
}
this.mapping.put(mergeResult, mergedStateWindow);
// don't put the target state window into the merged windows
mergedStateWindows.remove(mergedStateWindow);
// don't merge the new window itself, it never had any state associated with it
// i.e. if we are only merging one pre-existing window into itself
// without extending the pre-existing window
if (!(mergedWindows.contains(mergeResult) && mergedWindows.size() == 1)) {
mergeFunction.merge(mergeResult,
mergedWindows,
this.mapping.get(mergeResult),
mergedStateWindows);
}
}
// the new window created a new, self-contained window without merging
if (mergeResults.isEmpty() || (resultWindow.equals(newWindow) && !mergedNewWindow)) {
this.mapping.put(resultWindow, resultWindow);
}
return resultWindow;
}
前面调用MergingWindowAssigner.mergeWindows()方法的逻辑已经提过了,重点在后面状态的合并。注释写得比较详细,多读几遍即可理解,不再多废话,只解释一下四个关键变量的含义:
- mergeResult:窗口的时域合并结果;
- mergedWindows:本次被合并的窗口集合;
- mergedStateWindow:将要合并的状态窗口结果(目前就是基准的状态窗口);
- mergedStateWindows:本次被合并的状态窗口集合。
最后,回调MergeFunction.merge()方法进行正式的合并。MergeFunction的定义同样位于MergingWindowSet中,其merge()方法的参数与上面四个变量是一一对应的。
代码语言:javascript复制public interface MergeFunction<W> {
/**
* This gets called when a merge occurs.
*
* @param mergeResult The newly resulting merged {@code Window}.
* @param mergedWindows The merged {@code Window Windows}.
* @param stateWindowResult The state window of the merge result.
* @param mergedStateWindows The merged state windows.
* @throws Exception
*/
void merge(W mergeResult, Collection<W> mergedWindows, W stateWindowResult, Collection<W> mergedStateWindows) throws Exception;
}
接下来去到窗口算子WindowOperator中,完成最后一步。
Window Merging in WindowOperator
以下是WindowOperator.processElement()方法中,处理MergingWindowAssigner的部分。
代码语言:javascript复制@Override
public void processElement(StreamRecord<IN> element) throws Exception {
final Collection<W> elementWindows = windowAssigner.assignWindows(
element.getValue(), element.getTimestamp(), windowAssignerContext);
//if element is handled by none of assigned elementWindows
boolean isSkippedElement = true;
final K key = this.<K>getKeyedStateBackend().getCurrentKey();
if (windowAssigner instanceof MergingWindowAssigner) {
MergingWindowSet<W> mergingWindows = getMergingWindowSet();
for (W window: elementWindows) {
// adding the new window might result in a merge, in that case the actualWindow
// is the merged window and we work with that. If we don't merge then
// actualWindow == window
W actualWindow = mergingWindows.addWindow(window, new MergingWindowSet.MergeFunction<W>() {
@Override
public void merge(W mergeResult,
Collection<W> mergedWindows, W stateWindowResult,
Collection<W> mergedStateWindows) throws Exception {
if ((windowAssigner.isEventTime() && mergeResult.maxTimestamp() allowedLateness <= internalTimerService.currentWatermark())) {
// ...
} else if (!windowAssigner.isEventTime()) {
// ...
}
triggerContext.key = key;
triggerContext.window = mergeResult;
triggerContext.onMerge(mergedWindows);
for (W m: mergedWindows) {
triggerContext.window = m;
triggerContext.clear();
deleteCleanupTimer(m);
}
// merge the merged state windows into the newly resulting state window
windowMergingState.mergeNamespaces(stateWindowResult, mergedStateWindows);
}
});
// drop if the window is already late
if (isWindowLate(actualWindow)) {
mergingWindows.retireWindow(actualWindow);
continue;
}
isSkippedElement = false;
W stateWindow = mergingWindows.getStateWindow(actualWindow);
if (stateWindow == null) {
throw new IllegalStateException("Window " window " is not in in-flight window set.");
}
windowState.setCurrentNamespace(stateWindow);
windowState.add(element.getValue());
triggerContext.key = key;
triggerContext.window = actualWindow;
TriggerResult triggerResult = triggerContext.onElement(element);
if (triggerResult.isFire()) {
ACC contents = windowState.get();
if (contents == null) {
continue;
}
emitWindowContents(actualWindow, contents);
}
if (triggerResult.isPurge()) {
windowState.clear();
}
registerCleanupTimer(actualWindow);
}
// need to make sure to update the merging state in state
mergingWindows.persist();
} else {
// ......
}
// ......
}
该方法先获得输入的流元素分配到的窗口,然后调用MergingWindowSet.addWindow()方法试图合并它。特别注意MergeFunction.merge()方法,它做了如下两件事:
- 调用TriggerContext.onMerge()方法,更新触发器注册的定时器时间,然后遍历所有被合并的原始窗口,调用TriggerContext.clear()方法删除它们的触发器,保证合并后的窗口能够被正确地触发;
- 调用InternalMergingState.mergeNamespaces()方法,将待合并窗口的状态与基准窗口的状态合并,产生的stateWindowResult就是状态窗口。
addWindow()方法返回的actualWindow就是合并之后的真正窗口,然后再根据MergingWindowSet中维护的映射关系,取出它所对应的状态窗口,并将输入元素加入状态窗口中。最后,根据更新后的触发器逻辑判断窗口需要fire还是purge,并触发执行相应的操作。整个窗口合并的流程就完成了。
Back on AggregateFunction
前面说了这么多,AggregateFunction.merge()方法到底在哪里呢?注意上一节中出现的用于合并窗口状态的InternalMergingState.mergeNamespaces()方法,InternalMergingState是Flink状态体系中所有能够合并的状态的基类。下面观察它的两个实现类:
- 基于堆的AbstractHeapMergingState→HeapAggregatingState
@Override
public void mergeNamespaces(N target, Collection<N> sources) throws Exception {
if (sources == null || sources.isEmpty()) {
return; // nothing to do
}
final StateTable<K, N, SV> map = stateTable;
SV merged = null;
// merge the sources
for (N source : sources) {
// get and remove the next source per namespace/key
SV sourceState = map.removeAndGetOld(source);
if (merged != null && sourceState != null) {
merged = mergeState(merged, sourceState); // <----
} else if (merged == null) {
merged = sourceState;
}
}
// merge into the target, if needed
if (merged != null) {
map.transform(target, merged, mergeTransformation);
}
}
@Override
protected ACC mergeState(ACC a, ACC b) {
return aggregateTransformation.aggFunction.merge(a, b); // <----
}
- 基于RocksDB的RocksDBAggregatingState
@Override
public void mergeNamespaces(N target, Collection<N> sources) {
if (sources == null || sources.isEmpty()) {
return;
}
try {
ACC current = null;
// merge the sources to the target
for (N source : sources) {
if (source != null) {
setCurrentNamespace(source);
final byte[] sourceKey = serializeCurrentKeyWithGroupAndNamespace();
final byte[] valueBytes = backend.db.get(columnFamily, sourceKey);
backend.db.delete(columnFamily, writeOptions, sourceKey);
if (valueBytes != null) {
dataInputView.setBuffer(valueBytes);
ACC value = valueSerializer.deserialize(dataInputView);
if (current != null) {
current = aggFunction.merge(current, value); // <----
}
else {
current = value;
}
}
}
}
// ......
}
catch (Exception e) {
throw new FlinkRuntimeException("Error while merging state in RocksDB", e);
}
}
可见,累加器其实就是AggregateFunction维护的状态。当AggregateFunction与会话窗口一同使用来实现增量聚合时,就会调用用户实现的merge()方法来合并累加器中的数据了。也就是说,如果我们没有使用会话窗口,那么不实现merge()方法同样没问题。
Flink定时器
在flink实时处理中,涉及到延时处理可使用KeyedProcessFunction来完成,KeyedProcessFunction是flink提供面向用户的low level api,可以访问状态、当前的watermark或者当前的processingtime, 更重要的是提供了注册定时器的功能,分为:
- 注册处理时间定时器,直到系统的processingTime超过了注册的时间就会触发定时任务
- 注册事件时间定时器,直到watermark值超过了注册的时间就会触发定时任务另外也可以删除已经注册的定时器。
示例代码如下:
代码语言:javascript复制// 创建bean类CountWithTimestamp,里面有三个字段
package com.bolingcavalry.keyedprocessfunction;
public class CountWithTimestamp {
public String key;
public long count;
public long lastModified;
}
代码语言:javascript复制// 创建FlatMapFunction的实现类Splitter,作用是将字符串分割后生成多个Tuple2实例,f0是分隔后的单词,f1等于1:
package com.bolingcavalry;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
import org.apache.flink.util.StringUtils;
public class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
if(StringUtils.isNullOrWhitespaceOnly(s)) {
System.out.println("invalid line");
return;
}
for(String word : s.split(" ")) {
collector.collect(new Tuple2<String, Integer>(word, 1));
}
}
}
代码语言:javascript复制// 最后是整个逻辑功能的主体:ProcessTime.java,这里面有自定义的KeyedProcessFunction子类,还有程序入口的main方法
package com.bolingcavalry.keyedprocessfunction;
import com.bolingcavalry.Splitter;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.util.Collector;
import java.text.SimpleDateFormat;
import java.util.Date;
/**
* @description 体验KeyedProcessFunction类(时间类型是处理时间)
*/
public class ProcessTime {
/**
* KeyedProcessFunction的子类,作用是将每个单词最新出现时间记录到backend,并创建定时器,
* 定时器触发的时候,检查这个单词距离上次出现是否已经达到10秒,如果是,就发射给下游算子
*/
static class CountWithTimeoutFunction extends KeyedProcessFunction<Tuple, Tuple2<String, Integer>, Tuple2<String, Long>> {
// 自定义状态
private ValueState<CountWithTimestamp> state;
@Override
public void open(Configuration parameters) throws Exception {
// 初始化状态,name是myState
state = getRuntimeContext().getState(new ValueStateDescriptor<>("myState", CountWithTimestamp.class));
}
@Override
public void processElement(
Tuple2<String, Integer> value,
Context ctx,
Collector<Tuple2<String, Long>> out) throws Exception {
// 取得当前是哪个单词
Tuple currentKey = ctx.getCurrentKey();
// 从backend取得当前单词的myState状态
CountWithTimestamp current = state.value();
// 如果myState还从未没有赋值过,就在此初始化
if (current == null) {
current = new CountWithTimestamp();
current.key = value.f0;
}
// 单词数量加一
current.count ;
// 取当前元素的时间戳,作为该单词最后一次出现的时间
current.lastModified = ctx.timestamp();
// 重新保存到backend,包括该单词出现的次数,以及最后一次出现的时间
state.update(current);
// 为当前单词创建定时器,十秒后后触发
long timer = current.lastModified 10000;
ctx.timerService().registerProcessingTimeTimer(timer);
// 打印所有信息,用于核对数据正确性
System.out.println(String.format("process, %s, %d, lastModified : %d (%s), timer : %d (%s)nn",
currentKey.getField(0),
current.count,
current.lastModified,
time(current.lastModified),
timer,
time(timer)));
}
/**
* 定时器触发后执行的方法
* @param timestamp 这个时间戳代表的是该定时器的触发时间
* @param ctx
* @param out
* @throws Exception
*/
@Override
public void onTimer(
long timestamp,
OnTimerContext ctx,
Collector<Tuple2<String, Long>> out) throws Exception {
// 取得当前单词
Tuple currentKey = ctx.getCurrentKey();
// 取得该单词的myState状态
CountWithTimestamp result = state.value();
// 当前元素是否已经连续10秒未出现的标志
boolean isTimeout = false;
// timestamp是定时器触发时间,如果等于最后一次更新时间 10秒,就表示这十秒内已经收到过该单词了,
// 这种连续十秒没有出现的元素,被发送到下游算子
if (timestamp == result.lastModified 10000) {
// 发送
out.collect(new Tuple2<String, Long>(result.key, result.count));
isTimeout = true;
}
// 打印数据,用于核对是否符合预期
System.out.println(String.format("ontimer, %s, %d, lastModified : %d (%s), stamp : %d (%s), isTimeout : %snn",
currentKey.getField(0),
result.count,
result.lastModified,
time(result.lastModified),
timestamp,
time(timestamp),
String.valueOf(isTimeout)));
}
}
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 并行度1
env.setParallelism(1);
// 处理时间
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
// 监听本地9999端口,读取字符串
DataStream<String> socketDataStream = env.socketTextStream("localhost", 9999);
// 所有输入的单词,如果超过10秒没有再次出现,都可以通过CountWithTimeoutFunction得到
DataStream<Tuple2<String, Long>> timeOutWord = socketDataStream
// 对收到的字符串用空格做分割,得到多个单词
.flatMap(new Splitter())
// 设置时间戳分配器,用当前时间作为时间戳
.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<Tuple2<String, Integer>>() {
@Override
public long extractTimestamp(Tuple2<String, Integer> element, long previousElementTimestamp) {
// 使用当前系统时间作为时间戳
return System.currentTimeMillis();
}
@Override
public Watermark getCurrentWatermark() {
// 本例不需要watermark,返回null
return null;
}
})
// 将单词作为key分区
.keyBy(0)
// 按单词分区后的数据,交给自定义KeyedProcessFunction处理
.process(new CountWithTimeoutFunction());
// 所有输入的单词,如果超过10秒没有再次出现,就在此打印出来
timeOutWord.print();
env.execute("ProcessFunction demo : KeyedProcessFunction");
}
public static String time(long timeStamp) {
return new SimpleDateFormat("yyyy-MM-dd hh:mm:ss").format(new Date(timeStamp));
}
}
定时器原理
上图表示flink延时调用的总体流程,其设计也是借助于优先级队列(小顶堆)来完成,堆使用二叉树实现,而二叉树使用数组存储。队列中存储的数据结构如下:
代码语言:javascript复制@Internal
public final class TimerHeapInternalTimer<K, N> implements InternalTimer<K, N>, HeapPriorityQueueElement {
// Key 表示KeyedStream中提取的Key
@Nonnull
private final K key;
// Namespace 表示命名空间,在普通的KeyedStream中是固定的VoidNamespace,在WindowedStream表示的是Window
@Nonnull
private final N namespace;
// Timestamp表示触发的时间戳,在优先级队列中升序排序
// 由于该类重写了equals方法,在插入队列,即使尝试重复插入相同的TimerHeapInternalTimer对象多次,也会确保只有一个TimerHeapInternalTimer对象入队成功。详情见下文。
private final long timestamp;
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o instanceof InternalTimer) {
InternalTimer<?, ?> timer = (InternalTimer<?, ?>) o;
return timestamp == timer.getTimestamp()
&& key.equals(timer.getKey())
&& namespace.equals(timer.getNamespace());
}
return false;
}
}
注册
ProcessingTime类型注册使用registerProcessingTimeTimer,传入的是一个触发的时间戳,内部会将获取到当前的Key、VoidNamespace 、timestamp封装成为一个InternalTimer对象存入优先级队列(小顶堆)中。并且会针对堆顶元素,使用ScheduledThreadPoolExecutor注册一个堆顶元素触发时间与当前时间差值大小的延时调用。
EventTime类型注册使用registerEventTimeTimer,与ProcessingTime类型注册不同的是不需要做延时调用,并且二者使用的是不同的队列
下面以TimerService#registerProcessingTimeTimer为入口,分析一下基于process time的定时器的入队过程:
代码语言:javascript复制// 自定义KeyedProcessFunction
static class CountWithTimeoutFunction extends KeyedProcessFunction<Tuple, Tuple2<String, Integer>, Tuple2<String, Long>> {
// 每来一条数据,都会经过此方法进行处理
@Override
public void processElement(
Tuple2<String, Integer> value,
Context ctx,
Collector<Tuple2<String, Long>> out) throws Exception {
...
long timer = current.lastModified 10000;
// 注册定时器
ctx.timerService().registerProcessingTimeTimer(timer);
...
}
// 定时器回调函数
@Override
public void onTimer(
long timestamp,
OnTimerContext ctx,
Collector<Tuple2<String, Long>> out) throws Exception {
...
CountWithTimestamp result = state.value();
// 定时器到达时间,则往下游发送数据
out.collect(new Tuple2<String, Long>(result.key, result.count));
...
}
}
代码语言:javascript复制@Internal
public class SimpleTimerService implements TimerService {
@Override
public void registerProcessingTimeTimer(long time) {
// 注册定时器
internalTimerService.registerProcessingTimeTimer(VoidNamespace.INSTANCE, time);
}
}
代码语言:javascript复制public class InternalTimerServiceImpl<K, N> implements InternalTimerService<N> {
@Override
public void registerProcessingTimeTimer(N namespace, long time) {
InternalTimer<K, N> oldHead = processingTimeTimersQueue.peek();
// 定时器入队
// 一旦if条件满足,则证明入队成功且入队的是小顶堆的堆顶元素,要针对小顶堆堆顶元素创建延迟调用
if (processingTimeTimersQueue.add(new TimerHeapInternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace))) {
long nextTriggerTime = oldHead != null ? oldHead.getTimestamp() : Long.MAX_VALUE;
// check if we need to re-schedule our timer to earlier
if (time < nextTriggerTime) {
if (nextTimer != null) {
nextTimer.cancel(false);
}
nextTimer = processingTimeService.registerTimer(time, this::onProcessingTime);
}
}
}
}
代码语言:javascript复制public abstract class AbstractHeapPriorityQueue<T extends HeapPriorityQueueElement> implements InternalPriorityQueue<T> {
@Override
public boolean add(@Nonnull T toAdd) {
// 定时器入队
addInternal(toAdd);
// 如果入队后的定时器是堆顶节点,则返回true,后面的逻辑会根据这里是否返回true,来判断是否需要建立ScheduledThreadPoolExecutor延迟调用;换言之,延迟调用只会根据堆顶节点来建立
return toAdd.getInternalIndex() == getHeadElementIndex();
}
}
经过以上步骤,就完成了定时器的入队操作。如果入队的是小顶堆的堆顶元素,则需要针对其创建延迟调用。代码如下:
代码语言:javascript复制public class SystemProcessingTimeService implements TimerService {
@Override
public ScheduledFuture<?> registerTimer(long timestamp, ProcessingTimeCallback callback) {
// 这里延迟调用的时间颇有讲究,并非是用定时器时间减去当前时间这么简单,而是要将相减的值再加1ms
// 这么做是为了与watemark的语义保持一致(虽然基于processtime的定时器用不到watermark)
// 例如,在窗口 [20000, 30000)中,30000这个时间点是不会触发窗口计算的,只有当watermark至少为30001时,才会触发窗口操作。有兴趣的同学可以看一下该方法源码中的注释。
long delay = ProcessingTimeServiceUtil.getProcessingTimeDelay(timestamp, getCurrentProcessingTime());
// we directly try to register the timer and only react to the status on exception
// that way we save unnecessary volatile accesses for each timer
try {
// 这里wrapOnTimerCallback(callback, timestamp)中的一波lambda操作秀我一脸,看似复杂,其实直接看作是callback即可
// 往上追溯一下,你就会发现callback其实就是InternalTimerServiceImpl#onProcessingTime方法
return timerService.schedule(wrapOnTimerCallback(callback, timestamp), delay, TimeUnit.MILLISECONDS);
}
catch (RejectedExecutionException e) {
final int status = this.status.get();
if (status == STATUS_QUIESCED) {
return new NeverCompleteFuture(delay);
}
else if (status == STATUS_SHUTDOWN) {
throw new IllegalStateException("Timer service is shut down");
}
else {
// something else happened, so propagate the exception
throw e;
}
}
}
}
到这里,保存定时器的小顶堆维护好了,基于堆顶元素触发时间创建的延时调用也创建好了,接下来到时间就会触发回调函数了。
ProcessingTime类型的定时触发由注册的时候的延时调度触发,会不断从小顶堆堆顶弹出定时器,触发KeyedProcessFunction#onTimer方法,onTimer方法中可以从上下文OnTimerContext中获取到当前的key以及触发时间,有了key就可以从ValueState中提取出当前key对应的值(ValueState是一个散列表,其根据上下文中key获取value的逻辑对用户不可见,进一步进行某些计算。
当获取到InternalTimer对象中的时间大于延时调度时间,停止弹出定时器并处罚onTimer方法,重新针对堆顶元素建立新的延迟调用。
代码语言:javascript复制public class InternalTimerServiceImpl<K, N> implements InternalTimerService<N>, ProcessingTimeCallback {
@Override
public void onProcessingTime(long time) throws Exception {
// null out the timer in case the Triggerable calls registerProcessingTimeTimer()
// inside the callback.
nextTimer = null;
InternalTimer<K, N> timer;
// 从小顶堆堆顶依次弹出到达时间的定时器,调用用户自定义的KeyedProcessFunction#onTimer方法
// 一旦堆顶元素不满足触发时间,则重新针对堆顶元素建立延迟调用
while ((timer = processingTimeTimersQueue.peek()) != null && timer.getTimestamp() <= time) {
processingTimeTimersQueue.poll();
keyContext.setCurrentKey(timer.getKey());
triggerTarget.onProcessingTime(timer);
}
// 这段逻辑调用processingTimeService实现类SystemProcessingTimeService中的registerTimer方法,该方法中将上次遍历中的最后一个timer的触发时间注册到ScheduledThreadPoolExecutor线程池中,实现再次延迟调用当前 InternalTimerServiceImpl#onProcessingTime,以此实现while逻辑的不断执行,即优先级队列的不断遍历
if (timer != null && nextTimer == null) {
nextTimer = processingTimeService.registerTimer(timer.getTimestamp(), this);
}
}
}
EventTime类型的定时器触发是由Watermark决定的,同样会不断遍历优先级队列触发任务,直到获取到InternalTimer对象中的时间大于Watermark值;
延迟队列state
为了保证任务重启仍然能够执行未完成的延时调用,flink会在checkpoint过程中将优先级队列中的数据一起持久化到hdfs上,待下次任务重启仍然能够获取到这部分数据。由于EventTime类型定时器是由Watermark,那么只要任务产生watermark就能正常触发恢复的定时任务,但是ProcessingTime类型的定时器是由系统注册的延时调度来触发,所以在重启的时候获取到队列中第一个元素来注册延时调度,保证其恢复之后的正常触发。
定时器注意事项
优先级队列默认使用的是内存存储,在一些数据量比较大并且重度依赖定时触发的任务会占用比较大的内存,可以选择Rocksdb存储定时信息
flink为了保证定时触发操作(onTimer)与正常处理(processElement)操作的线程安全,做了同步处理,在调用触发时必须要获取到锁,也就是二者同时只能有一个执行,因此一定要保证onTimer处理的速度,以免任务发生阻塞。
如果不做同步处理,processElement方法中会进行state.update(),onTimer中会进行state.value(),两者会发生不一致从而引发线程安全问题。