Spillable StateBackend 之 SpillAndLoadManager 源码注解

2021-09-29 20:46:36 浏览数 (1)

背景概要

在前文中,我们介绍了 Spillable Backend 及其 HeapStatusMonitor 的工作原理和不足。今天我们来看一下 Spillable Backend 的另一个核心组件:SpillAndLoadManager。如果说 HeapStatusMonitor 是测量系统负载的信号灯,那么 SpillAndLoadManager 就是具体的决策者,根据信号来决定当前系统的行为(将状态从内存 Spill 到磁盘,还是从磁盘 Load 到内存)。

SpillAndLoadManager 接口

SpillAndLoadManager 接口的定义非常简单,只有一个 checkResource() 方法,表示检查当前的资源,然后决定下一步的行动,其代码如下:

代码语言:java复制
public interface SpillAndLoadManager {

	/**
	 * Check resource to decide whether to spill or load state.
	 */
	void checkResource();
}

目前的代码里,只有 SpillAndLoadManagerImpl 这一个实现类,因此本文以这个类为基准,通过逐行注解关键方法的方式,介绍它的工作原理,并对当前的实现方式提出一些建议。

SpillAndLoadManagerImpl 实现类初探

这个类目前有接近 500 行代码,处于中低复杂度。在分析它的工作原理之前,我们来首先先看一下他的整体结构:

SpillAndLoadManagerImpl 类的整体结构SpillAndLoadManagerImpl 类的整体结构

其中上图的 Action 是一个枚举类型,有 SPILL、LOAD、NONE 三种取值,分别表示三种动作(NONE 表示”无动作“这种动作),因此很容易猜到,它是本方法的决策信号,决定了相关状态的去向。

ActionResult 则是对 Action 及比例参数(spillOrLoadRatio)的封装类,作为行动的指南:例如传入了 SPILL Action 和 0.2 的 spillOrLoadRatio 作为 ActionResult, 表明需要把 20% 的冷状态写入磁盘。

既然它的父类只定义了 checkResource() 一个方法,那我们先从这个方法入手,逐步分析该类的运行原理(下面的代码片段,删减了一些日志代码)。

代码语言:java复制
public void checkResource() {
    long currentTime = System.currentTimeMillis();  // 获取当前 Unix 时间戳

    // 如果距离上次资源检查的时间小于阈值, 就不做检查. 配置项为 state.backend.spillable.resource-check.interval, 默认值为 10s
    if (currentTime - lastResourceCheckTime < resourceCheckInterval) { return; }

    lastResourceCheckTime = currentTime;    // 重置上次资源检查时间为当前值
    // getMonitorResult 访问的正是之前介绍的 heapStatusMonitor 里保存的资源负载。由于变量定义为了 volatile, 不能缓存, 因而访问开销较大
    HeapStatusMonitor.MonitorResult monitorResult = heapStatusMonitor.getMonitorResult();

    // 如果和上次获取的 monitorResult 相同, 则不做处理, 直接返回
    if (lastMonitorResult != null && lastMonitorResult.getId() == monitorResult.getId()) { return; }
    lastMonitorResult = monitorResult;

    ActionResult checkResult = decideAction(monitorResult);   // 根据 monitorResult 的值, 决定下一步的动作是 SPILL, LOAD 还是 NONE
    if (checkResult.action == Action.NONE) { return; }  // 如果无需动作, 则直接返回

    // 如果走到这里, 说明需要进行 SPILL 或 LOAD 动作. 首先确保动作不能太频繁, 如果小于阈值则直接返回, 不做动作
    // triggerInterval 的配置项为 state.backend.spillable.trigger-interval, 默认值为 1 分钟
    if (monitorResult.getTimestamp() - lastTriggerTime < triggerInterval) { return; }

    if (checkResult.action == Action.SPILL) {
        doSpill(checkResult);   // 调用 doSpill 方法将状态从内存移动到磁盘
    } else {
        doLoad(checkResult);    // 调用 doLoad 方法将状态从磁盘载入回内存
    }

    // spill 和 load 比较耗时, 所有事项做完后再更新 lastTriggerTime 时间
    lastTriggerTime = System.currentTimeMillis();
}

上述代码经过逐行注释后,逻辑非常清晰易懂。我们下面着重以三个核心方法 decideActiondoSpilldoLoad及其附属方法为例,介绍它们的工作流程。

