注意:系统和运行脚本在启动时解析配置.对配置文件的更改需要重新启动Flink JobManager和TaskManagers
Flink on Yarn模式安装部署要做的其实不多,正常的步骤: 1、上传二进制包 ===》2、解压缩 ===》 3、更改文件名称 ===》 4、配置环境变量。Flink on yarn的job运行模式大致分为两类:
- 内存集中管理模式(Yarn Session):在Yarn中初始化一个Flink集群,开辟指定的资源,之后我们提交的Flink Jon都在这个Flink yarn-session中,也就是说不管提交多少个job,这些job都会共用开始时在yarn中申请的资源。这个Flink集群会常驻在Yarn集群中,除非手动停止。
- 内存Job管理模式【推荐使用】:在Yarn中,每次提交job都会创建一个新的Flink集群,任务之间相互独立,互不影响并且方便管理。任务执行完成之后创建的集群也会消失。
一. 内存集中管理模式
第一种模式分为两步:yarn-session.sh(开辟资源)--->flink run(提交任务)
- 开源资源,使用命令
yarn-session.sh -n 2 -jm 1024 -tm 1024 -d -s 2
参数解释:
-n 2 表示指定两个容器
-jm 1024 表示jobmanager 1024M内存
-tm 1024表示taskmanager 1024M内存
-d --detached 任务后台运行
-s 指定每一个taskmanager分配多少个slots(处理进程)。建议设置为每个机器的CPU核数。一般情况下,vcore的数量等于处理的slot(-s)的数量
-nm,--name YARN上为一个自定义的应用设置一个名字
-q,--query 显示yarn中可用的资源 (内存, cpu核数)
-qu,--queue <arg> 指定YARN队列.
-z,--zookeeperNamespace <arg> 针对HA模式在zookeeper上创建NameSpace
- flink on yarn模式会覆盖一些配置文件 jobmanager.rpc.address(因为jobmanager总是分配在不同的机器),taskmanager.tmp.dirs(我们使用yarn提供的临时目录)和parallelism.default 如果solts的数量已经被指定。 如果不想修改配置文件去改变参数,有一个选择是通过动态的参数-D 来指定。所以你可以传递参数:-Dfs.overwrite-files=true -Dtaskmanager.network.memory.min=536346624
- 由于flink on yarn 模式 是基于hadoop的,如果hadoop 集群没启动,则会连接失败。 当启动之后,又会出现NameNode处于安全模式,这里没有必要手动关闭。 解决方法:等hadoop启动之后差不多20s再提交yarn-session的命令。正常运行后如下图所示,并访问JM的web 接口,这里有个麻烦的事情就是每次需要去看主机名和端口号。
- 关闭某个Flink集群:因为是yarn程序,我们可以直接使用 yarn application -kill application_1552292557465_0001 来结束进程。
- 提交任务 为了进行测试,我们对Flink目录下的LICENSE文件进行词频统计,步骤如下:
- 上传文件至HDFS:hadoop fs -put LICENSE /
- 查看文件是否上传成功:hadoop fs -ls /
- 执行命令:
./flink run ../examples/batch/WordCount.jar -input hdfs://192.168.83.129:9000/LICENSE -output hdfs://192.168.83.129:9000/wordcount-result.txt
- 查看输出结果:hadoop fs -cat /wordcount-result.txt
另外,jobmanager和taskmanager分别占有容器,示例:
./bin/yarn-session.sh -n 10 -tm 8192 -s 32
上面的例子将会启动11个容器(即使仅请求10个容器),因为有一个额外的容器来启动ApplicationMaster 和 job manager,一旦flink在你的yarn集群上部署,它将会显示job manager的连接详细信息。
二. 内存Job管理模式
第二种模式其实也分为两个部分,依然是开辟资源和提交任务,但是在Job模式下,这两步都合成一个命令了。 这里,我们直接执行命令
代码语言:javascript复制./flink run -m yarn-cluster -yn 2 -yjm 1024 -ytm 1024 ../examples/batch/WordCount.jar
在job结束后就会关闭flink yarn-session的集群
- 第二种方式命令 参数解释:
sudo /usr/lib/flink/bin/flink run -m yarn-cluster -yn 1 -yjm 1024 -ytm 1024 -ys 1 -p 1 xz-flink-examples-1.0.jar
• "run" 操作参数:
-c,--class <classname> 如果没有在jar包中指定入口类,则需要在这里通过这个参数指定
-m,--jobmanager <host:port> 指定需要连接的jobmanager(主节点)地址,使用这个参数可以指定一个不同于配置文件中的jobmanager
-p,--parallelism <parallelism> 指定程序的并行度。可以覆盖配置文件中的默认值
-yn taskmanager个数
-yjm jobmanager内存大小
-ytm taskmanager内存大小
-ys 一个taskmanager的slot个数
注意:client必须要设置YARN_CONF_DIR或者HADOOP_CONF_DIR环境变量,通过这个环境变量来读取YARN和HDFS的配置信息,否则启动会失败。 经试验发现,其实如果配置的有HADOOP_HOME环境变量的话也是可以的。HADOOP_HOME ,YARN_CONF_DIR,HADOOP_CONF_DIR 只要配置的有任何一个即可。 独立job模式客户端命令行参数参考:flink独立Job命令
三. 补充:Flink在YARN上的恢复行为
Flink 的 YARN 客户端具有以下配置参数来控制容器故障时的行为方式。这些参数可以从 conf/flink-conf.yaml 中设置,或者在启动会话时使用-D参数设置 如:
- yarn.reallocate-failed: 此参数控制Flink是否应重新分配失败的TaskManager容器。默认值:true
- yarn.maximum-failed-containers: ApplicationMaster 在YARN会话失败之前接受的最大失败容器数。默认值:最初请求的TaskManagers(-n)的数量。
- yarn.application-attempts:ApplicationMaster( 其TaskManager容器)尝试的数量。如果此值设置为1(默认值),则当Application master失败时,整个YARN会话将失败。较高的值指定YARN重新启动ApplicationMaster的次数。
参考:flink中文官网关于参数的解释