Flink REST API 介绍
Flink REST API 是 JobManager 提供的 HTTP 接口,用户可以通过 GET、POST 等 REST 定义的方法,请求获取作业、JobManager、TaskManager 的运行状态、监控信息、各项配置等等。
作为平台方,我们会给 Flink 增加各项新功能,例如提交 SQL 代码、动态调整作业配置、实时开启或关闭某些特性、下发调试指令等等,都可以通过扩展 REST API 来实现。
但是,由于这套系统的调用是阻塞性的,如果某个 API 长期不响应,就会持续阻塞调用方,甚至会造成 JobManager 长期卡顿,严重影响其他接口的正常请求。
因此,我们在新增接口时,一定要遵循一定的法则,以确保整体的可用和可靠性。
非阻塞的 Flink REST API 设计要点
关于拓展 Flink REST API 的方法,我们可以在 Flink 官网文档、各类技术社区文章中得到详细的指引,因而这里不再赘述基础的细节,而是更侧重于讲解遇到的一些常见的问题和解决方案。
从设计流程上来看,如文章所述,我们可以先定义这个接口所需的请求体结构(RequestBody)、返回体结构(ResponseBody) 、参数列表(MessageParameters),随后实现一个 Handler(AbstractRestHandler),即可在 flink-runtime 模块的 WebMonitorEndpoint 类中,注册这个新的 Handler。
从请求链路上来看,一个请求主要流向是:
用户请求 → Netty Server → 用户定义的 Handler → ResourceManagerGateway → ResourceManager → TaskExecutorGateway → TaskManager → 用户定义的 Task
请求体、返回体设计
通常对于接受 GET 方法的 REST API 而言,可以直接使用 EmptyRequestBody 类作为请求体的结构,方便快捷。
但对于 POST 方法的 API,我们通常需要实现 RequestBody 接口,来定义该 REST 接口的请求体。
我们还需要实现 ResponseBody 接口,来定义该 REST 接口的返回体结构。
注意 ⚠:Flink 使用 Shade 操作后的 Jackson 注解来描述每个字段,例如
代码语言:javascript复制org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty
因此我们不要直接引入未 shade 的 Jackson 类,以免与用户自己的 Jackson 类发生冲突。
参数列表设计
参数列表(MessageParameters)指的是 URL 请求里问号后面的参数,例如 /info/config?limit=5&order=desc 的加粗部分。
注意 ⚠:Flink 现有的参数列表的字段很多都采用 public final 或者 protected final 修饰,目的是为了便于编写测试用例。如果没有特殊需求,则建议使用 private final 来修饰。
REST Handler 设计
handler 是一个 REST API 接口的执行者,我们可以通过实现 handleRequest 方法来定义请求的处理逻辑。
注意 ⚠:很多接口 Handler 在构造方法里,有一个名为 executor 的参数。这个线程池的名字是 DispatcherRestEndpoint,用来异步执行一些耗时的操作。如果 Handler 里需要执行的操作很重,则一定要把操作交给这个 executor 来执行(CompletableFuture.supplyAsync 的第二个参数指定它),避免阻塞整个 Netty Server,造成 Flink UI 不响应的严重后果。
至此,我们可以让用户请求顺利到达 JobManager 的 JVM。对于需要调用 TaskManager 的功能,我们还需要了解一下 JobManager 与 TaskManager 的通讯机制。
JobManager 和 TaskManager 的通讯机制与超时处理
Flink 使用 Akka 的 Actor 模型来实现 JobManager 与 TaskManager 的命令下发与执行。我们定义了 RPC 接口后,Flink 与 Akka 会通过动态代理的方式,为我们自动生成 RPC 远程调用所需的对象;因此我们只需要把他当作本地方法来实现即可,无需关心被调用方的位置。
在 REST Handler 的具体实现上,我们在 handleRequest 方法的传参里,可以看到有一个 ResourceManagerGateway 类型的 gateway 的参数,它就是 REST Handler 与 ResourceManager(同属于 JobManager)通信的桥梁。
ResourceManager 网关和异步执行
ResourceManagerGateway 接口里定义了所有 JobManager(ResourceManager)可处理的请求列表,因此如果我们希望增加一个新的请求类型,就在这里新增一个相应的方法。
新增 Gateway 方法后,我们还需要在 ResourceManager 类里给出具体实现。在实现时,我们可以通过
代码语言:javascript复制taskExecutors.get(taskManagerId)
来获取某个特定的 TaskManager 的通讯接口(TaskExecutorGateway),它调用后返回一个 CompletableFuture 对象。
注意 ⚠:如果需要批量调用多个 TaskManager 的网关接口,为了保证所有的 TaskManager 都响应后才得到最终返回值,我们可以用
代码语言:javascript复制FutureUtils.combineAll(responseFutures)
来合并所有返回的 CompletableFuture 对象。
注意 ⚠: Gateway 是通讯的核心组件,因此请务必确保所有的方法都可以迅速返回(用 CompletableFuture 封装并异步执行),否则可能造成作业崩溃重启等严重后果。
TaskExecutor 网关以及处理异步超时
TaskExecutorGateway 是 JobManager(ResourceManager)与 TaskManager(TaskExecutor)之间通讯的桥梁。通过为 TaskExecutorGateway 接口中新增方法,并在 TaskExecutor 类中实现该方法,我们可以实现对 TaskManager 的功能调用。
注意 ⚠: 在 TaskExecutor 具体执行任务时,可能必须包含阻塞操作(例如下载日志、执行外部调用、触发 GC 等),但客观上又必须在规定的 timeout 范围内向 ResourceManager 返回结果,因此可以配合 FutureUtils.orTimeout 来实现超时就报错的效果。例如:
代码语言:javascript复制@Override
public CompletableFuture<SomeResponse> doSomething(int param, @RpcTimeout Time timeout) {
return FutureUtils.orTimeout(CompletableFuture.supplyAsync(
() {
// 一些耗时的工作
return new SomeResponse("success");
}, ioExecutor), timeout.getSize(), timeout.getUnit());
}
总结
新增一个 Flink REST API 很简单;但是如果设计不当,阻塞了 Flink 的核心流程,会造成作业不稳定甚至多组件超时退出的后果。
因此,用好异步逻辑,尽可能减少阻塞,防止超时,是我们必须关注的细节,也是开发完成后的重点测试项。