背景概要
在前文中,我们介绍了 Spillable Backend 及其 HeapStatusMonitor 的工作原理和不足。今天我们来看一下 Spillable Backend 的另一个核心组件:SpillAndLoadManager。如果说 HeapStatusMonitor 是测量系统负载的信号灯,那么 SpillAndLoadManager 就是具体的决策者,根据信号来决定当前系统的行为(将状态从内存 Spill 到磁盘,还是从磁盘 Load 到内存)。
SpillAndLoadManager 接口
SpillAndLoadManager 接口的定义非常简单,只有一个 checkResource()
方法,表示检查当前的资源,然后决定下一步的行动,其代码如下:
public interface SpillAndLoadManager {
/**
* Check resource to decide whether to spill or load state.
*/
void checkResource();
}
目前的代码里,只有 SpillAndLoadManagerImpl 这一个实现类,因此本文以这个类为基准,通过逐行注解关键方法的方式,介绍它的工作原理,并对当前的实现方式提出一些建议。
SpillAndLoadManagerImpl 实现类初探
这个类目前有接近 500 行代码,处于中低复杂度。在分析它的工作原理之前,我们来首先先看一下他的整体结构:
其中上图的 Action
是一个枚举类型,有 SPILL、LOAD、NONE 三种取值,分别表示三种动作(NONE 表示”无动作“这种动作),因此很容易猜到,它是本方法的决策信号,决定了相关状态的去向。
ActionResult
则是对 Action
及比例参数(spillOrLoadRatio
)的封装类,作为行动的指南:例如传入了 SPILL Action 和 0.2 的 spillOrLoadRatio 作为 ActionResult, 表明需要把 20% 的冷状态写入磁盘。
既然它的父类只定义了 checkResource()
一个方法,那我们先从这个方法入手,逐步分析该类的运行原理(下面的代码片段,删减了一些日志代码)。
public void checkResource() {
long currentTime = System.currentTimeMillis(); // 获取当前 Unix 时间戳
// 如果距离上次资源检查的时间小于阈值, 就不做检查. 配置项为 state.backend.spillable.resource-check.interval, 默认值为 10s
if (currentTime - lastResourceCheckTime < resourceCheckInterval) { return; }
lastResourceCheckTime = currentTime; // 重置上次资源检查时间为当前值
// getMonitorResult 访问的正是之前介绍的 heapStatusMonitor 里保存的资源负载。由于变量定义为了 volatile, 不能缓存, 因而访问开销较大
HeapStatusMonitor.MonitorResult monitorResult = heapStatusMonitor.getMonitorResult();
// 如果和上次获取的 monitorResult 相同, 则不做处理, 直接返回
if (lastMonitorResult != null && lastMonitorResult.getId() == monitorResult.getId()) { return; }
lastMonitorResult = monitorResult;
ActionResult checkResult = decideAction(monitorResult); // 根据 monitorResult 的值, 决定下一步的动作是 SPILL, LOAD 还是 NONE
if (checkResult.action == Action.NONE) { return; } // 如果无需动作, 则直接返回
// 如果走到这里, 说明需要进行 SPILL 或 LOAD 动作. 首先确保动作不能太频繁, 如果小于阈值则直接返回, 不做动作
// triggerInterval 的配置项为 state.backend.spillable.trigger-interval, 默认值为 1 分钟
if (monitorResult.getTimestamp() - lastTriggerTime < triggerInterval) { return; }
if (checkResult.action == Action.SPILL) {
doSpill(checkResult); // 调用 doSpill 方法将状态从内存移动到磁盘
} else {
doLoad(checkResult); // 调用 doLoad 方法将状态从磁盘载入回内存
}
// spill 和 load 比较耗时, 所有事项做完后再更新 lastTriggerTime 时间
lastTriggerTime = System.currentTimeMillis();
}
上述代码经过逐行注释后,逻辑非常清晰易懂。我们下面着重以三个核心方法 decideAction
、doSpill
、doLoad
及其附属方法为例,介绍它们的工作流程。
decideAction 方法
这个方法根据 HeapStatusMonitor
返回的资源观测结果,做出实际的决策。
但是从下面的源码注释可以看到,目前决策的依据非常单一且不可靠,例如 Spill 只判断 GC 时间(这里计算的是近几次的平均时间,经常无法应对突发情况),Load 只判断内存用量等,而且不可动态调整,局限性非常大,因此这里也是迫切需要改进的一个点。
代码语言:java复制ActionResult decideAction(HeapStatusMonitor.MonitorResult monitorResult) {
long gcTime = monitorResult.getGarbageCollectionTime(); // 获取近期 GC 平均时间
long usedMemory = monitorResult.getTotalUsedMemory(); // 获取堆内存总用量
// 1. 检查是否需要触发 SPILL
if (gcTime > gcTimeThreshold) { // TODO: 目前只有 GC 时间是否超过阈值, 非常简陋
return ActionResult.ofSpill(spillSizeRatio); // TODO: 这个 ratio 需要改为动态的, 目前为配置项 state.backend.spillable.spill-size.ratio 设置, 默认值为 0.2, 表示每次有 20% 的状态被移动到磁盘上
}
// 2. 检查是否需要触发 LOAD
if (usedMemory < loadStartSize) { // TODO: 目前只有一个指标, loadStartSize = maxMemory * loadStartRatio, 而 loadStartRatio 由 state.backend.spillable.load-start.ratio 配置项决定, 默认是 0.1
float loadRatio = (float) (loadEndSize - usedMemory) / usedMemory; // loadEndSize = maxMemory * loadEndRatio, 而 loadEndRatio 由 state.backend.spillable.load-end.ratio 配置项决定, 默认为 0.3
return ActionResult.ofLoad(loadRatio); // 决定载入的内存比例
}
// 都不需要, 那么无动作
return ActionResult.ofNone();
}
StateMapMeta
这个方法主要描述了在状态表中的一个 KeyGroup 所保存的所有状态的元数据信息,称为一个 StateMap
。 在 Flink 的状态系统中,分配的最小单元是 KeyGroup,表示哈希值一致的一组 Key。若干个连续的 KeyGroup 组成一个 KeyGroupRange
,分配给相关算子的某个并行实例中。
public static class StateMapMeta {
private final SpillableStateTable stateTable; // KeyGroup 所属的状态表引用
private final int keyGroupIndex; // 该状态表中, 此 KeyGroup 的偏移量 (索引)
private final boolean isOnHeap; // 目前是否在堆内存里
private final int size; // 该 KeyGroup 的状态数
private final long numRequests; // 该 KeyGroup 的总请求数
private long estimatedMemorySize; // 估计的状态总大小, 如果是 -1 表示未初始化
}
可以看到,StateMapMeta 对象包括了这个 KeyGroup 所属对象的多种典型属性,Flink Spillable Backend 就是根据这些属性来计算权重、决定 Spill 还是 Load 等动作的影响范围。
getStateMapMetas 方法
这里我们简单介绍一下这个名为 getStateMapMetas 的辅助方法。它的作用是给定一个过滤器函数,对当前状态表里的所有 StateMap(即 KeyGroup 所属的状态集)进行筛选,然后返回符合条件的列表。
代码语言:java复制private List<SpillableStateTable.StateMapMeta> getStateMapMetas(
Function<SpillableStateTable.StateMapMeta, Boolean> stateMapFilter) { // 传入筛选策略函数, 按条件筛选状态表中符合条件的 KeyGroup 列表
List<SpillableStateTable.StateMapMeta> stateMapMetas = new ArrayList<>(); // 创建一个列表, 以作为本函数返回值
for (Tuple2<String, SpillableStateTable> tuple : stateTableContainer) { // stateTableContainer 是即当前实例中, 所有已注册的状态名和状态表的 Tuple2 映射
int len = stateMapMetas.size();
SpillableStateTable spillableStateTable = tuple.f1; // tuple.f1 是某个状态的状态表, 而 tuple.f0 是状态名, 这里用不到
Iterator<SpillableStateTable.StateMapMeta> iterator = spillableStateTable.stateMapIterator(); // 准备按状态表中 KeyGroup 的顺序, 依次遍历检查该状态表里的所有的状态
while (iterator.hasNext()) { // 对该状态表中的每个 KeyGroup 下的状态映射进行遍历
SpillableStateTable.StateMapMeta meta = iterator.next();
if (stateMapFilter.apply(meta)) { stateMapMetas.add(meta); } // 如果这个状态表的 KeyGroup 的 Meta 信息符合传入函数的筛选条件, 就加入返回列表
}
if (len < stateMapMetas.size()) { // 如果发现上述循环中, 新增了符合条件的 KeyGroup, 需要估计表中每个 KeyGroup 的状态大小, 并写入元数据 (Meta) 中
long estimatedSize = spillableStateTable.getStateEstimatedSize(true); // 更新对象的平均大小, 注意是平均大小
// 逐个更新每个新增的 KeyGroup 元数据中堆内存的估计大小, 计算公式是: 状态总数 * 估计的状态平均大小
for (int i = len; i < stateMapMetas.size(); i ) {
SpillableStateTable.StateMapMeta stateMapMeta = stateMapMetas.get(i);
stateMapMeta.setEstimatedMemorySize(stateMapMeta.getSize() * estimatedSize); // 状态总数 * 估计的状态平均大小
}
}
}
return stateMapMetas; // 返回含有最新状态估计大小的状态表元数据
}
特别需要注意的是,它的作用不仅仅在于筛选,而且还在筛选之后,对本 StateMap 里所有状态的大小进行估计,并保存在前面所述的 StateMapMeta 对象中。这样,后面的权重计算和排序,才有了数据支持。
doSpill 方法
下面我们来看一下,Spillable Backend 是如何将内存里的状态对象,Spill 到磁盘上的。这个 doSpill 方法由前述的 decideAction 方法调用,执行具体的 Spill 操作(优先选择访问不频繁、尺寸较大的 KeyGroup 进行 spill)。
它包括了筛选和大小估计、权重排序、执行 Spill 等多个步骤,最终达到阈值而完成整个流程。
代码语言:java复制void doSpill(ActionResult actionResult) {
List<SpillableStateTable.StateMapMeta> onHeapStateMapMetas = // 筛选当前 KeyGroup 并得到统计的元数据 (过滤条件是 onHeap 并且 size 大于 0)
getStateMapMetas((meta) -> meta.isOnHeap() && meta.getSize() > 0);
if (onHeapStateMapMetas.isEmpty()) { return; } // 如果没有筛选到, 说明不用 spill, 直接返回
sortStateMapMeta(actionResult.action, onHeapStateMapMetas); // 根据 KeyGroup 的状态大小和访问频次进行权重排序, 权重大的放在前面
long totalSize = onHeapStateMapMetas.stream() // 获取所有堆内 KeyGroup 状态大小的总和
.map(SpillableStateTable.StateMapMeta::getEstimatedMemorySize).reduce(0L, (a, b) -> a b); // 可以用 Long::sum 代替
long spillSize = (long) (totalSize * actionResult.spillOrLoadRatio); // 计算需要 spill 的比例
if (spillSize == 0) { return; }
if (cancelCheckpoint) { checkpointManager.cancelAllCheckpoints(); } // 配置项 state.backend.spillable.cancel.checkpoint 可以控制是否在 spill 时取消当前的所有进行中的快照(默认为 true)。取消快照可以加快 GC 和 Spill 过程
for (SpillableStateTable.StateMapMeta meta : onHeapStateMapMetas) { // 根据排序得到的 KeyGroup 列表, 从权重最大(访问频次最小、大小最大)的开始, 逐个进行 Spill 操作, 直到达到阈值
meta.getStateTable().spillState(meta.getKeyGroupIndex());
spillSize -= meta.getEstimatedMemorySize();
if (spillSize <= 0) { break; } // 如果 Spill 的大小已经达到了阈值, 就不再继续, 本次 Spill 操作结束
}
}
doLoad 方法
这个 doLoad 方法同样由 decideAction 方法调用,是 doSpill 方法的“对偶函数”,即将状态从磁盘等外存,载入到堆内存中(优先选择访问频繁、尺寸较小的 KeyGroup 进行 load)。
整体的函数逻辑与 doSpill 相同,只是更新阈值的代码放在了操作之前,以避免多载入对象到内存中,造成较大压力。
代码语言:java复制void doLoad(ActionResult actionResult) { // 将状态从磁盘载入回堆内存
List<SpillableStateTable.StateMapMeta> onDiskStateMapMetas = // 筛选出所有不在堆内存且状态不为 0 的 KeyGroup 状态列表
getStateMapMetas((meta) -> !meta.isOnHeap() && meta.getSize() > 0);
if (onDiskStateMapMetas.isEmpty()) { return; }
sortStateMapMeta(actionResult.action, onDiskStateMapMetas); // 对所有的 KeyGroup 状态列表进行权重排序, 最大的 (访问次数最多、状态最小的)放在前面, 优先进行 Load
long totalSize = onDiskStateMapMetas.stream() // 计算符合条件的状态总大小
.map(SpillableStateTable.StateMapMeta::getEstimatedMemorySize)
.reduce(0L, Long::sum);
long loadSize = (long) (totalSize * actionResult.spillOrLoadRatio); // 计算出需要载入的最大比例
if (loadSize == 0) { return; }
for (SpillableStateTable.StateMapMeta meta : onDiskStateMapMetas) { // 开始载入到内存, 直到满足阈值
loadSize -= meta.getEstimatedMemorySize();
// Load 时先减去状态大小, 避免多载入了一些状态, 导致内存压力比预期的更大
if (loadSize < 0) { break; }
meta.getStateTable().loadState(meta.getKeyGroupIndex()); // 按照 KeyGroup 元数据里面记录的 KeyGroupIndex 来载入状态到当前状态表
}
}
sortStateMapMeta 方法
这个方法也属于比较关键的一个方法。它会根据传入的 StateMapMeta 列表,将其归一化以后,按照既定的规则进行权重排序。
需要注意的是,权重计算的方法在 computeWeight
方法中,根据 Spill(优先选择访问频率低、尺寸大的状态进行 Spill)还是 Load(优先选择访问频率高、尺寸小的状态进行 Load)有完全相反的权重计算方法。
private void sortStateMapMeta(Action action, List<SpillableStateTable.StateMapMeta> stateMapMetas) {
if (stateMapMetas.isEmpty()) { return; }
// 使用 (X - Xmin)/(Xmax - Xmin) 公式来进行归一化, 以确保归一化后的值域位于 [0,1]
long sizeMax = 0L, sizeMin = Long.MAX_VALUE, requestMax = 0L, requestMin = Long.MAX_VALUE;
for (SpillableStateTable.StateMapMeta meta : stateMapMetas) { // 统计最大、最小的的 KeyGroup 状态对象总大小
long estimatedMemorySize = meta.getEstimatedMemorySize();
sizeMax = Math.max(sizeMax, estimatedMemorySize);
sizeMin = Math.min(sizeMin, estimatedMemorySize);
long numRequests = meta.getNumRequests(); // 获取该 KeyGroup 状态的总请求数
requestMax = Math.max(requestMax, numRequests);
requestMin = Math.min(requestMin, numRequests);
}
final long sizeDenominator = sizeMax - sizeMin; // 根据上述公式进行归一化
final long requestDenominator = requestMax - requestMin;
final long sizeMinForCompare = sizeMin;
final long requestMinForCompare = requestMin;
final Map<SpillableStateTable.StateMapMeta, Double> computedWeights = new IdentityHashMap<>(); // 准备一个 Map 来表示各个 KeyGroup 的相对权重
Comparator<SpillableStateTable.StateMapMeta> comparator = (o1, o2) -> { // 创建一个 KeyGroup 权重的 Comparator 用来排序
if (o1 == o2) { return 0; }
if (o1 == null) { return -1; }
if (o2 == null) { return 1; }
double weight1 = computedWeights.computeIfAbsent(o1, // 计算第一个 KeyGroup 状态的权重, 并放入 computedWeights Map 中准备排序
k -> computeWeight(k, action, sizeMinForCompare, requestMinForCompare, sizeDenominator, requestDenominator)); // computeWeight 的公式是 (weightRetainedSize * normalizedSize weightRequestRate * normalizedRequest) / weightSum
double weight2 = computedWeights.computeIfAbsent(o2, // 计算第二个 KeyGroup 状态的权重, 并放入 computedWeights Map 中准备排序
k -> computeWeight(k, action, sizeMinForCompare, requestMinForCompare, sizeDenominator, requestDenominator));
// 对比权重, 然后对大的一方优先返回 -1, 这样在排序时会排到最前面, 优先进行 SPILL 或者 LOAD 等动作
return (weight1 > weight2) ? -1 : 1;
};
stateMapMetas.sort(comparator); // 进行排序, 令大的 KeyGroup 可以出现在最前面
}
这个方法利用了 Java 自带的排序机制,通过自定义 Comparator 的方式,让权重最大的对象排在前面,这样构造了一个优先队列,前面介绍的doSpill
和 doLoad
方法都只需要从首部开始处理即可。
computeWeight 方法
这个 computeWeight 方法决定了排序时的先后顺序。可以看到,对于 SPILL 和 LOAD,计算的公式可以说是相反的,但是最终目的一致:根据概率原理,留在内存里的相对都是访问频繁及占空间较小的对象,有利于保持性能。
代码语言:java复制private double computeWeight(
SpillableStateTable.StateMapMeta meta,
Action action,
long sizeMin, long requestMin,
long sizeDenominator, long requestDenominator) {
double normalizedSize = sizeDenominator == 0L ? 0.0 : (meta.getEstimatedMemorySize() - sizeMin) / (double) sizeDenominator; // 对本 KeyGroup 的状态大小进行归一化
double normalizedRequest = // 对本 KeyGroup 里状态的请求次数进行归一化
requestDenominator == 0L ? 0.0 : (meta.getNumRequests() - requestMin) / (double) requestDenominator;
double weightRetainedSize, weightRequestRate, weightSum;
switch (action) {
case SPILL: // 如果是 SPILL 操作, 倾向于选择访问不频繁的、较大的 Bucket 来进行
weightRetainedSize = WEIGHT_SPILL_RETAINED_SIZE; // 固定为 0.7, 正权重
weightRequestRate = WEIGHT_SPILL_REQUEST_RATE; // 固定为 -0.3, 负权重
weightSum = WEIGHT_SPILL_SUM; // 固定为前两者之和, 即 0.4
break;
case LOAD: // 如果是 LOAD 操作, 倾向于选择请求频繁的、较小的 Bucket 来进行
weightRetainedSize = WEIGHT_LOAD_RETAINED_SIZE; // 固定为 -0.3, 正权重
weightRequestRate = WEIGHT_LOAD_REQUEST_RATE; // 固定为 0.7, 负权重
weightSum = WEIGHT_LOAD_SUM; // 固定为前两者之和, 即 0.4
break;
default:
throw new RuntimeException("Unsupported action: " action);
}
// 如果是 Spill, 公式目前为 (0.7*normalizedSize-0.3*normalizedRequest)/0.4; 如果是 Load, 公式目前是 (0.7*normalizedRequest-0.3*normalizedSize)/0.4
return (weightRetainedSize * normalizedSize weightRequestRate * normalizedRequest) / weightSum;
}
目前来看,公式里采用的参数都是固定的,对于调优来说并不是很灵活。
总结和展望
从目前的代码来看,SpillAndLoadManagerImpl 类的实现只能说完成了雏形,但是还有很多硬伤,例如公式参数固定不可调、决策条件过于简单、资源检测手段较为单一等等。
但需要看到,正是有了这些基石,该功能才有进一步完善的可能。因此这里非常感谢 Flink 社区的 Pengfei Li 和 Yu Li 两位作者的辛勤劳动。
我们也期待在接下来的版本中,Flink 的 Spillable Backend 可以更加成熟和完善;同时,后续我们也会将线上的一些经验和结论,融入到资源检测和动作判断逻辑中,让这个模块更加成熟、稳定,真正在生产环境下可用、可靠,最大程度上替代现有的 HeapKeyedStateBackend 和 RocksDBKeyedStateBackend。