Flink on Yarn两种模式启动参数及在Yarn上的恢复

2020-02-19 10:51:09 浏览数 (1)

注意:系统和运行脚本在启动时解析配置.对配置文件的更改需要重新启动Flink JobManager和TaskManagers

Flink on Yarn模式安装部署要做的其实不多,正常的步骤: 1、上传二进制包 ===》2、解压缩 ===》 3、更改文件名称 ===》 4、配置环境变量。Flink on yarn的job运行模式大致分为两类:

  • 内存集中管理模式(Yarn Session):在Yarn中初始化一个Flink集群,开辟指定的资源,之后我们提交的Flink Jon都在这个Flink yarn-session中,也就是说不管提交多少个job,这些job都会共用开始时在yarn中申请的资源。这个Flink集群会常驻在Yarn集群中,除非手动停止。
  • 内存Job管理模式【推荐使用】:在Yarn中,每次提交job都会创建一个新的Flink集群,任务之间相互独立,互不影响并且方便管理。任务执行完成之后创建的集群也会消失。
一. 内存集中管理模式

第一种模式分为两步:yarn-session.sh(开辟资源)--->flink run(提交任务)

  1. 开源资源,使用命令 yarn-session.sh -n 2 -jm 1024 -tm 1024 -d -s 2 参数解释:
代码语言:javascript复制
-n 2 表示指定两个容器 
-jm 1024 表示jobmanager 1024M内存 
-tm 1024表示taskmanager 1024M内存 
-d --detached  任务后台运行 
-s  指定每一个taskmanager分配多少个slots(处理进程)。建议设置为每个机器的CPU核数。一般情况下,vcore的数量等于处理的slot(-s)的数量
-nm,--name YARN上为一个自定义的应用设置一个名字
-q,--query 显示yarn中可用的资源 (内存, cpu核数)
-qu,--queue <arg> 指定YARN队列.
-z,--zookeeperNamespace <arg> 针对HA模式在zookeeper上创建NameSpace
  • flink on yarn模式会覆盖一些配置文件 jobmanager.rpc.address(因为jobmanager总是分配在不同的机器),taskmanager.tmp.dirs(我们使用yarn提供的临时目录)和parallelism.default 如果solts的数量已经被指定。 如果不想修改配置文件去改变参数,有一个选择是通过动态的参数-D 来指定。所以你可以传递参数:-Dfs.overwrite-files=true -Dtaskmanager.network.memory.min=536346624
  • 由于flink on yarn 模式 是基于hadoop的,如果hadoop 集群没启动,则会连接失败。 当启动之后,又会出现NameNode处于安全模式,这里没有必要手动关闭。 解决方法:等hadoop启动之后差不多20s再提交yarn-session的命令。正常运行后如下图所示,并访问JM的web 接口,这里有个麻烦的事情就是每次需要去看主机名和端口号。
  • 关闭某个Flink集群:因为是yarn程序,我们可以直接使用 yarn application -kill application_1552292557465_0001 来结束进程。
  1. 提交任务 为了进行测试,我们对Flink目录下的LICENSE文件进行词频统计,步骤如下:
  • 上传文件至HDFS:hadoop fs -put LICENSE /
  • 查看文件是否上传成功:hadoop fs -ls /
  • 执行命令:
代码语言:javascript复制
./flink run ../examples/batch/WordCount.jar -input hdfs://192.168.83.129:9000/LICENSE -output hdfs://192.168.83.129:9000/wordcount-result.txt
  • 查看输出结果:hadoop fs -cat /wordcount-result.txt

另外,jobmanager和taskmanager分别占有容器,示例: ./bin/yarn-session.sh -n 10 -tm 8192 -s 32 上面的例子将会启动11个容器(即使仅请求10个容器),因为有一个额外的容器来启动ApplicationMaster 和 job manager,一旦flink在你的yarn集群上部署,它将会显示job manager的连接详细信息。

二. 内存Job管理模式

第二种模式其实也分为两个部分,依然是开辟资源和提交任务,但是在Job模式下,这两步都合成一个命令了。 这里,我们直接执行命令

代码语言:javascript复制
./flink run -m yarn-cluster -yn 2 -yjm 1024 -ytm 1024 ../examples/batch/WordCount.jar

在job结束后就会关闭flink yarn-session的集群

  • 第二种方式命令 参数解释:

sudo /usr/lib/flink/bin/flink run -m yarn-cluster -yn 1 -yjm 1024 -ytm 1024 -ys 1 -p 1 xz-flink-examples-1.0.jar • "run" 操作参数:

代码语言:javascript复制
-c,--class <classname> 如果没有在jar包中指定入口类,则需要在这里通过这个参数指定 
-m,--jobmanager <host:port> 指定需要连接的jobmanager(主节点)地址,使用这个参数可以指定一个不同于配置文件中的jobmanager 
-p,--parallelism <parallelism> 指定程序的并行度。可以覆盖配置文件中的默认值
-yn taskmanager个数
-yjm  jobmanager内存大小
-ytm  taskmanager内存大小
-ys    一个taskmanager的slot个数

注意:client必须要设置YARN_CONF_DIR或者HADOOP_CONF_DIR环境变量,通过这个环境变量来读取YARN和HDFS的配置信息,否则启动会失败。 经试验发现,其实如果配置的有HADOOP_HOME环境变量的话也是可以的。HADOOP_HOME ,YARN_CONF_DIR,HADOOP_CONF_DIR 只要配置的有任何一个即可。 独立job模式客户端命令行参数参考:flink独立Job命令

三. 补充:Flink在YARN上的恢复行为

Flink 的 YARN 客户端具有以下配置参数来控制容器故障时的行为方式。这些参数可以从 conf/flink-conf.yaml 中设置,或者在启动会话时使用-D参数设置 如:

  • yarn.reallocate-failed: 此参数控制Flink是否应重新分配失败的TaskManager容器。默认值:true
  • yarn.maximum-failed-containers: ApplicationMaster 在YARN会话失败之前接受的最大失败容器数。默认值:最初请求的TaskManagers(-n)的数量。
  • yarn.application-attempts:ApplicationMaster( 其TaskManager容器)尝试的数量。如果此值设置为1(默认值),则当Application master失败时,整个YARN会话将失败。较高的值指定YARN重新启动ApplicationMaster的次数。

参考:flink中文官网关于参数的解释

0 人点赞