从一个诡异的 Bug 来看 Flink 快照和状态读取的流程

2021-09-29 20:52:15 浏览数 (1)

问题概要

流计算 Oceanus 平台支持以 SQL 的方式提交作业,独享集群支持最新的 Flink 1.10 提供的新版 Blink Planner 语法。有一位客户写了一段代码,用到了 SQL 的 TopN 功能,语句类似于:

代码语言:javascript复制
INSERT INTO `MySink` 
SELECT [column_list]
FROM (
   SELECT [column_list],
     ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]]
       ORDER BY col1 [asc|desc][, col2 [asc|desc]...]) AS rownum
   FROM table_name)
WHERE rownum <= N [AND conditions]

作业提交后,程序运行时一切正常;但是一旦把作业暂停(做快照),然后恢复时,就会持续报错:

代码语言:javascript复制
java.lang.RuntimeException: Error while getting state
        at org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:62)
        at org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:144)
        at org.apache.flink.table.runtime.operators.rank.RetractableTopNFunction.open(RetractableTopNFunction.java:115)
        at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
        at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
        at org.apache.flink.streaming.api.operators.KeyedProcessOperator.open(KeyedProcessOperator.java:57)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:990)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:453)
        at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:448)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:460)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533)
        at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.StateMigrationException: The new state serializer cannot be incompatible.
        at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:543)
        at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:491)
        at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:652)
        at org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47)
        at org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:72)
        at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:279)
        at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:328)
        at org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:124)
        at org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:60)
        ... 13 more

从报错信息可以看出,作业的崩溃是由于 State Serializer 不兼容导致的。那么问题来了,跑的好好的程序,用的都是官方提供的 API,没有任何自定义的代码,为什么会不兼容呢?

报错初探

既然看到了报错时的线程栈,定位问题就可以以此入手。我们首先看下报错的直接根源:updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:543)

updateRestoredStateMetaInfo 方法的兼容性判断逻辑updateRestoredStateMetaInfo 方法的兼容性判断逻辑

从代码里了解到,是 updateStateSerializer() 方法返回了“不兼容”的结果,导致报错的。我们继续按图索骥,追踪到了 StateSerializerProvider 类的 registerNewSerializerForRestoredState() 方法,并确认了不兼容的信号来源于 previousSerializerSnapshot 对象的 resolveSchemaCompatibility() 方法返回的结果。

registerNewSerializerForRestoredState 方法中兼容性的判断逻辑registerNewSerializerForRestoredState 方法中兼容性的判断逻辑

追踪到这里,我们发现 previousSerializerSnapshot 是一个接口的引用,下面有几十种不同实现。怎么办呢?我们在测试环境复现,并进行远程调试来查看运行时到底发生了什么。

运行时调试

Java 的远程调试方法很简单,只需要在 java 命令的启动参数上加入

代码语言:javascript复制
-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=[调试端口]

的参数即可。

对于 Flink 而言,可以修改 flink-conf.yaml 里面的 env.java.opts.taskmanager 和 env.java.opts.jobmanager 两个配置项,分别对应着 TaskManager 和 JobManager 的运行参数。

因为报错发生在任务运行时,我们首先对 TaskManager 进行调试。果不其然,作业快照并恢复后,又开始报错了。我们也通过对上述代码设置断点,找到了现场:

远程调试 查看 resolveSchemaCompatibility 的具体类远程调试 查看 resolveSchemaCompatibility 的具体类

原来是出问题的 previousSerializerSnapshot 对象的类型是 org.apache.flink.table.runtime.typeutils.SortedMapSerializerSnapshot 这个类。我们仔细阅读它的 resolveSchemaCompatibility() 方法,可以看到它的核心逻辑,是通过使用 equals 方法,对比它自己的 comparator 和 传入的 newSortedMapSerializer 的 comparator,如果 equals 方法返回 true 就是兼容,反之就不一定兼容(也可能通过 state migration 就兼容了,此文不涉及)。

初步找出问题原因初步找出问题原因

同时在运行时,我们看到这里的 Comparator 对于 TopN 查询来说,实际上是 RetractableTopNFunction$ComparatorWrapper 内部类。

远程调试找出 Comparator 的具体类远程调试找出 Comparator 的具体类

那么问题就清楚了,大概率是这个类的 equals 方法实现不规范(测试不充分),导致出现了此问题。

问题确认

我们看下 ComparatorWrapper 类的 equals 方法的实现:

ComparatorWrapper 类的 equals 方法的实现ComparatorWrapper 类的 equals 方法的实现

从代码里可以看到,它是对比本方法和传入对象的类名、生成的代码、References(附加参数)来判断两个 Comparator 是不是相等(兼容)。

但是,从调试中我们可以看到,类名、生成的代码其实有微小差别(后缀数字不一样)。

调试中发现的对比逻辑 Bug调试中发现的对比逻辑 Bug

这里的后缀其实没有特别含义,是 Flink 在生成 Java 代码时,为了避免类、变量冲突而维护的一个自增变量,只与生成顺序有关,与代码逻辑无关。

因此问题就很清楚了:Flink 在判断 TopN 状态的序列化器是否兼容的时候,采用了不合适的对比方法,造成逻辑相同但是生成顺序略有差异的两个 Comparator 被误判为不等(不兼容)。

那么问题解决方法很简单:将这两个逻辑相同的类实例判断为相同即可。

可是问题来了:这两个类实例各自是如何来的呢?又是做什么的呢?为什么需要判断它们两个是否相同呢?

Flink 快照分析

为了回答这些问题,我们先来梳理一下 Flink 快照的流程:

