如何在Kubernetes 部署运行flink

2023-07-29 18:32:12 浏览数 (3)

一、场景描述:

Kubernetes 是目前非常流行的容器编排系统,在其之上可以运行 Web 服务、大数据处理等各类应用。这些应用被打包在一个个非常轻量的容器中,我们通过声明的方式来告知 Kubernetes 要如何部署和扩容这些程序,并对外提供服务。Flink 同样是非常流行的分布式处理框架,它也可以运行在 Kubernetes 之上。将两者相结合,我们就可以得到一个健壮和高可扩的数据处理应用,并且能够更安全地和其它服务共享一个 Kubernetes 集群。

二、flink概念:

Flink 由Job Manager和Task Manager两个部分组成,Job Manager负责协调流处理作业,管理作业的提交以及生命周期,并把工作分配给任务管理器。任务管理器执行实际的流处理逻辑,同一个时间只能一个活跃的Job Manager,但可以有多个Task manager。

Flink还引入Checkpoint机制,来周期性记录各种流处理操作的状态,并且进行持久化存储,在从故障恢复的时候,流处理作业可以从最新的检查点继续执行。checkpoint也是由job Manager进行协调更新。

三、部署步骤:

job Manager和task Manager都采用deployment进行部署, 另外还需要定义相应的configmap和service文件, 使其能够暴露一个端口供外界访问

1、首先配置ConfigMap

代码语言:javascript复制
vim flink-configmap.yaml
代码语言:yaml复制
apiVersion: v1
kind: ConfigMap
metadata:
  name: flink-config
  labels:
    app: flink
data:
  flink-conf.yaml: | 
    jobmanager.rpc.address: flink-jobmanager
    taskmanager.numberOfTaskSlots: 1
    blob.server.port: 6124
    jobmanager.rpc.port: 6123
    taskmanager.rpc.port: 6122
    jobmanager.heap.size: 1024m
    taskmanager.heap.size: 1024m
  log4j.properties: | 
    log4j.rootLogger=INFO, file
    log4j.logger.akka=INFO
    log4j.logger.org.apache.kafka=INFO
    log4j.logger.org.apache.hadoop=INFO
    log4j.logger.org.apache.zookeeper=INFO
    log4j.appender.file=org.apache.log4j.FileAppender
    log4j.appender.file.file=${log.file}
    log4j.appender.file.layout=org.apache.log4j.PatternLayout
    log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
    log4j.logger.org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, file

主要是把日志文件以及配置文件和创建的Pod解耦开;相应的deployment和configMap绑定主要通过在volumes那里,configMap指定定义好的configMap的名称和items项进行定义

2、配置jobManager-deployment.yaml

代码语言:yaml复制
apiVersion: apps/v1
kind: Deployment
metadata:
    name: flink-jobmanager
spec:
    selector:
        matchLabels:
            app: flink
    replicas: 1
    template:
        metadata:
            labels:
                app: flink
                component: jobmanager
        spec:
            containers:
            - name: jobmanager
              image: flink:1.9
              args:
              - jobmanager
              ports:
              - containerPort: 6123
                name: rpc
              - containerPort: 6124
                name: blob
              - containerPort: 6125
                name: query
              - containerPort: 8081
                name: ui
              env:
              - name: JOB_MANAGER_RPC_ADDRESS
                value: flink-jobmanager

3、taskmanager-deployment.yaml

代码语言:yaml复制

apiVersion: apps/v1
kind: Deployment
metadata:
    name: flink-taskmanager
spec:
    selector:
        matchLabels:
            app: flink
    replicas: 3
    template:
        metadata:
            labels:
                app: flink
                component: taskmanager
        spec:
            containers:
            - name: taskmanager
              image: flink:1.9
              args:
              - taskmanager
              ports:
              - containerPort: 6121
                name: data
              - containerPort: 6122
                name: rpc
              - containerPort: 6125
                name: query
              env:
              - name: JOB_MANAGER_RPC_ADDRESS
                value: flink-jobmanager

4、jobmanager-service.yaml

代码语言:yaml复制
apiVersion: v1
kind: Service
metadata:
  name: flink-jobmanager
spec:
  type: ClusterIP
  ports:
  - name: rpc
    port: 6123
  - name: blob
    port: 6124
  - name: ui
    port: 8081
  selector:
    app: flink
    component: jobmanager

四、开始部署

启动Flink Session Cluster

当各个组件服务配置文件定义完毕后,就可以通过使用以下Kubectl命令,创建Flink Session Cluster,集群启动完成后就可以通过JobManagerServices中配置的WebUI端口访问 Flink Web 页面

代码语言:javascript复制
[root@k8s-master flink_on_k8s]# kubectl create -f flink-configmap.yaml
configmap/flink-config created
代码语言:javascript复制
[root@k8s-master flink_on_k8s]# kubectl create -f jobmanager-service.yaml
service/flink-jobmanager created
代码语言:yaml复制
[root@k8s-master flink_on_k8s]# kubectl apply -f jobmanager-deployment.yaml
deployment.apps/flink-jobmanager created
代码语言:javascript复制
[root@k8s-master flink_on_k8s]# kubectl apply -f taskmanager-deployment.yaml
deployment.apps/flink-taskmanager created
代码语言:javascript复制
[root@k8s-master flink_on_k8s]# kubectl get pods
NAME                                 READY   STATUS    RESTARTS   AGE
flink-jobmanager-5bc5f79ccd-v4zxw    1/1     Running   0          4m25s
flink-taskmanager-5fcb99c84b-4kkg6   1/1     Running   0          2m13s
flink-taskmanager-5fcb99c84b-5kr7t   1/1     Running   0          2m13s
flink-taskmanager-5fcb99c84b-wfcrm   1/1     Running   0          2m13s

启动成功

0 人点赞