背景
之前有想过系统地来一番flink源码分析系列,谁曾想工作中需要完成的需求有些多,完整的flink源码分析系列只能一再往后拖了。之前公众号后台有想学习flink的朋友留言想看更多学习flink的资料,现在先发一些之前收藏的关于flink相关的文章,其中大多翻译自flink社区,希望能给大家带来一些帮助。本文[1]主要围绕flink任务的生命周期展开。
任务生命周期
Flink中的任务是执行的基本单位。在这里是执行操作符的每个并行实例。例如,并行度为5的运算符将使其每个实例由单独的任务执行。StreamTask
是Flink流引擎中所有不同任务子类型的基础(是一个抽象父类)。本文档介绍了StreamTask生命周期中的不同阶段,并描述了代表每个阶段的主要方法。
StreamTask的部分实现类见下图:
简述Operator的生命周期
由于任务是执行Operator并行实例的实体,因此其生命周期与Operator的生命周期紧密集成在一起。因此,在深入研究StreamTask本身的方法之前,我们将简要介绍表示Operator生命周期的基本方法。下面列出了每个方法调用的顺序。假设一个操作符可以有一个用户定义的函数(UDF),在每个Operator方法下面,我们还提供了它所调用的UDF生命周期中的方法(缩进)。如果Operator扩展了AbstractUdfStreamOperator(它是执行udf的所有Operator的基本类),则可以使用这些方法。
代码语言:javascript复制// initialization phase
OPERATOR::setup
UDF::setRuntimeContext
OPERATOR::initializeState
OPERATOR::open
UDF::open
// processing phase (called on every element/watermark)
OPERATOR::processElement
UDF::run
OPERATOR::processWatermark
// checkpointing phase (called asynchronously on every checkpoint)
OPERATOR::snapshotState
// termination phase
OPERATOR::close
UDF::close
OPERATOR::dispose
简而言之,调用setup()来初始化一些特定于Operator的设置,比如它的RuntimeContext和它的Metric集合数据结构。在此之后,initializeState()为Operator提供其初始状态,open()方法执行任何特定于Operator的初始化,例如在AbstractUdfStreamOperator的情况下打开用户定义的函数。
注意,initializeState()既包含在Operator初始执行期间初始化状态的逻辑(例如注册任何keyed state),也包含在失败后从检查点检索其状态的逻辑。更多关于这一页的其余部分。
现在一切都设置好了,Operator就可以处理传入的数据了。传入元素可以是下列元素之一:输入元素、水印和检查点屏障。每一个都有一个特殊的方法来处理它。元素由processElement()方法处理,水印由processWatermark()处理,检查点障碍触发一个检查点,该检查点(异步)调用snapshotState()方法,我们将在下面描述该方法。对于每个传入的元素,根据其类型调用前面提到的方法之一。注意processElement()也是调用UDF逻辑的地方,例如MapFunction的map()方法。
最后,在Operator正常、无故障终止的情况下(例如,如果流是有限的,并且到达了流的终点),调用close()方法来执行操作符逻辑所需的任何最终处理操作(例如关闭任何连接或I / O流Operator的执行期间打开),然后调用dispose()释放操作符持有的任何资源(Operator 数据所持有的本地内存)。
在由于失败或手动取消而终止的情况下,执行直接跳转到dispose(),并跳过故障发生时Operator所处的阶段和dispose()之间的任何中间阶段。
检查点:当接收到检查点屏障时,Operator的snapshotState()方法会在上面描述的其他方法中异步调用。检查点在处理阶段执行,即Operator打开之后和关闭之前。此方法的职责是将Operator的当前状态存储到指定的状态后端,当作业在失败后继续执行时,将从该后端检索Operator。下面我们将简要描述Flink的检查点机制,关于Flink中检查点的更多详细讨论,请阅读相应的文档:数据流容错( Data Streaming Fault Tolerance[2])。
任务生命周期
在简要介绍了Operator的主要阶段之后,本节将更详细地描述任务在集群上执行时如何调用各自的方法。这里描述的步骤主要包含在StreamTask类的invoke()方法中。本文档的其余部分分为两部分,一部分描述一个任务在正常、无故障执行期间的各个阶段(见正常执行 Normal Execution[3]),另一部分(较短的部分)描述任务取消(见中断执行)时的不同顺序(见中断执行 Interrupted Execution[4]),无论是手动取消,还是由于某些原因取消。
正常执行
任务在不中断的情况下执行直到完成时所经历的步骤如下所示:
代码语言:javascript复制TASK::setInitialState
TASK::invoke
create basic utils (config, etc) and load the chain of operators
setup-operators
task-specific-init
initialize-operator-states
open-operators
run
close-operators
dispose-operators
task-specific-cleanup
common-cleanup
如上所示,在恢复任务配置并初始化一些重要的运行时参数之后,任务的第一步是检索其初始的、任务范围的状态。这是在setInitialState()中完成的,在两种情况下特别重要:
1.当任务正在从失败中恢复并从最后一个成功的检查点重新启动时;2.当从保存点(savepoint[5])恢复时。
第一次执行时,任务初始状态为空。
恢复任何初始状态后,任务进入它的invoke()方法。在这里,它首先通过调用每个操作符的setup()方法来初始化涉及到本地计算的Operator,然后通过调用本地init()方法来执行特定任务的初始化。通过特定任务,我们的意思是根据任务的类型(SourceTask、OneInputStreamTask或twooinputstreamtask等),这个步骤可能有所不同,但在任何情况下,这里都是获得必要的任务范围内资源的地方。例如,OneInputStreamTask表示一个希望拥有单个输入流的任务,它初始化到与本地任务相关的输入流的不同分区的位置的连接。
在获得了必要的资源之后,现在是时候让不同的Operator和用户定义函数从上面检索的任务范围的状态中获取它们各自的状态了。这是在initializeState()方法中完成的,该方法调用每个Operator的initializeState()。每个有状态Operator都应该覆盖这个方法,并且应该包含状态初始化逻辑,无论是第一次执行作业时,还是任务从失败中恢复或使用保存点时。
现在任务中的所有Operator都已初始化,StreamTask的openAllOperators()方法将调用每个Operator的open()方法。此方法执行所有的操作初始化,例如向计时器服务注册检索到的计时器。单个任务可能会执行多个Operator,其中一个Operator会消耗其前一个任务的输出。在这种情况下,open()方法从最后一个Operator(即输出也是任务本身输出的Operator)调用到第一个Operator。这样,当第一个Operator开始处理任务的输入时,所有下游Operator都准备好接收它的输出。
注意:
任务中连续的Operator从最后一个到第一个依次打开。
现在任务可以恢复执行,operators可以开始处理新的输入数据。这就是调用特定于任务的run()方法的地方。这个方法将一直运行,直到没有更多的输入数据(有限流),或者任务被取消(手动或非手动)。这里调用特定于Operator的processElement()和processWatermark()方法。
在运行到完成的情况下,即没有更多的输入数据需要处理,退出run()方法后,任务进入它的shutdown进程。最初,计时器服务停止注册任何新的计时器(例如,正在执行的触发计时器),清除所有尚未启动的计时器,并等待当前正在执行的计时器完成。然后,closeAllOperators()尝试通过调用每个Operator的close()方法优雅地关闭涉及计算的Operator。然后,刷新所有缓冲的输出数据,以便下游任务能够处理它们,最后任务尝试通过调用每个Operator的dispose()方法来清除Operator持有的所有资源。在打开不同的Operator时,我们提到了顺序是从最后一个到第一个。关闭的方式是相反的,从第一个到最后。
注意:
任务中的连续Operator从第一个到最后一个关闭。
最后,当所有的Operator都被关闭并释放了它们的资源后,任务会关闭它的定时器服务,执行特定任务的清理,例如清除所有内部缓冲区,然后执行它的通用任务清理,包括关闭所有的输出通道和清除任何输出缓冲区。
检查点:
前面我们看到,在initializeState()期间,以及在从失败中恢复的情况下,任务及其所有Operator和函数检索在失败前的最后一个成功检查点期间持久化到稳定存储的状态。Flink中的检查点根据用户指定的时间间隔定期执行,并由与主任务线程不同的线程执行。这就是为什么他们不包括在任务生命周期的主要阶段。简而言之,称为CheckpointBarriers的特殊元素由作业的源任务定期注入到输入数据流中,并随实际数据从源迁移到sink。源任务在进入运行模式后注入这些障碍,并且假定CheckpointCoordinator也在运行。每当一个任务收到这样一个屏障时,它就会安排一个任务由检查点线程执行,检查点线程调用任务中Operator的snapshotState()。在执行检查点时,任务仍然可以接收输入数据,但数据将被缓存,并仅在检查点成功完成后才被下游处理和发送。
中断执行
在前面的部分中,我们描述了一直运行到完成的任务的生命周期。如果任务在任何点被取消,那么正常的执行将被中断,从那个点开始执行的操作只有计时器服务关闭、特定于任务的清理、operators的处理和一般的任务清理,如上所述。
References
[1]
本文: https://ci.apache.org/projects/flink/flink-docs-release-1.12/internals/task_lifecycle.html
[2]
Data Streaming Fault Tolerance: https://ci.apache.org/projects/flink/flink-docs-release-1.12/learn-flink/fault_tolerance.html
[3]
Normal Execution: https://ci.apache.org/projects/flink/flink-docs-release-1.12/internals/task_lifecycle.html#normal-execution
[4]
Interrupted Execution: https://ci.apache.org/projects/flink/flink-docs-release-1.12/internals/task_lifecycle.html#interrupted-execution
[5]
savepoint: https://ci.apache.org/projects/flink/flink-docs-release-1.12/ops/state/savepoints.html