聊聊flink的TimerService

2019-01-17 11:31:08 浏览数 (1)

本文主要研究一下flink的TimerService

TimerService

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/TimerService.java

代码语言:javascript复制
@PublicEvolving
public interface TimerService {
​
    String UNSUPPORTED_REGISTER_TIMER_MSG = "Setting timers is only supported on a keyed streams.";
​
    String UNSUPPORTED_DELETE_TIMER_MSG = "Deleting timers is only supported on a keyed streams.";
​
    long currentProcessingTime();
​
    long currentWatermark();
​
    void registerProcessingTimeTimer(long time);
​
    void registerEventTimeTimer(long time);
​
    void deleteProcessingTimeTimer(long time);
​
    void deleteEventTimeTimer(long time);
}
  • TimerService接口定义了currentProcessingTime、currentWatermark、registerProcessingTimeTimer、registerEventTimeTimer、deleteProcessingTimeTimer、deleteEventTimeTimer接口

SimpleTimerService

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/SimpleTimerService.java

代码语言:javascript复制
@Internal
public class SimpleTimerService implements TimerService {
​
    private final InternalTimerService<VoidNamespace> internalTimerService;
​
    public SimpleTimerService(InternalTimerService<VoidNamespace> internalTimerService) {
        this.internalTimerService = internalTimerService;
    }
​
    @Override
    public long currentProcessingTime() {
        return internalTimerService.currentProcessingTime();
    }
​
    @Override
    public long currentWatermark() {
        return internalTimerService.currentWatermark();
    }
​
    @Override
    public void registerProcessingTimeTimer(long time) {
        internalTimerService.registerProcessingTimeTimer(VoidNamespace.INSTANCE, time);
    }
​
    @Override
    public void registerEventTimeTimer(long time) {
        internalTimerService.registerEventTimeTimer(VoidNamespace.INSTANCE, time);
    }
​
    @Override
    public void deleteProcessingTimeTimer(long time) {
        internalTimerService.deleteProcessingTimeTimer(VoidNamespace.INSTANCE, time);
    }
​
    @Override
    public void deleteEventTimeTimer(long time) {
        internalTimerService.deleteEventTimeTimer(VoidNamespace.INSTANCE, time);
    }
}
  • SimpleTimerService实现了TimerService,它是委托InternalTimerService来实现

InternalTimerService

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/InternalTimerService.java

代码语言:javascript复制
@Internal
public interface InternalTimerService<N> {
​
    long currentProcessingTime();
​
    long currentWatermark();
​
    void registerProcessingTimeTimer(N namespace, long time);
​
    void deleteProcessingTimeTimer(N namespace, long time);
​
    void registerEventTimeTimer(N namespace, long time);
​
    void deleteEventTimeTimer(N namespace, long time);
}
  • InternalTimerService是TimerService的internal版本的接口,比起TimerService它定义了namespace,在registerProcessingTimeTimer、deleteProcessingTimeTimer、registerEventTimeTimer、deleteEventTimeTimer的方法中均多了一个namesapce的参数

InternalTimerServiceImpl

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java

