Flink job cluster on Kubernetes

2020-08-06 09:59:47 浏览数 (1)

1 Overview

之前文章介绍了 Flink session cluster on Kubernetes,需要注意,这种部署方式,可以在同一个 Cluster 上多次提交 Flink Job,而本文介绍的,是一种将任务和镜像绑定的部署方式,即 Flink 集群是不共享的,其组件是单独属于一个 Job。

2 Creating the job-specific image

可以参考之前另一篇文章。上一篇文章打出来的镜像是一个用了官方提供的 WordCount 例子,在部署 Flink job cluster 的重点在于把用户要运行的 Job 代码放入镜像,此处不赘述了。

3 Deploy Flink job cluster on Kubernetes

官方提供了 Service 和 Job 的模板用于在 K8S 集群上运行 Flink 任务。这里需要注意,模板里面有两个变量,需要用户自行填写。

  • ${FLINK_IMAGE_NAME}: Flink 镜像文件的名字
  • ${FLINK_JOB_PARALLELISM}: Flink Job 的并行度,可以理解成需要的 TaskManager 的个数(Pod 数量)

然后官网也提供了一个命令。

代码语言:javascript复制
FLINK_IMAGE_NAME=<IMAGE_NAME> FLINK_JOB_PARALLELISM=<PARALLELISM> envsubst < job-cluster-job.yaml.template | kubectl create -f -

当然,直接运行会报错的,除了填充变量以外,envsubst 也是个坑爹的工具,他其实就是用来替换文件中的占位变量的工具,Mac 用户可以通过 brew install gettext,安装完部分用户会发现 envsubst 命令还是用不了,如果不行的用户可以尝试运行下面的命令,直接去找到 envsusbt 运行文件的路径。配置好之后,这个命令就可以运行了。

代码语言:javascript复制
alias envsubst=/usr/local/Cellar/gettext/0.19.8.1/bin/envsubst

开始在 K8S 中部署 Flink 的组件。

首先部署 Service,基本上就是暴露各服务组件的端口。

代码语言:javascript复制
# 省略 License
apiVersion: v1
kind: Service
metadata:
  name: flink-job-cluster
  labels:
    app: flink
    component: job-cluster
spec:
  ports:
  - name: rpc
    port: 6123
  - name: blob
    port: 6124
  - name: query
    port: 6125
    nodePort: 30025
  - name: ui
    port: 8081
    nodePort: 30081
  type: NodePort
  selector:
    app: flink
    component: job-cluster

然后部署 Job Cluster,通过以下 YAML 文件。

代码语言:javascript复制
FLINK_IMAGE_NAME=flink-job:latest FLINK_JOB_PARALLELISM=1 envsubst < job-cluster-job.yaml.template | kubectl create -f -
代码语言:javascript复制
# 省略了 License
apiVersion: batch/v1
kind: Job
metadata:
  name: flink-job-cluster
spec:
  template:
    metadata:
      labels:
        app: flink
        component: job-cluster
    spec:
      restartPolicy: OnFailure
      containers:
      - name: flink-job-cluster
        image: ${FLINK_IMAGE_NAME}
        # 这是笔者加的,因为需要拉本地的 Docker 镜像
        imagePullPolicy: IfNotPresent
        # job-cluster 进程启动的命令
        args: ["job-cluster", "-Djobmanager.rpc.address=flink-job-cluster",
               "-Dparallelism.default=${FLINK_JOB_PARALLELISM}", "-Dblob.server.port=6124", "-Dqueryable-state.server.ports=6125"]
        ports:
        - containerPort: 6123
          name: rpc
        - containerPort: 6124
          name: blob
        - containerPort: 6125
          name: query
        - containerPort: 8081
          name: ui

然后部署 TaskManager。

代码语言:javascript复制
FLINK_IMAGE_NAME=flink-job:latest FLINK_JOB_PARALLELISM=1 envsubst < task-manager-deployment.yaml.template | kubectl create -f -
代码语言:javascript复制
# 省略 License
apiVersion: extensions/v1beta1
kind: Deployment
metadata:
  name: flink-task-manager
spec:
  replicas: ${FLINK_JOB_PARALLELISM}
  template:
    metadata:
      labels:
        app: flink
        component: task-manager
    spec:
      containers:
      - name: flink-task-manager
        image: ${FLINK_IMAGE_NAME}
        # 先找本地的镜像
        imagePullPolicy: IfNotPresent
        # task-manager 启动命令
        args: ["task-manager", "-Djobmanager.rpc.address=flink-job-cluster"]

上图可以看到 Job Cluster 作为 K8S Job 的资源类型,已经运行结束了。

4 Summary

在 K8S 上运行 Flink,目前看来还没有 Spark on K8S 这么成熟,原因是 K8S 并没有作为原生的 Flink 资源调度器(Spark 已经开发了),当然了,我看社区也有人提 PR 了,但是社区似乎还没有十分迫切要把 Flink 和 K8S 结合起来,所以进度上,稍微比 Spark on K8S 要慢一些。

0 人点赞