一、场景描述:
Kubernetes 是目前非常流行的容器编排系统,在其之上可以运行 Web 服务、大数据处理等各类应用。这些应用被打包在一个个非常轻量的容器中,我们通过声明的方式来告知 Kubernetes 要如何部署和扩容这些程序,并对外提供服务。Flink 同样是非常流行的分布式处理框架,它也可以运行在 Kubernetes 之上。将两者相结合,我们就可以得到一个健壮和高可扩的数据处理应用,并且能够更安全地和其它服务共享一个 Kubernetes 集群。
二、flink概念:
Flink 由Job Manager和Task Manager两个部分组成,Job Manager负责协调流处理作业,管理作业的提交以及生命周期,并把工作分配给任务管理器。任务管理器执行实际的流处理逻辑,同一个时间只能一个活跃的Job Manager,但可以有多个Task manager。
Flink还引入Checkpoint机制,来周期性记录各种流处理操作的状态,并且进行持久化存储,在从故障恢复的时候,流处理作业可以从最新的检查点继续执行。checkpoint也是由job Manager进行协调更新。
三、部署步骤:
job Manager和task Manager都采用deployment进行部署, 另外还需要定义相应的configmap和service文件, 使其能够暴露一个端口供外界访问
1、首先配置ConfigMap
代码语言:javascript复制vim flink-configmap.yaml
代码语言:yaml复制apiVersion: v1
kind: ConfigMap
metadata:
name: flink-config
labels:
app: flink
data:
flink-conf.yaml: |
jobmanager.rpc.address: flink-jobmanager
taskmanager.numberOfTaskSlots: 1
blob.server.port: 6124
jobmanager.rpc.port: 6123
taskmanager.rpc.port: 6122
jobmanager.heap.size: 1024m
taskmanager.heap.size: 1024m
log4j.properties: |
log4j.rootLogger=INFO, file
log4j.logger.akka=INFO
log4j.logger.org.apache.kafka=INFO
log4j.logger.org.apache.hadoop=INFO
log4j.logger.org.apache.zookeeper=INFO
log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file.file=${log.file}
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
log4j.logger.org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, file
主要是把日志文件以及配置文件和创建的Pod解耦开;相应的deployment和configMap绑定主要通过在volumes那里,configMap指定定义好的configMap的名称和items项进行定义
2、配置jobManager-deployment.yaml
代码语言:yaml复制apiVersion: apps/v1
kind: Deployment
metadata:
name: flink-jobmanager
spec:
selector:
matchLabels:
app: flink
replicas: 1
template:
metadata:
labels:
app: flink
component: jobmanager
spec:
containers:
- name: jobmanager
image: flink:1.9
args:
- jobmanager
ports:
- containerPort: 6123
name: rpc
- containerPort: 6124
name: blob
- containerPort: 6125
name: query
- containerPort: 8081
name: ui
env:
- name: JOB_MANAGER_RPC_ADDRESS
value: flink-jobmanager
3、taskmanager-deployment.yaml
代码语言:yaml复制
apiVersion: apps/v1
kind: Deployment
metadata:
name: flink-taskmanager
spec:
selector:
matchLabels:
app: flink
replicas: 3
template:
metadata:
labels:
app: flink
component: taskmanager
spec:
containers:
- name: taskmanager
image: flink:1.9
args:
- taskmanager
ports:
- containerPort: 6121
name: data
- containerPort: 6122
name: rpc
- containerPort: 6125
name: query
env:
- name: JOB_MANAGER_RPC_ADDRESS
value: flink-jobmanager
4、jobmanager-service.yaml
代码语言:yaml复制apiVersion: v1
kind: Service
metadata:
name: flink-jobmanager
spec:
type: ClusterIP
ports:
- name: rpc
port: 6123
- name: blob
port: 6124
- name: ui
port: 8081
selector:
app: flink
component: jobmanager
四、开始部署
启动Flink Session Cluster
当各个组件服务配置文件定义完毕后,就可以通过使用以下Kubectl命令,创建Flink Session Cluster,集群启动完成后就可以通过JobManagerServices中配置的WebUI端口访问 Flink Web 页面
代码语言:javascript复制[root@k8s-master flink_on_k8s]# kubectl create -f flink-configmap.yaml
configmap/flink-config created
代码语言:javascript复制[root@k8s-master flink_on_k8s]# kubectl create -f jobmanager-service.yaml
service/flink-jobmanager created
代码语言:yaml复制[root@k8s-master flink_on_k8s]# kubectl apply -f jobmanager-deployment.yaml
deployment.apps/flink-jobmanager created
代码语言:javascript复制[root@k8s-master flink_on_k8s]# kubectl apply -f taskmanager-deployment.yaml
deployment.apps/flink-taskmanager created
代码语言:javascript复制[root@k8s-master flink_on_k8s]# kubectl get pods
NAME READY STATUS RESTARTS AGE
flink-jobmanager-5bc5f79ccd-v4zxw 1/1 Running 0 4m25s
flink-taskmanager-5fcb99c84b-4kkg6 1/1 Running 0 2m13s
flink-taskmanager-5fcb99c84b-5kr7t 1/1 Running 0 2m13s
flink-taskmanager-5fcb99c84b-wfcrm 1/1 Running 0 2m13s
启动成功