问题导读
- 用户程序什么时候、在哪、如何被调用执行的?
- JobManager进程什么时候、在哪、如何被拉起来执行?启动后做了什么?
- TaskManager进程什么时候、在哪、如何被拉起来执行?
- 用户程序的Task什么时候、如何被分发到各个TaskManager进程中执行?
阅读说明: 源码版本:Flink release-1.14.4 先回顾一下YARN架构与YARN job 8步工作流程 Flink on yarn其实就是按照YARN job 8步工作流程走 以上述4个问题为导向,看Flink具体是如何实现的,8步中1、3、4、5、7、8在Flink代码哪里找到(2和6是YARN执行)
YARN架构
YARN集群介绍
YARN集群用来做资源的管理与用户应用程序的调度。
用户的应用程序是一个分布式程序,需要按照YARN的规范来写才能提交到YARN集群被调度运行起来。
ResourceManager(RM)
- YARN集群中的Master。
- 资源调度:根据容量、队列等限制条件将系统中的资源打包为Container对象分配给应用程序(AM)。
- 应用程序管理:通知NM启动AM;监控AM运行状态并在失败时重启它。
NodeManager(NM)
- YARN集群中的Slave。
- 是应用程序运行的节点。
- 接收来在ResourceManager的请求,划定一个Container描述的资源限制来启动用户应用程序的ApplicationMaster进程。
- 接收来在用户应用程序ApplicationMaster的Container启动、停止请求。
- 定时向ResourceManager汇报本节点上的资源使用情况和各个Container运行状态。
ApplicationMaster(AM)
当用户向YARN提交一个应用程序时,需要提供一个用以跟踪和管理这个程序的Master,它负责向ResourceManager申请Container资源,并通知NodeManager启动Container来执行具体的任务,监控任务的状态以便在失败时重启Container或者任务完成时回收Container,这个就是ApplicationMaster。
Container
- 是资源的抽象。
- Container对象包含资源的限制信息:Vcores、Memory,也包含资源的位置信息:需要运行在哪个NodeManager节点上。
由NodeManager进程负责启动。
YARN job工作流程
- Client向ResourceManager提交应用程序(包含启动ApplicationMaster的命令)。
- ResourceManager为应用分配第一个Container并与对应的NodeManager通信要求它启动ApplicationMaster。
- ApplicationMaster向ResourceManager注册并与ResourceManager保持心跳。
- ApplicationMaster为任务的运行向ResourceManager申请若干Container资源。
- ApplicationMaster领取ResourceManager分配的Container并初始化相关运行信息,便与对应的NodeManager通信要求它启动Container。
- NodeManager为Container设置好运行环境(下载运行资源、设置环境变量、资源限制等),将启动命令写到脚本文件中,运行脚本启动Container。
- Container运行期间向ApplicationMaster汇报自己的状态和任务进度。
- 应用程序运行完成后,ApplicationMaster向ResourceManager申请注销自己,释放相关Container资源。
用户程序什么时候、在哪、谁调用执行的?
入口示例程序
是一个Stream job
代码语言:shell复制./bin/flink run -t yarn-per-job --detached ./examples/streaming/TopSpeedWindowing.jar
流程图
步骤说明
1. shell命令交由入口类CliFrontend执行。
2. CliFrontend加载配置和命令行参数,生成Configuration和PackagedProgram对象。
1) 找配置文件目录,优先级:system env(FLINK_CONF_DIR) > ../conf > conf。
2) 从配置文件目录中加载配置文件并解析命令行参数args融合生成Configuration,由Configuration构建出PackagedProgram(包含URL jarFile、Class mainClass、String[] args、List<URL> classpaths、URLClassLoader userCodeClassLoader、savepointSettings等信息)。
3) 通过ClientUtils设置用户程序的执行环境ContextEnvironment和StreamContextEnvironment,为两个Environment设置了PipelineExecutorServiceLoader(用于找到PipelineExecutorFactory)、Configuration和ClassLoader(用户程序PackagedProgram指定的URLClassLoader)等信息。
4) 设置当前线程ClassLoader为上述3中指定的用户程序ClassLoader,调用PackagedProgram执行用户程序,执行完用户程序后重置当前线程ClassLoader。
3. PackagedProgram通过反射的方式调用用户程序入口类的main方法执行用户程序。
4. 用户程序执行,完成StreamGraph的构建。
使用纯SQL API,转换过程SQL -> calcite(SqlNode -> RelNode) -> Operation -> Transformation -> Pipeline
使用Table API,转换过程Operation -> Transformation -> Pipeline
使用DataStream API,转换过程Transformation -> Pipeline
注:流模式Pipeline的实现类是StreamGraph。
5. 找到匹配的PipelineExecutor去执行Pipeline。
PipelineExecutor的实现有多种:
LocalExecutor:本地模式 RemoteExecutor:Standalone模式 YarnJobClusterExecutor:YARN per job模式 YarnSessionClusterExecutor:YARN session job模式 KubernetesSessionClusterExecutor:K8S session job模式 EmbeddedExecutor:Application模式用
这里采用的是YarnJobClusterExecutor,如何找?
- StreamExecutionEnvironment通过PipelineExecutorServiceLoader找到PipelineExecutorFactory。PipelineExecutorServiceLoader先以SPI的方式加载PipelineExecutorFactory,再过滤出 与Configuration配置兼容的Factory。
- PipelineExecutorFactory负责创建对应的PipelineExecutor,由PipelineExecutor去执行Pipeline。
6. 执行Pipeline:先构建JobGraph,再找到匹配的ClusterDescriptor来部署flink集群以执行JobGraph。
- StreamGraph -> JobGraph
- 由ClientFactory工厂类会创建对应的ClusterDescriptor,从Configuration中整理出ClusterSpecification(集群描述信息,包含JM和TM的内存大小以及slots个数)。
- 通过ClusterDescriptor部署集群:clusterDescriptor.deployJobCluster(ClusterSpecification, JobGraph, detached)。
注: StreamGraph到JobGraph主要变化 node: ( List<StreamEdge> -> StreamNode -> List<StreamEdge> ) --> node --> node ... 到 node: ( List<JobEdge> -> JobVertex -> List<IntermediateDataSet> ) --> node --> node ... 的转换,另外完成Chaining,即多个StreamNode会合并为一个JobVertex
7. 通过ClusterDescriptor部署flink集群来执行JobGraph。
- 检查配置;
- 上传资源到HDFS;
- 提交yarn job让YARN启动AM;
- 循环等待提交结果ApplicationReport;
- 回设rpc、rest、ha信息;
- 返回集群Client对象RestClusterClient。
注: HDFS目录由yarn.staging-directory参数指定 flink-dist, lib/, plugins/ 这些多个flink应用都用到的,预先上传到yarn.provided.lib.dirs参数指定的HDFS目录即可,NM会缓存避免频繁上传下载。
调用链
代码语言:java复制1. bin/flink run
2. CliFrontend.java
main(String[] args)
CliFrontend cli = new CliFrontend(configuration, customCommandLines)
retCode = SecurityUtils.getInstalledContext().runSecured(() -> cli.parseAndRun(args))
run(params)
executeProgram(Configuration, PackagedProgram)
ClientUtils.executeProgram
ClientUtils.java
executeProgram
program.invokeInteractiveModeForExecution
3. callMainMethod(entryClass, args)
entryClass.getMethod("main", String[].class).invoke(null, (Object) args)
4. 用户程序入口类
main(args)
5. StreamExecutionEnvironment.java
executeAsync(StreamGraph)
executorFactory.getExecutor(configuration).execute(streamGraph, configuration, userClassloader)
6. AbstractJobClusterExecutor.java
execute
{
JobGraph jobGraph = PipelineExecutorUtils.getJobGraph(pipeline, configuration)
clusterDescriptor = clusterClientFactory.createClusterDescriptor(configuration)
clusterSpecification = clusterClientFactory.getClusterSpecification(configuration)
clusterDescriptor.deployJobCluster(clusterSpecification, jobGraph, detached)
}
7. YarnClusterDescriptor.java
ClusterClientProvider<ApplicationId> deployJobCluster(ClusterSpecification, JobGraph, detached)
deployInternal
{
1) 检查用户权限(Kerberos证书)、vcores、hadoop环境变量、yarn queue等
2) ApplicationReport report = startAppMaster(...)
{
1. 上传资源到HDFS(yarnShipFiles、yarn.ship-archives、pipeline.jars、配置文件等)
2. 将JobGraph序列化到文件中,并上传到HDFS(per job模式才有)
3. 设置集群HA信息
4. 设置ApplicationSubmissionContext
setApplicationName
setApplicationType
setAMContainerSpec // Application Master Container
setCommands // 通过class参数指定了yarn AM container启动类为YarnJobClusterEntrypoint
setTokens
setLocalResources(job.graph、flink-conf.yaml、yarn-site.xml、krb5.conf、keytab、kerberos)
setEnvironment
flink-conf.yaml中用户配置的以containerized.master.env.为前缀的变量
_FLINK_CLASSPATH // Flink app class path
_FLINK_DIST_JAR // local path of flink dist jar
classpath from YARN configuration
...
setResource(masterMemoryMB、yarn.appmaster.vcores)
setPriority
setQueue // yarn queue
setApplicationNodeLabel
setApplicationTags
5. 提交yarn application: yarnClient.submitApplication(appContext)
// RUNNING或FINISHED状态时,正常退出循环;FAILED或KILLED时抛异常退出;状态变化时打印日志,运行超60秒打印日志
6. loop循环等待提交结果: ApplicationReport report = yarnClient.getApplicationReport(appId)
}
3) 从ApplicationReport中获取rpc和rest的地址和端口信息以及ApplicationId信息,设置回Configuration;设置ClusterId=ApplicationId信息到HA中。
4) 返回RestClusterClient(Configuration, ApplicationId)
}
总结
- YARN per job模式下用户程序在Client端被执行,Client端即执行flink shell命令的执行节点。
- Client端主要工作就是将用户写的代码转换为JobGraph,向YARN提交应用以执行JobGraph。
- User Code(SQL API、Table API、DataStream API)-> StreamGraph
- PipelineExecutor(YarnJobClusterExecutor)将StreamGraph转换为JobGraph
- ClusterDescriptor(YarnClusterDescriptor)通过YARN部署flink集群以执行JobGraph
JobManager进程什么时候、在哪、如何被拉起来执行?启动后做了什么?
入口
JobManager进程就是YARN job中的ApplicationMaster(AM)。
YARN NodeManager接收到YARN RM发送的AM container启动请求后为其设置好运行环境(环境变量、JAR包、配置文件、Cgroup资源限制等),将启动命令写到脚本文件中,运行脚本启动Container(JobManager进程)。
后续操作如下图所示:
流程图
步骤说明
1. 启动RpcService,内部创建了ActorSystem。
Flink集群内RPC通信是封装了Akka Actor来实现。
ActorSystem会创建一个Supervisor Actor,用来创建并启动其他的Actor,比如ResourceManager、Dispatcher、JobMaster。
2. 创建并启动WebMonitorEndpoint
这是一个借助netty实现的Rest服务,用来响应web请求。
ApplicationMaster启动后向YARN RM注册,注册的appTrackingUrl就是这个Web服务的地址,这样就可从YARN资源管理页面跳转到Flink Web UI页面。
3. 创建并启动ResourceManager。
- 这里指的是Flink的ResourceManager,要与上述中YARN ResourceManager区别开。Flink的RM通过YarnResourceManagerDriver对象内的AMRMClientAsync沟通YARN RM,通过NMClientAsync沟通YARN NM。
- 内部创建SlotManager,用来管理Slot资源。
- ResourceManager是一个RPC服务,可以接收RPC请求。内部是通过步骤1中创建的RpcService来启动ResourceManager RPC服务,实际上是由RpcService中的Supervisor Actor创建的一个ResourceManager Actor来处理RPC请求。
4. 创建并启动Dispatcher。
Dispatcher是一个RPC服务,可以接收RPC请求。RPC服务的创建与服务过程与ResourceManager一样,不再多述。
5. Dispatcher创建并启动JobMaster。
- Dispatcher的onStart方法被调用,方法内部会启动recovered jobs(JobGraph),per job模式下,recovered job不为空,是借助FileJobGraphRetriever类从job.graph文件中读取而来。
- JobMaster内部有SchedulerNG和SlotPoolService对象。
- JobMaster创建完成后,onStart方法被调用,会触发SchedulerNG的调度,SchedulerNG向Flink ResourceManager申请slot资源,Flink RM收到请求向YARN RM申请启动container运行TaskManager进程。TaskManager进程启动后向Flink RM注册slot资源,JobMaster中的SchedulerNG就能从Flink RM申请获取到slot资源,然后向TaskManager提交运行Task。
调用链
代码语言:java复制YarnJobClusterEntrypoint.java
main(String[] args)
ClusterEntrypoint.runClusterEntrypoint(yarnJobClusterEntrypoint)
clusterEntrypoint.startCluster()
runCluster(Configuration, PluginManager)
initializeServices
{
1. commonRpcService = AkkaRpcServiceUtils.createRemoteRpcService
JMXService
IO线程池
HA Service
BlobServer
HeartbeatServices
MetricRegistry
MetricQuery RpcService
ProcessMetricGroup
ExecutionGraphInfoStore
}
clusterComponent = dispatcherResourceManagerComponentFactory.create
DefaultDispatcherResourceManagerComponentFactory.java
create(...)
{
2. webMonitorEndpoint = restEndpointFactory.createRestEndpoint
webMonitorEndpoint.start()
3. resourceManagerService = ResourceManagerServiceImpl.create
4. dispatcherRunner = dispatcherRunnerFactory.createDispatcherRunner
{
...
Dispatcher dispatcher = dispatcherFactory.createDispatcher
dispatcher.start()
{
...
5. startRecoveredJobs()
{
for (JobGraph recoveredJob : recoveredJobs){
...
JobMaster jobMaster = new JobMaster(...)
jobMaster.start()
}
}
}
}
resourceManagerService.start()
{
...
YarnResourceManagerDriver.java
initializeInternal()
{
...
// 创建并启动AMRMClientAsync,联系YARN RM
resourceManagerClient = yarnResourceManagerClientFactory.createResourceManagerClient
resourceManagerClient.init(yarnConfig)
resourceManagerClient.start()
// AM启动后向YARN RM注册自己,这样可以通过YARN跳转到Flink web ui页面
resourceManagerClient.registerApplicationMaster(rpcAddress, restPort, webInterfaceUrl)
...
// 创建并启动NMClientAsync,用于联系YARN NM
nodeManagerClient = yarnNodeManagerClientFactory.createNodeManagerClient
nodeManagerClient.init(yarnConfig)
nodeManagerClient.start()
}
YarnContainerEventHandler.onContainersAllocated(List<Container> containers)
{
...
// 运行任务所需的Container都申请并领取完毕后,AM维护与YARN RM心跳
// resourceManagerClient = AMRMClientAsync
if (getNumRequestedNotAllocatedWorkers() <= 0) {
resourceManagerClient.setHeartbeatInterval(yarnHeartbeatIntervalMillis);
}
}
}
}
总结
- Flink JobManager进程就是YARN job中的AM。
- Flink Client通过YarnClusterDescriptor中的YarnClient向YARN RM提交应用,YARN RM通过调度策略为其分配AM container资源,并通知container指定的YARN NM启动container。
- YARN NM接收到YARN RM发送的AM container启动请求后为其设置好运行环境(环境变量、JAR包、配置文件等),将启动命令写到脚本文件中,运行脚本启动Container(JobManager进程)。
- JobManager进程启动后运行WebMonitorEndpoint,并向YARN RM注册自己,注册的appTrackingUrl就是WebMonitorEndpoint中netty服务占用的地址和端口,即Flink web ui。
- 启动ResourceManager、Dispatcher等服务,并通过Dispatcher启动JobMaster来执行JobGraph。
TaskManager进程什么时候、在哪、如何被拉起来执行?
流程图
步骤说明
1. JobMaster通过SchedulerNG执行JobGraph的调度。
调度分两步走:
1) 获取资源(Task运行的容器,即Flink中的Slot,Slot需要从Container中划分)
2) 调度任务(Task)
注:这里仅说第1步,即TaskManager进程如何被调度起来为Task运行提供slots资源支持。第2步在下个问题描述。
2. SlotPoolService负责slots资源申请,先从缓存中(内存)检查是否有可用的slots资源,有的话直接分配,没的话会向Flink RM发送RPC请求。
AllocatedSlotPool中缓存有已经注册的slots资源:Map<AllocationID, AllocatedSlot> registeredSlots。
AllocatedSlot属性:AllocationID、TaskManagerLocation、TaskManagerGateway、ResourceProfile、physicalSlotNumber。
3. Flink RM接收到JobMaster发送的RPC资源请求,会将处理交SlotManager,SlotManager又通过ResourceManagerDriver来做具体的资源申请。
YarnResourceManagerDriver内部有YARN RM Client和YARN NM Client。
1) 通过YARN RM Client请求YARN RM分配containers。
2) 通过YARN NM Client向YARN NM发送请求,创建container运行TaskManager进程(指定了TaskManager进程入口类为YarnTaskExecutorRunner)。
调用链
代码语言:java复制JobMaster.java
new JobMaster
{
...
// jobmanager.scheduler 默认值为Ng,因此创建的SchedulerNG为DefaultScheduler
this.schedulerNG = createScheduler(...)
{
// JobGraph -> ExecutionGraph
this.executionGraph = createAndRestoreExecutionGraph(...)
}
}
jobMaster.start
...
onStart
startJobExecution
startJobMasterServices
startScheduling
1. schedulerNG.startScheduling
DefaultScheduler.java
// 方法体在父类SchedulerBase.java中
startScheduling
startSchedulingInternal
schedulingStrategy.startScheduling
PipelinedRegionSchedulingStrategy.java
startScheduling
maybeScheduleRegions(Set<SchedulingPipelinedRegion> regions)
// 遍历regions,按region调度
maybeScheduleRegion
schedulerOperations.allocateSlotsAndDeploy(List<ExecutionVertexDeploymentOption>)
DefaultScheduler.java
allocateSlotsAndDeploy
allocateSlots
executionSlotAllocator.allocateSlotsFor(List<ExecutionVertexID> executionVertexIds)
SlotSharingExecutionSlotAllocator.java
allocateSlotsFor
getOrAllocateSharedSlot
slotProvider.allocatePhysicalSlot(PhysicalSlotRequest)
PhysicalSlotProviderImpl.java
allocatePhysicalSlot
// 先看有可用的slot没,有的话直接分配
tryAllocateFromAvailable
// 没的话请求Flink RM获取
orElseGet(requestNewSlot(...))
2. slotPool.requestNewAllocatedSlot(SlotRequestId, ResourceProfile, timeout)
DeclarativeSlotPoolBridge.java
requestNewAllocatedSlot
internalRequestNewSlot
internalRequestNewAllocatedSlot
getDeclarativeSlotPool().increaseResourceRequirementsBy
DefaultDeclarativeSlotPool.java
increaseResourceRequirementsBy
declareResourceRequirements
notifyNewResourceRequirements.accept(Collection<ResourceRequirement>)
DeclarativeSlotPoolService.java
declareResourceRequirements(Collection<ResourceRequirement>)
resourceRequirementServiceConnectionManager.declareResourceRequirements
DefaultDeclareResourceRequirementServiceConnectionManager.java
declareResourceRequirements(ResourceRequirements)
triggerResourceRequirementsSubmission
sendResourceRequirements
service.declareResourceRequirements
DeclarativeSlotPoolService.java
// 向Flink RM发送RPC请求,获取slots资源
resourceManagerGateway.declareRequiredResources(JobMasterId, ResourceRequirements, Time timeout)
// Flink ResourceManager服务端处理资源请求
ResourceManager.java
declareRequiredResources
3. slotManager.processResourceRequirements
DeclarativeSlotManager.java
processResourceRequirements
resourceTracker.notifyResourceRequirements
checkResourceRequirements()
tryFulfillRequirementsWithPendingSlots(JobID jobId, Collection<Map.Entry<ResourceProfile, Integer>> missingResources, ResourceCounter pendingSlots)
// 遍历missingResource
tryAllocateWorkerAndReserveSlot(ResourceProfile profile, ResourceCounter pendingSlots)
taskExecutorManager.allocateWorker(profile)
TaskExecutorManager.java
allocateWorker
resourceActions.allocateResource(WorkerResourceSpec)
ResourceManager.java
ResourceActionsImpl.allocateResource
startNewWorker
ActiveResourceManager.java
startNewWorker
requestNewWorker
resourceManagerDriver.requestResource(TaskExecutorProcessSpec)
YarnResourceManagerDriver.java
requestResource
// 请求获取container资源
3.1 resourceManagerClient.addContainerRequest(getContainerRequest(resource, priority))
YarnResourceManagerDriver.java
// container里有NM的地址信息
YarnContainerEventHandler.onContainersAllocated(List<Container> containers)
onContainersOfPriorityAllocated
// 遍历containers
startTaskExecutorInContainerAsync
// 创建ContainerLaunchContext请求对象
context = createTaskExecutorLaunchContext(ResourceID containerId, String host, TaskExecutorProcessSpec taskExecutorProcessSpec)
// 通过YARN NM Client发送请求,启动container运行TaskManager进程
3.2 nodeManagerClient.startContainerAsync(container, context)
总结
- YARN per job模式下,TaskManager进程不是根据配置事先就启动好的,而是需要有JobGraph的驱动。
- JobGraph被转为ExecutionGraph,后被进一步分解为一个个Task(可运行的Runnable对象),Task是需要在划定的slot资源里执行的,slot由TaskManager进程提供。
- JobMaster通过SlotPoolService向Flink RM申请获取资源,Flink RM通过SlotManager管理slot的申请与释放,SlotManager又通过ResourceManagerDriver来做具体的资源申请。YARN per job模式中是YarnResourceManagerDriver实现类,driver先向YARN RM申请分配container资源,然后driver联系container指定的YARN NM启动container,即运行TaskManager进程。
用户程序的Task什么时候、如何被分发到各个TaskManager进程中执行?
流程图
说明
- JobMaster中的SchedulerNG拿到slots资源后,开始进行Task的调度。
- Execution是可调度的最小单位,内有LogicalSlot,即这个Execution要被调度到哪个Slot中,通过LogicalSlot可获取其对应的TaskManager RPC客户端代理对象。
- 这样一个个Execution被转化为对应的TaskDeploymentDescriptor对象,通过RPC协议提交给对应的TaskManager执行。
- TaskManager接收到submitTask请求后将TaskDeploymentDescriptor转化为Task对象,将其放到对应的TaskSlot中,启动Thread执行Task。
提示:Task运行过程中,接收上游发过来的数据,处理完发往下游,由下游Task继续处理,这期间数据的存取由TaskSlot中的MemoryManager控制,相较于Java的堆来说能有效控制内存使用限额,缩减数据占用内存的大小,及时回收内存,这就是Flink的内存管理。
调用链
代码语言:java复制DefaultScheduler.java
allocateSlotsAndDeploy
// 请求获取slots资源
allocateSlots
// 部署Task
waitForAllSlotsAndDeploy
deployAll(List<DeploymentHandle> deploymentHandles)
// 遍历deploymentHandles
deployOrHandleError(DeploymentHandle)
deployTaskSafe
executionVertexOperations.deploy(ExecutionVertex)
DefaultExecutionVertexOperations.java
deploy(ExecutionVertex executionVertex)
executionVertex.deploy()
ExecutionVertex.java
deploy()
currentExecution.deploy()
Execution.java
deploy()
TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway()
taskManagerGateway.submitTask(TaskDeploymentDescriptor, rpcTimeout)
RpcTaskManagerGateway.java
submitTask
taskExecutorGateway.submitTask(TaskDeploymentDescriptor, jobMasterId, timeout)
// RPC服务端响应
TaskExecutor.java
submitTask(TaskDeploymentDescriptor tdd, JobMasterId jobMasterId, Time timeout)
{
...
// Task implements Runnable
Task task = new Task(...)
taskSlotTable.addTask(task)
task.startTaskThread()
// executingThread = new Thread(TASK_THREADS_GROUP, this, taskNameWithSubtask)
executingThread.start()
}
Task.java
run()
doRun()
{
...
TaskInvokable invokable = loadAndInstantiateInvokable(...)
{
...
invokableClass = Class.forName(className, true, classLoader).asSubclass(TaskInvokable.class)
invokableClass.getConstructor(Environment.class).newInstance(environment)
}
restoreAndInvoke(invokable)
...
}
附录
ResourceService创建与启动
ResourceManager创建与启动过程涉及Leader选举、代理类、Akka Actor,代码跳转比较绕,故这里把完整调用链描述一下,有兴趣可阅读。
整个过程总结下来就是
- 使用ResourceManagerService封装ResourceManager,ResourceManagerService启动后先做leader选举,成为leader后再创建并启动ResourceManager。
- 创建ResourceManager对象时,在其内部创建并启动了Akka Actor来做RPC服务。
- ResourceManager对象创建完毕调用start做初始化工作,启动相关服务。
start() -> onStart()
1) ResourceManager将start的处理交由代理对象RpcServer(AkkaInvocationHandler实例)的start方法处理
2) RpcServer invoke方法被调用,发现是非RPC消息,就调用自身start方法
3) start通过ResourceManager Actor的引用ActorRef向ResourceManager Actor发送start类型控制消息
4) Actor(AkkaRpcActor)收到消息,将处理交由RpcEndpoint,即ResourceManager处理
onStart() 方法内部启动相关服务。
调用链用如下
代码语言:java复制// 1) 启动leader选举服务,选举leader
// start(LeaderContender) -> leaderContender.grantLeadership
{
// implements ResourceManagerService, LeaderContender
ResourceManagerServiceImpl.java
start()
leaderElectionService.start(this)
// HA leader选举
DefaultLeaderElectionService.java
start(LeaderContender)
leaderElectionDriver = leaderElectionDriverFactory.createLeaderElectionDriver
ZooKeeperLeaderElectionDriverFactory.java
createLeaderElectionDriver
new ZooKeeperLeaderElectionDriver
{
// latchPath节点用于leader选举
leaderLatch = new LeaderLatch(client, checkNotNull(latchPath))
leaderLatch.addListener(this)
leaderLatch.start()
// leaderPath节点用于存储leader信息,监听该节点数据的变化
cache = new NodeCache(client, leaderPath)
cache.getListenable().addListener(this)
cache.start()
}
ZooKeeperLeaderElectionDriver.java
// 成为leader时,latchPath上挂的监听会被回调,isLeader方法被执行
isLeader()
leaderElectionEventHandler.onGrantLeadership
DefaultLeaderElectionService.java
onGrantLeadership
leaderContender.grantLeadership
}
// 2) 成为leader后,创建ResourceManager并启动RPC服务
{
ResourceManagerServiceImpl.java
grantLeadership
startNewLeaderResourceManager
// resourceManagerFactory = ActiveResourceManagerFactory
this.leaderResourceManager = resourceManagerFactory.createResourceManager
{
new ActiveResourceManager
{
...
// 启动RPC服务
this.rpcServer = rpcService.startServer(this)
}
}
startResourceManagerIfIsLeader
resourceManager.start()
}
// 3) start() -> onStart()
{
// ResourceManager extends RpcEndpoint
RpcEndpoint.java
start()
rpcServer.start()
// AkkaInvocationHandler implements InvocationHandler, AkkaBasedEndpoint, RpcServer
AkkaInvocationHandler.java(客户端)
// rpcServer是JDK Proxy生成的代理对象,实现了InvocationHandler接口的invoke方法,故start方法交由代理对象的invoke执行
invoke(Object proxy, Method method, Object[] args)
// if(非rpc方法) 调用对象自身相应的method处理
result = method.invoke(this, args)
start()
// rpcEndpoint是Actor的引用:ActorRef,可以用来向Actor发消息,这里是向自身ResourceManager发送控制类消息
rpcEndpoint.tell(ControlMessages.START, ActorRef.noSender())
// AkkaRpcActor extends AbstractActor,是一个Actor
AkkaRpcActor.java(服务端)
handleControlMessage(ControlMessages)
// 初始状态为STOPPED,即state = AkkaRpcActor.StoppedState
state = state.start(this)
akkaRpcActor.rpcEndpoint.internalCallOnStart()
RpcEndpoint.java
internalCallOnStart()
onStart()
}
// 4) 启动ResourceManager下相关服务:心跳管理服务、SlotManager等
{
ResourceManager.java
onStart()
startResourceManagerServices()
...
startHeartbeatServices()
slotManager.start
}
Dispatcher创建与启动
代码语言:java复制// 入口 dispatcherRunner = dispatcherRunnerFactory.createDispatcherRunner
// 1) 创建DispatcherRunner,启动HA leader选举
{
DefaultDispatcherRunnerFactory.java
createDispatcherRunner
DefaultDispatcherRunner.create
DispatcherRunnerLeaderElectionLifecycleManager.createFor
new DispatcherRunnerLeaderElectionLifecycleManager
// 启动leader选举服务,
// 后续 start(LeaderContender) -> leaderContender.grantLeadership过程同上述ResourceManager调用链一致,不再多述
leaderElectionService.start(dispatcherRunner)
}
// 2) 成为leader后,启动Dispatcher(内部启动了RPC服务)
{
DefaultDispatcherRunner.java
grantLeadership
startNewDispatcherLeaderProcess
dispatcherLeaderProcess = createNewDispatcherLeaderProcess
dispatcherLeaderProcess.start
AbstractDispatcherLeaderProcess.java
start
startInternal
onStart
JobDispatcherLeaderProcess.java
onStart
dispatcherGatewayServiceFactory.create
DefaultDispatcherGatewayServiceFactory.java
create(DispatcherId fencingToken, Collection<JobGraph> recoveredJobs, JobGraphWriter jobGraphWriter)
{
Dispatcher dispatcher = dispatcherFactory.createDispatcher
// JobDispatcherFactory.java
createDispatcher
// MiniDispatcher extends Dispatcher
new MiniDispatcher
{
// 启动RPC服务,内部会创建Actor,返回代理对象AkkaInvocationHandler
this.rpcServer = rpcService.startServer(this)
}
dispatcher.start
}
}
// 3) 启动后做什么?startRecoveredJobs(start JobMasters)
{
Dispatcher.java
start()
.. // start() -> onStart() 过程,同上述ResourceManager调用链一致,不再多述
onStart()
startDispatcherServices
startRecoveredJobs
// 遍历recoveredJobs
runRecoveredJob(recoveredJob)
runJob
createJobManagerRunner
JobManagerRunner runner = jobManagerRunnerFactory.createJobManagerRunner
runner.start()
// implements LeaderContender
JobMasterServiceLeadershipRunner.java
start
leaderElectionService.start(this)
... // 后续 start(LeaderContender) -> leaderContender.grantLeadership过程同上述ResourceManager调用链一致,不再多述
grantLeadership
startJobMasterServiceProcessAsync
verifyJobSchedulingStatusAndCreateJobMasterServiceProcess
createNewJobMasterServiceProcess
jobMasterServiceProcess = jobMasterServiceProcessFactory.create
DefaultJobMasterServiceProcessFactory.java
create
new DefaultJobMasterServiceProcess
this.jobMasterServiceFuture = jobMasterServiceFactory.createJobMasterService
DefaultJobMasterServiceFactory.java
createJobMasterService
internalCreateJobMasterService
JobMaster jobMaster = new JobMaster(...)
jobMaster.start()
}
参考
- Flink on YARN(上):一张图轻松掌握基础架构与启动流程
- Apache Flink 进阶(四):Flink on Yarn/K8s 原理剖析及实践