代码语言:javascript复制
public class InternalTimerServiceImpl<K, N> implements InternalTimerService<N>, ProcessingTimeCallback {
​
    private final ProcessingTimeService processingTimeService;
​
    private final KeyContext keyContext;
​
    private final KeyGroupedInternalPriorityQueue<TimerHeapInternalTimer<K, N>> processingTimeTimersQueue;
​
    private final KeyGroupedInternalPriorityQueue<TimerHeapInternalTimer<K, N>> eventTimeTimersQueue;
​
    private final KeyGroupRange localKeyGroupRange;
​
    private final int localKeyGroupRangeStartIdx;
​
    private long currentWatermark = Long.MIN_VALUE;
​
    private ScheduledFuture<?> nextTimer;
​
    // Variables to be set when the service is started.
​
    private TypeSerializer<K> keySerializer;
​
    private TypeSerializer<N> namespaceSerializer;
​
    private Triggerable<K, N> triggerTarget;
​
    private volatile boolean isInitialized;
​
    private TypeSerializer<K> keyDeserializer;
​
    private TypeSerializer<N> namespaceDeserializer;
​
    private InternalTimersSnapshot<K, N> restoredTimersSnapshot;
​
    InternalTimerServiceImpl(
        KeyGroupRange localKeyGroupRange,
        KeyContext keyContext,
        ProcessingTimeService processingTimeService,
        KeyGroupedInternalPriorityQueue<TimerHeapInternalTimer<K, N>> processingTimeTimersQueue,
        KeyGroupedInternalPriorityQueue<TimerHeapInternalTimer<K, N>> eventTimeTimersQueue) {
​
        this.keyContext = checkNotNull(keyContext);
        this.processingTimeService = checkNotNull(processingTimeService);
        this.localKeyGroupRange = checkNotNull(localKeyGroupRange);
        this.processingTimeTimersQueue = checkNotNull(processingTimeTimersQueue);
        this.eventTimeTimersQueue = checkNotNull(eventTimeTimersQueue);
​
        // find the starting index of the local key-group range
        int startIdx = Integer.MAX_VALUE;
        for (Integer keyGroupIdx : localKeyGroupRange) {
            startIdx = Math.min(keyGroupIdx, startIdx);
        }
        this.localKeyGroupRangeStartIdx = startIdx;
    }
​
    public void startTimerService(
            TypeSerializer<K> keySerializer,
            TypeSerializer<N> namespaceSerializer,
            Triggerable<K, N> triggerTarget) {
​
        if (!isInitialized) {
​
            if (keySerializer == null || namespaceSerializer == null) {
                throw new IllegalArgumentException("The TimersService serializers cannot be null.");
            }
​
            if (this.keySerializer != null || this.namespaceSerializer != null || this.triggerTarget != null) {
                throw new IllegalStateException("The TimerService has already been initialized.");
            }
​
            // the following is the case where we restore
            if (restoredTimersSnapshot != null) {
                CompatibilityResult<K> keySerializerCompatibility = CompatibilityUtil.resolveCompatibilityResult(
                    this.keyDeserializer,
                    null,
                    restoredTimersSnapshot.getKeySerializerConfigSnapshot(),
                    keySerializer);
​
                CompatibilityResult<N> namespaceSerializerCompatibility = CompatibilityUtil.resolveCompatibilityResult(
                    this.namespaceDeserializer,
                    null,
                    restoredTimersSnapshot.getNamespaceSerializerConfigSnapshot(),
                    namespaceSerializer);
​
                if (keySerializerCompatibility.isRequiresMigration() || namespaceSerializerCompatibility.isRequiresMigration()) {
                    throw new IllegalStateException("Tried to initialize restored TimerService "  
                        "with incompatible serializers than those used to snapshot its state.");
                }
            }
​
            this.keySerializer = keySerializer;
            this.namespaceSerializer = namespaceSerializer;
            this.keyDeserializer = null;
            this.namespaceDeserializer = null;
​
            this.triggerTarget = Preconditions.checkNotNull(triggerTarget);
​
            // re-register the restored timers (if any)
            final InternalTimer<K, N> headTimer = processingTimeTimersQueue.peek();
            if (headTimer != null) {
                nextTimer = processingTimeService.registerTimer(headTimer.getTimestamp(), this);
            }
            this.isInitialized = true;
        } else {
            if (!(this.keySerializer.equals(keySerializer) && this.namespaceSerializer.equals(namespaceSerializer))) {
                throw new IllegalArgumentException("Already initialized Timer Service "  
                    "tried to be initialized with different key and namespace serializers.");
            }
        }
    }
​
    @Override
    public long currentProcessingTime() {
        return processingTimeService.getCurrentProcessingTime();
    }
​
    @Override
    public long currentWatermark() {
        return currentWatermark;
    }
​
    @Override
    public void registerProcessingTimeTimer(N namespace, long time) {
        InternalTimer<K, N> oldHead = processingTimeTimersQueue.peek();
        if (processingTimeTimersQueue.add(new TimerHeapInternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace))) {
            long nextTriggerTime = oldHead != null ? oldHead.getTimestamp() : Long.MAX_VALUE;
            // check if we need to re-schedule our timer to earlier
            if (time < nextTriggerTime) {
                if (nextTimer != null) {
                    nextTimer.cancel(false);
                }
                nextTimer = processingTimeService.registerTimer(time, this);
            }
        }
    }
​
    @Override
    public void registerEventTimeTimer(N namespace, long time) {
        eventTimeTimersQueue.add(new TimerHeapInternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace));
    }
​
    @Override
    public void deleteProcessingTimeTimer(N namespace, long time) {
        processingTimeTimersQueue.remove(new TimerHeapInternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace));
    }
​
    @Override
    public void deleteEventTimeTimer(N namespace, long time) {
        eventTimeTimersQueue.remove(new TimerHeapInternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace));
    }
