方案
ZOOKEEPER 3节点 FLINK JOB-MANAGER 3节点 每个节点一个pod FLINK TASK-MANAGER N节点 通过k8s replicas 扩缩容 负载均衡 1节点
FLINK容器镜像
使用dockerhub官方镜像 flink:1.14.4-scala_2.12-java11
FLINK配置
flink官方docker镜像使用环境变量FLINK_PROPERTIES注入配置,格式形如:
FLINK_PROPERTIES = jobmanager.rpc.address: flink-jobmanager-1.test.svc.cluster.local jobmanager.memory.process.size: 896mb
- flink-jobmanager-x
每个实例负载需单独配置
配置项 | 值 | 说明 |
---|---|---|
jobmanager.rpc.address | flink-jobmanager-x.test.svc.cluster.local | 通过rpc访问jobmanager的地址,每个jobmanager负载单独配置,如果不配置此项zookeeper中的服务注册地址会有问题 |
high-availability | ZOOKEEPER | |
high-availability.zookeeper.quorum | zookeeper-cluster-1.test.svc.cluster.local:2181,zookeeper-cluster-2.test.svc.cluster.local:2181,zookeeper-cluster-3.test.svc.cluster.local:2181 | 罗列zookeeper集群成员地址 |
high-availability.jobmanager.port | 6123 | 部署高可用集群时必须配置此项,否则将使用随机端口,无法通过k8s服务映射 |
state.checkpoint-storage | filesystem | 使用文件系统保存checkpoint |
state.checkpoints.dir | file:///flink/checkpoint/ | checkpoint保存地址 |
high-availability.storageDir | file:///flink/ha-storage/ | 集群共享存储,保存如作业JAR包等数据 |
jobmanager.memory.process.size | 896mb | jobmanager进程总内存大小 |
样例:
代码语言:javascript复制FLINK_PROPERTIES=jobmanager.rpc.address: flink-jobmanager-1.test.svc.cluster.local
high-availability.jobmanager.port: 6123
high-availability: ZOOKEEPER
high-availability.zookeeper.quorum: zookeeper-cluster-1.test.svc.cluster.local:2181,zookeeper-cluster-2.test.svc.cluster.local:2181,zookeeper-cluster-3.test.svc.cluster.local:2181
high-availability.storageDir: file:///flink/ha-storage/
jobmanager.memory.process.size: 896mb
state.checkpoint-storage: filesystem
state.checkpoints.dir: file:///flink/checkpoint/
- flink-taskmanager
配置项 | 值 | 说明 |
---|---|---|
high-availability | ZOOKEEPER | |
high-availability.zookeeper.quorum | zookeeper-cluster-1.test.svc.cluster.local:2181,zookeeper-cluster-2.test.svc.cluster.local:2181,zookeeper-cluster-3.test.svc.cluster.local:2181 | 罗列zookeeper集群成员地址 |
high-availability.storageDir | file:///flink/ha-storage/ | 集群共享存储,保存如作业JAR包等数据 |
state.checkpoint-storage | filesystem | 使用文件系统保存checkpoint |
state.checkpoints.dir | file:///flink/checkpoint/ | checkpoint保存地址 |
taskmanager.numberOfTaskSlots | 2 | 每个taskmanager的任务槽数量,默认为cpu核心数 |
样例:
代码语言:javascript复制FLINK_PROPERTIES=high-availability: ZOOKEEPER
high-availability.zookeeper.quorum: zookeeper-cluster-1.test.svc.cluster.local:2181,zookeeper-cluster-2.test.svc.cluster.local:2181,zookeeper-cluster-3.test.svc.cluster.local:2181
high-availability.storageDir: file:///flink/ha-storage/
state.checkpoint-storage: filesystem
state.checkpoints.dir: file:///flink/checkpoint/
taskmanager.numberOfTaskSlots: 2
服务
- jobmanager
类型 | 服务名称 | 端口 | 说明 |
---|---|---|---|
cluster ip | flink-jobmanager-x | 6123,6124,6125 | 每个jobmanager负载单独配置,用于与taskmanager通信,6123为rpc端口(高可用下需配置),6124为blob端口(默认),6125为状态查询端口(默认) |
node port | flink-jobmanager-x-np | xxx->8081 | 用于通过负载均衡访问webui |
负载均衡
通过flink-jobmanager-x-np服务映射的地址:端口号反代