Flink重点难点:内存模型与内存结构

2021-09-22 15:26:45 浏览数 (1)

在阅读本文之前,你应该阅读过的系列:

  • 《Flink重点难点:时间、窗口和流Join》
  • 《Flink重点难点:网络流控和反压》
  • 《Flink重点难点:维表关联理论和Join实战

前言

在介绍内存模型之前的基础知识。

1. 堆内内存(on-heap memory)

1.1 什么是堆内内存

Java 虚拟机在执行Java程序的过程中会把它在主存中管理的内存部分划分成多个区域,每个区域存放不同类型的数据。下图所示为java虚拟机运行的时候,主要的内存分区:

在这些分区中,占用内存空间最大的一部分叫做“堆(heap)”,也就是我们所说的堆内内存(on-heap memory)。java虚拟机中的“堆”主要是存放所有对象的实例。这一块区域在java虚拟机启动的时候被创建,被所有的线程所共享,同时也是垃圾收集器的主要工作区域,因此这一部分区域除了被叫做“堆内内存”以外,也被叫做“GC堆”(Garbage Collected Heap)。

1.2 堆内内存的垃圾回收

堆内内存是java垃圾收集器的主要工作区域,为了提高垃圾回收的效率,在堆内内存的内部又划分出了新生代、老年代和永久代。在新生代内存中又按照8:1:1的比例(java虚拟机默认分配比例为8:1:1,这个比例也可以自定义)划分出了Eden, Survivor1, Survivor2三个区域。

在执行垃圾回收算法的时候,不同的回收算法会对内存区域造成不一样的影响。但是大部分的回收算法会造成堆内内存空间在物理上的不连续性。下面以最基本的垃圾回收算法“标记 - 清除算法”为例:

可以看到,内存区域在经过垃圾回收之后,产生大量不连续的内存空间。因此,java虚拟机中的堆内内存区域,只是逻辑上的连续,并不能保证物理上的连续性。所以,操作系统并不能直接得到堆内内存区域所存储的数据在主存中的正确地址。在一些特定的时间点,Java虚拟机会进行一次彻底的垃圾回收(full gc)。彻底回收时,垃圾收集器会对所有分配的堆内内存进行完整的扫描,在扫描期间,绝大部分正在运行的java线程都会被暂时停止。这意味着:这样一次垃圾收集对Java应用造成的影响,跟堆内内存所存储的数据的多少是成正比的,过大的堆内内存会影响Java应用的性能。

2. 堆外内存(off-heap memory)

2.1 堆外内存的产生

为了解决堆内内存过大带来的长时间的GC停顿的问题,以及操作系统对堆内内存不可知的问题,java虚拟机开辟出了堆外内存(off-heap memory)。堆外内存意味着把一些对象的实例分配在Java虚拟机堆内内存以外的内存区域,这些内存直接受操作系统(而不是虚拟机)管理。这样做的结果就是能保持一个较小的堆,以减少垃圾收集对应用的影响。同时因为这部分区域直接受操作系统的管理,别的进程和设备(例如GPU)可以直接通过操作系统对其进行访问,减少了从虚拟机中复制内存数据的过程。

2.2 堆外内存的分配

java 在NIO 包中提供了ByteBuffer类,对堆外内存进行访问。下图为NIO包中ByteBuffer的层次继承关系

使用下面的方式,可以直接开辟指定大小的对外内存:

代码语言:javascript复制
import sun.nio.ch.DirectBuffer;
import java.nio.ByteBuffer;
public class TestDirectByteBuffer {
    public static void main(String[] args) throws Exception {
        while (true) {
            ByteBuffer buffer = ByteBuffer.allocateDirect(10 * 1024 * 1024);
        }
    }
}

这样我们就开辟出了一块大小为10M的堆外内存。

3. 堆外内存的优缺点以及与堆内内存联系

3.1堆外内存的优缺点

优点 :

代码语言:javascript复制
可以很方便的自主开辟很大的内存空间,对大内存的伸缩性很好
减少垃圾回收带来的系统停顿时间
直接受操作系统控制,可以直接被其他进程和设备访问,减少了原本从虚拟机复制的过程
特别适合那些分配次数少,读写操作很频繁的场景

缺点 :

代码语言:javascript复制
容易出现内存泄漏,并且很难排查
堆外内存的数据结构不直观,当存储结构复杂的对象时,会浪费大量的时间对其进行串行化。

3.2 堆内内存与堆外内存的联系

虽然堆外内存本身不受垃圾回收算法的管辖,但是因为其是由ByteBuffer所创造出来的,因此这个buffer自身作为一个实例化的对象,其自身的信息(例如堆外内存在主存中的起始地址等信息)必须存储在堆内内存中,具体情况如下图所示。

