什么是 Timer
顾名思义就是 Flink 内部的定时器,与 key 和 timestamp 相关,相同的 key 和 timestamp 只有一个与之对应的 timer。timer 本质上是通过 ScheduledThreadPoolExecutor.schedule 来实现的
Flink synchronizes invocations of onTimer() and processElement(). Hence, users do not have to worry about concurrent modification of state.
真实的事件样例,告诉我 只要 key 不同是不会并发修改的,如果一直都是完全相同的 key ,比如我的 key 一直都是 1,完全是会并发修改的。key 的重复性越高,并发修改的可能性就越大,除非 rocksdb 自身保证同一个每个 key ( rocksdb key ) 的事务。
Timer 的使用
代码语言:javascript复制public class KeyedProcessFunctionImp extends KeyedProcessFunction<String, Tuple2<String, Object>, Tuple2<String, String>> {
@Override
public void open(Configuration parameters) throws Exception {
}
@Override
public void close() throws Exception {
}
@Override
public void processElement(Tuple2<String, Object> stringObjectTuple2, Context context, Collector<Tuple2<String, String>> collector) throws Exception {
System.out.println("注册一个 timer");
long currentProcessingTime = context.timerService().currentProcessingTime() / 1000 * 1000 60 * 1000;
context.timerService().registerProcessingTimeTimer(currentProcessingTime);
}
@Override
//TODO timer 与 process 同时发生
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple2<String, String>> out) throws Exception {
System.out.println("我是一个 timer");
}
}
Timer的存储
Timer 会存储到 key state backend 中,并且会做 checkpoint ,失败会恢复。
Timer的源码分析
context.timerService().registerProcessingTimeTimer(currentProcessingTime); 会直接调用 InternalTimerServiceImpl.registerProcessingTimeTimer 方法
代码语言:javascript复制public void registerProcessingTimeTimer(N namespace, long time) {
InternalTimer<K, N> oldHead = processingTimeTimersQueue.peek();
if (processingTimeTimersQueue.add(new TimerHeapInternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace))) {
long nextTriggerTime = oldHead != null ? oldHead.getTimestamp() : Long.MAX_VALUE;
// check if we need to re-schedule our timer to earlier
if (time < nextTriggerTime) {
if (nextTimer != null) {
nextTimer.cancel(false);
}
//registerProcessingTimeTimer 定时调用 onProcessingTimer 调用,
// 最终调用 triggerTarget.onProcessingTimer,比如 windowOperator.onProcessingTimer
// ScheduledThreadPoolExecutor.schedule
nextTimer = processingTimeService.registerTimer(time, this);
}
}
}
processingTimeTimersQueue 可以保证相同的 key 和 time 对应的 timer 只会注册一次。我们以 rocksdb 为例看细节
代码语言:javascript复制@Override
// 按照时间戳的顺序添加的,时间戳越大优先级越低
public boolean add(@Nonnull E toAdd) {
//会依据条件将 rocksdb 中 store 的 timer 存储到 orderedCache 中
checkRefillCacheFromStore();
final byte[] toAddBytes = serializeElement(toAdd);
// 默认 128
// orderedCache 通过 treeSet 进行操作的
final boolean cacheFull = orderedCache.isFull();
if ((!cacheFull && allElementsInCache) ||
LEXICOGRAPHIC_BYTE_COMPARATOR.compare(toAddBytes, orderedCache.peekLast()) < 0) {
if (cacheFull) {
// we drop the element with lowest priority from the cache
orderedCache.pollLast();
// the dropped element is now only in the store
allElementsInCache = false;
}
if (orderedCache.add(toAddBytes)) {
// write-through sync
addToRocksDB(toAddBytes);
if (toAddBytes == orderedCache.peekFirst()) {
peekCache = null;
return true;
}
}
} else {
// we only added to the store
addToRocksDB(toAddBytes);
allElementsInCache = false;
}
return false;
}
orderedCache 是通过 treeSet 来实现的,所以 time key namespace (非window 是固定不变的) 为 treeMap 的 key 。新来的 timer 除了添加到 orderedCache 外还会添加到 rocksdb。 添加完成之后,就正式开始注册 定时任务了。当定时任务开始执行时,调用
代码语言:javascript复制@Override
// registerProcessingTimeTimer 定时调用 onProcessingTime
// time 设定的那个 timestamp
public void onProcessingTime(long time) throws Exception {
// null out the timer in case the Triggerable calls registerProcessingTimeTimer()
// inside the callback.
nextTimer = null;
InternalTimer<K, N> timer;
// 小于这个 time 的所有 timer 都会被触发
while ((timer = processingTimeTimersQueue.peek()) != null && timer.getTimestamp() <= time) {
processingTimeTimersQueue.poll();
keyContext.setCurrentKey(timer.getKey());
// windowOperator onProcessingTime
// 自己定义的 timer
triggerTarget.onProcessingTime(timer);
}
if (timer != null && nextTimer == null) {
//再次创建 timer
nextTimer = processingTimeService.registerTimer(timer.getTimestamp(), this);
}
}
至此的话,自己写的 Timer 方法就被执行了。 当然还有一些更细节的东西,比如 timer restore ,timer snapshot ,startTimerService 等读者自己可以依需查看
Timer的其他情况
Window Operator 的 Timer 与此类似