decideAction 方法

这个方法根据 HeapStatusMonitor 返回的资源观测结果,做出实际的决策。

但是从下面的源码注释可以看到,目前决策的依据非常单一且不可靠,例如 Spill 只判断 GC 时间(这里计算的是近几次的平均时间,经常无法应对突发情况),Load 只判断内存用量等,而且不可动态调整,局限性非常大,因此这里也是迫切需要改进的一个点。

代码语言:java复制
ActionResult decideAction(HeapStatusMonitor.MonitorResult monitorResult) {
    long gcTime = monitorResult.getGarbageCollectionTime();     // 获取近期 GC 平均时间
    long usedMemory = monitorResult.getTotalUsedMemory();       // 获取堆内存总用量

    // 1. 检查是否需要触发 SPILL
    if (gcTime > gcTimeThreshold) { // TODO: 目前只有 GC 时间是否超过阈值, 非常简陋
        return ActionResult.ofSpill(spillSizeRatio);    // TODO: 这个 ratio 需要改为动态的, 目前为配置项 state.backend.spillable.spill-size.ratio 设置, 默认值为 0.2, 表示每次有 20% 的状态被移动到磁盘上
    }

    // 2. 检查是否需要触发 LOAD
    if (usedMemory < loadStartSize) {   // TODO: 目前只有一个指标, loadStartSize = maxMemory * loadStartRatio, 而 loadStartRatio 由 state.backend.spillable.load-start.ratio 配置项决定, 默认是 0.1
        float loadRatio = (float) (loadEndSize - usedMemory) / usedMemory;  // loadEndSize = maxMemory * loadEndRatio, 而 loadEndRatio 由 state.backend.spillable.load-end.ratio 配置项决定, 默认为 0.3
        return ActionResult.ofLoad(loadRatio);  // 决定载入的内存比例
    }

    // 都不需要, 那么无动作
    return ActionResult.ofNone();
}

StateMapMeta

这个方法主要描述了在状态表中的一个 KeyGroup 所保存的所有状态的元数据信息,称为一个 StateMap。 在 Flink 的状态系统中,分配的最小单元是 KeyGroup,表示哈希值一致的一组 Key。若干个连续的 KeyGroup 组成一个 KeyGroupRange,分配给相关算子的某个并行实例中。

代码语言:java复制
public static class StateMapMeta {
    private final SpillableStateTable stateTable;    // KeyGroup 所属的状态表引用
    private final int keyGroupIndex;    // 该状态表中, 此 KeyGroup 的偏移量 (索引)
    private final boolean isOnHeap;     // 目前是否在堆内存里
    private final int size;             // 该 KeyGroup 的状态数
    private final long numRequests;     // 该 KeyGroup 的总请求数
    private long estimatedMemorySize;   // 估计的状态总大小, 如果是 -1 表示未初始化
}

可以看到,StateMapMeta 对象包括了这个 KeyGroup 所属对象的多种典型属性,Flink Spillable Backend 就是根据这些属性来计算权重、决定 Spill 还是 Load 等动作的影响范围。

getStateMapMetas 方法

这里我们简单介绍一下这个名为 getStateMapMetas 的辅助方法。它的作用是给定一个过滤器函数,对当前状态表里的所有 StateMap(即 KeyGroup 所属的状态集)进行筛选,然后返回符合条件的列表。

