源码分析系列推荐:
【Flink】第四篇:【迷思】对update语义拆解D-、I 后造成update原子性丢失
【Flink】第十五篇:Redis Connector 数据保序思考
【Flink】第十六篇:源码角度分析 sink 端的数据一致性
【Flink】第二十四篇:源码角度分析 DataStream API 调用逻辑
继上篇 【Flink】第二十四篇:源码角度分析 DataStream API 调用逻辑 之后,我们从一个WordCount程序入手,探索了在调用execute提交作业之前的源码主线逻辑:经过DataStream API的一系列链式调用,得到一个重要的数据结构:List<Tansformation>。最终用户调用execute方法,这里传给execute的就是它。
本文继续讨论:在execute里,在最终将作业提交到集群后,在集群调度作业之前,Flink主要做了些什么。
同样,先将主要的结论列出来,以便在阅读源码时可以和笔者有一个基本一致的语境。
本文讨论的内容主要包含了两个阶段(例如,从yarn的per-job提交模式):
1. 运行flink.sh脚本,调起Flink Client(ClientFrontend),通过反射启动Jar中的main函数,生成StreamGraph、JobGraph,由PipelineExecutor提交给集群
2. 集群收到JobGraph,将JobGraph翻译成ExecutionGraph,然后开始调度执行,启动成功之后开始消费
DAG流转过程
JobManager的主要构成
yarn集群为例,
1. Dispacher:一个,提供Rest接口接收作业,不负责实际的调度执行
2. JobMaster:一个作业一个,负责作业调度、管理作业,Task生命周期
3. YarnResourceManager:一个,资源管理
Flink提交模式
- 本地local:LocalExecutor。
- session:AbstractSessionClusterExecutor,通过http协议提交作业。通过yarn-session.sh脚本启动,检查是否存在已经启动好的Flink Session模式集群,如果没有,则启动一个。然后在PipelineExecutor中通过Dsipatcher提供的Rest接口提交JobGraph,Dsipatcher为每个作业启动一个JobMaster,进入作业执行阶段。
- per-job:AbstractJobClusterExecutor。在提交的时候创建集群,将JobGraph及其所需的文件等一同提交给Yarn集群,剩下的和yarn-session模式下一样。
yarn session提交流程
1. 启动集群
1) 使用yarn-session.sh提交会话模式的作业
2) 如果没有Flink Session集群,启动新的Flink Session集群
首先将应用配置和相关文件上传至HDFS;Yarn Client向Yarn提交创建Flink Session集群的申请,在分配的Containner中启动JobManager进程,并在其中运行YarnSessionClusterEntrypoint作为集群启动的入口,初始化Dispatcher、ResourceManager,启动相关的RPC服务,等待Client通过Rest接口提交作业。
2. 作业提交
1) Flink Client通过Rest向Dsipatcher提交作业
2) 为作业创建一个JobMaster,构建ExecutionGraph
3. 作业调度执行
1) JobMaster向YarnResourceManager申请资源
2) YarnResourceManager如果没有可提供的slot则向Yarn的ResourceManager申请Containner,启动TaskManager
3) 在Yarn分配的Containner中启动新的TaskManager,并从HDFS上加载Jar所需资源
4) TaskManager启动之后,向YarnResourceManager注册自己和自己的slot资源情况
5) YarnResourceManager从等待队列取出JobMaster的slot请求,通知相应的TaskManager将slot分配给了哪些JobMaster
6) JobMaster将Task调度到该TaskManager的slot上
DAG流转细节
1. StreamGraph:
2. JobGraph
3. ExecutionGraph
源码分析
分析两部分:
1. 由flink shell 脚本 到 Flink作业 jar 的过程;
2. Flink 绘制 DAG的过程,这里我们只重点看StreamGraph的绘制逻辑,其他的类似;
下面开始介绍,
1. 由flink shell 脚本 到 Flink作业 jar 的过程;
打开FLINK_HOME下的flink脚本,在最后一行可以看到flink本质是执行了一个org.apache.flink. client.cli.CliFrontend,
所以,我们从CliFrontend.java的main入口方法找起,
以上,主要做了:
1. 将本地配置文件及命令行配置项加载到全局配置中
2. 构造CliFrontend,运行它的parseAndRun
接着看parseAndRun,
以上,主要做了,
1. 取命令行输入参数的第一个动作action,这里我们以yarn-cluster方式提交,所以第一个单词是:flink
2. 以action进入开关语句,这里我们进入第一个分支:ACTION_RUN
接着进入run(params),
这里,主要有三个动作:
1. 得到依赖jar
2. 封装各种配置
3. 得到作业包,封装成PackagedProgram
顺着作业执行这条主线,可以追溯到callMainMethod方法
以上,通过java反射,从作业包的主类中拿main方法,并且调用main,从这里开始便进入了WordCount的main方法。
2. Flink 绘制 DAG的过程,这里我们只重点看StreamGraph的绘制逻辑,其他的类似;
从env.execute("Window WordCount") 深入源码,
我们可以看到,在execute的第一层就进行了StreamGraph的绘制,继续深入,经过2次重构方法的调用,
这里我们看到,StreamGraphGenerator核心类是具体绘制StreamGraph的类。
进入generate,
这里一个非常关键的循环,在循环里对之前上一节输出的transformations列表进行遍历,对每个transformation进行transform转换操作,循环转换完后就返回产生的StreamGraph。我们接着看transform方法,
主要有三个步骤,
1. 判断是否已经转换过了,如果当前transform已经转换过,直接返回
2. 得到合适的translator,有哪些translator?如下,
3.如果得到translator,执行translate方法,没得到则执行遗留的legacyTransform
继续看translate方法,
1. 得到所有输入Id
2. 再次检查是否当前transform已经被转换过了
3. 调用translator对当前transform实例进行translator
这里有个很关键的地方,即getParentInputIds,
这里,对所有parentTransformations再次执行了之前已经阅读过的:transform,到这里显示形成了一个递归的逻辑调用,结合之前的调用很容易就总结到如下递归调用的意图:
起始,从transformations列表第一个transformation进行循环,每次都检查当前transformation的上游所有输入的transformations是否被处理过了,这种检查是一个递归的过程,并且结束条件是,当前transformation被包含在alreadyTransformed集合中了。
理解了整个递归处理transformations元素后,我们就可以进一步看看这个所谓的处理逻辑到底做了些什么。
我们找到translator的实现类AbstractOneInputTransformationTranslator,我们来看看这个UML类图,
从AbstractOneInputTransformationTranslator中即可找到这个很关键的方法,
终于到了StreamGraph的算法操作:
1. addOperator添加节点,节点对应transformation
2. addEdge添加边,包含上游所有输入边
综上,本质上就是一个利用递归操作绘制DAG的过程。
至于,StreamGraph如何转换为了JobGraph,而JobGraph由如何被JobManagerd的JobMaster转换为了可调度的ExecutionGraph,这里就不再赘述了,核心思想是一致的,只不过是为了使得这个DAG适合在相应的应用层面上而进行了一系列的丰富和优化,例如加入并行的概念,对齐进行OperatorChain的优化。
后续文章要讨论的是ExecutionGraph是如何被调度到集群上的TaskManager中执行的。。。