这是Ververica的Stefan Richter和Piotr Nowojski提出的,有关这个方法的一些描述是本文的重点。在flink 1.10之前还都是使用flink checkpoint lock 进行线程同步,为了避免所有相关操作都去获取checkpoint lock进行同步,之后开始使用mailbox进行StreamTask的处理,本文便是关于这种线程模型的提议介绍。
前言
本文中关于将StreamTask中的线程模型更改为基于Mailbox的方法主要译自如下两处:
•https://issues.apache.org/jira/browse/FLINK-12477•https://docs.google.com/document/d/1eDpsUKv2FqwZiS1Pm6gYO5eFHScBHfULKmH1-ZEWB4g下一篇将会针对本文中所讨论的mailbox线程模型在flink 1.12中的实现源码进行分析。
1. 动机
我们提出这个建议的动机是用基于mailbox的方法简化流任务的线程模型(类似于在actor-model中常见的执行模型)。
使用Flink的流任务中的当前线程模型,有多个线程可能希望并发访问对象状态,例如事件处理(event-processing)和检查点触发(checkpoint triggering)。线程通过使用一个“全局”锁(即臭名昭著的检查点锁——checkpoint lock)进行互斥访问而彼此屏蔽。
使用checkpoint lock有很多缺点:锁必须传递到代码中的许多地方,泄漏到面向用户的API中(请参阅SourceContext),不获取锁可能会导致细微或不那么细微的bug,而关于并发线程的推理通常很容易出错。
使用mailbox模式,流任务中的所有状态更改都将从单个线程(即所谓的“mailbox线程”)发生。通过将操作(或至少其状态更改部分)排队到阻塞队列—邮箱,可以模拟并发操作。该队列由单个主线程(邮箱线程)持续探测,以寻找新的操作。如果“并发”操作在队列中,主线程将执行它。这种方法可以极大地简化流任务的线程模型。下面我们将描述实现这一改变所面临的挑战和计划。
2. 旧检查点锁的用例分析
检查点锁用于在以下三个并发源之间实现对流任务组件状态的互斥访问:
(1)事件处理(Event Processing):事件、水印、屏障、延迟标记等的基本发射和处理;
原文如下:
代码语言:javascript复制the basic emitting and processing of events, watermarks, barriers, latency markers, etc;
(2)检查点(Checkpoints):关于完整性的触发器(源上的)和通知来自对TaskExecutor(网关)的RPC调用,并在asyncCallDispatcher中移交给executor。触发/取消也可以通过事件处理(1)过程中接收到的障碍发生; 原文如下:
代码语言:javascript复制the trigger (on sources) and notifications about completeness come from an RPC call to the TaskExecutor(Gateway) and is handed over to an executor in asyncCallDispatcher. Triggering/cancellation can also happen through barriers that can be received during event processing(1)。
(3)处理时间计时器(Processing Time Timers):SystemProcessingTimeService使用ScheduledExecutor异步运行处理时间计时器。请注意,事件时间计时器是基于事件处理(1)中的水印处理同步触发的。原文如下(直接看原文可能更合理):
代码语言:javascript复制Please notice that event time timers are triggered synchronously, based on watermark handling that is part of the event processing (1)
我们还可以确定,检查点锁的替换不仅必须提供排他,还必须提供关键部分(如处理事件)的原子执行。
3. 修改建议
流任务的变化
我们建议在流任务中引入一个mailbox属性。mailbox的一种可能的初始实现是ArrayBlockingQueue。稍后,我们可以为我们必须讨论的多生产者-单一消费者的情况评估更有效的实现,可能基于ringbuffer的disruptor风格的实现。
这个邮箱将位于流任务主线程的活动中心,并且(在大多数情况下)接管当前StreamTask#run()方法的角色,也就是说,它成为事件生成/处理的驱动程序。然而,与StreamTask#run()不同的是,该方法还将负责执行检查点事件和处理计时器事件。所有这些事件都将成为在邮箱中排队的任务,流任务的主线程将不断地从邮箱中拉出并运行下一个事件。这通过队列实现了互斥执行。
由于我们希望能够在此模型中表示原子部分,一种方法是将此类原子操作表示为在邮箱中排队的Runnable对象。
注意,任务的主线程在执行这些Runnables程序时是可以阻塞执行的,生产者在尝试将新动作放入队列时也是可以阻塞的。第一种情况对应于当前代码中的情况,在检查点锁下阻塞了较长的临界段。第二种情况是在尝试获取检查点锁时线程阻塞。
我们可以将StreamTask的基本变化概括如下:
代码语言:javascript复制BlockingQueue<Runnable> mailbox = ...
void runMailboxProcessing() {
//TODO: can become a cancel-event through mailbox eventually
Runnable letter;
while (isRunning()) {
while ((letter = mailbox.poll()) != null) {
letter.run();
}
defaultAction();
}
}
void defaultAction() {
// e.g. event-processing from an input
}
这段代码只是代表了核心思想,可以在很多方面进行优化,例如不同的队列实现,或者通过不同处理的Runnable的特定标记singleton实例来表示非常频繁的事件,或者在队列上使用像#drainTo这样的批量方法,等等。队列公平性也需要考虑,但是当前在检查点锁上进行同步的方法不提供任何公平性保证。我们还可以考虑一种变体,其中的“default action”也只是邮箱的一个事件。
当前使用检查点锁的客户端代码的一般变化
现在,我们将讨论这个模型如何在前一节讨论的3个用例中替换当前的检查点锁定方法。
首先,checkpointing, processing timers, 和 event processing中的参与者如何在邮箱上同步?目前,检查点锁通过协作参与者的构造函数或getter公开给他们。我们巧妙地将邮箱隐藏在Queue接口(或类似的接口)之后,并通过传递给构造函数或通过getter返回来公开队列。目前,我们可以将它与检查点锁对象一起传递,为了向后兼容,我们保留了该对象(参见第4节)。
Runnable#run()实现中的代码可以被视为原子关键部分,因为邮箱只会在方法完全完成后继续处理下一个事件。
事件的生成和处理(Event generation and processing)
用例1,即一般事件的生成和处理,将通过我们的更改而大大简化。邮箱确保所有状态更改都来自单个线程,不再具有互斥性。这意味着我们可以从这些代码路径中完全放弃锁定的需求。
要使用邮箱模型,我们需要将run方法的事件处理循环拆分为可以处理有限数量事件的方法,例如每次调用的单个事件。例如,删除在One/ twooinputstreamtask中运行while (running && inputProcessor.processInput())的循环,并在再次检查邮箱是否来自其他参与者的事件之前一次调用inputProcessor.processInput()。
请注意,这与我们对 selectable, (un)bounded task inputs (FLINK-11875[1])的设想更改相匹配。
从sources来看,情况一开始似乎有点复杂。原因是,从高层次的角度来看,当前的source functions就像事件生成的“无穷”循环一样工作。这是他们的公共API的一部分,不能为了向后兼容自定义sources而更改。但是,在永不产生结果的循环中运行的source function不会给我们的方法任何检查邮箱的机会。在第4节中,我们将讨论一种向后兼容的方法,以适应邮箱模型的那些“legacy” sources。
我们目前还在讨论一个新的source接口(flip 27),它对事件生成循环提供了更多的控制,并且非常适合邮箱模型。这意味着未来的源代码可能会非常容易集成。
检查点和定时器触发
这种方法的一个很好的特性是,它似乎已经适合计时器和检查点事件(用例2和3),因为它们已经以Runnable对象的形式向异步执行器执行了。我们现在可以简单地将操作放入邮箱队列,并删除锁定。
4. 向后兼容“legacy” sources
正如第3.3节所讨论的,我们需要提供兼容性,使我们能够运行由事件生成的无穷循环组成的source functions。如果不进行重大修改(将邮箱作为循环的一部分进行检查),这种无限循环就不能与邮箱模型集成,因此我们需要考虑如何提供另一种方法来实现此类source functions与邮箱模型之间的向后兼容性。我们将讨论使用不同于其他流任务的source functions来执行流任务的一种可能的方法。我们可以采用不同的分支,因为可以通过API检测这样的sources,不同的执行行为也可以是在原始邮箱线程中运行的操作,直到流任务终止。
兼容方法背后的核心思想是,我们将使用两个线程来运行这样的source functions,一个是带有事件生成循环( event generating loop)的source function线程,另一个是接收检查点、处理计时器触发器等事件的流任务的邮箱线程。我们通过旧的检查点锁使两个线程互斥,这意味着我们运行一个修改版本的邮箱循环,该循环阻塞邮箱,并在检查点锁下执行邮箱事件(参见图)。当邮箱事件到达时,邮箱线程将以获取检查点锁为目标,将其从源函数线程中取出。在锁定下,邮箱操作是独占执行的。在此之后,邮箱线程再次释放source function线程的锁,并阻塞邮箱上的等待take()调用。
5. 细化实施步骤
1.将StreamInputProcessor、StreamTwoInputProcessor和StreamIterationHead的主要“无穷”循环分解为逐步的、非阻塞的增量事件处理(incremental event processing)调用。→https://github.com/apache/flink/pull/84092.在StreamTask中引入邮箱队列,并让它驱动1中引入的事件处理步骤。邮箱循环仍然必须始终同步锁。→https://github.com/apache/flink/pull/84313.向后兼容的代码来检测 legacy source function,并在与流任务主线程不同的线程中运行它们。→https://github.com/apache/flink/pull/84424.使输入非阻塞。5.通过邮箱队列运行检查点trigger/notifyComplete。6.通过邮箱队列运行处理时间计时器触发器。7.在操作符(如AsyncWaitOperator)中取消或调整特殊锁的使用8.对于现在在StreamTask邮箱线程中运行的路径,删除不必要的锁定。9.可选:重写一些现有的Flink操作符(例如源)到新的接口
6. 选择
我们还回顾了Kotlin协程和挂起函数,作为减少阻塞操作(AsyncWaitOperator)和仍然需要在同一个任务线程中处理事件消息所导致的某些交互的方法。
Kotlin协程确实使邮箱方法更容易实现和维护。邮箱本身将是通道,异步操作可以挂起,将控制权交还给邮箱处理器。然而,如果不使用大量的样板代码对代码进行巨大的更改,就没有很好的方法在Java中模拟这种行为。对潜在的定制操作符隐藏Kotlin实现也不容易。
7. 附录
代码实现中其他需要考虑的地方:
Stream task:
•timer triggers•checkpoints (perform, broadcast, notify complete)•(notification actions)
External Signals from TaskExecutor/TaskExecutorGateway:
•cancelTask -> Task#cancelExecution•(updatePartitions)•(failPartition)•triggerCheckpoint -> Task#triggerCheckpointBarrier•confirmCheckpoint -> Task#notifyCheckpointComplete
Lock usage to consider:
•StreamInputProcessor#processInput•StreamTwoInputProcessor#processInput•SourceStreamTask#run•StreamIterationHead#run•SystemProcessingTimeService: TriggerTask#run, RepeatedTriggerTask#run
In StreamTask:
•#abortCheckpointOnBarrier•#performCheckpoint•#notifyCheckpointComplete
我们收集了所有相关类的列表,除了那些基于检查点锁的线程协调的source类,也不包括那些通过暴露的API使用检查点锁的实现,例如在事件生成循环中的特定源类:
代码语言:javascript复制AsyncWaitOperator
ContinuousFileReaderOperator
Emitter
OneInputStreamTask
SourceStreamTask
StreamInputProcessor
StreamIterationHead
StreamSource
StreamSourceContexts
StreamTask
StreamTwoInputProcessor
SystemProcessingTimeService
TwoInputStreamTask
References
[1]
FLINK-11875: https://issues.apache.org/jira/browse/FLINK-11875