当在堆内内存中存放的buffer对象实例被垃圾回收算法回收掉的时候,这个buffer对应的堆外内存区域同时也就被释放掉了。

0 简介

首先,Flink 使用自主的内存管理:

JVM 内存管理的不足

1)Java 对象存储密度低。Java 的对象在内存中存储包含 3 个主要部分:对象头、实例数据、对齐填充部分。例如,一个只包含 boolean 属性的对象占 16byte:对象头占 8byte,boolean 属性占 1byte,为了对齐达到 8 的倍数额外占 7byte。而实际上只需要一个 bit(1/8字节)就够了。

2)Full GC 会极大地影响性能。尤其是为了处理更大数据而开了很大内存空间的 JVM来说,GC 会达到秒级甚至分钟级。

3)OOM 问题影响稳定性。OutOfMemoryError 是分布式计算框架经常会遇到的问题,当JVM中所有对象大小超过分配给JVM的内存大小时,就会发生OutOfMemoryError错误,导致 JVM 崩溃,分布式框架的健壮性和性能都会受到影响。

4)缓存未命中问题。CPU 进行计算的时候,是从 CPU 缓存中获取数据。现代体系的CPU 会有多级缓存,而加载的时候是以 Cache Line 为单位加载。如果能够将对象连续存储,这样就会大大降低 Cache Miss。使得 CPU 集中处理业务,而不是空转。(Java 对象在堆上存储的时候并不是连续的,所以从内存中读取 Java 对象时,缓存的邻近的内存区域的数据往往不是 CPU 下一步计算所需要的,这就是缓存未命中。此时 CPU 需要空转等待从内存中重新读取数据。)

Flink 并不是将大量对象存在堆内存上,而是将对象都序列化到一个预分配的内存块上,这个内存块叫做 MemorySegment,它代表了一段固定长度的内存(默认大小为 32KB),也是 Flink 中最小的内存分配单元,并且提供了非常高效的读写方法,很多运算可以直接操作二进制数据,不需要反序列化即可执行。每条记录都会以序列化的形式存储在一个或多个MemorySegment 中。

1 内存模型

1.1 JobManager 内存模型

JobManagerFlinkMemory.java

在 1.10 中,Flink 统一了 TM 端的内存管理和配置,相应的在 1.11 中,Flink 进一步对 JM 端的内存配置进行了修改,使它的选项和配置方式与 TM 端的配置方式保持一致。

1.2 TaskManager 内存模型

Flink 1.10 对 TaskManager 的内存模型和 Flink 应用程序的配置选项进行了重大更改,让用户能够更加严格地控制其内存开销。

TaskExecutorFlinkMemory.java

JVM Heap:JVM 堆上内存

  1. Framework Heap Memory:Flink 框架本身使用的内存,即 TaskManager 本身所占用的堆上内存,不计入 Slot 的资源中。配置参数:taskmanager.memory.framework.heap.size=128MB,默认 128MB。
  2. Task Heap Memory:Task 执行用户代码时所使用的堆上内存。配置参数:taskmanager.memory.task.heap.size

Off-Heap Mempry:JVM 堆外内存

  1. DirectMemory:JVM 直接内存

1)Framework Off-Heap Memory:Flink框架本身所使用的内存,即TaskManager本身所占用的对外内存,不计入 Slot 资源。配置参数:taskmanager.memory.framework.off-heap.size=128MB,默认128MB。

2)Task Off-Heap Memory:Task 执行用户代码所使用的对外内存。配置参数:taskmanager.memory.task.off-heap.size=0,默认 0

3)Network Memory:网络数据交换所使用的堆外内存大小,如网络数据交换缓冲区

配置参数:

代码语言:javascript复制
taskmanager.memory.network.fraction: 0.1
taskmanager.memory.network.min: 64mb
taskmanager.memory.network.max: 1gb
  1. Managed Memory:Flink 管理的堆外内存,用于排序、哈希表、缓存中间结果及RocksDB State Backend 的本地内存。

配置参数:

代码语言:javascript复制
taskmanager.memory.managed.fraction=0.4
taskmanager.memory.managed.size
  1. JVM specific memory:JVM 本身使用的内存
  2. JVM metaspace:JVM 元空间
  3. JVM over-head 执行开销:JVM 执行时自身所需要的内容,包括线程堆栈、IO、编译缓存等所使用的内存。

配置参数:

代码语言:javascript复制
taskmanager.memory.jvm-overhead.min=192mb
taskmanager.memory.jvm-overhead.max=1gb
taskmanager.memory.jvm-overhead.fraction=0.1

  1. 总体内存
  2. 总进程内存:Flink Java 应用程序(包括用户代码)和 JVM 运行整个进程所消耗的总内存。

