原生的在K8s上运行Flink

2022-03-18 15:50:22 浏览数 (1)

Kubernetes 简介

什么是 Kubernetes?

Kubernetes 相信大家都比较熟悉,近两年大家都在讨论云原生的话题,讨论 Kubernetes。那么什么是 Kubernetes 呢?

  • K8s 是一个资源管理系统。

如果大家对 Yarn、 Mesos 熟悉,假设给定一批裸的物理机,将资源管理系统部署上去之后,可以在此之上基于它的 API 或者 SDK 开发一些分布式软件或者应用程序。例如可以在 Yarn 上开发传统的 MapReduce,在 K8s 上可以开发一些分布式的 Web Server,或者是大数据计算任务等等。

  • K8s 是一个容器编排系统。

不同于传统的 Yarn,K8s 在所有的进程运行过程中,是全部基于容器化的,但这里的容器并不只是单纯的 Docker 容器,它也包括 Rocket 等其他相关的隔离措施。如果在生产环境中要求比较高的话,可能会有一些安全容器,比如 Kata Containers 等等。K8s 在 Slave 上部署的应用程序,都是用容器化的方式去做分发和管理,同时用容器化的技术做隔离。

  • K8s 是一个自动化运维系统。

它是一个声明式的 API,我们只需要告诉 K8s 集群需要创建一个 Deployment,设置的副本数量,需要达到一个什么样的状态,调度系统也就是 K8s 就会帮助我们维持状态,直到达到设置的状态为止。如果中间发生了一些 failover 或者发生了一些失败,它会自动地将任务迁移到其他的机器上,来满足当前的调度。

  • 云原生。

目前几乎所有的云厂商都已经提供了 K8s 服务支持,包括国内的阿里、国际上的 Amazon、Google 等等,包括传统的微软都已经提供了对于 K8s 的 Managed 服务或者是 Unmanaged 服务。随着目前 Lambda 表达式或者 Function 计算的应用, Serverless 方式也变得更加流行。除了传统的部署小集群以外,通过云产生一个 manager,构建一个大的 Serverless 集群,然后用户按需进行计算资源付费,这也是一种新的模式。

Kubernetes 的架构

上图是 K8s 基本的架构,这是一个非常典型的 Master-Slave 的架构。

  1. 在 Master 上,是由 Controller,API Server,Scheduler 以及包括做存储的 Etcd 等构成。Etcd 可以算成 Master,也可以作为独立于 Master 之外的存储来对待。Master 的 Controller、API Server、Scheduler 都是单独的进程模式。这和 Yarn 有一些不同,Yarn 的整个 Master 是一个单进程的模式。K8s 的 Master 还可以在多个 Master 之间完成自发的选举,然后由 active 状态的 Master 对外提供服务。
  2. 在 Slave 上,它主要是包括 Kube proxy、Kubelet,以及 Docker 等相关的组件,每个 Node 上部署的相关组件都是类似的,通过它来管理上面运行的多个 Pod。
  3. 根据不同用户的习惯,可以通过 UI 或者 CLI 的方式向 K8s 提交任务。用户可以通过 K8s 提供的 Dashboard Web UI 的方式将任务进行提交,也可以通过 Kubectl 命令行的方式进行提交。

Kubernetes 的一些概念

  • ConfigMap

ConfigMap 是一个 K-V 数据结构。通常的用法是将 ConfigMap 挂载到 Pod ,作为配置文件提供 Pod 里新的进程使用。在 Flink 中可以将 Log4j 文件或者是 flink-conf 文件写到 ConfigMap 里面,在 JobManager 或者 TaskManger 起来之前将它挂载到 Pod 里,然后 JobManager 去读取相应的 conf 文件,加载其配置,进而再正确地拉起 JobManager 一些相应的组件。

  • Service(简称 SVC )

一种对外暴露服务的方式。如果现在内部有一个服务,需要在 K8s 外部进行访问,此时可以通过 Service,然后用 LoadBalancer 或者 NodePort 的方式将其暴露出去。

如果有一个 Service,不希望或不需要将其对外暴露,可以把它设置为 Cluster IP 或者是 None 这种 Headless 的模式。这个时候,它可以用于服务之间相互连接,例如传统的前端去联后端服务,或者是在 Flink 中非 HA 的情况下,TaskManager 去连 JobManager 等等。

  • Pod

Pod 是 K8s 里最小的调度单元。K8s 都是以 Pod 进行调度的。每个 Pod 可以包含一个或者多个 Container。每个 Container 都会有自己的资源,相互之间资源也是已经隔离的,但是所有 Container 共享同一个网络,这就意味着所有的 Container 可以通过 localhost 直接进行通信。

同时,Container 之间可以通过 Volume 共享一些文件。比如 JobManager 或 TaskManager 的 Pod 里产生了一些日志,在同一个 Pod 里再去起另外一个进程收集不符合 K8s 的原生语义。可以通过 SideCar 的方式去起另外一个 Container,把 JobManager 产生的日志收走。这就是一个 Pod 多个 Container 的具体用途。

  • Deployment

因为 Pod 是可以随时被终止的,所以当 Pod 终止之后,就无法再拉起来去做 failover 等其他相关操作。Deployment 是在 Pod 之上提供了更高一层的抽象。Deployment 可以设置 Pod 的状态,比如需要起 5 个 TaskManager,Deployment 会维持当前状态。当有 TaskManager 挂了以后,它会起新的 TaskManager,来补上。这样可以避免自己汇报 Pod 的状态,可以去做一些更复杂的管理 failover 等等。这也是最基础的概念——运维自动化。

