修改配置文件
修改flink-conf.yaml HA模式下,jobmanager不需要指定,在master file中配置,由zookeeper选出leader与standby。
代码语言:javascript复制#jobmanager.rpc.address: bigdata11
high-availability:zookeeper
#指定高可用模式(必须)
high-availability.zookeeper.quorum:bigdata11:2181,bigdata12:2181,bigdata13:2181
#ZooKeeper仲裁是ZooKeeper服务器的复制组,它提供分布式协调服务(必须)
high-availability.storageDir:hdfs:///flink/ha/
#JobManager元数据保存在文件系统storageDir中,只有指向此状态的指针存储在ZooKeeper中(必须)
high-availability.zookeeper.path.root:/flink
#根ZooKeeper节点,在该节点下放置所有集群节点(推荐)
high-availability.cluster-id:/flinkCluster
#自定义集群(推荐)
state.backend: filesystem
state.checkpoints.dir: hdfs:///flink/checkpoints
state.savepoints.dir: hdfs:///flink/checkpoints
修改conf/zoo.cfg
代码语言:javascript复制server.1=bigdata11:2888:3888
server.2=bigdata12:2888:3888
server.3=bigdata13:2888:3888
修改conf/masters
代码语言:javascript复制bigdata11:8081
bigdata12:8081
修改slaves
代码语言:javascript复制bigdata12
bigdata13
同步配置文件conf到各节点
启动HA
先启动zookeeper集群各节点(测试环境中也可以用Flink自带的start-zookeeper-quorum.sh),启动dfs ,再启动flink
代码语言:javascript复制start-cluster.sh
WebUI查看,这是会自动产生一个主Master
验证HA
手动杀死bigdata12上的master,此时,bigdata11上的备用master转为主mater。
手动将JobManager / TaskManager实例添加到群集
您可以使用bin/jobmanager.sh和bin/taskmanager.sh脚本将JobManager和TaskManager实例添加到正在运行的集群中。
添加JobManager
代码语言:javascript复制bin/jobmanager.sh ((start|start-foreground) [host] [webui-port])|stop|stop-all
添加TaskManager
代码语言:javascript复制bin/taskmanager.sh start|start-foreground|stop|stop-all
$ jobmanager.sh start bigdata12
新添加的为从master。
运行测试任务
代码语言:javascript复制$ flink run -m bigdata11:8081 ./examples/batch/WordCount.jar --input /opt/wcinput/wc.txt --output /opt/wcoutput/
$ flink run -m bigdata11:8081 ./examples/batch/WordCount.jar --input hdfs:///user/itstar/input/wc.txt --output hdfs:///user/itstar/output2