阿里二面:Flink内存管理是如何实现的?

2022-05-17 14:59:50 浏览数 (1)

一、内存模型

从大的方面来说,TaskManager进程的内存模型分为JVM本身所使用的内存和Flink使用的内存,Flink使用了堆上内存和堆外内存。

1.Flink使用的内存

1)JVM堆上内存

a. 框架堆上内存Framework Heap Memory。Flink框架本身所使用的的内存,即TaskManager本身所占用的堆上内存,不计入Slot的资源

配置参数:taskmanager.memory.framework.heap.size = 128MB,默认128MB。

b. Task堆上内存Task Heap Memory。Task 执行用户代码时所使用的堆上内存。

配置参数:taskmanager.memory.taks.heap.size。

2)JVM堆外内存

a. 框架堆外内存Framework Off-Heap Memory。Flink 框架本身所使用的的内存。即TaskManager本身所占用的堆外内存,不计入Slot资源。

配置参数:taskmanager.memroy.framework.off-heap.size = 128MB,默认128MB。

b. Task堆外内存Task Off-Heap Memory。Task执行用户代码时所使用的堆外内存。

配置参数:taskmanager.memory.task.off-heap.size = 0,默认为0。

c. 网络缓冲内存Network Memory。网络数据交换所使用的堆外加内存大小,如网络数据交换缓冲区(Network Buffer)。

配置参数:taskmanager.memory.network.[64/1024/0.1](min/max/fraction),默认 min = 64MB,max = 1gb,fraction = 0.1。

d. 堆外托管内存 Manged Memory。Flink管理的堆外内存。

配置参数:taskmanager.memory.managed.[size][fraction],默认fraction = 0.4。

2. JVM本身使用的内存

JVM本身直接使用了操作系统的内存。

1. JVM元空间

JVM 元空间所使用的的内存。

配置参数:taskmanager.memory.jvm-metaspace = 96m, 默认96MB。

2. JVM执行开销

JVM 在执行时自身所需要的的内容,包括线程堆栈、IO、编译缓存等所使用的的内存。

配置参数:taskmanager.memory.jvm-overhead = [min/max/faction]。默认min=192MB,max=1GB,fraction = 0.1。

3. 总体内存

1.Flink使用内存

综上而言,Flink使用的内存包括Flink使用的堆上、堆外内存。使用参数taskmanager.memory.flink.size进行控制。

2. 进程使用内存

整个进程所使用的内存,包括Flink使用的内存和JVM使用的内存。使用参数taskmanager.memory.process.size进行控制。

JVM内存控制参数如下:

1)JVM堆上内存,使用-Xmx 和 -Xms 参数进行控制

2)JVM直接内存,使用参数-XX:MaxDirectMemorySize 进行控制。对于托管内存,使用Unsafe.allocateMemory()申请,不受改参数控制。

3)JVM Metaspace 使用-XX:MaxMetaspaceSize进行控制。

二、内存数据结构

2.1 内存段

内存段在Flink内部叫作MemorySegment,是Flink的内存抽象的最小分配单元。默认情况下,一个MemorySegment 对应一个32kb大小的内存块。这块内存既可以是堆上内存(Java的byte数组),也可以是堆外内存(基于Netty的DirecByteBuffer)。

MemorySegment同时也提供了对二进制数据进行读取和写入的方法。对于Java基本数据类型,如short、int、long等,MemorySegment 内置了方法,可以直接返回或者写入数据,对于其他类型,读取二进制数组byte[]后进行反序列化,序列化为二进制数组byte[]后写入。

1.MemorySegment结构

1)BYTE_ARRAY_BASE_OFFSET:二进制字节数组的起始索引,相对于字节数组对象而言。

2)LITTLE_ENDIAN:判断是否为Little Endian模式的字节存储顺序,若不是,就是Big Endian模式。

3)HeapMemory:如果MemorySegment使用堆上内存,则表示一个堆上的字节数组(byte[]),如果MemorySegment使用对外内存,则为null。

4)address:字节数组对应的相对地址(若HeapMemory为null,即可能为对外内存的绝对地址)。

5)addressLimit:标识地址结束位置(address size)。

6)size:内存段的字节数。

2.字节顺序Big Endian与Little Endian

字节顺序是指占内存多于一个字节类型的数据在内存中的存放顺序,不同的CPU架构体系使用不同的存储顺序。PowerPC系列采用Big Endian方式存储数据,低地址存放最高有效字节(MSB 高位编址),而x86 系列则采用Little Endian方式存储数据,低地址存放最低有效字节(LSB 低位编址)

3. MemorySegment实现

Flink的MemorySegment有堆上和堆外两种实现:

HeapMemorySegment 用来分配堆上内存,HybridMemorySegment用来分配堆外内存和堆上内存。实际上在2017年之后的Flink中,并没有使用HeapMemorySegment,而是使用 HybridMemorySegment这个类来同时实现堆上和堆外内存的分配。

2.2 内存页

内存页是MemorySegment之上的数据访问视图,数据读取抽象为DataInputView,数据写入抽象为DataOutputView。有了这一层,上层使用者无须关心MemorySegment的细节,该层会自动处理跨MemorySegment的读取和写入。

1.DataInputView

DataInputView 是从MemorySegment数据读取抽象视图,继承自java.io.DataInput.InputView中持有多个MemorySegment的引用(MemorySegment[]),这一组·MemorySegment被视为一个内存页(Page),可以顺序读取MemorySegment中的数据。

2.DataOutputView