代码语言:java复制
private List<SpillableStateTable.StateMapMeta> getStateMapMetas(
    Function<SpillableStateTable.StateMapMeta, Boolean> stateMapFilter) {   // 传入筛选策略函数, 按条件筛选状态表中符合条件的 KeyGroup 列表
    List<SpillableStateTable.StateMapMeta> stateMapMetas = new ArrayList<>();   // 创建一个列表, 以作为本函数返回值
    for (Tuple2<String, SpillableStateTable> tuple : stateTableContainer) {     // stateTableContainer 是即当前实例中, 所有已注册的状态名和状态表的 Tuple2 映射
        int len = stateMapMetas.size();
        SpillableStateTable spillableStateTable = tuple.f1;     // tuple.f1 是某个状态的状态表, 而 tuple.f0 是状态名, 这里用不到
        Iterator<SpillableStateTable.StateMapMeta> iterator = spillableStateTable.stateMapIterator();   // 准备按状态表中 KeyGroup 的顺序, 依次遍历检查该状态表里的所有的状态
        while (iterator.hasNext()) {    // 对该状态表中的每个 KeyGroup 下的状态映射进行遍历
            SpillableStateTable.StateMapMeta meta = iterator.next();
            if (stateMapFilter.apply(meta)) { stateMapMetas.add(meta); }    // 如果这个状态表的 KeyGroup 的 Meta 信息符合传入函数的筛选条件, 就加入返回列表
        }

        if (len < stateMapMetas.size()) {   // 如果发现上述循环中, 新增了符合条件的 KeyGroup, 需要估计表中每个 KeyGroup 的状态大小, 并写入元数据 (Meta) 中
            long estimatedSize = spillableStateTable.getStateEstimatedSize(true);   // 更新对象的平均大小, 注意是平均大小

            // 逐个更新每个新增的 KeyGroup 元数据中堆内存的估计大小, 计算公式是: 状态总数 * 估计的状态平均大小
            for (int i = len; i < stateMapMetas.size(); i  ) {
                SpillableStateTable.StateMapMeta stateMapMeta = stateMapMetas.get(i);
                stateMapMeta.setEstimatedMemorySize(stateMapMeta.getSize() * estimatedSize);    // 状态总数 * 估计的状态平均大小
            }
        }
    }
    return stateMapMetas; // 返回含有最新状态估计大小的状态表元数据
}

特别需要注意的是,它的作用不仅仅在于筛选,而且还在筛选之后,对本 StateMap 里所有状态的大小进行估计,并保存在前面所述的 StateMapMeta 对象中。这样,后面的权重计算和排序,才有了数据支持。

doSpill 方法

下面我们来看一下,Spillable Backend 是如何将内存里的状态对象,Spill 到磁盘上的。这个 doSpill 方法由前述的 decideAction 方法调用,执行具体的 Spill 操作(优先选择访问不频繁、尺寸较大的 KeyGroup 进行 spill)。

它包括了筛选和大小估计、权重排序、执行 Spill 等多个步骤,最终达到阈值而完成整个流程。

代码语言:java复制
void doSpill(ActionResult actionResult) {
    List<SpillableStateTable.StateMapMeta> onHeapStateMapMetas =    // 筛选当前 KeyGroup 并得到统计的元数据 (过滤条件是 onHeap 并且 size 大于 0)
        getStateMapMetas((meta) -> meta.isOnHeap() && meta.getSize() > 0);
    if (onHeapStateMapMetas.isEmpty()) { return; }  // 如果没有筛选到, 说明不用 spill, 直接返回

    sortStateMapMeta(actionResult.action, onHeapStateMapMetas);     // 根据 KeyGroup 的状态大小和访问频次进行权重排序, 权重大的放在前面

    long totalSize = onHeapStateMapMetas.stream()   // 获取所有堆内 KeyGroup 状态大小的总和
        .map(SpillableStateTable.StateMapMeta::getEstimatedMemorySize).reduce(0L, (a, b) -> a   b);     // 可以用 Long::sum 代替
    long spillSize = (long) (totalSize * actionResult.spillOrLoadRatio);    // 计算需要 spill 的比例

    if (spillSize == 0) { return; }
    if (cancelCheckpoint) { checkpointManager.cancelAllCheckpoints(); }     // 配置项 state.backend.spillable.cancel.checkpoint 可以控制是否在 spill 时取消当前的所有进行中的快照(默认为 true)。取消快照可以加快 GC 和 Spill 过程

    for (SpillableStateTable.StateMapMeta meta : onHeapStateMapMetas) {     // 根据排序得到的 KeyGroup 列表, 从权重最大(访问频次最小、大小最大)的开始, 逐个进行 Spill 操作, 直到达到阈值
        meta.getStateTable().spillState(meta.getKeyGroupIndex());
        spillSize -= meta.getEstimatedMemorySize();
        if (spillSize <= 0) { break; }  // 如果 Spill 的大小已经达到了阈值, 就不再继续, 本次 Spill 操作结束
    }
}

doLoad 方法

这个 doLoad 方法同样由 decideAction 方法调用,是 doSpill 方法的“对偶函数”,即将状态从磁盘等外存,载入到堆内存中(优先选择访问频繁、尺寸较小的 KeyGroup 进行 load)。

