Spillable StateBackend 之 HeapStatusMonitor 解析

2021-09-29 20:46:50 浏览数 (1)

背景介绍

Flink 社区的 Spillable Backend 特性,社区经过了大半年的开发,目前已经放出了预览版可供体验。

简而言之,这个 StateBackend 它可以实现将 Flink 作业运行时的状态主要还是存留在内存里(如果不超过内存本身的限制的话),此时行为和 Flink 目前的 HeapStateBackend 类似,可以获取最佳的性能;系统会定期自动检测当前的指标值(内存负载、GC 时间等),发现内存已经大概率承受不住之时,启动 spill 过程,将不常访问的大对象从内存中移到磁盘中。而当负载下降时,可以把这些状态 load 回内存。

总体来看,它的实现思路类似于 swap(虚拟内存),但是针对 Flink 本身的特点进行了特别设计。本专栏之前有一篇文章《Spill-able Heap Keyed State Backend 设计概览》介绍过这个特性,这次我们根据最新的代码,对它的实现进行更细致的分析和评价。

HeapStatusMonitor 的配置项

正如前文所述,这个模块主要承担着运行时监测堆内存状态的职责,它定期根据策略检测系统资源情况,然后将当前资源用量写入自己的变量中,等待别人读取,因而它是生产者。

它的检测周期由配置参数state.backend.spillable.heap-status.check-interval来决定,默认是 1 分钟。如果从经验上看,作业内存负载变化较为剧烈,推荐将这个值设置短一些,避免突发的用量增加而导致整个进程 OOM;反之,如果内存负载变化缓慢而均匀,则可以增大设置,避免频繁检测的开销。

还有一个参数 state.backend.spillable.gc-time.threshold,决定了 GC 时间超过多长时,发送 spill 信号(状态移到磁盘),默认值是 2s,即发现 GC 时间超过 2s 时,触发 spill。

另外,还有一个参数 state.backend.spillable.resource-check.interval 比较令人迷惑,它实际上指的是状态表(StateTable)在存取的数据时候(put、get 操作),从 HeapStatusMonitor 读取资源检测结果的最小周期,因此这是消费者的消费周期,与 HeapStatusMonitor 自身无关。由于本文重点介绍的是决策的生产者 HeapStatusMonitor,这些参数暂时略过。

构造方法解析

HeapStatusMonitor 类的构造方法里有十几个参数,分别作用如下:

- checkIntervalInMs:堆内存检测周期,即我们之前介绍的 state.backend.spillable.heap-status.check-interval 参数。

- memoryMxBean:JVM 自带的 MemoryMXBean 对象,可以从中获取当前的堆内存用量、堆内存最大值等。

- maxMemory:从上述 Bean 中获取的堆内存最大值,用来作为一个常量。

- garbageCollectorMXBeans:JVM 自带的描述 GC 统计信息的 MXBean 对象列表(不止一个)。

- resultIdGenerator:为每次检测的结果生成一个自增的 ID。因为可能并发访问,这里需要使用 AtomicLong 对象。

- monitorResult:描述单次的检测结果。MonitorResult 对象包含了当前时间戳、ID、堆内存用量、GC 时间等关键信息。当然,目前这些指标还是太少,生产环境还是需要更多决策项。

- lastGcTime:上次 GC 的时间值

- lastGcCount:上次 GC 的统计数

- shutdownHook:Flink 里常见的用法,注册一个 JVM 的 shutdown hook,这样在进程关闭时,可以打印相关的日志,并设置 isShutdown 环境变量。

- checkExecutor:创建一个周期执行器,并设置相关的取消策略。

- checkFuture:向上述周期执行器提交一个周期为 checkIntervalInMs,会定时运行本类的 runCheck()方法,来检查堆内存的情况,并生成对应的 monitorResult。

runCheck 方法

这个方法是周期检查的核心,也非常简单,我们来看一下:

代码语言:javascript复制
private void runCheck() {
    long timestamp = System.currentTimeMillis();
    long id = resultIdGenerator.getAndIncrement();
    this.monitorResult = new MonitorResult(timestamp, id, memoryMXBean.getHeapMemoryUsage(), getGarbageCollectionTime());
    if (LOG.isDebugEnabled()) {
        LOG.debug("Check memory status, {}", monitorResult.toString());
    }
}

可以看到,这个方法就是定期将当前的堆内存用量,以及最近一次 GC 的平均时间保存在本实例的 monitorResult 对象中,以备决策者读取。堆内存用量的获取非常直白,即直接获取 memoryMXBean 的 getHeapMemoryUsage() 方法即可,但是 GC 时间的获取还是要费一番功夫。

getGarbageCollectionTime 方法

这个方法用来获取最近 GC 的平均时间,代码如下:

代码语言:javascript复制
private long getGarbageCollectionTime() {
    long count = 0;
    long timeMillis = 0;
    for (GarbageCollectorMXBean gcBean : garbageCollectorMXBeans) {
        long c = gcBean.getCollectionCount();
        long t = gcBean.getCollectionTime();
        count  = c;
        timeMillis  = t;
    }

    if (count == lastGcCount) {
        return 0;
    }

    long gcCountIncrement = count - lastGcCount;
    long averageGcTime = (timeMillis - lastGcTime) / gcCountIncrement;

    lastGcCount = count;
    lastGcTime = timeMillis;

    return averageGcTime;
}

从代码里可以看到,这里是通过一个 for 循环,遍历 garbageCollectorMXBeans 列表里的所有 GC 的 MXBean,然后逐个读取当前的 GC 次数和时间,加到变量里。然后将得到的总次数和总时间,分别减去上次记录的值(lastGcCount 和 lastGcTime),然后进行相除,就可以得到本次检测时的 GC 平均时间了。

总结

在 Spillable StateBackend 的设计中,HeapStatusMonitor 属于非常简单但是也非常核心的功能类,相当于整个系统决策的信号标。

但时,我们也注意到,目前它的作用只是定时的检测堆内存用量、平均 GC 时间,并没有涉及到更多的指标,例如方差、增量比率、复合指标等等。同时,由于 GC 算法的不同,这里得到的平均 GC 时间很可能没有意义,例如最新的 ZGC 宣称可以将停顿时间降低到 2ms 以下。此时,单纯检测 GC 时间,并不能很好地得出系统繁忙与否的结论。

因此,目前这个预览版仍然不适合作为线上生产环境使用。我们近期会持续追踪,并补充生产环境的一些实践经验和改进项,这些都会在接下来的系列文章中得到阐述。

0 人点赞