​
    @Override
    public void onProcessingTime(long time) throws Exception {
        // null out the timer in case the Triggerable calls registerProcessingTimeTimer()
        // inside the callback.
        nextTimer = null;
​
        InternalTimer<K, N> timer;
​
        while ((timer = processingTimeTimersQueue.peek()) != null && timer.getTimestamp() <= time) {
            processingTimeTimersQueue.poll();
            keyContext.setCurrentKey(timer.getKey());
            triggerTarget.onProcessingTime(timer);
        }
​
        if (timer != null && nextTimer == null) {
            nextTimer = processingTimeService.registerTimer(timer.getTimestamp(), this);
        }
    }
​
    public void advanceWatermark(long time) throws Exception {
        currentWatermark = time;
​
        InternalTimer<K, N> timer;
​
        while ((timer = eventTimeTimersQueue.peek()) != null && timer.getTimestamp() <= time) {
            eventTimeTimersQueue.poll();
            keyContext.setCurrentKey(timer.getKey());
            triggerTarget.onEventTime(timer);
        }
    }
​
    public InternalTimersSnapshot<K, N> snapshotTimersForKeyGroup(int keyGroupIdx) {
        return new InternalTimersSnapshot<>(
            keySerializer,
            keySerializer.snapshotConfiguration(),
            namespaceSerializer,
            namespaceSerializer.snapshotConfiguration(),
            eventTimeTimersQueue.getSubsetForKeyGroup(keyGroupIdx),
            processingTimeTimersQueue.getSubsetForKeyGroup(keyGroupIdx));
    }
​
    @SuppressWarnings("unchecked")
    public void restoreTimersForKeyGroup(InternalTimersSnapshot<?, ?> restoredSnapshot, int keyGroupIdx) {
        this.restoredTimersSnapshot = (InternalTimersSnapshot<K, N>) restoredSnapshot;
​
        if (areSnapshotSerializersIncompatible(restoredSnapshot)) {
            throw new IllegalArgumentException("Tried to restore timers "  
                "for the same service with different serializers.");
        }
​
        this.keyDeserializer = restoredTimersSnapshot.getKeySerializer();
        this.namespaceDeserializer = restoredTimersSnapshot.getNamespaceSerializer();
​
        checkArgument(localKeyGroupRange.contains(keyGroupIdx),
            "Key Group "   keyGroupIdx   " does not belong to the local range.");
​
        // restore the event time timers
        eventTimeTimersQueue.addAll(restoredTimersSnapshot.getEventTimeTimers());
​
        // restore the processing time timers
        processingTimeTimersQueue.addAll(restoredTimersSnapshot.getProcessingTimeTimers());
    }
​
    @VisibleForTesting
    public int numProcessingTimeTimers() {
        return this.processingTimeTimersQueue.size();
    }
​
    @VisibleForTesting
    public int numEventTimeTimers() {
        return this.eventTimeTimersQueue.size();
    }
​
    @VisibleForTesting
    public int numProcessingTimeTimers(N namespace) {
        return countTimersInNamespaceInternal(namespace, processingTimeTimersQueue);
    }
​
    @VisibleForTesting
    public int numEventTimeTimers(N namespace) {
        return countTimersInNamespaceInternal(namespace, eventTimeTimersQueue);
    }
​
    private int countTimersInNamespaceInternal(N namespace, InternalPriorityQueue<TimerHeapInternalTimer<K, N>> queue) {
        int count = 0;
        try (final CloseableIterator<TimerHeapInternalTimer<K, N>> iterator = queue.iterator()) {
            while (iterator.hasNext()) {
                final TimerHeapInternalTimer<K, N> timer = iterator.next();
                if (timer.getNamespace().equals(namespace)) {
                    count  ;
                }
            }
        } catch (Exception e) {
            throw new FlinkRuntimeException("Exception when closing iterator.", e);
        }
        return count;
    }
​
    @VisibleForTesting
    int getLocalKeyGroupRangeStartIdx() {
        return this.localKeyGroupRangeStartIdx;
    }
​
    @VisibleForTesting
    List<Set<TimerHeapInternalTimer<K, N>>> getEventTimeTimersPerKeyGroup() {
        return partitionElementsByKeyGroup(eventTimeTimersQueue);
    }
​
    @VisibleForTesting
    List<Set<TimerHeapInternalTimer<K, N>>> getProcessingTimeTimersPerKeyGroup() {
        return partitionElementsByKeyGroup(processingTimeTimersQueue);
    }
​
    private <T> List<Set<T>> partitionElementsByKeyGroup(KeyGroupedInternalPriorityQueue<T> keyGroupedQueue) {
        List<Set<T>> result = new ArrayList<>(localKeyGroupRange.getNumberOfKeyGroups());
        for (int keyGroup : localKeyGroupRange) {
            result.add(Collections.unmodifiableSet(keyGroupedQueue.getSubsetForKeyGroup(keyGroup)));
        }
        return result;
    }
