Flink 1.13 在Native k8s的部署实践

2021-06-25 20:48:15 浏览数 (1)

  • 前言
  • 创建一个名字为flink-cluster的namespace
  • 创建一个账户
  • service account和角色的绑定
  • k8s session 集群
    • 启动session集群
    • 往session集群提交jar任务
    • 往session集群提交sql任务
  • 启动k8s application任务
    • 编写Dockerfile文件:
    • 打镜像
    • 启动application mode的任务
    • 查看正在运行的任务
    • 停止任务
  • 踩坑

前言

flink支持多种部署模式,比如standalone、sesson、per job、application,一般在生产环境我们都是将flink程序部署到k8s或者yarn等资源管理器上。目前k8s部署模式暂时不支持per job模式。不过由于k8s部署flink集群相对yarn要落后一些,是在最近几个版本才慢慢完善的,所以我猜测市面上很多公司还是以yarn为主,逐渐尝试k8s。

flink的程序写完之后,提交到git,之后我们会进行拉代码、打包编译、打镜像,push到仓库,最后以application 模式构建k8s集群。

此外我们还在k8s上启动一个session集群,对于平时一些flink sql的调试任务,可以在这个集群完成。

通过踩坑实践我觉得首先你需要对flink和k8s的一些概念、使用方式等等都非常了解,这样你才能对flink在k8s的部署的时候遇到问题很好的定位和解决问题。

创建一个名字为flink-cluster的namespace

代码语言:javascript复制
kubectl create namespace flink-cluster

创建一个账户

代码语言:javascript复制
kubectl create serviceaccount flink -n flink-cluster

service account和角色的绑定

代码语言:javascript复制
kubectl create clusterrolebinding flink-role-binding-flink 
  --clusterrole=edit 
  --serviceaccount=flink-cluster:flink

k8s session 集群

启动session集群

代码语言:javascript复制
./bin/kubernetes-session.sh 
  -Dkubernetes.namespace=flink-cluster 
  -Dkubernetes.jobmanager.service-account=flink 
  -Dkubernetes.cluster-id=my-session 
  -Dtaskmanager.memory.process.size=4096m 
  -Dkubernetes.taskmanager.cpu=1 
  -Dtaskmanager.numberOfTaskSlots=4 
  -Dresourcemanager.taskmanager-timeout=3600000

往session集群提交jar任务

代码语言:javascript复制
./bin/flink run -d 
  -e kubernetes-session 
  -Dkubernetes.namespace=flink-cluster 
  -Dkubernetes.cluster-id=my-session 
    examples/streaming/WindowJoin.jar

往session集群提交sql任务

在sql客户端执行以下的命令之后,可以把sql任务提交到刚才新建的flink session集群。

代码语言:javascript复制
set kubernetes.cluster-id=my-session;
set kubernetes.namespace=flink-cluster;
set execution.target = kubernetes-session;

启动k8s application任务

编写Dockerfile文件:

代码语言:javascript复制
# base image
FROM apache/flink:1.13.1-scala_2.12
RUN mkdir -p $FLINK_HOME/usrlib
RUN mkdir -p $FLINK_HOME/hadoopconf
COPY flink-1.13.1/examples/streaming/TopSpeedWindowing.jar $FLINK_HOME/usrlib/TopSpeedWindowing.jar
COPY core-site.xml $FLINK_HOME/hadoopconf
COPY hdfs-site.xml $FLINK_HOME/hadoopconf
# 添加hdfs的相关的jar,为了读取hdfs  
COPY flink-shaded-hadoop2-uber-2.8.3-1.8.3.jar $FLINK_HOME/lib
# 配置HADOOP_CONF_DIR为了获取hadoop的core-site.xml and hdfs-site.xml ,因为checkpoint是存在hdfs的,需要读写hdfs 
ENV  HADOOP_CONF_DIR=$FLINK_HOME/hadoopconf:$HADOOP_CONF_DIR

打镜像

代码语言:javascript复制
docker build -t apache/flink:v0.1 .

打镜像的目录下存在的文件

