【Flink源码实战(一)】给Flink增加一个REST API

2021-06-10 13:26:20 浏览数 (1)

本文参考了flink committer tison的文章,基于flink 1.13版本源码改动实现。

一、概述

https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/rest_api/

Flink官方实现了大量的REST API接口,有用于Flink UI展示数据、也用于各自监控面板。这些REST API的webserver作为JobManager的一部分在运行。默认端口是8081,可以通过flink-conf.yamlrest.port参数进行配置。

在有多个JobManager的情况下(HA场景下),每个JobManager将运行自己的REST API实例,而由被选为leader的JobManager实例提供有关已完成和正在运行的作业的信息。

二、开发指南

REST API 位于flink-runtime项目下,核心实现org.apache.flink.runtime.webmonitor.WebMonitorEndpoint (因为Flink早期REST API都是用于监控,所以命名是WebMonitorEndpoint。现在其工作职能还包含一些任务启停等非监控场景),其主要是负责server实现和请求路由。

image.pngimage.png

(主要:2个pierre package是笔者下面自定义REST API的地方)

当然Flink REST API实现是基于NettyNetty Router ,因为实现比较轻量,所以性能还是比较好的。

而完整的REST API则需要这四大模块:

image.pngimage.png

三、开发自己的REST API!

0、设计与规划

1)需求

向http链接 http://${jobmaster-host}:8081/pierre/foo 发起get请求,返回一个json串{"response":"bar"}

2)实现规划

当我们要新增加一个REST API的时候,我们至少需要:

  • 实现一个MessageHeaders,作为新请求的接口
  • 实现一个ResponseBody,作为返回结果的Body
  • 实现一个AbstractRestHandler,根据添加的MessageHeaders类处理请求
  • 将handler注册到org.apache.flink.runtime.webmonitor.WebMonitorEndpoint#initializeHandlers()

1、实现MessageHeaders

代码语言:txt复制
package org.apache.flink.runtime.rest.messages.pierre;

import org.apache.flink.runtime.rest.HttpMethodWrapper;
import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.MessageHeaders;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;

public class FooHeaders
        implements MessageHeaders<EmptyRequestBody, BarResponseBody, EmptyMessageParameters> {

    // 单实例模式
    private static final FooHeaders INSTANCE = new FooHeaders();

    public static FooHeaders getInstance() {
        return INSTANCE;
    }

    @Override
    public Class<BarResponseBody> getResponseClass() {
        return BarResponseBody.class;
    }

    @Override
    public HttpResponseStatus HttpResponseStatus() {
        return HttpResponseStatus.OK;
    }

    @Override
    public String getDescription() {
        return "pierre foobar service";
    }

    @Override
    public Class<EmptyRequestBody> getRequestClass() {
        return EmptyRequestBody.class;
    }

    // 解析url里面的参数
    @Override
    public EmptyMessageParameters getUnresolvedMessageParameters() {
        return EmptyMessageParameters.getInstance();
    }

    @Override
    public HttpMethodWrapper getHttpMethod() {
        return HttpMethodWrapper.GET;
    }

	// URL路由信息
    @Override
    public String getTargetRestEndpointURL() {
        return "/pierre/foo";
    }
}

这里注意:

  • 必须是单实例模式
  • HttpResponseStatusgetResponseClass等均不能return null,否则会有NullPointerException image.pngimage.png

2、实现ResponseBody

代码语言:txt复制
package org.apache.flink.runtime.rest.messages.pierre;

import org.apache.flink.runtime.rest.messages.ResponseBody;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;

public class BarResponseBody implements ResponseBody {
    private static final String FIELD_BAR = "response";

    @JsonProperty(FIELD_BAR)
    public final String response = "bar";

    private static final BarResponseBody INSTANCE = new BarResponseBody();

    public static BarResponseBody getInstance() {
        return INSTANCE;
    }
}

这里使用到了 jackson注解,需要import FLINK shaded的版本,避免冲突。

3、实现`AbstractRestHandler

代码语言:txt复制
package org.apache.flink.runtime.rest.handler.pierre;

import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
import org.apache.flink.runtime.rest.handler.HandlerRequest;
import org.apache.flink.runtime.rest.handler.RestHandlerException;
import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.MessageHeaders;
import org.apache.flink.runtime.rest.messages.pierre.BarResponseBody;
import org.apache.flink.runtime.webmonitor.RestfulGateway;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;

import javax.annotation.Nonnull;
import java.util.Map;
import java.util.concurrent.CompletableFuture;

public class FooHandler
        extends AbstractRestHandler<
                RestfulGateway, EmptyRequestBody, BarResponseBody, EmptyMessageParameters> {

    public FooHandler(
            GatewayRetriever<? extends RestfulGateway> leaderRetriever,
            Time timeout,
            Map<String, String> responseHeaders,
            MessageHeaders<EmptyRequestBody, BarResponseBody, EmptyMessageParameters>
                    messageHeaders) {
        super(leaderRetriever, timeout, responseHeaders, messageHeaders);
    }

    @Override
    protected CompletableFuture<BarResponseBody> handleRequest(
            @Nonnull HandlerRequest<EmptyRequestBody, EmptyMessageParameters> request,
            @Nonnull RestfulGateway gateway)
            throws RestHandlerException {
        return CompletableFuture.completedFuture(BarResponseBody.getInstance());
    }
}

4、注册handler

代码语言:txt复制
    // 自己的handler
    final FooHandler fooHandler =
                new FooHandler(leaderRetriever, timeout, responseHeaders, FooHeaders.getInstance());
    ……
	handlers.add(Tuple2.of(fooHandler.getMessageHeaders(), fooHandler));

5、编译打包

初次改造Flink代码,不是特别熟悉,列了一下步骤供大家参考:

  • maven-checkstyle-pluginfailOnViolation设置为false,因为我们的一些小改动不完全符合flink的代码工程规范。当然如果是要给Flink正式贡献代码,肯定还是要符合规范的。
  • mvn spotless:apply 会自动进行代码格式化的工作
  • mvn clean package -DskipTests 进入漫长的package中

预计十分钟:flink-dist/target 目录下即可生成最新的可执行文件

image.pngimage.png

六、效果

  • 启动一个本地的loacl集群 ./bin/start-cluster.sh
  • 请求http://${jobmaster-host}:8081/pierre/foo
image.pngimage.png

大功告成,完美第一步!

更多精彩:https://github.com/pierre94/flink-notes

0 人点赞