​
    private boolean areSnapshotSerializersIncompatible(InternalTimersSnapshot<?, ?> restoredSnapshot) {
        return (this.keyDeserializer != null && !this.keyDeserializer.equals(restoredSnapshot.getKeySerializer())) ||
            (this.namespaceDeserializer != null && !this.namespaceDeserializer.equals(restoredSnapshot.getNamespaceSerializer()));
    }
}
  • InternalTimerServiceImpl实现了InternalTimerService及ProcessingTimeCallback(定义了onProcessingTime方法)接口
  • startTimerService方法主要是初始化keySerializer、namespaceSerializer、triggerTarget属性;registerEventTimeTimer及deleteEventTimeTimer方法使用的是eventTimeTimersQueue;registerProcessingTimeTimer及deleteProcessingTimeTimer方法使用的是processingTimeTimersQueue(eventTimeTimersQueue及processingTimeTimersQueue的类型为KeyGroupedInternalPriorityQueue,queue的元素类型为TimerHeapInternalTimer)
  • eventTimerTimer的触发主要是在advanceWatermark方法中(AbstractStreamOperator的processWatermark方法会调用InternalTimeServiceManager的advanceWatermark方法,而该方法调用的是InternalTimerServiceImpl的advanceWatermark方法),它会移除timestamp小于等于指定time的eventTimerTimer,然后回调triggerTarget.onEventTime方法;而processingTimeTimer的触发则是在onProcessingTime方法中(SystemProcessingTimeService的TriggerTask及RepeatedTriggerTask的定时任务会回调ProcessingTimeCallback的onProcessingTime方法),它会移除timestamp小于等于指定time的processingTimeTimer,然后回调triggerTarget.onProcessingTime方法

Triggerable

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/Triggerable.java

代码语言:javascript复制
@Internal
public interface Triggerable<K, N> {
​
    /**
     * Invoked when an event-time timer fires.
     */
    void onEventTime(InternalTimer<K, N> timer) throws Exception;
​
    /**
     * Invoked when a processing-time timer fires.
     */
    void onProcessingTime(InternalTimer<K, N> timer) throws Exception;
}
  • Triggerable接口定义了InternalTimerService会调用的onEventTime及onProcessingTime方法;WindowOperator、IntervalJoinOperator、KeyedProcessOperator、KeyedCoProcessOperator等operator均实现了Triggerable接口,可以响应timer的onEventTime或onProcessingTime的回调

小结

  • TimerService接口定义了currentProcessingTime、currentWatermark、registerProcessingTimeTimer、registerEventTimeTimer、deleteProcessingTimeTimer、deleteEventTimeTimer接口;它有一个实现类为SimpleTimerService,而SimpleTimerService主要是委托给InternalTimerService来实现
  • InternalTimerService是TimerService的internal版本的接口,比起TimerService它定义了namespace,在registerProcessingTimeTimer、deleteProcessingTimeTimer、registerEventTimeTimer、deleteEventTimeTimer的方法中均多了一个namesapce的参数;它的实现类为InternalTimerServiceImpl;InternalTimerServiceImpl实现了InternalTimerService及ProcessingTimeCallback(定义了onProcessingTime方法)接口,其registerEventTimeTimer及deleteEventTimeTimer方法使用的是eventTimeTimersQueue;registerProcessingTimeTimer及deleteProcessingTimeTimer方法使用的是processingTimeTimersQueue(eventTimeTimersQueue及processingTimeTimersQueue的类型为KeyGroupedInternalPriorityQueue,queue的元素类型为TimerHeapInternalTimer)
  • InternalTimerServiceImpl的eventTimerTimer的触发主要是在advanceWatermark方法中(AbstractStreamOperator的processWatermark方法会调用InternalTimeServiceManager的advanceWatermark方法,而该方法调用的是InternalTimerServiceImpl的advanceWatermark方法),它会移除timestamp小于等于指定time的eventTimerTimer,然后回调triggerTarget.onEventTime方法
  • InternalTimerServiceImpl的processingTimeTimer的触发则是在onProcessingTime方法中(SystemProcessingTimeService的TriggerTask及RepeatedTriggerTask的定时任务会回调ProcessingTimeCallback的onProcessingTime方法),它会移除timestamp小于等于指定time的processingTimeTimer,然后回调triggerTarget.onProcessingTime方法
  • Triggerable接口定义了InternalTimerService会调用的onEventTime及onProcessingTime方法;WindowOperator、IntervalJoinOperator、KeyedProcessOperator、KeyedCoProcessOperator等operator均实现了Triggerable接口,可以响应timer的onEventTime或onProcessingTime的回调

doc

  • TimerService

0 人点赞