1. 算子收到 Checkpoint Barrier(非数据源算子从上游算子传递得到,数据源算子则是被 Checkpoint Coordinator 推送),处理并准备进行快照(org.apache.flink.streaming.runtime.io.CheckpointBarrierHandler#processBarrier 方法)。

2. 通过 org.apache.flink.streaming.runtime.io.CheckpointBarrierHandler#notifyCheckpoint 方法,对收到的 Barrier 进行提取元数据和统计,然后触发 Checkpoint 快照(org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable#triggerCheckpointOnBarrier 方法)。

3. 调用 org.apache.flink.streaming.runtime.tasks.StreamTask#performCheckpoint 方法,对该算子的状态进行完整快照。它还需要调用 checkpointState、checkpointStreamOperator、snapshotState、snapshot、doSnapshot、computeSnapshot 等方法,对状态进行快照。

4. 在 computeSnapshot 方法里,会对 namespaceSerializer 和 stateSerializer 调用 snapshotConfiguration 方法,对当前的配置做快照,如下图:

computeSnapshot 方法里保存 Serializer 的快照computeSnapshot 方法里保存 Serializer 的快照

5. 之后对于这个问题,它调用的是前面介绍过的 SortedMapSerializerSnapshot 类的 writeSnapshot 方法进行快照,本质是将 comparator 序列化并写入快照文件里。

序列化和反序列化 comparator序列化和反序列化 comparator

从 readSnapshot 方法也可以看到,前面介绍过的 SortedMapSerializerSnapshot 类的 comparator 对象就是反序列化状态文件得到的,而这个 comparator 对象就被用作前述 org.apache.flink.table.runtime.typeutils.SortedMapSerializerSnapshot#resolveSchemaCompatibility 方法的 equals 左边的 comparator(下图红框)。

问题根源基本确认问题根源基本确认

那前面的疑问就解决了一半。下面我们继续来查看右边那个 comparator 是怎么来的,又是做什么的。

SQL 作业提交流程

为了回答右边的 comparator 是怎么来的,我们需要看下客户端(Client)的 Flink SQL 作业的提交流程:

1. 当用户写了一个 “INSERT INTO” 语句后,需要调用 org.apache.flink.table.api.TableEnvironment#sqlUpdate 方法,令 Flink 将其解析为语法树(org.apache.flink.table.planner.StreamPlanner#translate)。

2. 在解析时,还会调用 ExecNode#translateToPlan、StreamExecCalc#translateToPlanInternal 等一系列方法,进一步解析语法树的每个节点,将其翻译成逻辑计划。

3. 在生成逻辑计划时,还涉及到一个名为“代码生成”的步骤,即将 SQL -> 语法树时,要把逻辑用 Java 代码表达出来,然后通过内置的 Janino 轻量级编译器,在内存中编译为 class 实例并序列化以作为计划的一部分。在代码生成过程中,类名和变量名都是自增生成的,这也是为什么之前我们截图里两个 comparator 的逻辑一样,类名和代码中的变量名类似但不一致的原因。

4. 对于这个场景,Comparator 的代码是 org.apache.flink.table.planner.codegen.sort.ComparatorCodeGenerator#gen 生成的,生成以后就会返回一个 GeneratedRecordComparator 对象。

5. GeneratedRecordComparator 对象会被传入 RetractableTopNFunction 的构造方法,并被前述的 ComparatorWrapper 进行包装,以支持被序列化。

RetractableTopNFunction 的构造方法里需要传入生成的 GeneratedRecordComparator 对象RetractableTopNFunction 的构造方法里需要传入生成的 GeneratedRecordComparator 对象

此时,Flink 作业运行图生成就告一段落。当运行图提交到 Flink 集群进行运行时,RetractableTopNFunction 类的 open 方法中会对状态进行初始化,其中 ValueStateDescriptor 就是访问状态的“钥匙”,它的构造需要传入上述生成的 GeneratedRecordComparator 对象:

RetractableTopNFunction 的运行时初始化方法里要用到生成的 ComparatorRetractableTopNFunction 的运行时初始化方法里要用到生成的 Comparator

报错回溯

让我们再返回文章开头的报错信息,一切明了:

1. 当恢复后的新 Flink 作业希望读取状态时,通过 getState 方法尝试从这个 ValueStateDescriptor 获取状态。

2. 首次访问时,由于这里用了延迟初始化(Lazy Initialization)机制,会检查这个 ValueStateDescriptor 里面封装的 comparator(新作业代码生成的)与快照恢复时里面记录的 comparator 是否兼容,如果兼容才会进行状态读取。

3. 但是很不巧,由于 equals 方法写的有问题,导致对比两个 comparator 时,因为生成的类名不一样,代码里变量也不一样,直接返回了 false,让 Flink 误认为不兼容,所以拒绝继续,作业报错。

解决思路

既然我们知道了问题的根本原因,可以从多方面入手解决这个问题:

1. 最简单的方法是通过正则的方式,把随机变量名去掉,然后对比类名和代码里剩下的字符串。但是这种方法不稳定,跨版本时如果 Generator 的代码格式稍有修改,就会出问题。

2. 更好的方法,是对比代码生成时,传入的元数据(即 org.apache.flink.table.planner.codegen.sort.ComparatorCodeGenerator#gen 方法的参数)。如果不需要考虑历史作业的兼容性,可以直接修改 GeneratedRecordComparator 类,加入这些元数据,并在后续的 equals 对比时直接对比这些元数据即可。

另外,这个问题已经反馈给社区并记录为 JIRA 单。由于社区代码提交需要做完整的考量,这里我们还在讨论阶段。初步想法是采用方法 2,但是对于无法处理的历史类,则回退到类似方法 1 的途径。

总而言之,这个问题解决起来不难,但是需要充分注意兼容性和正确性,以避免社区前段时间的一个小修改造成的不兼容的问题。

0 人点赞