问题概要
流计算 Oceanus 平台支持以 SQL 的方式提交作业,独享集群支持最新的 Flink 1.10 提供的新版 Blink Planner 语法。有一位客户写了一段代码,用到了 SQL 的 TopN 功能,语句类似于:
代码语言:javascript复制INSERT INTO `MySink`
SELECT [column_list]
FROM (
SELECT [column_list],
ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]]
ORDER BY col1 [asc|desc][, col2 [asc|desc]...]) AS rownum
FROM table_name)
WHERE rownum <= N [AND conditions]
作业提交后,程序运行时一切正常;但是一旦把作业暂停(做快照),然后恢复时,就会持续报错:
代码语言:javascript复制java.lang.RuntimeException: Error while getting state
at org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:62)
at org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:144)
at org.apache.flink.table.runtime.operators.rank.RetractableTopNFunction.open(RetractableTopNFunction.java:115)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
at org.apache.flink.streaming.api.operators.KeyedProcessOperator.open(KeyedProcessOperator.java:57)
at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:990)
at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:453)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:448)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:460)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.StateMigrationException: The new state serializer cannot be incompatible.
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:543)
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:491)
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:652)
at org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47)
at org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:72)
at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:279)
at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:328)
at org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:124)
at org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:60)
... 13 more
从报错信息可以看出,作业的崩溃是由于 State Serializer 不兼容导致的。那么问题来了,跑的好好的程序,用的都是官方提供的 API,没有任何自定义的代码,为什么会不兼容呢?
报错初探
既然看到了报错时的线程栈,定位问题就可以以此入手。我们首先看下报错的直接根源:updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:543)
从代码里了解到,是 updateStateSerializer() 方法返回了“不兼容”的结果,导致报错的。我们继续按图索骥,追踪到了 StateSerializerProvider 类的 registerNewSerializerForRestoredState() 方法,并确认了不兼容的信号来源于 previousSerializerSnapshot 对象的 resolveSchemaCompatibility() 方法返回的结果。
追踪到这里,我们发现 previousSerializerSnapshot 是一个接口的引用,下面有几十种不同实现。怎么办呢?我们在测试环境复现,并进行远程调试来查看运行时到底发生了什么。
运行时调试
Java 的远程调试方法很简单,只需要在 java 命令的启动参数上加入
代码语言:javascript复制-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=[调试端口]
的参数即可。
对于 Flink 而言,可以修改 flink-conf.yaml 里面的 env.java.opts.taskmanager 和 env.java.opts.jobmanager 两个配置项,分别对应着 TaskManager 和 JobManager 的运行参数。
因为报错发生在任务运行时,我们首先对 TaskManager 进行调试。果不其然,作业快照并恢复后,又开始报错了。我们也通过对上述代码设置断点,找到了现场:
原来是出问题的 previousSerializerSnapshot 对象的类型是 org.apache.flink.table.runtime.typeutils.SortedMapSerializerSnapshot 这个类。我们仔细阅读它的 resolveSchemaCompatibility() 方法,可以看到它的核心逻辑,是通过使用 equals 方法,对比它自己的 comparator 和 传入的 newSortedMapSerializer 的 comparator,如果 equals 方法返回 true 就是兼容,反之就不一定兼容(也可能通过 state migration 就兼容了,此文不涉及)。
同时在运行时,我们看到这里的 Comparator 对于 TopN 查询来说,实际上是 RetractableTopNFunction$ComparatorWrapper 内部类。
那么问题就清楚了,大概率是这个类的 equals 方法实现不规范(测试不充分),导致出现了此问题。
问题确认
我们看下 ComparatorWrapper 类的 equals 方法的实现:
从代码里可以看到,它是对比本方法和传入对象的类名、生成的代码、References(附加参数)来判断两个 Comparator 是不是相等(兼容)。
但是,从调试中我们可以看到,类名、生成的代码其实有微小差别(后缀数字不一样)。
这里的后缀其实没有特别含义,是 Flink 在生成 Java 代码时,为了避免类、变量冲突而维护的一个自增变量,只与生成顺序有关,与代码逻辑无关。
因此问题就很清楚了:Flink 在判断 TopN 状态的序列化器是否兼容的时候,采用了不合适的对比方法,造成逻辑相同但是生成顺序略有差异的两个 Comparator 被误判为不等(不兼容)。
那么问题解决方法很简单:将这两个逻辑相同的类实例判断为相同即可。
可是问题来了:这两个类实例各自是如何来的呢?又是做什么的呢?为什么需要判断它们两个是否相同呢?
Flink 快照分析
为了回答这些问题,我们先来梳理一下 Flink 快照的流程:
1. 算子收到 Checkpoint Barrier(非数据源算子从上游算子传递得到,数据源算子则是被 Checkpoint Coordinator 推送),处理并准备进行快照(org.apache.flink.streaming.runtime.io.CheckpointBarrierHandler#processBarrier 方法)。
2. 通过 org.apache.flink.streaming.runtime.io.CheckpointBarrierHandler#notifyCheckpoint 方法,对收到的 Barrier 进行提取元数据和统计,然后触发 Checkpoint 快照(org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable#triggerCheckpointOnBarrier 方法)。
3. 调用 org.apache.flink.streaming.runtime.tasks.StreamTask#performCheckpoint 方法,对该算子的状态进行完整快照。它还需要调用 checkpointState、checkpointStreamOperator、snapshotState、snapshot、doSnapshot、computeSnapshot 等方法,对状态进行快照。
4. 在 computeSnapshot 方法里,会对 namespaceSerializer 和 stateSerializer 调用 snapshotConfiguration 方法,对当前的配置做快照,如下图:
5. 之后对于这个问题,它调用的是前面介绍过的 SortedMapSerializerSnapshot 类的 writeSnapshot 方法进行快照,本质是将 comparator 序列化并写入快照文件里。
从 readSnapshot 方法也可以看到,前面介绍过的 SortedMapSerializerSnapshot 类的 comparator 对象就是反序列化状态文件得到的,而这个 comparator 对象就被用作前述 org.apache.flink.table.runtime.typeutils.SortedMapSerializerSnapshot#resolveSchemaCompatibility 方法的 equals 左边的 comparator(下图红框)。
那前面的疑问就解决了一半。下面我们继续来查看右边那个 comparator 是怎么来的,又是做什么的。
SQL 作业提交流程
为了回答右边的 comparator 是怎么来的,我们需要看下客户端(Client)的 Flink SQL 作业的提交流程:
1. 当用户写了一个 “INSERT INTO” 语句后,需要调用 org.apache.flink.table.api.TableEnvironment#sqlUpdate 方法,令 Flink 将其解析为语法树(org.apache.flink.table.planner.StreamPlanner#translate)。
2. 在解析时,还会调用 ExecNode#translateToPlan、StreamExecCalc#translateToPlanInternal 等一系列方法,进一步解析语法树的每个节点,将其翻译成逻辑计划。
3. 在生成逻辑计划时,还涉及到一个名为“代码生成”的步骤,即将 SQL -> 语法树时,要把逻辑用 Java 代码表达出来,然后通过内置的 Janino 轻量级编译器,在内存中编译为 class 实例并序列化以作为计划的一部分。在代码生成过程中,类名和变量名都是自增生成的,这也是为什么之前我们截图里两个 comparator 的逻辑一样,类名和代码中的变量名类似但不一致的原因。
4. 对于这个场景,Comparator 的代码是 org.apache.flink.table.planner.codegen.sort.ComparatorCodeGenerator#gen 生成的,生成以后就会返回一个 GeneratedRecordComparator 对象。
5. GeneratedRecordComparator 对象会被传入 RetractableTopNFunction 的构造方法,并被前述的 ComparatorWrapper 进行包装,以支持被序列化。
此时,Flink 作业运行图生成就告一段落。当运行图提交到 Flink 集群进行运行时,RetractableTopNFunction 类的 open 方法中会对状态进行初始化,其中 ValueStateDescriptor 就是访问状态的“钥匙”,它的构造需要传入上述生成的 GeneratedRecordComparator 对象:
报错回溯
让我们再返回文章开头的报错信息,一切明了:
1. 当恢复后的新 Flink 作业希望读取状态时,通过 getState 方法尝试从这个 ValueStateDescriptor 获取状态。
2. 首次访问时,由于这里用了延迟初始化(Lazy Initialization)机制,会检查这个 ValueStateDescriptor 里面封装的 comparator(新作业代码生成的)与快照恢复时里面记录的 comparator 是否兼容,如果兼容才会进行状态读取。
3. 但是很不巧,由于 equals 方法写的有问题,导致对比两个 comparator 时,因为生成的类名不一样,代码里变量也不一样,直接返回了 false,让 Flink 误认为不兼容,所以拒绝继续,作业报错。
解决思路
既然我们知道了问题的根本原因,可以从多方面入手解决这个问题:
1. 最简单的方法是通过正则的方式,把随机变量名去掉,然后对比类名和代码里剩下的字符串。但是这种方法不稳定,跨版本时如果 Generator 的代码格式稍有修改,就会出问题。
2. 更好的方法,是对比代码生成时,传入的元数据(即 org.apache.flink.table.planner.codegen.sort.ComparatorCodeGenerator#gen 方法的参数)。如果不需要考虑历史作业的兼容性,可以直接修改 GeneratedRecordComparator 类,加入这些元数据,并在后续的 equals 对比时直接对比这些元数据即可。
另外,这个问题已经反馈给社区并记录为 JIRA 单。由于社区代码提交需要做完整的考量,这里我们还在讨论阶段。初步想法是采用方法 2,但是对于无法处理的历史类,则回退到类似方法 1 的途径。
总而言之,这个问题解决起来不难,但是需要充分注意兼容性和正确性,以避免社区前段时间的一个小修改造成的不兼容的问题。