目前都有什么样的任务在 K8s 上运行?

除了传统的 Web 以及移动端一些无状态的如 MySQL、Kafka 等存储相关的任务外,有状态的服务也不断地在 K8s 上做适配和运行。除此之外,深度学习框架 Tensorflow 原生即可在 K8s 上运行,包括 Spark、Flink 等等,一些大数据相关的框架也在不断地去兼容,不断地去适配,以便让更多的大数据服务可以更好地在 K8s 上运行。

从这一点我们可以看出, K8s 相比于 Yarn 或传统的 Hadoop 具有更好的包容性,它可以把存储、深度学习、大数据包括 OLAP 分析等多种计算框架、引擎都运行在 K8s 之上。这样就会带来一个很大的好处,整个公司只需要去管理一个调度架构,就可以把所有的存储,实时计算,批量计算,包括深度学习,OLAP 分析等等,都在一个集群里面运行。除了管理更方便以外,也可以达到更好的集群利用率。

Flink On Kubernetes 的部署演进

Flink 在 K8s 上最简单的方式是以 Standalone 方式进行部署。这种方式部署的好处在于不需要对 Flink 做任何改动,同时 Flink 对 K8s 集群是无感知的,通过外部手段即可让 Flink 运行起来。

Standalone Session On K8s

Standalone方式在k8s运行步骤:

如图所示:

代码语言:javascript复制
步骤1, 使用 Kubectl 或者 K8s 的 Dashboard 提交请求到 K8s Master。
步骤2, K8s Master 将创建 Flink Master Deployment、TaskManager Deployment、ConfigMap、SVC 的请求分发给 Slave 去创建这四个角色,创建完成后,这时 Flink Master、TaskManager 启动了。
步骤3, TaskManager 注册到 JobManager。在非 HA 的情况下,是通过内部 Service 注册到 JobManager。
至此,Flink 的 Sesion Cluster 已经创建起来。此时就可以提交任务了。
步骤4,在 Flink Cluster 上提交 Flink run 的命令,通过指定 Flink Master 的地址,将相应任务提交上来,用户的 Jar 和 JobGrapth 会在 Flink Client 生成,通过 SVC 传给 Dispatcher。
步骤5,Dispatcher 会发现有一个新的 Job 提交上来,这时会起一个新的 JobMaster,去运行这个 Job。
步骤6,JobMaster 会向 ResourceManager 申请资源,因为 Standalone 方式并不具备主动申请资源的能力,所以这个时候会直接返回,而且我们已经提前把 TaskManager 起好,并且已经注册回来了。
步骤7-8,这时 JobMaster 会把 Task 部署到相应的 TaskManager 上,整个任务运行的过程就完成了。

Standalone perjob on K8s

现在我们看一下 Perjob 的部署,因为 Session Cluster 和 Perjob 分别都有不同的适用场景,一个 Session 里面可以跑多个任务,但是每个任务之间没有办法达到更好的隔离性。而 Perjob 的方式,每个job都会有一个自己独立的 Flink Cluster 去运行,它们之间相互独立。

■ Perjob 的特点:

  1. 用户的 Jar 和依赖都是在镜像里提前编译好,或者通过 Init Container 方式,在真正 Container 启动之前进行初始化。
  2. 每个 Job 都会启动一个新的 Cluster。
  3. 一步提交,不需要像 Session Cluster 一样先启动集群再提交任务。
  4. 用户的 main 方法是在 Cluster 里运行。在特殊网络环境情况下,main 方法需要在 Cluster 里运行的话,Session 方式是无法做到的,而 Perjob 方式是可以执行的。

执行步骤:

由 Standalone JobCluster EntryPoint 执行,从 classpath 找到用户 Jar,执行它的 main 方法得到 JobGrapth 。再提交到 Dispathcher,这时候走 Recover Job 的逻辑,提交到 JobMaster。JobMaster 向 ResourceManager 申请资源,请求 slot,执行 Job。

Helm Chart 方式

Helm 类似于 Linux 上的 Yum。

K8s 里的 Helm 是一个包管理工具,可以很方便的安装一个包。部署一个 Flink 集群等操作,只需要 helm install 就可以将之前很多步的安装操作,一步去完成。本质上没有什么差别,只是它用 Helm 重新组织,包括一些模板等等,用起来会更加方便。

Flink Kubernetes Operator

  • 任务生命周期管理

使用 Operator 的方式来管理 Flink,主要是来管理多个 Cluster 的情况,可起到任务生命周期管理的作用。它和 Standalone、Native 的方式,本质上不是在一个层次上,它类似于一个更上层的做任务管理的工具。

  • 基于 K8s Operator,方便创建 Flink Cluster。

之前去创建一个 Perjob Cluster,可能需要部署多次,如果任务要做升级,甚至可能需要把之前的删掉,然后修改配置,再重新部署。

引入 K8s Operator 就只需要做一些简单操作。比如 Operator 中有自己的一套 yaml 描述方式,修改其中某一个字段,如修改 image 的 version 字段,此时后台会自动触发一些重启,包括对目前正在执行的任务做 savepoint,然后把 Cluster 销毁掉,再进行新的定向就可以将集群拉起,等一系列自动化的操作。对 Flink 的配置做修改等也都可以在后台自动化完成。

目前 Operater 有 Lyft 和 Google 两个开源的 operator,他们在功能上类似,而且都是已经经过生产检验,与目前的 Standalone Cluster 结合的比较好的,已经达到生产可用的标准。

0 人点赞