Spill-able Heap Keyed State Backend 设计概览

2021-09-29 20:48:26 浏览数 (1)

背景介绍

Flink 在流式数据处理方面的能力非常强大,尤其值得一提的是它对带状态的流计算作业的支持度。它支持 Operator 和 Keyed 两类状态存储结构,其中后者因为用量大、用法多样,Flink 在这方面做了很多的支持:提供了纯粹基于堆内存的 HeapKeyedStateBackend,适合状态小,对延时要求高的作业;以及磁盘存储为主,内存为辅的 RocksDBKeyedStateBackend,适合状态巨大,对时延相对不敏感的作业。

随着业务量和业务数据的增长,我们更多地遇到了多样的混合场景:在通常情况下,状态数目在内存中足以得到存放。但是,总会有偶尔的流量尖峰(例如秒杀活动、作业突然崩溃后“倒带”恢复等),造成状态数和大小的短期激增,而这些往往难以提前预知。此时,很容易由于参数的设置不当而造成进程的频繁 GC 甚至发生 OOM,或者超出 YARN 等容器的限制而被 KILL,给业务的稳定运行带来了挑战。

在去年的 Flink Forward China 2018 会议上,我们腾讯云团队提出了一种混合的方案 [参考阅读 1]。我们将 HeapKeyedStateBackend 和 RocksDBKeyedStateBackend 的优点结合起来,实现了以重自适应的状态管理器,可以根据实时负载,自动地在运行时切换当前的 KeyedStateBackend。在我们的业务场景中,通常整个过程较为平滑,不会对作业的正常运行造成明显干扰。

对于 Flink 社区而言,实现一个 Spillable(可以把堆内放不下的状态,溢出到堆外或者磁盘)特性的 KeyedStateBackend 也被提上了日程,即 FLIP-50: Spill-able Heap Keyed State Backend. 这个特性也是我们期待已久的,因为它不仅仅是运行时的状态切换那么简单,而且还提供了很多的高级功能愿景,例如内置堆内存状态的实时监控、高性能的空间分配模块等等,这些将在下文得到介绍。

整体架构

FLIP-50 是一个完善提议(Improvement Proposal),它对应着名为 FLINK-12692 的社区 JIRA 单。在提议里,这个功能特性包含如下模块:

FLIP-50 整体架构(图源自官方 FLIP 文档)FLIP-50 整体架构(图源自官方 FLIP 文档)

下面根据 FLIP 动议,对图中各个模块的功能做概述。由于具体实现还未完全定型,这里大致采用了文中的描述,但也有我们的理解 :)

KeyGroupSizeAccountingManager

这个模块主要负责记录堆内和堆外(及磁盘上)每个 Key Group 的大小(Flink 的状态保存在 Key Group 里,各个 Key 以 Group 进行分区,详情见官网文档 [参考阅读 2]),并分别采取不同的记录方式。

对于堆内的 Key Group 空间占用统计,主要是通过类似 Lucene 的 RamUsageEstimator 的方式来估计 POJO 大小,不要求太精确。对于堆外的 Key Group 占用,则是通过统计序列化后的 Key 和 Value 的所占空间计算,要求精确。

这个模块提供的信息,直接对整个模块起到指引的作用,由于还未有代码 Pull Request,还未能确定具体的实现方式,我们会持续关注。

HeapStatusMonitor

本模块的功能主要是监控堆内内存的状态,同时在适当的时候触发 Spill(从堆内溢出到堆外)或 Load(从堆外加载回堆内)操作。主要关注点是堆内存用量(MXBean 得到的上报值等),同时关注不同的 GC 选型时的 JVM 行为差异(例如 CMS 和 G1 在内存压力时的指标表现是不同的,需要分别处理,例如用量、停顿时间等等)。

对于更新版本的 JDK,例如 JDK 12,还提供了 Shenandoah GC 等更优化的实现,这些目前该 FLIP 没有提及,但是我们也会尽快对此进行验证,并设计相应的判断逻辑。

Spill/LoadManager

这个模块是具体的执行者,它根据上述 HeapStatusMonitor 的请求,选择合适的 Key Group 进行 Spill 和 Load 操作。在对 Key Group 做选择时,主要根据大小和使用率来判断,这也是很常见的思路。对于越大、越冷门的对象,越有可能被踢出堆内存;反之亦然。