代码语言:javascript复制
-rw-r--r--  1 root root      1204 May 25 14:00 core-site.xml
-rw-r--r--  1 root root       412 Jun  2 10:45 Dockerfile
drwxrwxr-x 10 work work      4096 May 25 20:17 flink-1.13.1
-rw-r--r--  1 root root  43433774 May 18 17:54 flink-shaded-hadoop2-uber-2.8.3-1.8.3.jar
-rw-r--r--  1 root root      3750 Jun  1 18:49 hdfs-site.xml

启动application mode的任务

代码语言:javascript复制
    ./flink-1.13.1/bin/flink run-application 
    --target kubernetes-application 
    -Dkubernetes.namespace=flink-cluster 
    -Dkubernetes.jobmanager.service-account=flink 
    -Dkubernetes.cluster-id=my-first-application-cluster 
    -Dkubernetes.high-availability=org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory 
    -Dkubernetes.cluster-id=my-first-application-cluster 
    -Dhigh-availability.storageDir=hdfs://hadoopcluster/flink/recovery 
    -Dkubernetes.container.image=apache/flink:v0.2 
    -Dkubernetes.rest-service.exposed.type=NodePort 
    -Dstate.backend=rocksdb 
    -Dstate.checkpoints.dir=hdfs://hadoopcluster/flink/flink-checkpoints 
    -Dstate.backend.incremental=true 
    local:///opt/flink/usrlib/TopSpeedWindowing.jar

启动之后会在最后出现如下的日志,最后的地址就是该任务的web ui地址

代码语言:javascript复制
2021-06-01 15:03:00,445 INFO  org.apache.flink.kubernetes.KubernetesClusterDescriptor      [] - Create flink application cluster my-first-application-cluster successfully, JobManager Web Interface: http://ip:port

启动之后我们会看到k8s启动了以下的flink的组件

代码语言:javascript复制

[root@master1 ~]# kubectl get svc -n flink-cluster
NAME                                TYPE        CLUSTER-IP        EXTERNAL-IP   PORT(S)             AGE
my-first-application-cluster        ClusterIP   None              <none>        6123/TCP,6124/TCP   46h
my-first-application-cluster-rest   ClusterIP   ip xxxx   <none>        8082/TCP            46h
[root@master1 ~]# kubectl get deployments -n flink-cluster
NAME                           READY   UP-TO-DATE   AVAILABLE   AGE
my-first-application-cluster   1/1     1            1           46h
[root@master1 ~]# kubectl get pods -n flink-cluster
NAME                                            READY   STATUS    RESTARTS   AGE
my-first-application-cluster-7c4d9d7994-6vwjr   1/1     Running   0          46h
my-first-application-cluster-taskmanager-1-1    1/1     Running   0          46h

查看正在运行的任务

代码语言:javascript复制
./bin/flink list --target kubernetes-application -Dkubernetes.cluster-id=my-first-application-cluster -Dkubernetes.namespace=flink-cluster

停止任务

代码语言:javascript复制
./bin/flink cancel --target kubernetes-application -Dkubernetes.cluster-id=my-first-application-cluster -Dkubernetes.namespace=flink-cluster <jobId>

或者在flink的web ui放弃任务

踩坑

在application模式启动flink程序之后,不管服务是NodePort还是LoadBalancer,对外暴露的端口都是30000多,而我们的环境下,30000多的端口访问不了,只能访问8000多的(这点我没有去找相关的说明文档,不知道是哪里的问题)。

要解决这个问题,可以有两个方案,第一个,就是添加参数控制LoadBalancer的暴露端口范围(暂时flink提供的参数里没这个配置,我觉得应该可以自己通过改源码实现),还有一个方案,我是咨询了其他公司的朋友,就是将flink生产的rest服务类型改成ClusterIP,然后添加一个ingress的规则,我觉得这个方案好处就是不用每个flink程序都去启动一个LoadBalancer,每个application都暴露一个端口。

后续的计划是采用公司的发布平台把这套部署环境自动化,从git仓库拉代码,然后build,打镜像、push到仓库,发布flink程序,然后修改ingress规则,开源的产品可以用jenkins来实现这一系列工作。

0 人点赞