【Flink】第二十五篇:源码角度分析作业提交逻辑

2022-03-31 11:19:29 浏览数 (1)

源码分析系列推荐:

【Flink】第四篇:【迷思】对update语义拆解D-、I 后造成update原子性丢失

【Flink】第十五篇:Redis Connector 数据保序思考

【Flink】第十六篇:源码角度分析 sink 端的数据一致性

【Flink】第二十四篇:源码角度分析 DataStream API 调用逻辑

继上篇 【Flink】第二十四篇:源码角度分析 DataStream API 调用逻辑 之后,我们从一个WordCount程序入手,探索了在调用execute提交作业之前的源码主线逻辑:经过DataStream API的一系列链式调用,得到一个重要的数据结构:List<Tansformation>。最终用户调用execute方法,这里传给execute的就是它。

本文继续讨论:在execute里,在最终将作业提交到集群后,在集群调度作业之前,Flink主要做了些什么。

同样,先将主要的结论列出来,以便在阅读源码时可以和笔者有一个基本一致的语境。

本文讨论的内容主要包含了两个阶段(例如,从yarn的per-job提交模式):

1. 运行flink.sh脚本,调起Flink Client(ClientFrontend),通过反射启动Jar中的main函数,生成StreamGraph、JobGraph,由PipelineExecutor提交给集群

2. 集群收到JobGraph,将JobGraph翻译成ExecutionGraph,然后开始调度执行,启动成功之后开始消费

DAG流转过程

JobManager的主要构成

yarn集群为例,

1. Dispacher:一个,提供Rest接口接收作业,不负责实际的调度执行

2. JobMaster:一个作业一个,负责作业调度、管理作业,Task生命周期

3. YarnResourceManager:一个,资源管理

Flink提交模式

  • 本地local:LocalExecutor。
  • session:AbstractSessionClusterExecutor,通过http协议提交作业。通过yarn-session.sh脚本启动,检查是否存在已经启动好的Flink Session模式集群,如果没有,则启动一个。然后在PipelineExecutor中通过Dsipatcher提供的Rest接口提交JobGraph,Dsipatcher为每个作业启动一个JobMaster,进入作业执行阶段。
  • per-job:AbstractJobClusterExecutor。在提交的时候创建集群,将JobGraph及其所需的文件等一同提交给Yarn集群,剩下的和yarn-session模式下一样。

yarn session提交流程

1. 启动集群

1) 使用yarn-session.sh提交会话模式的作业

2) 如果没有Flink Session集群,启动新的Flink Session集群

首先将应用配置和相关文件上传至HDFS;Yarn Client向Yarn提交创建Flink Session集群的申请,在分配的Containner中启动JobManager进程,并在其中运行YarnSessionClusterEntrypoint作为集群启动的入口,初始化Dispatcher、ResourceManager,启动相关的RPC服务,等待Client通过Rest接口提交作业。

2. 作业提交

1) Flink Client通过Rest向Dsipatcher提交作业

2) 为作业创建一个JobMaster,构建ExecutionGraph

3. 作业调度执行

1) JobMaster向YarnResourceManager申请资源

2) YarnResourceManager如果没有可提供的slot则向Yarn的ResourceManager申请Containner,启动TaskManager

3) 在Yarn分配的Containner中启动新的TaskManager,并从HDFS上加载Jar所需资源

4) TaskManager启动之后,向YarnResourceManager注册自己和自己的slot资源情况

5) YarnResourceManager从等待队列取出JobMaster的slot请求,通知相应的TaskManager将slot分配给了哪些JobMaster

6) JobMaster将Task调度到该TaskManager的slot上

DAG流转细节

1. StreamGraph:

2. JobGraph

3. ExecutionGraph

源码分析

分析两部分:

1. 由flink shell 脚本 到 Flink作业 jar 的过程;

2. Flink 绘制 DAG的过程,这里我们只重点看StreamGraph的绘制逻辑,其他的类似;

下面开始介绍,

1. 由flink shell 脚本 到 Flink作业 jar 的过程;

打开FLINK_HOME下的flink脚本,在最后一行可以看到flink本质是执行了一个org.apache.flink. client.cli.CliFrontend,

所以,我们从CliFrontend.java的main入口方法找起,

以上,主要做了:

1. 将本地配置文件及命令行配置项加载到全局配置中

2. 构造CliFrontend,运行它的parseAndRun

接着看parseAndRun,

以上,主要做了,

1. 取命令行输入参数的第一个动作action,这里我们以yarn-cluster方式提交,所以第一个单词是:flink

2. 以action进入开关语句,这里我们进入第一个分支:ACTION_RUN

接着进入run(params),

这里,主要有三个动作:

1. 得到依赖jar

2. 封装各种配置

3. 得到作业包,封装成PackagedProgram

顺着作业执行这条主线,可以追溯到callMainMethod方法

以上,通过java反射,从作业包的主类中拿main方法,并且调用main,从这里开始便进入了WordCount的main方法。

2. Flink 绘制 DAG的过程,这里我们只重点看StreamGraph的绘制逻辑,其他的类似;


从env.execute("Window WordCount") 深入源码,

我们可以看到,在execute的第一层就进行了StreamGraph的绘制,继续深入,经过2次重构方法的调用,

这里我们看到,StreamGraphGenerator核心类是具体绘制StreamGraph的类。

进入generate,

这里一个非常关键的循环,在循环里对之前上一节输出的transformations列表进行遍历,对每个transformation进行transform转换操作,循环转换完后就返回产生的StreamGraph。我们接着看transform方法,

主要有三个步骤,

1. 判断是否已经转换过了,如果当前transform已经转换过,直接返回

2. 得到合适的translator,有哪些translator?如下,

3.如果得到translator,执行translate方法,没得到则执行遗留的legacyTransform

继续看translate方法,

1. 得到所有输入Id

2. 再次检查是否当前transform已经被转换过了

3. 调用translator对当前transform实例进行translator

这里有个很关键的地方,即getParentInputIds,

这里,对所有parentTransformations再次执行了之前已经阅读过的:transform,到这里显示形成了一个递归的逻辑调用,结合之前的调用很容易就总结到如下递归调用的意图:

起始,从transformations列表第一个transformation进行循环,每次都检查当前transformation的上游所有输入的transformations是否被处理过了,这种检查是一个递归的过程,并且结束条件是,当前transformation被包含在alreadyTransformed集合中了。

理解了整个递归处理transformations元素后,我们就可以进一步看看这个所谓的处理逻辑到底做了些什么。

我们找到translator的实现类AbstractOneInputTransformationTranslator,我们来看看这个UML类图,

从AbstractOneInputTransformationTranslator中即可找到这个很关键的方法,

终于到了StreamGraph的算法操作:

1. addOperator添加节点,节点对应transformation

2. addEdge添加边,包含上游所有输入边

综上,本质上就是一个利用递归操作绘制DAG的过程。

至于,StreamGraph如何转换为了JobGraph,而JobGraph由如何被JobManagerd的JobMaster转换为了可调度的ExecutionGraph,这里就不再赘述了,核心思想是一致的,只不过是为了使得这个DAG适合在相应的应用层面上而进行了一系列的丰富和优化,例如加入并行的概念,对齐进行OperatorChain的优化。

后续文章要讨论的是ExecutionGraph是如何被调度到集群上的TaskManager中执行的。。。

0 人点赞