背景
本文参考Flink1.10官方多篇文章相关知识收集、翻译、整合和内化而写成的关于Flink内存模型详解的文章,其中Job Manager、Task Manager和Client 分别是什么,各自之间的运行关系怎样,任务运行过程中所使用任务槽和资源情况的内存模型构成详解,内存设置需要配置哪些参数,参数功能描述等。暂时不熟悉Flink相关概念的童鞋自觉查阅笔者以往分享关于Flink术语基本概念的文章链接:Flink优化器与源码解析系列--Flink相关基本概念。
内存模型
先从一个简单Flink程序执行流程讲起,对在作业执行过程中涉及到Job Managers, Task Managers和Clients功能说明以及使用资源情况以及Flink内存模型介绍。
- 一个Flink程序执行流程:
- Client客户端提交作业Job给JobManager
- JobManager调起任务到TaskManager去执行任务,并管理Task任务,协调记录CheckPoint检查点。
- TaskManager执行任务,并返回任务状态、心跳信息、执行结果或统计信息等给JobManager。
- JobManager再将状态更新、执行结果或统计信息返回给Client客户端。
- 启动JobManager和TaskManager几种方式:
- 独立群集standaloneCluster的形式直接在计算机上,
- 在容器中启动
- YARN或Mesos等资源框架进行管理。TaskManager连接到JobManager后直接可用,并被分配工作。
- Job Managers, Task Managers和Clients各自功能:
- JobManagers(也称为master)协调分布式执行。主要功能是调度任务,协调检查点,协调故障恢复等。总是至少有一个Job Manager。高可用性设置将有多个JobManager,其中一个始终是leader,而其他则是standby。
- TaskManagers(也叫workers)主要功能是执行数据流的任务(或者更具体地说,子任务subtasks),以及缓冲buffer和交换exchange数据流。必须始终至少有一个TaskManager。
Clients客户端不属于运行时环境和程序执行(the runtime and program execution)的一部分,而是被用来准备和发送的数据流到JobManager。之后,客户端可以断开连接或保持连接状态以接收进度报告。客户端既可以作为触发执行的Java / Scala程序的一部分运行,也可以在命令行进程中运行./bin/flink run ...。
- 任务槽和资源Task Slots and Resources
每个worker(TaskManager)是一个JVM进程,并且可以在单独的线程中执行一个或多个子任务subtasks。一个worker用任务槽task slots(至少一个)来管理接受任务的。每个任务槽代表TaskManager的资源的固定子集。例如,具有三个插槽的TaskManager,那么每个插槽slot享有1/3的托管内存Managed Memory。分配资源意味着子任务不会与其他作业的子任务subtasks竞争托管内存,而是具有一定数量的保留托管内存。请注意,此处没有发生CPU隔离。当前插槽slot仅将任务的托管内存分开。
通过调整任务槽task slots的数量,用户可以定义子任务如何相互隔离。每个TaskManager具有一个插槽slot,这意味着每个任务组都在单独的JVM中运行(例如,可以在单独的容器中启动)。具有多个插槽意味着更多子任务共享同一JVM。同一JVM中的任务共享TCP连接(通过多路复用)和心跳消息。他们还可以共享数据集和数据结构,从而减少每个任务的开销。
默认情况下,Flink允许子任务共享插槽slot,即使它们是不同任务的子任务也是如此,只要它们来自同一任务即可。结果是一个插槽可以容纳整个job流。允许此插槽共享有两个主要好处:
- Flink集群所需的任务槽与作业job中使用的最高并行度数量是一样多的。所以不需要计算一个程序总共包含多少个任务(因并行度不同而各异的)。
- 更容易获得更好的资源利用率。如果没有插槽共享,则非密集型 source / map()子任务将阻塞与资源密集型窗口子任务一样多的资源。通过插槽共享,我们示例中的基本并行度从2增加到6,可以充分利用插槽资源
子任务在TaskManager之间公平分配。
具有共享任务插槽的TaskManager API还包括一种资源组机制,可用于防止不良的时隙共享。根据经验,默认的任务插槽数量应该是CPU内核的数量。使用超线程时,每个插槽将占用2个或更多物理线程上下文。
- 内存模型详解
上述讲述了Job Manager、Task Manager的分布式运行情况,这里对TaskManager所使用内存模型进行介绍。Flink尝试使用户免受配置JVM进行数据密集型处理的复杂性的影响。在大多数情况下,用户只需要设置值taskmanager.memory.process.size或taskmanager.memory.flink.size(取决于设置的方式),并可能通过调整JVM堆与管理内存的比率taskmanager.memory.managed.fraction等选项可用于执行性能调整和修复与内存相关的错误。
TaskManager进程总内存 = Flink总内存 JVM元空间 JVM开销。其由taskmanager.memory.process.size参数设置其大小。
- taskmanager.memory.process.size = Flink总内存 JVM元空间 JVM开销
Flink总内存 = 框架堆内存 任务堆内存 任务堆外内存 管理内存 网络内存 。包括TaskExecutor占用的所有内存。JVM Metaspace和JVM Overhead除外。其由taskmanager.memory.flink.siz参数设置大小
- taskmanager.memory.flink.size = 框架堆内存 任务堆内存 任务堆外内存 托管内存 网络内存 (包括TaskExecutor占用的所有内存。JVM Metaspace和JVM Overhead除外)
各类内存或开销相关参数说明:
组建 | 配置参数 | 描述 |
---|---|---|
框架堆内存 | taskmanager.memory.framework.heap.size | 专用于Flink框架的JVM堆内存(高级选项) |
任务堆内存 | taskmanager.memory.task.heap.size | 专用于Flink应用程序的JVM堆内存可运行操作员和用户代码 |
托管内存 | taskmanager.memory.managed.sizetaskmanager.memory.managed.fraction | 由Flink管理的本机内存,保留用于排序,哈希表,中间结果的缓存和RocksDB状态后端 |
框架堆外内存 | taskmanager.memory.framework.off-heap.size | 专用于Flink框架的堆外直接(或本机)内存(高级选项) |
任务堆外内存 | taskmanager.memory.task.off-heap.size | 专用于Flink应用程序以运行操作员的堆外直接(或本机)内存 |
网络内存 | taskmanager.memory.network.mintaskmanager.memory.network.maxtaskmanager.memory.network.fraction | 直接存储器保留用于任务之间的数据记录交换(例如缓冲用于传输通过网络),它是一种封端的分馏成分的的总弗林克存储器 |
JVM元空间 | taskmanager.memory.jvm-metaspace.size | Flink JVM进程的元空间大小 |
JVM开销 | taskmanager.memory.jvm-overhead.mintaskmanager.memory.jvm-overhead.maxtaskmanager.memory.jvm-overhead.fraction | 架空其他JVM保留本机内存:如线程堆栈,代码缓存,垃圾收集等的空间,这是一个上限分级成分的的总进程内存 |
各参数功能描述:
这些配置值设置决定了TaskManager使用内存大小。
- taskmanager.memory.flink.size TaskExecutor的总Flink内存大小。这包括TaskExecutor占用的所有内存,但JVM Metaspace和JVM Overhead除外。它由框架堆内存,任务堆内存,任务堆外内存,管理内存和网络内存组成。
- taskmanager.memory.framework.heap.size TaskExecutor的框架堆内存大小。这是为TaskExecutor框架保留的JVM堆内存的大小,不会分配给任务插槽。默认值128MB
- taskmanager.memory.framework.off-heap.size TaskExecutor的框架外堆内存大小。这是为TaskExecutor框架保留的堆外内存(JVM直接内存和本机内存)的大小,不会分配给任务插槽。当Flink计算JVM 最大直接内存大小参数时,将完全计算配置的值。默认值128MB
- taskmanager.memory.managed.size TaskExecutor的托管内存大小。这是由内存管理器管理的堆外内存的大小,保留用于排序,哈希表,中间结果的缓存和RocksDB状态后端。内存使用者可以以MemorySegments的形式从内存管理器中分配内存,也可以从内存管理器中保留字节并将它们的内存使用率保持在该范围内。如果未指定,则将派生它来构成总Flink内存的已配置部分。无默认值
- taskmanager.memory.task.heap.size TaskExecutor的任务堆内存大小。这是为任务保留的JVM堆内存的大小。如果未指定,它将导出为总Flink内存减去框架堆内存,任务堆外内存,托管内存和网络内存。
- taskmanager.memory.task.off-heap.size TaskExecutor的任务堆外内存大小。这是为任务保留的堆外内存(JVM直接内存和本机内存)的大小。当Flink计算JVM最大直接内存大小参数时,将完全计算配置的值。默认为0
- taskmanager.memory.network.fraction Flink总内存的比例,用作网络内存。网络内存是为ShuffleEnvironment保留的堆外内存(例如,网络缓冲区)。得出网络内存大小以构成总Flink内存的已配置部分。如果导出的大小小于/大于配置的最小/最大大小,则将使用最小/最大大小。通过将最小/最大大小设置为相同的值,可以显式指定网络内存的确切大小。默认值0.1浮动值
- taskmanager.memory.network.maxTaskExecutor的最大网络内存大小。网络内存是为ShuffleEnvironment保留的堆外内存(例如,网络缓冲区)。得出网络内存大小以构成总Flink内存的已配置部分。如果导出的大小小于/大于配置的最小/最大大小,则将使用最小/最大大小。通过将最小值/最大值设置为相同的值,可以显式指定网络内存的确切大小。默认值1 GB
- taskmanager.memory.network.min TaskExecutor的最小网络内存大小。网络内存是为ShuffleEnvironment保留的堆外内存(例如,网络缓冲区)。得出网络内存大小以构成总Flink内存的已配置部分。如果导出的大小小于/大于配置的最小/最大大小,则将使用最小/最大大小。通过将最小值/最大值设置为相同的值,可以显式指定网络内存的确切大小。默认值 64MB
- taskmanager.memory.process.size TaskExecutor的总进程内存大小。这包括TaskExecutor消耗的所有内存,包括总Flink内存,JVM元空间和JVM开销。在容器化设置中,应将其设置为容器内存。
- taskmanager.memory.jvm-overhead.fraction 要为JVM开销保留的总进程内存的百分比。这是为JVM开销保留的堆外内存,例如线程堆栈空间,编译缓存等。这包括本机内存,但不包括直接内存,并且在Flink计算JVM最大直接内存大小参数时将不进行计数。得出JVM开销的大小以构成总进程内存的已配置部分。如果导出的大小小于/大于配置的最小/最大大小,则将使用最小/最大大小。可以通过将最小/最大大小设置为相同的值来明确指定JVM开销的确切大小。默认值0.1浮动值
- taskmanager.memory.jvm-overhead.max TaskExecutor的最大JVM开销大小。这是为JVM开销保留的堆外内存,例如线程堆栈空间,编译缓存等。这包括本机内存,但不包括直接内存,并且在Flink计算JVM最大直接内存大小参数时将不进行计数。得出JVM开销的大小以构成总进程内存的已配置部分。如果导出的大小小于/大于配置的最小/最大大小,则将使用最小/最大大小。可以通过将最小/最大大小设置为相同的值来明确指定JVM开销的确切大小。默认值1 GB
- taskmanager.memory.jvm-overhead.min :TaskExecutor的最小JVM开销大小。这是为JVM开销保留的堆外内存,例如线程堆栈空间,编译缓存等。这包括本机内存,但不包括直接内存,并且在Flink计算JVM最大直接内存大小参数时将不进行计数。得出JVM开销的大小以构成总进程内存的已配置部分。如果导出的大小小于/大于配置的最小/最大大小,则将使用最小/最大大小。可以通过将最小/最大大小设置为相同的值来明确指定JVM开销的确切大小。默认值192MB
- taskmanager.memory.jvm-metaspace.size:TaskExecutor的JVM元空间大小。默认值256MB
- taskmanager.memory.managed.fraction :如果未明确指定托管内存大小,则将用作托管内存的Flink总内存的百分比。默认值0.4浮动值
其他内存或资源使用说明
还有内存组件的大小可以通过相应的选项简单地设置。其他组件可以使用多个选项进行调整。
- 框架内存 :框架堆内存和离堆内存架构没有充分的理由不建议改变它。仅在确定Flink需要更多内存用于某些内部数据结构或操作时才进行调整。它可能与特定的部署环境或工作结构有关,例如高度并行性。此外,在某些设置中,诸如Hadoop之类的Flink依赖项可能会消耗更多的直接或本地内存。注意目前Flink中没有隔离框架和任务内存的堆版本或非堆版本。框架和任务内存的分离可以在将来的版本中使用,以进行进一步的优化。
- 网络内存:此介绍以下选项的配置详细信息,这些选项可能只占总内存的一小部分 :
- 网络内存可能只占Flink内存总量的一小部分
- JVM开销可能是总进程内存的一小部分
这些组件的大小必须始终在其最大值和最小值之间,否则Flink启动将失败。最大值和最小值具有默认值,或者可以通过相应的配置选项明确设置。例如,如果仅设置以下内存选项:Flink总内存= 1000Mb,网络最小= 64Mb,网络最大= 128Mb,网络比例= 0.1那么网络内存将为1000Mb x 0.1 = 100Mb,在64-128Mb范围内。(请注意,如果您配置相同的最大值和最小值,则实际上意味着它的大小固定为该值。如果未显式配置组件内存,则Flink将使用百分比基于总内存来计算内存大小。计算值由其相应的最小/最大选项限制)如果定义了总内存及其其他组件的大小,也可能会忽略该百分比。在这种情况下,网络内存是总内存的其余部分。派生值仍必须在其最小/最大范围内,否则配置将失败。Flink总内存中的所有其他组件都具有默认值,包括默认的托管内存部分。那么,网络内存不是百分比(1000Mb x 0.1 = 100Mb),而是总Flink内存的其余部分,该部分将在64-256Mb范围内,否则将失败。
- JVM参数
Flink在启动任务执行程序进程时,根据配置的或派生的内存组件大小,显式添加以下与内存相关的JVM参数:
JVM Arguments | Value |
---|---|
-Xmx and -Xms | Framework Task Heap Memory |
-XX:MaxDirectMemorySize | Framework Task Off-Heap Network Memory |
-XX:MaxMetaspaceSize | JVM Metaspace |
- 本地执行
如果您在计算机上作为单个Java程序在本地启动Flink而不创建集群(例如从IDE中创建),则将忽略所有组件,但以下各项除外:
Memory component | Relevant options | Default value |
---|---|---|
Task heap | taskmanager.memory.task.heap.size | infinite |
Task off-heap | taskmanager.memory.task.off-heap.size | infinite |
Managed memory | taskmanager.memory.managed.size | 128Mb |
Network memory | taskmanager.memory.network.mintaskmanager.memory.network.max | 64Mb |
上面列出的所有组件都可以但不必为本地执行显式配置。如果未配置它们,则将它们设置为其默认值。任务堆内存和 任务堆外内存被认为是无限的(Long.MAX_VALUE字节),并且托管内存 的默认值仅对于本地执行模式为128Mb。
注意在这种情况下,任务堆大小与实际堆大小没有任何关系。它可能与后续版本的未来优化相关。启动的本地进程的实际JVM堆大小不受Flink的控制,取决于您如何启动该进程。如果要控制JVM堆大小,则必须显式传递相应的JVM参数,例如-Xmx,-Xms。
总结
本篇是对Flink内存模型及其相关知识点进行详细说明讲解,掌握这些知识后,就更快排查和解决如IllegalConfigurationException、OutOfMemoryError: Java heap space、OutOfMemoryError: Direct buffer memory、OutOfMemoryError: Metaspace、IOException: Insufficient number of network buffers和Container Memory Exceeded等常见异常。