Flink SQL 算子生成固定 ID 的方法总结

2023-06-14 17:48:28 浏览数 (2)

背景知识

Flink 的作业的算子拓扑结构,由一系列算子组成的运行图来描述,如下图所示:

含有多个算子的 Flink 运行图含有多个算子的 Flink 运行图

运行图中的每个节点有自己的 ID,也可以有自己的状态(State)。当 Flink 做快照时,会保存算子 ID 和状态的对应关系。因此,我们从快照恢复作业时,如果每个算子 ID 都和之前的算子一一对应,就可以精确还原之前快照时的运行状态。

如果用户没有显式指定算子的 ID,Flink 会根据拓扑结构,自动为算子生成自己的 ID

问题描述

我们通过 SQL 或者 Table API 的方式来编写 Flink 作业时,由于需要经过 Calcite 翻译、优化才可以得到最终的 Flink 算子,用户侧很难直接干预算子的生成逻辑。

例如,用户稍微修改了一下 SQL 代码,或者升级了 Flink 版本,都可能导致运行图发生变化,自动生成的算子 ID 不再与之前的保持一致,从而造成快照无法恢复的后果。

那么问题来了:如何能够固定算子的 ID,即后续无论做了什么修改,只要这个算子还是他自己,那么它的 ID 永远都不变呢?

原理介绍

在 DataStream API 编程模式下,Flink 确实提供了固定算子 ID 的方式:我们可以通过 uid() 方法,显式为算子设置一个字符串 ID,随后 Flink 就会把这个 uid 进行 hash 处理,最终映射为唯一的算子 ID。

例如我们可以在 Flink 的测试代码里找到如下的例子:

代码语言:javascript复制
env.addSource(new StatefulSource(false, finalCheckpointLatch))
        .uid(SOURCE_UID)
        .setParallelism(NUM_SOURCES)
        .sinkTo(sink)
        .setParallelism(NUM_SINKS)
        .uid(SINK_UID);

在这个示例中,用户为 Source 和 Sink 算子显式声明了 uid。这样后续无论再在中间加入多少其他算子,都不影响 Source 和 Sink 的状态匹配。

这个 uid() 方法底层是调用 Transformation#setUid() 方法来设置 uid 的,因此这里的突破口就是:如何找到 Flink SQL 生成的 Transformation 对象,并为它设置唯一的 uid。

我们已经可以从很多文章中了解到 Flink 的 SQL 代码转换为 Transformation 的步骤,例如这篇文章,因此本文不再复述其中的细节。

简而言之,Flink 的 SQL 作业要经过 SQL 代码 → SqlNode AST 语法树 → Operation 抽象层 → RelNode 逻辑树 → RelNode 物理树 → ExecNodeGraph 执行图 → Transformations → StreamGraph → JobGraph → ExecutionGraph,才能最终提交执行。

固定 UID 方法一: 链路追溯

既然我们知道只要给 Transformation 设置 uid 即可保证后续的算子 ID 固定化,那可以反向思考:只要在它的前体 ExecNode 中保存 uid,那么在 ExecNodeBase#translateToPlanInternal 方法里,我们就可以根据这个保存的 uid 来设置 Transformation 的 uid。

例如我们新建一个 StreamExecCalc 的子类,名为 EnhancedStreamExecCalc,覆盖 translateToPlanInternal 方法:

代码语言:javascript复制
@Override
protected Transformation<RowData> translateToPlanInternal(PlannerBase planner) {
    Transformation<RowData> transformation = super.translateToPlanInternal(planner);
    transformation.setUid(uid);
    return transformation;
}

这样生成的 Transformation 就会一路把我们的 uid 传递下去。

按照这个思路,我们继续反向追溯,在 ExecNode 的前体,即 StreamPhysicalRel 中,也加入 uid 字段。这样它生成的 ExecNode 就可以带上 uid 信息。

例如,我们可以继续新建一个 StreamPhysicalCalc 的子类,覆盖 translateToExecNode 方法,在这里生成刚刚提到的有 uid 的 EnhancedStreamExecCalc 对象。

随后,我们新增 Planner Rule,根据算子的特征(例如名字、参数),编写匹配规则,将 RelNode 替换成我们增强后的版本。

总结:这个方法对每个类型的都需要做定制,较为繁琐,适合逻辑较为特化的场景。

固定 UID 方法二: 官方增强

由于上述提到的“单点式”增强方式通用性不够,Flink 社区在 FLIP-190: Support Version Upgrades for Table API & SQL Programs 提案里,对这类因 ID 变化而导致不兼容的问题有了一个系统化的方案(目前还不成熟)。

该提案的核心技术点仍然是根据一定规则来生成 Transformation 的 uid,从而保证运行图算子 ID 的固定化。这里新增了一个 TransformationMetadata 类,用于在刚刚提到的 translateToPlanInternal 方法里记录某个算子的名称、uid、描述等元数据。

主要思路是通过增加 COMPILE PLAN 语句,把给定的 SQL 查询逻辑变成一个 JSON 描述 Plan 文件(见 示例文件),随后用户可以通过 EXECUTE PLAN 语句,执行这个 JSON 格式的 Plan 文件。只要 Plan 文件的格式是兼容的,算子 ID 的生成规则固定,就可以保障最后的运行图算子的逻辑和 ID 的稳定性。

用户只需要在 Flink 参数里设置 table.exec.uid.generationPLAN_ONLY(默认值),即可开启该功能。对于所有用 COMPILE PLAN 语句包含的逻辑,Flink 都会为每个算子根据规则(由 table.exec.uid.format 参数控制)生成唯一的 ID。

总结

本文讲解了 Flink 算子 ID 的用途、生成逻辑,以及不匹配的后果,并从流程上分析了如何显式给 SQL 语句生成的各项结构设置固定的 uid,随后还介绍了 Flink 社区对此问题的应对思路,希望能给大家带来一些启发。

0 人点赞