MMapManager

这个组件的存在,是因为在功能的设计里,使用 mmap 特性来提升读写磁盘的性能,所以需要一个模块来负责创建和维护文件映射、清理、异常记录和处理等等事务。

SpaceAllocator

这个组件最简单,也是目前设计最清晰的一个。它负责分配堆外/磁盘上的空间,以保存各种溢出的 Key Group. 在目前的设计里,存储空间被分为 Chunk、Bucket、Block 三级,关系见下图(来源于 PowerTwoBucketAllocator 文档 [参考阅读 3]):

Chunk、Bucket、Block 的关系示意图(来源见参考阅读 3 的设计文档)Chunk、Bucket、Block 的关系示意图(来源见参考阅读 3 的设计文档)

Chunk 表示一整块大的空间,它会被分为若干大小相同的 Bucket。在每个 Bucket 里,又可以分为大小相同的若干 Block. 需要注意的是,每个 Bucket 所采用的 Block 大小并不一定相同,所以可能出现某个 Bucket 的 Block 大,其他 Bucket 的 Block 小的情况,以充分根据实际情况来适配存储空间,减少内外零头的出现。

根据设计文档和当前的代码 Merge Request,目前有 PowerTwoBucketAllocator 和 DirectBucketAllocator 两种类型的 Bucket Allocator 来从 Chunk 中切分出各种 Bucket.

PowerTwoBucketAllocator 适用于小空间的分配,它的工作原理类似于 Linux 的 Buddy 内存分配算法,分配的 Block 大小必须是 2 的正整数次幂。它采用一个栈来维护当前可用的 Block,并通过位运算和对数运算来计算得到各个 Block 的内存地址。具体的算法参见设计文档及实现代码(目前还没有 Merge 到官方分支,仅供参考),这里不再重复描述。

DirectBucketAllocator 适用于大块空间的分配,以避免 PowerTwoBucketAllocator 带来的碎片问题。它采用经典的首次适应法(First Fit),通过 SortedMap 有序结构来记录的空余空间、已用空间列表,在分配时按顺序查找到首个可用空间;如果没有的话,压缩整理现有空间,以腾出新的块来使用。

实现进展

上述的设计图可能比较抽象和理论化。目前的实现主要由阿里巴巴的 Yu Li 同学提供,并已经提交到 GitHub 官方仓库供社区审议,作为 FLINK-12692 的子任务,FLINK-12693 的实现。

目前这个大的特性,分为如下的子任务:

  1. Store state per key-group in CopyOnWriteStateTable(目前已实现并合入 master,主要是对 flink-runtime 模块的 CopyOnWriteStateTable 类的完善)
  2. Implement a HeapAccountingManager(目前还没有看到讨论)
  3. Implement a HeapStatusMonitor(目前还没有看到讨论,但标记为进展中)
  4. Implement a MmapManager(目前还没有看到讨论)
  5. Support on-disk state storage for spill-able heap backend(目前已经有较为完整的实现方案,但还未通过社区评议。值得分析学习)
  6. Implement a SpillLoadManager(目前还没有看到讨论)
  7. Reduce CPU consumption when snapshot/restore the spilled key-group(目前还没有看到讨论)
  8. Create a module for spill-able heap backend(这个只是创建一个项目占位符,目前已经实现)
  9. Introduce a HybridStateTable to combine everything together(目前还没有看到讨论)
  10. Introduce SpillableHeapKeyedStateBackend and all necessities(目前还没有看到讨论)
  11. Implement the SpaceAllocator (已经有了实现,例如上文提到的 PowerTwoBucketAllocator,还在社区审议中)

按照原计划,FLIP-50 本打算在 Flink 1.9 版本发布,但是应该是因为最近几个版本的改动太多了,还有很大一部分没有得到实现。对于这个特性,我们也非常兴奋和期待,目前也已经在进行初步的验证和设计探讨,并在内部对比各种方案。当时机成熟时,也会回馈给社区,一起给 Flink 增砖添瓦,贡献力量。

参考阅读

[1] 基于Apache Flink的平台化构建及运维优化经验

[2] Flink 官方文档 Working With State

[3] PowerTwoBucketAllocator 设计文档

0 人点赞