一、内存模型
从大的方面来说,TaskManager进程的内存模型分为JVM本身所使用的内存和Flink使用的内存,Flink使用了堆上内存和堆外内存。
1.Flink使用的内存
1)JVM堆上内存
a. 框架堆上内存Framework Heap Memory。Flink框架本身所使用的的内存,即TaskManager本身所占用的堆上内存,不计入Slot的资源
配置参数:taskmanager.memory.framework.heap.size = 128MB,默认128MB。
b. Task堆上内存Task Heap Memory。Task 执行用户代码时所使用的堆上内存。
配置参数:taskmanager.memory.taks.heap.size。
2)JVM堆外内存
a. 框架堆外内存Framework Off-Heap Memory。Flink 框架本身所使用的的内存。即TaskManager本身所占用的堆外内存,不计入Slot资源。
配置参数:taskmanager.memroy.framework.off-heap.size = 128MB,默认128MB。
b. Task堆外内存Task Off-Heap Memory。Task执行用户代码时所使用的堆外内存。
配置参数:taskmanager.memory.task.off-heap.size = 0,默认为0。
c. 网络缓冲内存Network Memory。网络数据交换所使用的堆外加内存大小,如网络数据交换缓冲区(Network Buffer)。
配置参数:taskmanager.memory.network.[64/1024/0.1](min/max/fraction),默认 min = 64MB,max = 1gb,fraction = 0.1。
d. 堆外托管内存 Manged Memory。Flink管理的堆外内存。
配置参数:taskmanager.memory.managed.[size][fraction],默认fraction = 0.4。
2. JVM本身使用的内存
JVM本身直接使用了操作系统的内存。
1. JVM元空间
JVM 元空间所使用的的内存。
配置参数:taskmanager.memory.jvm-metaspace = 96m, 默认96MB。
2. JVM执行开销
JVM 在执行时自身所需要的的内容,包括线程堆栈、IO、编译缓存等所使用的的内存。
配置参数:taskmanager.memory.jvm-overhead = [min/max/faction]。默认min=192MB,max=1GB,fraction = 0.1。
3. 总体内存
1.Flink使用内存
综上而言,Flink使用的内存包括Flink使用的堆上、堆外内存。使用参数taskmanager.memory.flink.size进行控制。
2. 进程使用内存
整个进程所使用的内存,包括Flink使用的内存和JVM使用的内存。使用参数taskmanager.memory.process.size进行控制。
JVM内存控制参数如下:
1)JVM堆上内存,使用-Xmx 和 -Xms 参数进行控制
2)JVM直接内存,使用参数-XX:MaxDirectMemorySize 进行控制。对于托管内存,使用Unsafe.allocateMemory()申请,不受改参数控制。
3)JVM Metaspace 使用-XX:MaxMetaspaceSize进行控制。
二、内存数据结构
2.1 内存段
内存段在Flink内部叫作MemorySegment,是Flink的内存抽象的最小分配单元。默认情况下,一个MemorySegment 对应一个32kb大小的内存块。这块内存既可以是堆上内存(Java的byte数组),也可以是堆外内存(基于Netty的DirecByteBuffer)。
MemorySegment同时也提供了对二进制数据进行读取和写入的方法。对于Java基本数据类型,如short、int、long等,MemorySegment 内置了方法,可以直接返回或者写入数据,对于其他类型,读取二进制数组byte[]后进行反序列化,序列化为二进制数组byte[]后写入。
1.MemorySegment结构
1)BYTE_ARRAY_BASE_OFFSET:二进制字节数组的起始索引,相对于字节数组对象而言。
2)LITTLE_ENDIAN:判断是否为Little Endian模式的字节存储顺序,若不是,就是Big Endian模式。
3)HeapMemory:如果MemorySegment使用堆上内存,则表示一个堆上的字节数组(byte[]),如果MemorySegment使用对外内存,则为null。
4)address:字节数组对应的相对地址(若HeapMemory为null,即可能为对外内存的绝对地址)。
5)addressLimit:标识地址结束位置(address size)。
6)size:内存段的字节数。
2.字节顺序Big Endian与Little Endian
字节顺序是指占内存多于一个字节类型的数据在内存中的存放顺序,不同的CPU架构体系使用不同的存储顺序。PowerPC系列采用Big Endian方式存储数据,低地址存放最高有效字节(MSB 高位编址),而x86 系列则采用Little Endian方式存储数据,低地址存放最低有效字节(LSB 低位编址)
3. MemorySegment实现
Flink的MemorySegment有堆上和堆外两种实现:
HeapMemorySegment 用来分配堆上内存,HybridMemorySegment用来分配堆外内存和堆上内存。实际上在2017年之后的Flink中,并没有使用HeapMemorySegment,而是使用 HybridMemorySegment这个类来同时实现堆上和堆外内存的分配。
2.2 内存页
内存页是MemorySegment之上的数据访问视图,数据读取抽象为DataInputView,数据写入抽象为DataOutputView。有了这一层,上层使用者无须关心MemorySegment的细节,该层会自动处理跨MemorySegment的读取和写入。
1.DataInputView
DataInputView 是从MemorySegment数据读取抽象视图,继承自java.io.DataInput.InputView中持有多个MemorySegment的引用(MemorySegment[]),这一组·MemorySegment被视为一个内存页(Page),可以顺序读取MemorySegment中的数据。
2.DataOutputView
DataOutputView 是从MemorySegment数据读取抽象视图,继承自java.io.DataOutput.OutputView中持有多个MemorySegment的引用(MemorySegment[]),这一组·MemorySegment被视为一个内存页(Page),可以顺序地向MemorySegment中写入数据。
2.3 Buffer
Task算子处理数据完毕,将结果交给下游的时候,使用的抽象或者说内存对象是Buffer。Buffer的接口是网络层面上传输数据和事件的统一抽象,其实现类是NetworkBuffer。Flink在各个TaskManger之间传递数据时,使用的是这一层的抽象。1个NetworkBuffer中包装了一个MemorySegment。
Buffer 的底层是MemorySegment,Buffer申请和释放由Flink自行管理,Flink引入了引用数的概念。当有新的Buffer消费者时,引用数加1,当消费者消费完Buffer时,引用数减1,最终当引用数变为0时,就可以将Buffer释放重用了。
AbstractReferenceCountedByteBuf是Netty的抽象类,通过继承该类,Flink中的buffer 具备了引用计数的能力,并且实现了对MemorySegment的读写。
2.4 Buffer资源池
Buffer资源池在Flink中叫作BufferPool。BufferPool用来管理Buffer,包含Buffer的申请、释放、销毁、可用Buffer的通知等,其实现类是LocalBufferPool,每个Task拥有自己的LocalBufferPool。
BufferPool的类体系如下:
为了方便对BufferPool的管理,Flink设计了BufferPoolFactory,提供了BufferPool的创建和销毁,其唯一的实现类是NetworkBufferPool。
每个TaskManager只有一个NetworkBufferPool,同一个TaskManager上的Task共享NetworkBufferPool,在TaskManager启动的时候,就会创建NetworkBufferpool,为其分配内存。
NetworkBufferPool持有该TaskManager在进行数据传递时所能够使用的所有内存,所以其除了作为BufferPool的工厂外,还作为Task所需内存段(MemorySegment)的提供者,每个Task的LocalBufferPool所需要的内存都是从NetworkBufferPool申请而来的。
三、内存管理器
3.1 内存申请
批处理计算任务中,MemoryManager负责为算子申请堆外内存。最终实际申请的是堆外的ByteBuffer。
#MemorySegmentFactory.java
流计算任务中,MemoryManager更多的作用是管理,控制RocksDB的内存使用量,通过RocksDB的Block Cache 和WriteBufferManager参数来限制,参数的具体值从TaskManger的内存配置参数中计算来。RocksDb自己来负责运行过程中的内存申请和内存释放。
#MemoryManager.java
3.2 内存释放
Flink自行管理内存,也就意味着内存的申请和释放都由Flink来负责。触发Java堆外内存释放的行为一般有如下两种。
a.内存使用完毕
b.Task停止(正常或异常)执行。
在Flink中实现了一个JavaGcCleanerWrapper来进行堆外内存的释放,提供了两个JavaCleaner。
1)LegacyCleanerProvider
该CleanerProvider提供1.8及以下版本JDK的Flink管理的内存的垃圾回收,使用sun.misc.Cleaner来释放内存。
2)Java9CleanerProvider
该CleanerProvider提供1.9及以上版本JDK的Flink管理的内存的垃圾回收,使用java.lang.ref.Cleaner来释放内存。
JavaGcCleanerWrapper会为每一个Owner创建一个包含Cleaner的Runnable对象,在每个MemorySegment释放内存的时候,调用此Cleaner进行内存的释放。
四、网络缓冲器
网络缓冲器(NetworkBuffer)是网络交换数据的包装,其对应于MemorySegment内存段,当结果分区(ResultPartition)开始写出数据的时候,需要向LocalBufferPool申请Buffer资源,使用BufferBuilder将数据写入MemorySegment。当MemorySegment都分配完后,则会持续等待Buffer的释放。
BufferBuilder在上游Task中,用来向申请到的MemorySegment写入数据。与BufferBuilder相对的是BufferConsumer,BufferConsumer位于下游Task中,负责从MemorySegment中读取数据,1个BufferBuilder对应一个BufferConsumer。
4.1 内存申请
LocalBufferPool的大小是动态的,在最小内存段数量与最大内存段数量之间浮动。使用NetworkBufferPool创建LocalBufferPool时,如果该TaskManager的内存无法满足所有Task所需的最小MemorySegment的数据总和,则会发生错误。
1.buffer申请
结果分区(ResultPartition)申请Buffer进行数据写入。
LocalBufferPool 首先从自身持有的MemorySegment中分配可用的,如果没有可用的,则从TaskManager的NetworkBufferPool申请,如果没有,则阻塞等待可用的MemorySegment。
2.MemorySegment申请
申请Buffer本质上来说就是申请MemorySegment,如果在LocalBufferPool中,则申请新的堆外内存MemorySegment。
4.2 内存回收
Buffer使用了引用计数机制来判断什么时候可以释放Buffer到可用资源池。每创建一个BufferConsumer,就会对Buffer的引用计数 1,每个Buffer被消费完,就会对Buffer的引用计数-1,当Buffer引用计数为0的时候就可以回收了。
1.Buffer回收
Buffer回收之后,并不会释放MemorySegment,此时MemorySegment仍然在LocalBufferPool的资源池中,除非TaskManager级别内存不足,才会释放回TaskManager持有的全局资源池。
释放MemorySegment的时候,同样要根据MemorySegment的类型来进行,并且要在不低于保留内存的情况下,将内存释放回内存段中 ,变为可用内存,后续申请MemorySegment的时候,可以重复利用该内存片段。
2.MemorySegment释放
当NetworkBufferPool关闭的时候进行内存的释放,交还给操作系统。