关于大数据Flink内存管理的原理与实现

2022-03-11 14:27:25 浏览数 (2)

背景介绍

最近几年国内大数据apache开源社区计算框架最火的莫过于Flink,得益于阿里在后面的推动以及各大互联网大厂的参与,flink业已成为流式计算事实上的标准。一句话来介绍 Flink 就是 “Stateful Computations Over Streams”,基于数据流的有状态计算。flink的四个基石:Checkpoint、State、Time、Window。

  • Checkpoint 机制,Flink 基于 Chandy-Lamport 算法实现了分布式一致性的快照,从而提供了 exactly-once 的语义。(Flink 基于两阶段提交协议,实现了端到端的 exactly-once 语义保证。内置支持了 Kafka 的端到端保证,并提供了 TwoPhaseCommitSinkFunction 供用于实现自定义外部存储的端到端 exactly-once 保证。)
  • state有状态计算:支持大状态、灵活的状态后端
  • Flink 还实现了 watermark 的机制,解决了基于事件时间处理时的数据乱序和数据迟到的问题。
  • Window:提供了一套开箱即用的窗口操作,如滚动窗口、滑动窗口、会话窗口,支持非常灵活的自定义窗口满足特殊业务需求。
  • 带反压的流模型

Flink是采用java开发的,flink计算集群运行在java虚拟机中,因为flink计算会面临大量数据处理、大量状态存储,完全基于jvm的堆内存管理存在较大的缺陷,flink基于jvm实现了独立的内存管理:可超出主内存的大小限制、承受更少的垃圾回收开销、对象序列化二进制存储,下面在来详细介绍下flink内存管理。

完全JVM内存管理存在的问题

基于JVM的数据分析引擎都需要面对将大量数据存到内存当中,就不得不面对JVM存在的几个问题:

  • java对象存储密度低:比如一个只包含boolean属性的对象占用16个字节,对象头占用8个,boolean属性占1个,对齐填充占了7个,实际上只需要一个bit(1/8字节)就够了他。
  • Full GC会极大的影响性能,尤其是为了处理更大数据而开了很大内存空间的jvm来说,GC会达到秒级甚至分钟级。
  • OOM问题影响稳定性:jvm奔溃,分布式对象框架的健壮性和稳定性都会收到影响。因此大数据框架都开始自己管理JVM内存了,像Spark、Flink、Hbase,为了获取C一样的性能以及避免OOM的发生。

Flink内存管理

因为Java对象及jvm内存管理存在的问题,flink针对这些问题基于jvm进行了优化, Flink内存管理主要会涉及内存管理、定制的序列化工具、缓存友好的数据结构和算法、堆外内存、JIT编译优化。Flink并不是将大量对象存在堆上,而是将对象序列化到一个预分配的内存块上,这个内存块叫MemorySegment,它代表了一段固定长度的内存(默认32KB)也就是flink中最小的内存分配单元,并且提供了非常高效的读写方法。底层可以是一个普通的java字节数组(byte[]),也可以是一个申请在堆外的ByteBuffer。每条记录都会以序列化的形式存在一个或多个MemorySegment中。

TaskManager内存模型如下图所示:

Flink 主要的内存管理是TaskManager进行内存管理,主要分为三部分:

  • Network Buffers:一定数量的32KB大小的Buffer,主要用于网络传输。在TaskManager启动的时候就会分配。默认数量是2048个,可以通过taskmanager.network.numberOfBuffers来配置
  • Memory Manager Pool:这是一个由MemoryManager 管理的,由众多MemorySegment组成的超大集合。Flink中的算法(如sort/shuffle/join)会向这个内存池申请MemorySegment,将序列化后的数据存于其中,使用完后释放回内存池。默认情况下,池子占用了堆内存的70%的大小。
  • Remaning(free)Heap:这部分的内存是留给用户代码以及 TaskManager 的数据结构使用的。因为这些数据结构一般都很小,所以基本上这些内存都是给用户代码使用的。从GC的角度来看,可以把这里看成的新生代,也就是说这里主要都是由用户代码生成的短期对象。

Flink 采用类似 DBMS 的 sort 和 join 算法,直接操作二进制数据,从而使序列化/反序列化带来的开销达到最小。所以 Flink 的内部实现更像 C/C 而非 Java。如果需要处理的数据超出了内存限制,则会将部分数据存储到硬盘上。如果要操作多块MemorySegment就像操作一块大的连续内存一样,Flink会使用逻辑视图(AbstractPagedInputView)来方便操作。下