总进程内存 = Flink 使用内存 JVM 元空间 JVM 执行开销

配置项:taskmanager.memory.process.size: 1728m

  1. Flink 总内存:仅 Flink Java 应用程序消耗的内存,包括用户代码,但不包括 JVM为其运行而分配的内存。

Flink 使用内存:框架堆内外 task 堆内外 network manage。配置项:taskmanager.memory.flink.size: 1280m

说明:配置项详细信息查看如下链接:https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/config.html#memory-configuration

1.3 内存分配
  1. JobManager 内存分配

YarnClusterDescriptor.java

JobManagerProcessUtils.java

ProcessMemoryUtils.java

JobManagerFlinkMemoryUtils.java

  1. TaskManager 内存分配

ActiveResourceManager.java

TaskExecutorProcessUtils.java

2 内存数据结构

内存段

内存段在 Flink 内部叫 MemorySegment,是 Flink 中最小的内存分配单元,默认大小 32KB。它即可以是堆上内存(Java 的 byte 数组),也可以是堆外内存(基于 Netty 的 DirectByteBuffer),同时提供了对二进制数据进行读取和写入的方法。

HeapMemorySegment:用来分配堆上内存

HybridMemorySegment:用来分配堆外内存和堆上内存,2017 年以后的版本实际上只使用了 HybridMemorySegment。

如下图展示一个内嵌型的 Tuple3< Integer,Double,Person> 对象的序列化过程:

可以看出这种序列化方式存储密度是相当紧凑的。其中 int 占 4 字节,double 占 8 字节,POJO 多个一个字节的 header,PojoSerializer 只负责将 header 序列化进去,并委托每个字段对应的 serializer 对字段进行序列化。

内存页

内存页是 MemorySegment 之上的数据访问视图,数据读取抽象为 DataInputView,数据写入抽象为 DataOutputView。使用时就无需关心 MemorySegment 的细节,会自 动处理跨 MemorySegment 的读取和写入。

Buffer

Task 算子之间在网络层面上传输数据,使用的是 Buffer,申请和释放由 Flink自行管理,实现类为 NetworkBuffer。1 个 NetworkBuffer 包装了 1 个MemorySegment。同时继承了 AbstractReferenceCountedByteBuf,是 Netty 中的抽象类。

Buffer 资源池

BufferPool 用来管理 Buffer,包含 Buffer 的申请、释放、销毁、可用 Buffer 通知等,实现类是 LocalBufferPool,每个 Task 拥有自己的 LocalBufferPool。

BufferPoolFactory 用 来 提 供 BufferPool 的 创 建 和 销 毁 , 唯 一 的 实 现 类 是NetworkBufferPool , 每 个 TaskManager 只 有 一 个 NetworkBufferPool 。同一个TaskManager 上的 Task 共享 NetworkBufferPool,在 TaskManager 启动的时候创建并分配内存。

3 内存管理器

MemoryManager 用来管理 Flink 中用于排序、Hash 表、中间结果的缓存或使用堆外内存的状态后端(RocksDB)的内存。1.10 之前版本,负责 TaskManager 所有内存。1.10 版本开始,管理范围是 Slot 级别。

堆外内存资源申请:

MemoryManager.java

MemorySegmentFactory.java

RocksDB 自己负责内存申请和释放

RocksDBOperationUtils.java

MemoryManager.java

4 网络传输中的内存管理

网络上传输的数据会写到 Task 的 InputGate(IG)中,经过 Task 的处理后,再由 Task写到 ResultPartition(RS) 中。每个 Task 都包括了输入和输入,输入和输出的数据存在Buffer 中(都是字节数据)。Buffer 是 MemorySegment 的包装类。

1)TaskManager(TM)在启动时,会先初始化 NetworkEnvironment 对象,TM 中所有与网络相关的东西都由该类来管理(如 Netty 连接),其中就包括 NetworkBufferPool。根据配置, Flink 会 在 NetworkBufferPool 中 生 成 一 定 数 量 ( 默 认 2048 ) 的 内 存 块MemorySegment(关于 Flink 的内存管理,后续文章会详细谈到),内存块的总数量就代表了网络传输中所有可用的内存。NetworkEnvironment 和 NetworkBufferPool 是 Task 之间共享的,每个 TM 只会实例化一个。

