序
本文主要研究一下flink的TimerService
TimerService
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/TimerService.java
代码语言:javascript复制@PublicEvolving
public interface TimerService {
String UNSUPPORTED_REGISTER_TIMER_MSG = "Setting timers is only supported on a keyed streams.";
String UNSUPPORTED_DELETE_TIMER_MSG = "Deleting timers is only supported on a keyed streams.";
long currentProcessingTime();
long currentWatermark();
void registerProcessingTimeTimer(long time);
void registerEventTimeTimer(long time);
void deleteProcessingTimeTimer(long time);
void deleteEventTimeTimer(long time);
}
- TimerService接口定义了currentProcessingTime、currentWatermark、registerProcessingTimeTimer、registerEventTimeTimer、deleteProcessingTimeTimer、deleteEventTimeTimer接口
SimpleTimerService
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/SimpleTimerService.java
代码语言:javascript复制@Internal
public class SimpleTimerService implements TimerService {
private final InternalTimerService<VoidNamespace> internalTimerService;
public SimpleTimerService(InternalTimerService<VoidNamespace> internalTimerService) {
this.internalTimerService = internalTimerService;
}
@Override
public long currentProcessingTime() {
return internalTimerService.currentProcessingTime();
}
@Override
public long currentWatermark() {
return internalTimerService.currentWatermark();
}
@Override
public void registerProcessingTimeTimer(long time) {
internalTimerService.registerProcessingTimeTimer(VoidNamespace.INSTANCE, time);
}
@Override
public void registerEventTimeTimer(long time) {
internalTimerService.registerEventTimeTimer(VoidNamespace.INSTANCE, time);
}
@Override
public void deleteProcessingTimeTimer(long time) {
internalTimerService.deleteProcessingTimeTimer(VoidNamespace.INSTANCE, time);
}
@Override
public void deleteEventTimeTimer(long time) {
internalTimerService.deleteEventTimeTimer(VoidNamespace.INSTANCE, time);
}
}
- SimpleTimerService实现了TimerService,它是委托InternalTimerService来实现
InternalTimerService
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/InternalTimerService.java
代码语言:javascript复制@Internal
public interface InternalTimerService<N> {
long currentProcessingTime();
long currentWatermark();
void registerProcessingTimeTimer(N namespace, long time);
void deleteProcessingTimeTimer(N namespace, long time);
void registerEventTimeTimer(N namespace, long time);
void deleteEventTimeTimer(N namespace, long time);
}
- InternalTimerService是TimerService的internal版本的接口,比起TimerService它定义了namespace,在registerProcessingTimeTimer、deleteProcessingTimeTimer、registerEventTimeTimer、deleteEventTimeTimer的方法中均多了一个namesapce的参数
InternalTimerServiceImpl
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java
代码语言:javascript复制public class InternalTimerServiceImpl<K, N> implements InternalTimerService<N>, ProcessingTimeCallback {
private final ProcessingTimeService processingTimeService;
private final KeyContext keyContext;
private final KeyGroupedInternalPriorityQueue<TimerHeapInternalTimer<K, N>> processingTimeTimersQueue;
private final KeyGroupedInternalPriorityQueue<TimerHeapInternalTimer<K, N>> eventTimeTimersQueue;
private final KeyGroupRange localKeyGroupRange;
private final int localKeyGroupRangeStartIdx;
private long currentWatermark = Long.MIN_VALUE;
private ScheduledFuture<?> nextTimer;
// Variables to be set when the service is started.
private TypeSerializer<K> keySerializer;
private TypeSerializer<N> namespaceSerializer;
private Triggerable<K, N> triggerTarget;
private volatile boolean isInitialized;
private TypeSerializer<K> keyDeserializer;
private TypeSerializer<N> namespaceDeserializer;
private InternalTimersSnapshot<K, N> restoredTimersSnapshot;
InternalTimerServiceImpl(
KeyGroupRange localKeyGroupRange,
KeyContext keyContext,
ProcessingTimeService processingTimeService,
KeyGroupedInternalPriorityQueue<TimerHeapInternalTimer<K, N>> processingTimeTimersQueue,
KeyGroupedInternalPriorityQueue<TimerHeapInternalTimer<K, N>> eventTimeTimersQueue) {
this.keyContext = checkNotNull(keyContext);
this.processingTimeService = checkNotNull(processingTimeService);
this.localKeyGroupRange = checkNotNull(localKeyGroupRange);
this.processingTimeTimersQueue = checkNotNull(processingTimeTimersQueue);
this.eventTimeTimersQueue = checkNotNull(eventTimeTimersQueue);
// find the starting index of the local key-group range
int startIdx = Integer.MAX_VALUE;
for (Integer keyGroupIdx : localKeyGroupRange) {
startIdx = Math.min(keyGroupIdx, startIdx);
}
this.localKeyGroupRangeStartIdx = startIdx;
}
public void startTimerService(
TypeSerializer<K> keySerializer,
TypeSerializer<N> namespaceSerializer,
Triggerable<K, N> triggerTarget) {
if (!isInitialized) {
if (keySerializer == null || namespaceSerializer == null) {
throw new IllegalArgumentException("The TimersService serializers cannot be null.");
}
if (this.keySerializer != null || this.namespaceSerializer != null || this.triggerTarget != null) {
throw new IllegalStateException("The TimerService has already been initialized.");
}
// the following is the case where we restore
if (restoredTimersSnapshot != null) {
CompatibilityResult<K> keySerializerCompatibility = CompatibilityUtil.resolveCompatibilityResult(
this.keyDeserializer,
null,
restoredTimersSnapshot.getKeySerializerConfigSnapshot(),
keySerializer);
CompatibilityResult<N> namespaceSerializerCompatibility = CompatibilityUtil.resolveCompatibilityResult(
this.namespaceDeserializer,
null,
restoredTimersSnapshot.getNamespaceSerializerConfigSnapshot(),
namespaceSerializer);
if (keySerializerCompatibility.isRequiresMigration() || namespaceSerializerCompatibility.isRequiresMigration()) {
throw new IllegalStateException("Tried to initialize restored TimerService "
"with incompatible serializers than those used to snapshot its state.");
}
}
this.keySerializer = keySerializer;
this.namespaceSerializer = namespaceSerializer;
this.keyDeserializer = null;
this.namespaceDeserializer = null;
this.triggerTarget = Preconditions.checkNotNull(triggerTarget);
// re-register the restored timers (if any)
final InternalTimer<K, N> headTimer = processingTimeTimersQueue.peek();
if (headTimer != null) {
nextTimer = processingTimeService.registerTimer(headTimer.getTimestamp(), this);
}
this.isInitialized = true;
} else {
if (!(this.keySerializer.equals(keySerializer) && this.namespaceSerializer.equals(namespaceSerializer))) {
throw new IllegalArgumentException("Already initialized Timer Service "
"tried to be initialized with different key and namespace serializers.");
}
}
}
@Override
public long currentProcessingTime() {
return processingTimeService.getCurrentProcessingTime();
}
@Override
public long currentWatermark() {
return currentWatermark;
}
@Override
public void registerProcessingTimeTimer(N namespace, long time) {
InternalTimer<K, N> oldHead = processingTimeTimersQueue.peek();
if (processingTimeTimersQueue.add(new TimerHeapInternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace))) {
long nextTriggerTime = oldHead != null ? oldHead.getTimestamp() : Long.MAX_VALUE;
// check if we need to re-schedule our timer to earlier
if (time < nextTriggerTime) {
if (nextTimer != null) {
nextTimer.cancel(false);
}
nextTimer = processingTimeService.registerTimer(time, this);
}
}
}
@Override
public void registerEventTimeTimer(N namespace, long time) {
eventTimeTimersQueue.add(new TimerHeapInternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace));
}
@Override
public void deleteProcessingTimeTimer(N namespace, long time) {
processingTimeTimersQueue.remove(new TimerHeapInternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace));
}
@Override
public void deleteEventTimeTimer(N namespace, long time) {
eventTimeTimersQueue.remove(new TimerHeapInternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace));
}
@Override
public void onProcessingTime(long time) throws Exception {
// null out the timer in case the Triggerable calls registerProcessingTimeTimer()
// inside the callback.
nextTimer = null;
InternalTimer<K, N> timer;
while ((timer = processingTimeTimersQueue.peek()) != null && timer.getTimestamp() <= time) {
processingTimeTimersQueue.poll();
keyContext.setCurrentKey(timer.getKey());
triggerTarget.onProcessingTime(timer);
}
if (timer != null && nextTimer == null) {
nextTimer = processingTimeService.registerTimer(timer.getTimestamp(), this);
}
}
public void advanceWatermark(long time) throws Exception {
currentWatermark = time;
InternalTimer<K, N> timer;
while ((timer = eventTimeTimersQueue.peek()) != null && timer.getTimestamp() <= time) {
eventTimeTimersQueue.poll();
keyContext.setCurrentKey(timer.getKey());
triggerTarget.onEventTime(timer);
}
}
public InternalTimersSnapshot<K, N> snapshotTimersForKeyGroup(int keyGroupIdx) {
return new InternalTimersSnapshot<>(
keySerializer,
keySerializer.snapshotConfiguration(),
namespaceSerializer,
namespaceSerializer.snapshotConfiguration(),
eventTimeTimersQueue.getSubsetForKeyGroup(keyGroupIdx),
processingTimeTimersQueue.getSubsetForKeyGroup(keyGroupIdx));
}
@SuppressWarnings("unchecked")
public void restoreTimersForKeyGroup(InternalTimersSnapshot<?, ?> restoredSnapshot, int keyGroupIdx) {
this.restoredTimersSnapshot = (InternalTimersSnapshot<K, N>) restoredSnapshot;
if (areSnapshotSerializersIncompatible(restoredSnapshot)) {
throw new IllegalArgumentException("Tried to restore timers "
"for the same service with different serializers.");
}
this.keyDeserializer = restoredTimersSnapshot.getKeySerializer();
this.namespaceDeserializer = restoredTimersSnapshot.getNamespaceSerializer();
checkArgument(localKeyGroupRange.contains(keyGroupIdx),
"Key Group " keyGroupIdx " does not belong to the local range.");
// restore the event time timers
eventTimeTimersQueue.addAll(restoredTimersSnapshot.getEventTimeTimers());
// restore the processing time timers
processingTimeTimersQueue.addAll(restoredTimersSnapshot.getProcessingTimeTimers());
}
@VisibleForTesting
public int numProcessingTimeTimers() {
return this.processingTimeTimersQueue.size();
}
@VisibleForTesting
public int numEventTimeTimers() {
return this.eventTimeTimersQueue.size();
}
@VisibleForTesting
public int numProcessingTimeTimers(N namespace) {
return countTimersInNamespaceInternal(namespace, processingTimeTimersQueue);
}
@VisibleForTesting
public int numEventTimeTimers(N namespace) {
return countTimersInNamespaceInternal(namespace, eventTimeTimersQueue);
}
private int countTimersInNamespaceInternal(N namespace, InternalPriorityQueue<TimerHeapInternalTimer<K, N>> queue) {
int count = 0;
try (final CloseableIterator<TimerHeapInternalTimer<K, N>> iterator = queue.iterator()) {
while (iterator.hasNext()) {
final TimerHeapInternalTimer<K, N> timer = iterator.next();
if (timer.getNamespace().equals(namespace)) {
count ;
}
}
} catch (Exception e) {
throw new FlinkRuntimeException("Exception when closing iterator.", e);
}
return count;
}
@VisibleForTesting
int getLocalKeyGroupRangeStartIdx() {
return this.localKeyGroupRangeStartIdx;
}
@VisibleForTesting
List<Set<TimerHeapInternalTimer<K, N>>> getEventTimeTimersPerKeyGroup() {
return partitionElementsByKeyGroup(eventTimeTimersQueue);
}
@VisibleForTesting
List<Set<TimerHeapInternalTimer<K, N>>> getProcessingTimeTimersPerKeyGroup() {
return partitionElementsByKeyGroup(processingTimeTimersQueue);
}
private <T> List<Set<T>> partitionElementsByKeyGroup(KeyGroupedInternalPriorityQueue<T> keyGroupedQueue) {
List<Set<T>> result = new ArrayList<>(localKeyGroupRange.getNumberOfKeyGroups());
for (int keyGroup : localKeyGroupRange) {
result.add(Collections.unmodifiableSet(keyGroupedQueue.getSubsetForKeyGroup(keyGroup)));
}
return result;
}
private boolean areSnapshotSerializersIncompatible(InternalTimersSnapshot<?, ?> restoredSnapshot) {
return (this.keyDeserializer != null && !this.keyDeserializer.equals(restoredSnapshot.getKeySerializer())) ||
(this.namespaceDeserializer != null && !this.namespaceDeserializer.equals(restoredSnapshot.getNamespaceSerializer()));
}
}
- InternalTimerServiceImpl实现了InternalTimerService及ProcessingTimeCallback(
定义了onProcessingTime方法
)接口 - startTimerService方法主要是初始化keySerializer、namespaceSerializer、triggerTarget属性;registerEventTimeTimer及deleteEventTimeTimer方法使用的是eventTimeTimersQueue;registerProcessingTimeTimer及deleteProcessingTimeTimer方法使用的是processingTimeTimersQueue(
eventTimeTimersQueue及processingTimeTimersQueue的类型为KeyGroupedInternalPriorityQueue,queue的元素类型为TimerHeapInternalTimer
) - eventTimerTimer的触发主要是在advanceWatermark方法中(
AbstractStreamOperator的processWatermark方法会调用InternalTimeServiceManager的advanceWatermark方法,而该方法调用的是InternalTimerServiceImpl的advanceWatermark方法
),它会移除timestamp小于等于指定time的eventTimerTimer,然后回调triggerTarget.onEventTime方法;而processingTimeTimer的触发则是在onProcessingTime方法中(SystemProcessingTimeService的TriggerTask及RepeatedTriggerTask的定时任务会回调ProcessingTimeCallback的onProcessingTime方法
),它会移除timestamp小于等于指定time的processingTimeTimer,然后回调triggerTarget.onProcessingTime方法
Triggerable
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/Triggerable.java
代码语言:javascript复制@Internal
public interface Triggerable<K, N> {
/**
* Invoked when an event-time timer fires.
*/
void onEventTime(InternalTimer<K, N> timer) throws Exception;
/**
* Invoked when a processing-time timer fires.
*/
void onProcessingTime(InternalTimer<K, N> timer) throws Exception;
}
- Triggerable接口定义了InternalTimerService会调用的onEventTime及onProcessingTime方法;WindowOperator、IntervalJoinOperator、KeyedProcessOperator、KeyedCoProcessOperator等operator均实现了Triggerable接口,可以响应timer的onEventTime或onProcessingTime的回调
小结
- TimerService接口定义了currentProcessingTime、currentWatermark、registerProcessingTimeTimer、registerEventTimeTimer、deleteProcessingTimeTimer、deleteEventTimeTimer接口;它有一个实现类为SimpleTimerService,而SimpleTimerService主要是委托给InternalTimerService来实现
- InternalTimerService是TimerService的internal版本的接口,比起TimerService它定义了namespace,在registerProcessingTimeTimer、deleteProcessingTimeTimer、registerEventTimeTimer、deleteEventTimeTimer的方法中均多了一个namesapce的参数;它的实现类为InternalTimerServiceImpl;InternalTimerServiceImpl实现了InternalTimerService及ProcessingTimeCallback(
定义了onProcessingTime方法
)接口,其registerEventTimeTimer及deleteEventTimeTimer方法使用的是eventTimeTimersQueue;registerProcessingTimeTimer及deleteProcessingTimeTimer方法使用的是processingTimeTimersQueue(eventTimeTimersQueue及processingTimeTimersQueue的类型为KeyGroupedInternalPriorityQueue,queue的元素类型为TimerHeapInternalTimer
) - InternalTimerServiceImpl的eventTimerTimer的触发主要是在advanceWatermark方法中(
AbstractStreamOperator的processWatermark方法会调用InternalTimeServiceManager的advanceWatermark方法,而该方法调用的是InternalTimerServiceImpl的advanceWatermark方法
),它会移除timestamp小于等于指定time的eventTimerTimer,然后回调triggerTarget.onEventTime方法 - InternalTimerServiceImpl的processingTimeTimer的触发则是在onProcessingTime方法中(
SystemProcessingTimeService的TriggerTask及RepeatedTriggerTask的定时任务会回调ProcessingTimeCallback的onProcessingTime方法
),它会移除timestamp小于等于指定time的processingTimeTimer,然后回调triggerTarget.onProcessingTime方法 - Triggerable接口定义了InternalTimerService会调用的onEventTime及onProcessingTime方法;WindowOperator、IntervalJoinOperator、KeyedProcessOperator、KeyedCoProcessOperator等operator均实现了Triggerable接口,可以响应timer的onEventTime或onProcessingTime的回调
doc
- TimerService