修复 Flink Kubernetes 资源分配慢 兼谈如何贡献开源社区

2021-09-29 20:45:23 浏览数 (1)

问题现象

近期我们发现 Kubernetes 环境下的 Flink 集群有个奇怪的现象:在算子并行度较大(例如超过 50)时,Flink 的 TaskManager 注册异常缓慢(具体表现为 TaskManager 容器注册后过段时间就超时退出了,随后反复循环,导致作业迟迟分配不到所需的资源),且 Web UI 长期处于如下的加载界面,无法正常显示作业列表:

Flink UI 不响应Flink UI 不响应

通过查看 JobManager 的日志,发现有大量的 DNS 反向解析报错信息:

No hostname could be resolved for the IP address xxx.xxx.xxx.xxx, using IP address as host name. Local input split assignment (such as for HDFS files) may be impacted.

而 TaskManager 的日志则没有异常,均为向 ResourceManager 注册成功,但是向新作业的 JobManager 注册时发生超时造成的被迫退出,日志日下:

2020-10-11 21:21:40.346 [flink-akka.actor.default-dispatcher-3] DEBUG org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl - Free slot TaskSlot(index:0, state:ALLOCATED, resource profile: ResourceProfile{cpuCore-=1.0000000000000000, taskHeapMemory=1.425gb (1530082070 bytes), taskOffHeapMemory=0 bytes, managedMemory=1.340gb (1438814063 bytes), networkMemory=343.040mb (359703515 bytes)}, allocationId: 60d5277e138a94fb73fc6691557001e0, jobId: 493cd86e389ccc8f2887e1222903b5ce).

java.lang.Exception: The slot 60d5277e138a94fb73fc6691557001e0 has timed out.

此时我们感觉非常奇怪,为什么 TaskManager 向 ResourceManager 注册是正常的,但是却向 JobManager 注册超时呢?这些报错消息是否代表系统工作不正常呢?

我们对 DNS 地址反向解析做了验证,发现平均解析时间并不长,那是不是说明问题不一定在这里呢?

分析定位

Flink 作业在 Kubernetes 环境下的提交流程

首先我们来看一下 Flink 原生 Kubernetes 模块的架构图,其中我们关心的是 K8s Deployment(代表 Flink 的 ResourceManager 和 JobManager / JobMaster),以及 K8s Pod(代表 Flink 的 TaskManager),以及两者的交互过程。

Flink 原生 Kubernetes 架构图(来自 Flink 官网文档)Flink 原生 Kubernetes 架构图(来自 Flink 官网文档)

我们可以根据上图,复现一下本作业的提交过程:

  1. Flink-Client 模块负责用户作业的提交,它内嵌了 Fabric8 的 Kubernetes Client SDK,向 Kubernetes 的 API Server 发起 Flink 作业 Deployment 的创建请求和必要参数。
  2. Kubernetes 的控制平面会根据请求参数,筛选合适的节点进行资源分配和初始化操作,并生成配置(ConfigMap)。
  3. Kubernetes Deployment 创建好后,包含了 FLIP-6 新架构里指定的多项组件,例如 Dispatcher、ResourceManager、JobMaster(也叫做 JobManager)等。
  4. 当作业提交后,根据作业的资源信息,ResourceManager 会申请 Pod 来部署 TaskManager,每个 TaskManager 的资源信息用 WorkerResourceSpec 类描述,包含了 CPU 核数、堆内存大小、堆外内存大小等信息,并配置监听器(Listener),这样新的 TaskManager Pod 从出生到就绪的全流程都可以被它掌握。
  5. 当 TaskManager Pod 就绪后,会自动向 ResourceManager 注册并周期的发送 Slot Report,TaskManager 的 Slot 资源信息会放入 SlotManager 中保存。
  6. ResourceManager 开始向各个 TaskManager 询问资源(Slot Request),每个 TaskManager 会一直尝试向给定的 JobManager 注册自己。注册重试逻辑采用倍增的方式(因为 JobManager 的注册线程是异步运行的,不一定能实时响应),例如第一次 100ms,第二次 200ms,第三次 400ms,第四次 800ms 等,直到达到最大的超时阈值,此时注册失败,就出现了上面所说的The slot has timed out报错日志。
  7. 多次注册失败后,TaskManager 的 slot 会被释放。如果长期得不到作业分配,TaskManager 自己也会退出。
  8. 正常情况下,当 TaskManager 向 JobManager 注册成功后,TaskManager 才会进入后续的 Offer slot、Activate slot 并开始接收具体的作业逻辑,而我们的作业并没有到这里。