Flink 内存管理带来的好处

  • 减少GC压力,因为所有常驻内存的数据以二进制的形式存在于Flink的MemoryManager中,这些MemorySegment一直待在老年代不会被GC回收。其它的数据对象基本上是由用户代码生成的短生命周期对象,这部分对象可以被MinorGC快速回收。只要用户不去创建大量类似缓存的常驻对象,老年代的大小是不会变的,Major GC也就永远也不能发生。从而有效地降低了垃圾回收的压力。另外,这里的内存还可以是堆外内存,这可以使得jvm内存更小了,从而加速垃圾回收。
  • 避免了OOM,所有运行的数据结构和算法只能通过内存池申请内存,保证了其使用内存的大小是固定不变的,不会因为运行时数据结构和算法而发生OOM,在内存吃紧的情况下,算法(sort/join等)会高效地将一大批内存块写入到磁盘,之后再读回来,因此,OutOfMemoryErrors可以有效的避免。
  • 节省内存空间。java对象再存储上有很多额外的消耗。如果只存储实际的二进制内容,就可以避免这部分消耗。
  • 高效的二进制操作 & 缓存友好的计算。二进制数据以定义好的格式存储,可以高效地比较与操作。另外,该二进制形式可以把相关的值,以及hash值,键值和指针等相邻地放进内存中。这使得数据结构可以对高速缓存更友好,可以从 L1/L2/L3 缓存获得性能的提升

Flink量身定制的序列化框架

Flink没有采用java生态圈众多的序列化框架,而是自己实现了序列化框架。因为在flink中处理的数据流通常是同一类型,由于数据集对象的类型固定,对于数据集可以只保存一份对象schema信息,节省大量的存储空间。同时对于固定大小的类型,也可以通过固定的偏移位置存取。访问某个对象成员变量,可以可以直接通过偏移量,只是序列化特定的对象成员变量了。如果对象的成员变量较多时,能够大大减少java对象的创建开销以及内存数据拷贝的大小。

Flink 如何直接操作二进制数据

以sort为例:

首先,Flink会从MemoryManager中申请一批MemorySegment,我们把这批MemorySegment称作sort buffer,用来存放排序的数据。其次,把sort buffer分成两块区域,一个区域是用来存放所有对象完整的二进制数据。另一个区域用来存放指向完整二进制数据的指针以及定长的序列化后的key(Key pointer)。如果需要序列化的 key 是个变长类型,如 String,则会取其前缀序列化。如上图所示,当一个对象要加到 sort buffer 中时,它的二进制数据会被加到第一个区域,指针(可能还有 key)会被加到第二个区域。这样做的目地:第一,交换定长块(key pointer)更高效,不用交换真实的数据也不用移动其它key和pointer。第二,这样做是缓存友好的,因为key都是连续存储在内存中,可以大大减少cache miss

接着,排序操作,flink中可以先用key比大小,这样可以直接用二进制的key比较而不需要反序列化出整个对象。因为Key是定长的,所以如果key相同,那就必须将真实的二进制数据反序列化出来,然后再做比较。之后只需要交换key pointer就可以达到排序的效果,真实的数据不用移动。

最后,访问排序后的数据,可以沿着排好序的key pointer区域顺序访问,通过pointer找到对应的真实数据,并写到内存或着外部。

缓存友好的数据结构和算法

Flink 通过定制的序列化框架将算法中需要操作的数据(如 sort 中的 key)连续存储,而完整数据存储在其他地方。因为对于完整的数据来说,key pointer 更容易装进缓存,这大大提高了缓存命中率,从而提高了基础算法的效率。这对于上层应用是完全透明的,可以充分享受缓存友好带来的性能提升。

堆外内存

  1. 启动超大内存(上百 GB)的 JVM 需要很长时间,GC 停留时间也会很长(分钟级)。使用堆外内存的话,可以极大地减小堆内存(只需要分配 Remaining Heap 那一块),使得 TaskManager 扩展到上百 GB 内存不是问题。
  2. 高效的 IO 操作。堆外内存在写磁盘或网络传输时是 zero-copy,而堆内存的话,至少需要 copy 一次。
  3. 堆外内存是进程间共享的。也就是说,即使 JVM 进程崩溃也不会丢失数据。这可以用来做故障恢复(Flink 暂时没有利用起这个,不过未来很可能会去做)。

不好的地方:

  1. 堆内存的使用、监控、调试都要简单很多。堆外内存意味着更复杂更麻烦。
  2. Flink 有时需要分配短生命周期的 MemorySegment,这个申请在堆上会更廉价。
  3. 有些操作在堆内存上会快一点。

总结

Flink面对jvm存在的问题,从自己管理内存、到自己实现序列化框架、再到使用堆外内存,基本上是按照大数据生态通用的解决方式去处理,其解决思路值得我们在进行分布式计算框架设计和实现的时候作参考。

在从apache生态圈的设计上基本上分布式计算框架,大都开始了部分脱离JVM,走上了自己管理内存的路线,比如spark Tungsten甚至更进一步,提出了通过LLVM,将部分逻辑编译成本地代码,从而更加深入的挖掘SIMD等CPU潜力,除此之外 HBase、HDFS 等存储相关项目也在部分性能相关的模块通过自己管理内存来规避JVM的一些缺陷,同时提升性能。

参考文档:

https://zhuanlan.zhihu.com/p/20228397

https://flink.apache.org/news/2015/09/16/off-heap-memory.html

http://wuchong.me/blog/2016/04/29/flink-internals-memory-manage/

0 人点赞