DataOutputView 是从MemorySegment数据读取抽象视图,继承自java.io.DataOutput.OutputView中持有多个MemorySegment的引用(MemorySegment[]),这一组·MemorySegment被视为一个内存页(Page),可以顺序地向MemorySegment中写入数据。

2.3 Buffer

Task算子处理数据完毕,将结果交给下游的时候,使用的抽象或者说内存对象是Buffer。Buffer的接口是网络层面上传输数据和事件的统一抽象,其实现类是NetworkBuffer。Flink在各个TaskManger之间传递数据时,使用的是这一层的抽象。1个NetworkBuffer中包装了一个MemorySegment。

Buffer 的底层是MemorySegment,Buffer申请和释放由Flink自行管理,Flink引入了引用数的概念。当有新的Buffer消费者时,引用数加1,当消费者消费完Buffer时,引用数减1,最终当引用数变为0时,就可以将Buffer释放重用了。

AbstractReferenceCountedByteBuf是Netty的抽象类,通过继承该类,Flink中的buffer 具备了引用计数的能力,并且实现了对MemorySegment的读写。

2.4 Buffer资源池

Buffer资源池在Flink中叫作BufferPool。BufferPool用来管理Buffer,包含Buffer的申请、释放、销毁、可用Buffer的通知等,其实现类是LocalBufferPool,每个Task拥有自己的LocalBufferPool。

BufferPool的类体系如下:

为了方便对BufferPool的管理,Flink设计了BufferPoolFactory,提供了BufferPool的创建和销毁,其唯一的实现类是NetworkBufferPool。

每个TaskManager只有一个NetworkBufferPool,同一个TaskManager上的Task共享NetworkBufferPool,在TaskManager启动的时候,就会创建NetworkBufferpool,为其分配内存。

NetworkBufferPool持有该TaskManager在进行数据传递时所能够使用的所有内存,所以其除了作为BufferPool的工厂外,还作为Task所需内存段(MemorySegment)的提供者,每个Task的LocalBufferPool所需要的内存都是从NetworkBufferPool申请而来的。

三、内存管理器

3.1 内存申请

批处理计算任务中,MemoryManager负责为算子申请堆外内存。最终实际申请的是堆外的ByteBuffer。

#MemorySegmentFactory.java

流计算任务中,MemoryManager更多的作用是管理,控制RocksDB的内存使用量,通过RocksDB的Block Cache 和WriteBufferManager参数来限制,参数的具体值从TaskManger的内存配置参数中计算来。RocksDb自己来负责运行过程中的内存申请和内存释放。

#MemoryManager.java

3.2 内存释放

Flink自行管理内存,也就意味着内存的申请和释放都由Flink来负责。触发Java堆外内存释放的行为一般有如下两种。

a.内存使用完毕

b.Task停止(正常或异常)执行。

在Flink中实现了一个JavaGcCleanerWrapper来进行堆外内存的释放,提供了两个JavaCleaner。

1)LegacyCleanerProvider

该CleanerProvider提供1.8及以下版本JDK的Flink管理的内存的垃圾回收,使用sun.misc.Cleaner来释放内存。

2)Java9CleanerProvider

该CleanerProvider提供1.9及以上版本JDK的Flink管理的内存的垃圾回收,使用java.lang.ref.Cleaner来释放内存。

JavaGcCleanerWrapper会为每一个Owner创建一个包含Cleaner的Runnable对象,在每个MemorySegment释放内存的时候,调用此Cleaner进行内存的释放。

四、网络缓冲器

网络缓冲器(NetworkBuffer)是网络交换数据的包装,其对应于MemorySegment内存段,当结果分区(ResultPartition)开始写出数据的时候,需要向LocalBufferPool申请Buffer资源,使用BufferBuilder将数据写入MemorySegment。当MemorySegment都分配完后,则会持续等待Buffer的释放。

BufferBuilder在上游Task中,用来向申请到的MemorySegment写入数据。与BufferBuilder相对的是BufferConsumer,BufferConsumer位于下游Task中,负责从MemorySegment中读取数据,1个BufferBuilder对应一个BufferConsumer。

4.1 内存申请

LocalBufferPool的大小是动态的,在最小内存段数量与最大内存段数量之间浮动。使用NetworkBufferPool创建LocalBufferPool时,如果该TaskManager的内存无法满足所有Task所需的最小MemorySegment的数据总和,则会发生错误。

1.buffer申请

结果分区(ResultPartition)申请Buffer进行数据写入。

LocalBufferPool 首先从自身持有的MemorySegment中分配可用的,如果没有可用的,则从TaskManager的NetworkBufferPool申请,如果没有,则阻塞等待可用的MemorySegment。

2.MemorySegment申请

申请Buffer本质上来说就是申请MemorySegment,如果在LocalBufferPool中,则申请新的堆外内存MemorySegment。

4.2 内存回收

Buffer使用了引用计数机制来判断什么时候可以释放Buffer到可用资源池。每创建一个BufferConsumer,就会对Buffer的引用计数 1,每个Buffer被消费完,就会对Buffer的引用计数-1,当Buffer引用计数为0的时候就可以回收了。

1.Buffer回收

Buffer回收之后,并不会释放MemorySegment,此时MemorySegment仍然在LocalBufferPool的资源池中,除非TaskManager级别内存不足,才会释放回TaskManager持有的全局资源池。

释放MemorySegment的时候,同样要根据MemorySegment的类型来进行,并且要在不低于保留内存的情况下,将内存释放回内存段中 ,变为可用内存,后续申请MemorySegment的时候,可以重复利用该内存片段。

2.MemorySegment释放

当NetworkBufferPool关闭的时候进行内存的释放,交还给操作系统。

0 人点赞