整体的函数逻辑与 doSpill 相同,只是更新阈值的代码放在了操作之前,以避免多载入对象到内存中,造成较大压力。

代码语言:java复制
void doLoad(ActionResult actionResult) {    // 将状态从磁盘载入回堆内存
    List<SpillableStateTable.StateMapMeta> onDiskStateMapMetas =    // 筛选出所有不在堆内存且状态不为 0 的 KeyGroup 状态列表
        getStateMapMetas((meta) -> !meta.isOnHeap() && meta.getSize() > 0);
    if (onDiskStateMapMetas.isEmpty()) { return; }

    sortStateMapMeta(actionResult.action, onDiskStateMapMetas);     // 对所有的 KeyGroup 状态列表进行权重排序, 最大的 (访问次数最多、状态最小的)放在前面, 优先进行 Load

    long totalSize = onDiskStateMapMetas.stream()   // 计算符合条件的状态总大小
        .map(SpillableStateTable.StateMapMeta::getEstimatedMemorySize)
        .reduce(0L, Long::sum);
    long loadSize = (long) (totalSize * actionResult.spillOrLoadRatio); // 计算出需要载入的最大比例

    if (loadSize == 0) { return; }

    for (SpillableStateTable.StateMapMeta meta : onDiskStateMapMetas) { // 开始载入到内存, 直到满足阈值
        loadSize -= meta.getEstimatedMemorySize();
        // Load 时先减去状态大小, 避免多载入了一些状态, 导致内存压力比预期的更大
        if (loadSize < 0) { break; }

        meta.getStateTable().loadState(meta.getKeyGroupIndex());    // 按照 KeyGroup 元数据里面记录的 KeyGroupIndex 来载入状态到当前状态表
    }
}

sortStateMapMeta 方法

这个方法也属于比较关键的一个方法。它会根据传入的 StateMapMeta 列表,将其归一化以后,按照既定的规则进行权重排序。

需要注意的是,权重计算的方法在 computeWeight 方法中,根据 Spill(优先选择访问频率低、尺寸大的状态进行 Spill)还是 Load(优先选择访问频率高、尺寸小的状态进行 Load)有完全相反的权重计算方法。

代码语言:java复制
private void sortStateMapMeta(Action action, List<SpillableStateTable.StateMapMeta> stateMapMetas) {
    if (stateMapMetas.isEmpty()) { return; }

    // 使用 (X - Xmin)/(Xmax - Xmin) 公式来进行归一化, 以确保归一化后的值域位于 [0,1]
    long sizeMax = 0L, sizeMin = Long.MAX_VALUE, requestMax = 0L, requestMin = Long.MAX_VALUE;
    for (SpillableStateTable.StateMapMeta meta : stateMapMetas) {   // 统计最大、最小的的 KeyGroup 状态对象总大小
        long estimatedMemorySize = meta.getEstimatedMemorySize();
        sizeMax = Math.max(sizeMax, estimatedMemorySize);
        sizeMin = Math.min(sizeMin, estimatedMemorySize);
        long numRequests = meta.getNumRequests();   // 获取该 KeyGroup 状态的总请求数
        requestMax = Math.max(requestMax, numRequests);
        requestMin = Math.min(requestMin, numRequests);
    }
    final long sizeDenominator = sizeMax - sizeMin; // 根据上述公式进行归一化
    final long requestDenominator = requestMax - requestMin;
    final long sizeMinForCompare = sizeMin;
    final long requestMinForCompare = requestMin;
    final Map<SpillableStateTable.StateMapMeta, Double> computedWeights = new IdentityHashMap<>();  // 准备一个 Map 来表示各个 KeyGroup 的相对权重
    Comparator<SpillableStateTable.StateMapMeta> comparator = (o1, o2) -> {     // 创建一个 KeyGroup 权重的 Comparator 用来排序
        if (o1 == o2) { return 0; }
        if (o1 == null) { return -1; }
        if (o2 == null) { return 1; }
        double weight1 = computedWeights.computeIfAbsent(o1,    // 计算第一个 KeyGroup 状态的权重, 并放入 computedWeights Map 中准备排序
            k -> computeWeight(k, action, sizeMinForCompare, requestMinForCompare, sizeDenominator, requestDenominator));   // computeWeight 的公式是 (weightRetainedSize * normalizedSize   weightRequestRate * normalizedRequest) / weightSum
        double weight2 = computedWeights.computeIfAbsent(o2,    // 计算第二个 KeyGroup 状态的权重, 并放入 computedWeights Map 中准备排序
            k -> computeWeight(k, action, sizeMinForCompare, requestMinForCompare, sizeDenominator, requestDenominator));
        // 对比权重, 然后对大的一方优先返回 -1, 这样在排序时会排到最前面, 优先进行 SPILL 或者 LOAD 等动作
        return (weight1 > weight2) ? -1 : 1;
    };
    stateMapMetas.sort(comparator); // 进行排序, 令大的 KeyGroup 可以出现在最前面
}