2)Task 线程启动时,会向 NetworkEnvironment 注册,NetworkEnvironment 会为 Task的 InputGate(IG)和 ResultPartition(RP) 分别创建一个 LocalBufferPool(缓冲池)并设置可申请的 MemorySegment(内存块)数量。IG 对应的缓冲池初始的内存块数量与 IG 中InputChannel 数量一致,RP 对应的缓冲池初始的内存块数量与 RP 中的 ResultSubpartition数量一致。不过,每当创建或销毁缓冲池时,NetworkBufferPool 会计算剩余空闲的内存块数量,并平均分配给已创建的缓冲池。注意,这个过程只是指定了缓冲池所能使用的内存块数量,并没有真正分配内存块,只有当需要时才分配。为什么要动态地为缓冲池扩容呢?因为内存越多,意味着系统可以更轻松地应对瞬时压力(如 GC),不会频繁地进入反压状态,所以我们要利用起那部分闲置的内存块。

3)在 Task 线程执行过程中,当 Netty 接收端收到数据时,为了将 Netty 中的数据拷贝到 Task 中,InputChannel(实际是 RemoteInputChannel)会向其对应的缓冲池申请内存块(上图中的①)。如果缓冲池中也没有可用的内存块且已申请的数量还没到池子上限,则会向 NetworkBufferPool 申请内存块(上图中的②)并交给 InputChannel 填上数据(上图中的③和④)。如果缓冲池已申请的数量达到上限了呢?或者 NetworkBufferPool 也没有可用内存块了呢?这时候,Task 的 Netty Channel 会暂停读取,上游的发送端会立即响应停止发送,拓扑会进入反压状态。当 Task 线程写数据到 ResultPartition 时,也会向缓冲池请求内存块,如果没有可用内存块时,会阻塞在请求内存块的地方,达到暂停写入的目的。

4)当一个内存块被消费完成之后(在输入端是指内存块中的字节被反序列化成对象了,在输出端是指内存块中的字节写入到 Netty Channel 了),会调用 Buffer.recycle() 方法,会将内存块还给 LocalBufferPool (上图中的⑤)。如果 LocalBufferPool 中当前申请的数量超过了池子容量(由于上文提到的动态容量,由于新注册的 Task 导致该池子容量变小),则LocalBufferPool 会将该内存块回收给 NetworkBufferPool(上图中的⑥)。如果没超过池子容量,则会继续留在池子中,减少反复申请的开销。

反压的过程

1)记录“A”进入了 Flink 并且被 Task 1 处理。(这里省略了 Netty 接收、反序列化等过程)

2)记录被序列化到 buffer 中。

3)该 buffer 被发送到 Task 2,然后 Task 2 从这个 buffer 中读出记录。

记录能被 Flink 处理的前提是:必须有空闲可用的 Buffer。

结合上面两张图看:Task 1 在输出端有一个相关联的 LocalBufferPool(称缓冲池 1),Task 2 在输入端也有一个相关联的 LocalBufferPool(称缓冲池 2)。如果缓冲池 1 中有空闲可用的 buffer 来序列化记录 “A”,我们就序列化并发送该 buffer。

注意两个场景:

1)本地传输:如果 Task 1 和 Task 2 运行在同一个 worker 节点(TaskManager),该buffer 可以直接交给下一个 Task。一旦 Task 2 消费了该 buffer,则该 buffer 会被缓冲池 1回收。如果 Task 2 的速度比 1 慢,那么 buffer 回收的速度就会赶不上 Task 1 取 buffer的速度,导致缓冲池 1 无可用的 buffer,Task 1 等待在可用的 buffer 上。最终形成 Task 1的降速。

2)远程传输:如果 Task 1 和 Task 2 运行在不同的 worker 节点上,那么 buffer 会在发送到网络(TCP Channel)后被回收。在接收端,会从 LocalBufferPool 中申请 buffer,然后拷贝网络中的数据到 buffer 中。如果没有可用的 buffer,会停止从 TCP 连接中读取数据。在输出端,通过 Netty 的水位值机制来保证不往网络中写入太多数据(后面会说)。如果网络中的数据(Netty 输出缓冲中的字节数)超过了高水位值,我们会等到其降到低水位值以下才继续写入数据。这保证了网络中不会有太多的数据。如果接收端停止消费网络中的数据(由于接收端缓冲池没有可用 buffer),网络中的缓冲数据就会堆积,那么发送端也会暂停发送。另外,这会使得发送端的缓冲池得不到回收,writer 阻塞在向 LocalBufferPool 请求buffer,阻塞了 writer 往 ResultSubPartition 写数据。

这种固定大小缓冲池就像阻塞队列一样,保证了 Flink 有一套健壮的反压机制,使得Task 生产数据的速度不会快于消费的速度。我们上面描述的这个方案可以从两个 Task 之间的数据传输自然地扩展到更复杂的 pipeline 中,保证反压机制可以扩散到整个 pipeline。

0 人点赞