本文参考了flink committer tison的文章,基于flink 1.13版本源码改动实现。
一、概述
https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/rest_api/
Flink官方实现了大量的REST API接口,有用于Flink UI展示数据、也用于各自监控面板。这些REST API的webserver作为JobManager
的一部分在运行。默认端口是8081,可以通过flink-conf.yaml
的rest.port
参数进行配置。
在有多个JobManager
的情况下(HA场景下),每个JobManager
将运行自己的REST API实例,而由被选为leader的JobManager
实例提供有关已完成和正在运行的作业的信息。
二、开发指南
REST API 位于flink-runtime
项目下,核心实现org.apache.flink.runtime.webmonitor.WebMonitorEndpoint
(因为Flink早期REST API都是用于监控,所以命名是WebMonitorEndpoint。现在其工作职能还包含一些任务启停等非监控场景),其主要是负责server实现和请求路由。
(主要:2个pierre package是笔者下面自定义REST API的地方)
当然Flink REST API实现是基于Netty
和Netty Router
,因为实现比较轻量,所以性能还是比较好的。
而完整的REST API则需要这四大模块:
三、开发自己的REST API!
0、设计与规划
1)需求
向http链接 http://${jobmaster-host}:8081/pierre/foo
发起get请求,返回一个json串{"response":"bar"}
2)实现规划
当我们要新增加一个REST API的时候,我们至少需要:
- 实现一个
MessageHeaders
,作为新请求的接口 - 实现一个
ResponseBody
,作为返回结果的Body - 实现一个
AbstractRestHandler
,根据添加的MessageHeaders类处理请求 - 将handler注册到
org.apache.flink.runtime.webmonitor.WebMonitorEndpoint#initializeHandlers()
1、实现MessageHeaders
代码语言:txt复制package org.apache.flink.runtime.rest.messages.pierre;
import org.apache.flink.runtime.rest.HttpMethodWrapper;
import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.MessageHeaders;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
public class FooHeaders
implements MessageHeaders<EmptyRequestBody, BarResponseBody, EmptyMessageParameters> {
// 单实例模式
private static final FooHeaders INSTANCE = new FooHeaders();
public static FooHeaders getInstance() {
return INSTANCE;
}
@Override
public Class<BarResponseBody> getResponseClass() {
return BarResponseBody.class;
}
@Override
public HttpResponseStatus HttpResponseStatus() {
return HttpResponseStatus.OK;
}
@Override
public String getDescription() {
return "pierre foobar service";
}
@Override
public Class<EmptyRequestBody> getRequestClass() {
return EmptyRequestBody.class;
}
// 解析url里面的参数
@Override
public EmptyMessageParameters getUnresolvedMessageParameters() {
return EmptyMessageParameters.getInstance();
}
@Override
public HttpMethodWrapper getHttpMethod() {
return HttpMethodWrapper.GET;
}
// URL路由信息
@Override
public String getTargetRestEndpointURL() {
return "/pierre/foo";
}
}
这里注意:
- 必须是单实例模式
HttpResponseStatus
、getResponseClass
等均不能return null,否则会有NullPointerException
image.png
2、实现ResponseBody
代码语言:txt复制package org.apache.flink.runtime.rest.messages.pierre;
import org.apache.flink.runtime.rest.messages.ResponseBody;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
public class BarResponseBody implements ResponseBody {
private static final String FIELD_BAR = "response";
@JsonProperty(FIELD_BAR)
public final String response = "bar";
private static final BarResponseBody INSTANCE = new BarResponseBody();
public static BarResponseBody getInstance() {
return INSTANCE;
}
}
这里使用到了 jackson注解,需要import FLINK shaded的版本,避免冲突。
3、实现`AbstractRestHandler
代码语言:txt复制package org.apache.flink.runtime.rest.handler.pierre;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
import org.apache.flink.runtime.rest.handler.HandlerRequest;
import org.apache.flink.runtime.rest.handler.RestHandlerException;
import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.MessageHeaders;
import org.apache.flink.runtime.rest.messages.pierre.BarResponseBody;
import org.apache.flink.runtime.webmonitor.RestfulGateway;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
import javax.annotation.Nonnull;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
public class FooHandler
extends AbstractRestHandler<
RestfulGateway, EmptyRequestBody, BarResponseBody, EmptyMessageParameters> {
public FooHandler(
GatewayRetriever<? extends RestfulGateway> leaderRetriever,
Time timeout,
Map<String, String> responseHeaders,
MessageHeaders<EmptyRequestBody, BarResponseBody, EmptyMessageParameters>
messageHeaders) {
super(leaderRetriever, timeout, responseHeaders, messageHeaders);
}
@Override
protected CompletableFuture<BarResponseBody> handleRequest(
@Nonnull HandlerRequest<EmptyRequestBody, EmptyMessageParameters> request,
@Nonnull RestfulGateway gateway)
throws RestHandlerException {
return CompletableFuture.completedFuture(BarResponseBody.getInstance());
}
}
4、注册handler
代码语言:txt复制 // 自己的handler
final FooHandler fooHandler =
new FooHandler(leaderRetriever, timeout, responseHeaders, FooHeaders.getInstance());
……
handlers.add(Tuple2.of(fooHandler.getMessageHeaders(), fooHandler));
5、编译打包
初次改造Flink代码,不是特别熟悉,列了一下步骤供大家参考:
maven-checkstyle-plugin
的failOnViolation
设置为false
,因为我们的一些小改动不完全符合flink的代码工程规范。当然如果是要给Flink正式贡献代码,肯定还是要符合规范的。mvn spotless:apply
会自动进行代码格式化的工作mvn clean package -DskipTests
进入漫长的package中
预计十分钟:flink-dist/target
目录下即可生成最新的可执行文件
六、效果
- 启动一个本地的loacl集群
./bin/start-cluster.sh
- 请求
http://${jobmaster-host}:8081/pierre/foo
大功告成,完美第一步!
更多精彩:https://github.com/pierre94/flink-notes