原因推测

根据上述分析,我们可以看到,问题的直接原因是 TaskManager 出生后迟迟无法向 JobManager 注册自己(一直超时),那么问题有两种可能:

  1. TaskManager 的问题,在上述注册过程中卡住了(由于 GC 或者网络原因等)。
  2. JobManager 的问题,在上述注册过程中卡住了(同样由于 GC 或者网络、调度原因)。

但通过查看 TaskManager 和 JobManager 的 GC 日志,并未见到异常的 STW 停顿,基本可以排除是 GC 原因导致的。而通过查看网络的 TCP 连接信息,发现连接数正常,并未出现大量异常连接,且网络带宽充足,丢包率可以忽略不计。磁盘访问也很少,因此也排除网络和磁盘原因。

对症治疗(临时方案)

通过把 Flink 的日志级别调整为 DEBUG,可以发现 TaskManager 向 JobManager 发送了 10 次注册请求,但是 JobManager 一直没有回应,尤其是第 10 次,超时时间为 52s 都没有注册上去,可见问题之严重:

TaskManager 注册失败的日志TaskManager 注册失败的日志

由于报错是因为超时导致的,为了缓解这个现象,让作业先跑起来,我们首先调大了各个超时相关的参数:

代码语言:javascript复制
# 避免"Could not allocate the required slot within slot request timeout. Please make sure that the cluster has enough resources. Stopping the JobMaster for job"
slot.request.timeout: 500000 

# 增加单次尝试的最大超时时间
cluster.registration.max-timeout: 300000

# 避免 "free slot (TaskSlot)"
akka.ask.timeout: 10 min

# 避免 "Heartbeat of TaskManager timed out."
heartbeat.timeout: 500000

加上这些参数后,TaskManager 终于可以注册成功了,但是整个初始化过程耗时 10 分钟以上,而且没有找到根因,只能算是权益之举,还需更细致的分析。

日志分析

通过仔细分析上述 DEBUG 级别的日志,我们可以进一步缩小范围到 TaskManager 向 JobManager 注册后的一小段时间。由于 Flink DEBUG 级别日志仍然无法显示出这段时间内 Flink 做了哪些事情,于是我们在关键的代码片段插入了一些日志标记,以进一步缩小范围。

经过逐步调整,终于确认了问题元凶就是 DNS 反向解析造成的 Akka Dispatcher 阻塞:

DNS 反向解析有概率卡顿几秒 造成 Dispatcher 阻塞DNS 反向解析有概率卡顿几秒 造成 Dispatcher 阻塞

而元凶的代码就是 TaskManagerLocation 类的构造方法中的一小段代码:

代码语言:javascript复制
inetAddress.getCanonicalHostName()

这个方法会查询 inetAddress IP 地址的主机名,如果 DNS 反应缓慢,就会卡在这里。把这段代码注释掉后,作业提交立刻变得正常了。

后来我们回顾了一下,之前对 IP 做反向解析时,只做了手动的几个 IP 的查询,发现响应速度很快,因此当时没有确认是这里的问题。

后面我们通过短时间快速查询多个 IP 的主机名时,确认 DNS 反应速度会变的异常缓慢(后续了解到是云 DNS 做了反查频率限制导致的),而正是服务器迟迟不返回造成 Flink Akka Dispatcher 处理流程阻塞,异步部分迟迟得不到执行,TaskManager 与 JobManager 之间的一问一答变成了只问不答(消息超过超时时间被丢弃)。

Profiler 画图

其实对于这个问题,我们也使用了 JFR 和 JProfiler 等工具对测试环境的 JVM 进行采样并绘制火焰图,但是效果不理想,看不出哪些方法被阻塞(异步流程的定位一直是相对困难的),因此从图中也没有明确找到问题原因:

社区版 JDK 火焰图,看不出重点社区版 JDK 火焰图,看不出重点

后来我们使用腾讯自研的 KonaJDK 里提供的优化过的 Flight Recorder 并配合图形化工具(目前小工具暂未对外开放,尽请期待)再次绘制火焰图时,可以很明显的发现 java.net.InetAddress#getCanonicalHostname 方法耗时过长:

KonaJDK 记录并绘制的火焰图,可以明显看出耗时过长的方法(元凶)KonaJDK 记录并绘制的火焰图,可以明显看出耗时过长的方法(元凶)

通过与 KonaJDK 的专家们讨论,确认了社区版 JDK 8 的 JFR 仍然不成熟,而腾讯自研的 KonaJDK 在这方面明显更有优势。目前也在开源评估过程中,相信后续大家可以用到此工具来加速问题定位和性能调优。

根因解决

既然我们找到问题的原因是 DNS 反向解析在高并发的情况下较为缓慢,我们又进行了如下的思考:

  • 没必要在 Kubernetes 环境下做 DNS 反向解析,因为对于 Pod 而言,如果没有暴露为 Service,那么反向解析其 IP 是永远失败的。
  • 即使需要做反向解析,也没必要再构造方法里面做,可以改为用到时再获取(即 Lazy Initialization)。

针对上述的思考结论,以及与社区讨论后,形成了下面的修复方案:

  1. 提供一个参数(jobmanager.retrieve-taskmanager-hostname),允许用户彻底关闭这里的 DNS 反向解析功能。这对 Kubernetes 环境是很有必要的。
  2. 把 DNS 反向解析功能下放到 getter 方法中,在首次访问时进行主机名获取和保存。

经过验证,两种方法均可解决本文提到的资源分配缓慢的问题。

社区贡献

之前其他用户在邮件组中也反馈过同样的问题,只是没有得到解决,我们认为这个问题不止我们遇到了。另外 Kubernetes 等环境下,做 IP 地址的反向解析纯属多余,因此提供一个关闭选项也是一个加速部署的优化点,因此我们决定把改进回馈给社区。

点此查看 Flink 文档中关于如何参与贡献的说明。

邮件讨论

当遇到疑难问题时,建议订阅并向 Flink 的 User 组发邮件进行咨询。后来我们找到问题根源后,社区的 Till 也建议我来进行问题的修复。为了反馈问题,发现者可以在 Flink 的 JIRA 上提个单,提单前需要先注册账号。

JIRA 单

我们提了一个 FLINK-19677 的 JIRA 单,然后 at 任意 Flink Committer 进行分配,只有分配给自己后才可以后续进行代码提交

另外需要将 Flink 的 GitHub 仓库 Fork 一份到自己名下,然后新建一个分支进行开发。

代码检查

当写好代码后,还需要准备测试用例,以确保代码能够得到妥善验证。

另外还需要注意编码风格需要符合代码规范,如果涉及到新功能或者重大变更,还需要编写或更新相关文档。

最后还要运行mvn verify以及 CI 以确保代码可以完整构建。

Pull Request 提交

当上述流程准备妥当后,就可以在 Flink GitHub 仓库中新建一个 Pull Request 了。选择自己的开发分支,与 Flink 的当前分支进行对比,然后提交一个 PR。注意请务必按照模板里的 Checklist 做逐项检查和填写,否则会影响 Review。

Review 和修改

当一个 Pull Request 提交后,Bot 会介入进行自动化构建,并随后更新结果。如果构建失败,则需要仔细检查是不是代码风格未通过校验(例如 JavaDoc 编写不规范,每句话后没有加句号、有未使用的 import、换行不规范等问题),或者文档未更新(文档编辑后需要进入flink-docs模块按 README 重新生成)等问题。

随后可以邀请相关的 Flink Committer 进行 Review。如果不了解的话,可以从 Flink 源码、邮件组、其他相关的 Pull Request 里查看哪位 Committer 出现次数最多,那通常表明他最熟悉这个模块。

为了通过评审,需要至少两位 Committer 进行 Review 并给出同意的意见。

代码合并

当代码修改妥当,所有 Reviewer 都同意后,代码即可合并到 Flink 中,您也会成为一名 Flink 的贡献者。其实,不只是代码修改,文档、Wiki 以及参与邮件组讨论等,都是贡献的方式之一。

总结回顾

其实回顾来看,问题的定位并不复杂,关键在于对 Flink 的资源分配和提交过程要有足够的熟悉度,另外日志和适当的工具也可以大大加速我们的定位过程。当发现解决方案并验证通过后,可以尝试将改进反馈给社区,从而帮助更多的小伙伴,也可以避免后续 Flink 版本升级后需要再次修复的重复劳动,一举多得。

0 人点赞