这个方法利用了 Java 自带的排序机制,通过自定义 Comparator 的方式,让权重最大的对象排在前面,这样构造了一个优先队列,前面介绍的doSpilldoLoad 方法都只需要从首部开始处理即可。

computeWeight 方法

这个 computeWeight 方法决定了排序时的先后顺序。可以看到,对于 SPILL 和 LOAD,计算的公式可以说是相反的,但是最终目的一致:根据概率原理,留在内存里的相对都是访问频繁及占空间较小的对象,有利于保持性能。

代码语言:java复制
private double computeWeight(
    SpillableStateTable.StateMapMeta meta,
    Action action,
    long sizeMin, long requestMin,
    long sizeDenominator, long requestDenominator) {
    double normalizedSize = sizeDenominator == 0L ? 0.0 : (meta.getEstimatedMemorySize() - sizeMin) / (double) sizeDenominator; // 对本 KeyGroup 的状态大小进行归一化
    double normalizedRequest =  // 对本 KeyGroup 里状态的请求次数进行归一化
        requestDenominator == 0L ? 0.0 : (meta.getNumRequests() - requestMin) / (double) requestDenominator;
    double weightRetainedSize, weightRequestRate, weightSum;
    switch (action) {
        case SPILL: // 如果是 SPILL 操作, 倾向于选择访问不频繁的、较大的 Bucket 来进行
            weightRetainedSize = WEIGHT_SPILL_RETAINED_SIZE;    // 固定为 0.7, 正权重
            weightRequestRate = WEIGHT_SPILL_REQUEST_RATE;      // 固定为 -0.3, 负权重
            weightSum = WEIGHT_SPILL_SUM;   // 固定为前两者之和, 即 0.4
            break;
        case LOAD:  // 如果是 LOAD 操作, 倾向于选择请求频繁的、较小的 Bucket 来进行
            weightRetainedSize = WEIGHT_LOAD_RETAINED_SIZE; // 固定为 -0.3, 正权重
            weightRequestRate = WEIGHT_LOAD_REQUEST_RATE;   // 固定为 0.7, 负权重
            weightSum = WEIGHT_LOAD_SUM;    // 固定为前两者之和, 即 0.4
            break;
        default:
            throw new RuntimeException("Unsupported action: "   action);
    }
    // 如果是 Spill, 公式目前为 (0.7*normalizedSize-0.3*normalizedRequest)/0.4; 如果是 Load, 公式目前是 (0.7*normalizedRequest-0.3*normalizedSize)/0.4
    return (weightRetainedSize * normalizedSize   weightRequestRate * normalizedRequest) / weightSum; 
}

目前来看,公式里采用的参数都是固定的,对于调优来说并不是很灵活。

总结和展望

从目前的代码来看,SpillAndLoadManagerImpl 类的实现只能说完成了雏形,但是还有很多硬伤,例如公式参数固定不可调、决策条件过于简单、资源检测手段较为单一等等。

但需要看到,正是有了这些基石,该功能才有进一步完善的可能。因此这里非常感谢 Flink 社区的 Pengfei Li 和 Yu Li 两位作者的辛勤劳动。

我们也期待在接下来的版本中,Flink 的 Spillable Backend 可以更加成熟和完善;同时,后续我们也会将线上的一些经验和结论,融入到资源检测和动作判断逻辑中,让这个模块更加成熟、稳定,真正在生产环境下可用、可靠,最大程度上替代现有的 HeapKeyedStateBackend 和 RocksDBKeyedStateBackend。

0 人点赞