问题背景
近期接到客户反馈,某地域的作业不定期的出现 JobManager 崩溃重启的问题。具体现象如下:
JobManager 在正常运行中,没有任何预兆地,突然报too old resource version
错误,紧接着容器就自动退出了:
2020-10-17 14:51:36.289 [Checkpoint Timer] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Triggering checkpoint 37 (type=CHECKPOINT) @ 1602917496275 for job 7848c696590291978aa8279b3c09e5d7.
2020-10-17 14:51:36.442 [jobmanager-future-thread-1] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Completed checkpoint 37 for job 7848c696590291978aa8279b3c09e5d7 (6958828 bytes in 167 ms).
2020-10-17 14:56:36.287 [Checkpoint Timer] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Triggering checkpoint 38 (type=CHECKPOINT) @ 1602917796275 for job 7848c696590291978aa8279b3c09e5d7.
2020-10-17 14:56:36.434 [jobmanager-future-thread-1] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Completed checkpoint 38 for job 7848c696590291978aa8279b3c09e5d7 (6943747 bytes in 159 ms).
2020-10-17 15:01:36.286 [Checkpoint Timer] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Triggering checkpoint 39 (type=CHECKPOINT) @ 1602918096275 for job 7848c696590291978aa8279b3c09e5d7.
2020-10-17 15:01:36.461 [jobmanager-future-thread-1] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Completed checkpoint 39 for job 7848c696590291978aa8279b3c09e5d7 (6929939 bytes in 185 ms).
2020-10-17 15:06:36.289 [Checkpoint Timer] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Triggering checkpoint 40 (type=CHECKPOINT) @ 1602918396275 for job 7848c696590291978aa8279b3c09e5d7.
2020-10-17 15:06:36.434 [jobmanager-future-thread-1] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Completed checkpoint 40 for job 7848c696590291978aa8279b3c09e5d7 (6916258 bytes in 159 ms).
2020-10-17 15:11:36.290 [Checkpoint Timer] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Triggering checkpoint 41 (type=CHECKPOINT) @ 1602918696275 for job 7848c696590291978aa8279b3c09e5d7.
2020-10-17 15:11:36.434 [jobmanager-future-thread-1] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Completed checkpoint 41 for job 7848c696590291978aa8279b3c09e5d7 (6960383 bytes in 159 ms).
2020-10-17 15:16:36.288 [Checkpoint Timer] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Triggering checkpoint 42 (type=CHECKPOINT) @ 1602918996275 for job 7848c696590291978aa8279b3c09e5d7.
2020-10-17 15:16:36.469 [jobmanager-future-thread-1] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Completed checkpoint 42 for job 7848c696590291978aa8279b3c09e5d7 (6969143 bytes in 194 ms).
2020-10-17 15:21:36.290 [Checkpoint Timer] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Triggering checkpoint 43 (type=CHECKPOINT) @ 1602919296275 for job 7848c696590291978aa8279b3c09e5d7.
2020-10-17 15:21:36.438 [jobmanager-future-thread-1] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Completed checkpoint 43 for job 7848c696590291978aa8279b3c09e5d7 (6982908 bytes in 162 ms).
2020-10-17 15:25:20.649 [OkHttp https://9.166.252.1/...] ERROR org.apache.flink.kubernetes.KubernetesResourceManager [] - Fatal error occurred in ResourceManager.
io.fabric8.kubernetes.client.KubernetesClientException: too old resource version: 5906722735 (5907278804)
at io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager$1.onMessage(WatchConnectionManager.java:259) [flink-dist_2.11-1.11.0.jar:1.11.0]
at org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.RealWebSocket.onReadMessage(RealWebSocket.java:323) [flink-dist_2.11-1.11.0.jar:1.11.0]
at org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.WebSocketReader.readMessageFrame(WebSocketReader.java:219) [flink-dist_2.11-1.11.0.jar:1.11.0]
at org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.WebSocketReader.processNextFrame(WebSocketReader.java:105) [flink-dist_2.11-1.11.0.jar:1.11.0]
at org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.RealWebSocket.loopReader(RealWebSocket.java:274) [flink-dist_2.11-1.11.0.jar:1.11.0]
at org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.RealWebSocket$2.onResponse(RealWebSocket.java:214) [flink-dist_2.11-1.11.0.jar:1.11.0]
at org.apache.flink.kubernetes.shaded.okhttp3.RealCall$AsyncCall.execute(RealCall.java:206) [flink-dist_2.11-1.11.0.jar:1.11.0]
at org.apache.flink.kubernetes.shaded.okhttp3.internal.NamedRunnable.run(NamedRunnable.java:32) [flink-dist_2.11-1.11.0.jar:1.11.0]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_232]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_232]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_232]
2020-10-17 15:25:20.650 [OkHttp https://9.166.252.1/...] ERROR org.apache.flink.runtime.entrypoint.ClusterEntrypoint [] - Fatal error occurred in the cluster entrypoint.
io.fabric8.kubernetes.client.KubernetesClientException: too old resource version: 5906722735 (5907278804)
at io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager$1.onMessage(WatchConnectionManager.java:259) [flink-dist_2.11-1.11.0.jar:1.11.0]
at org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.RealWebSocket.onReadMessage(RealWebSocket.java:323) [flink-dist_2.11-1.11.0.jar:1.11.0]
at org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.WebSocketReader.readMessageFrame(WebSocketReader.java:219) [flink-dist_2.11-1.11.0.jar:1.11.0]
at org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.WebSocketReader.processNextFrame(WebSocketReader.java:105) [flink-dist_2.11-1.11.0.jar:1.11.0]
at org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.RealWebSocket.loopReader(RealWebSocket.java:274) [flink-dist_2.11-1.11.0.jar:1.11.0]
at org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.RealWebSocket$2.onResponse(RealWebSocket.java:214) [flink-dist_2.11-1.11.0.jar:1.11.0]
at org.apache.flink.kubernetes.shaded.okhttp3.RealCall$AsyncCall.execute(RealCall.java:206) [flink-dist_2.11-1.11.0.jar:1.11.0]
at org.apache.flink.kubernetes.shaded.okhttp3.internal.NamedRunnable.run(NamedRunnable.java:32) [flink-dist_2.11-1.11.0.jar:1.11.0]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_232]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_232]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_232]
2020-10-17 15:25:20.661 [BlobServer shutdown hook] INFO org.apache.flink.runtime.blob.BlobServer [] - Stopped BLOB server at 0.0.0.0:6124
而查看所有 TaskManager 的日志,也没有发现任何异样。
该问题会触发 ResourceManager 对 JobManager 的重新初始化过程,作业也会从最近的一次 Checkpoint 恢复。但是如果没有配置 HA(High Availability,高可用)时,Flink 就无法正常恢复作业,造成运行中关键状态的丢失,这对线上业务是无法接受的。
原因排查
由于日志中关键的内容不多,我们决定先从 too old resource version 这个 Kubernetes 客户端的报错入手,分析异常发生的原因。
由于 Flink 的 Kubernetes 客户端使用 Fabric8,我们查到了其团队成员针对此问题的回复,简单概括如下:
每个 Kubernetes 资源都有自己的版本号,当客户端对 Pods 进行 watch(监听)操作时,有概率会出现
410 Gone
的 HTTP 状态码。这个状态表代表客户端记录的资源版本号(resourceVersion)太低,服务端不再接受它的请求。此时对于普通 watch 而言,需要自行处理该场景,客户端并没有对此做处理。
而 Flink 并没有妥善处理这种场景,而是粗暴地令 JobManager 关闭(随后会重新启动一个新的实例)来应对任何 KubernetesClientException 异常(详见 FLINK-15836)。
对于这个设计而言,固然有其合理成分,即遇到异常时 Fail Fast(尽快暴露问题),不至于小问题越拖越大。
但是我们认为,对于这种资源版本不够新的问题,并不属于故障,因此也不需要重启 JobManager 这么重的操作,只需要重新初始化一次 watcher,令其资源版本更新到最新即可。
毕竟,这种可恢复的异常,可能会在一个长期运行作业的运行周期内多次出现,平台方需要考虑到细粒度的容错,令客户的作业能够长期平稳运行。
不过至此我们仍然好奇,究竟是什么原因造成的客户端与服务端的资源版本不一致呢?
触发条件
对于这个 Too old resource version(HTTP Gone 410)的问题,我们已经知道了它的含义和潜在的应对策略,但是仍然需要找到触发条件以便对修复方案进行验证。对此我们尝试了不少方案,例如主动令 JobManager 的 JVM 较长时间停顿等等,但是难以触发同样的现象。
后来我们偶然间发现,重启 API Server 服务可以复现该问题,因为新启动的 API Server 会从 etcd 中获取当前最新 resourceVersion,如果客户端后续用保存的旧值请求的话,该现象就可以得到稳定复现,这给我们的修复和验证工作提供了极大的便利。
解决方案
有了上面的分析,可以在上述提到的onClose
方法中,对异常cause
的类型进行判定:如果它的错误码code
等于 410(HTTP Gone),则令 KubernetesResourceManager 走重新初始化 watch 的流程:
/**
* When “Fatal error occurred in ResourceManager.
* io.fabric8.kubernetes.client.KubernetesClientException:
* too old resource version”
* happens, re-watch the pods
*/
public void reWatchPods() {
podsWatch = kubeClient.watchPodsAndDoCallback(
KubernetesUtils.getTaskManagerLabels(clusterId),
this);
}
这个方法会令 Flink 的kubeClient
从服务器中获取当前最新的 pods 信息,然后重新注册 watch 回调,开启新的一轮监听。
容错效果
应用了上述修复方案后,重新制作 Flink 镜像并进行验证,可以看到 Too old resource version 问题得到复现,异常也被捕获并重新进行了 pod watcher 的初始化,JobManager 正常运行,没有发生崩溃等现象:
同时对该作业进行多次重启 API Server 操作,均可正常应对,Checkpoint 和 Savepoint 也可以继续进行。我们还在作业运行期间模拟单个和多个 TaskManager Pod 崩溃的场景,也可以正常地重新分配新的 Pod 并自动恢复作业,说明 Kubernetes Client 与服务端的后续通信都是正常的。
思考总结
回顾该问题的定位过程,其实不算特别顺利,大多数时间用在了思考和尝试如何复现问题方面。找到了复现方案,就可以把一个偶现的问题变成必现的问题,从而可以细致地从整个调用链分析问题的成因、后果、解决方案等方方面面。
另外此问题也显示了 Flink 的 Kubernetes 模块远非完美,仍然需要大家积极的发现、定位并解决各种运行时问题,为社区的发展贡献自己的力量。