上几篇文章中我们分析了一个flink wordcount任务生成streamGraph和jobGraph的过程。接下来,我们继续从jobGraph生成后开始来分析executionGraph的生成过程及任务的提交过程,本文主要分析任务提交过程中各组件的执行逻辑,如TaskManager、ResourceManager、JobManager等。本文只涉及到本地运行wordcount时各组件的内部运行逻辑分析,不包括其他资源管理模式如yarn或Kubernetes模式下任务的提交流程(后续会专门行文来分析)。文章较长,代码较多,不喜慎入。
接前几篇文章,分析到了org.apache.flink.client.deployment.executors.LocalExecutor#execute方法:
代码语言:javascript复制@Override
public CompletableFuture<JobClient> execute(Pipeline pipeline, Configuration configuration, ClassLoader userCodeClassloader) throws Exception {
checkNotNull(pipeline);
checkNotNull(configuration);
// 有效的配置
Configuration effectiveConfig = new Configuration();
effectiveConfig.addAll(this.configuration);
effectiveConfig.addAll(configuration);
// we only support attached execution with the local executor.
checkState(configuration.getBoolean(DeploymentOptions.ATTACHED));
// 获取了jobGraph
final JobGraph jobGraph = getJobGraph(pipeline, effectiveConfig);
return PerJobMiniClusterFactory.createWithFactory(effectiveConfig, miniClusterFactory).submitJob(jobGraph, userCodeClassloader);
}
在上面的方法中一方面对配置进行了有效性校验,另一方面生成了jobGraph,然后提交jobGraph。我们来看org.apache.flink.client.program.PerJobMiniClusterFactory#submitJob方法:
代码语言:javascript复制 public CompletableFuture<JobClient> submitJob(JobGraph jobGraph, ClassLoader userCodeClassloader) throws Exception {
MiniClusterConfiguration miniClusterConfig = getMiniClusterConfig(jobGraph.getMaximumParallelism());
MiniCluster miniCluster = miniClusterFactory.apply(miniClusterConfig);
miniCluster.start();
return miniCluster
.submitJob(jobGraph)
.thenApplyAsync(FunctionUtils.uncheckedFunction(submissionResult -> {
org.apache.flink.client.ClientUtils.waitUntilJobInitializationFinished(
() -> miniCluster.getJobStatus(submissionResult.getJobID()).get(),
() -> miniCluster.requestJobResult(submissionResult.getJobID()).get(),
userCodeClassloader);
return submissionResult;
}))
.thenApply(result -> new MiniClusterJobClient(
result.getJobID(),
miniCluster,
userCodeClassloader,
MiniClusterJobClient.JobFinalizationBehavior.SHUTDOWN_CLUSTER))
.whenComplete((ignored, throwable) -> {
if (throwable != null) {
// We failed to create the JobClient and must shutdown to ensure cleanup.
shutDownCluster(miniCluster);
}
})
.thenApply(Function.identity());
}
这个方法主要执行内容有以下几步:
1.启动miniCluster;2.向mini cluster提交jobGraph;3.jobGraph提交成功后返回submissionResult;4.将结果包装成MiniClusterJobClient并返回;5.如果有异常产生则关闭mini cluster;6.如果5没有异常产生则反之返回4产生的结果。
从上面我们也能看出CompletableFuture流式编程的魅力,在这里我们主要关注下taskmanager、jobmanager、resourcemanager这些核心组件的执行动作。在MiniCluster#start方法内部会处理taskmanager和resourcemanager的逻辑,而jobmanager的逻辑主要在提交任务的流程里。
taskManager
MiniCluster#start方法内部会调用startTaskManagers方法,关于这个方法之前有写过一篇文章进行分析,可以看下之前写过的一篇文章flink taskmanager启动篇。
这里主要来挼一下整体的流程:
代码语言:javascript复制startTaskManagers() => startTaskExecutor() => taskExecutor.start() => TaskExecutor#onStart =>
startTaskExecutorServices() => resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener()) => StandaloneLeaderRetrievalService#start => listener.notifyLeaderAddress(leaderAddress, leaderId) =>
在resourceManager选主成功后会进行回调通知,紧接着会进入到org.apache.flink.runtime.taskexecutor.TaskExecutor.ResourceManagerLeaderListener#notifyLeaderAddress方法中,我们来看下具体的代码逻辑:
代码语言:javascript复制 @Override
public void notifyLeaderAddress(final String leaderAddress, final UUID leaderSessionID) {
// 异步执行,rpc触发
runAsync(
() -> notifyOfNewResourceManagerLeader(
leaderAddress,
ResourceManagerId.fromUuidOrNull(leaderSessionID)));
}
继续来看org.apache.flink.runtime.taskexecutor.TaskExecutor#notifyOfNewResourceManagerLeader方法:
代码语言:javascript复制 private void notifyOfNewResourceManagerLeader(String newLeaderAddress, ResourceManagerId newResourceManagerId) {
resourceManagerAddress = createResourceManagerAddress(newLeaderAddress, newResourceManagerId);
// 重新连接resourceManager
reconnectToResourceManager(new FlinkException(String.format("ResourceManager leader changed to new address %s", resourceManagerAddress)));
}
在这里会去建立和resourceManager的连接,具体代码如下:
代码语言:javascript复制private void reconnectToResourceManager(Exception cause) {
// 关闭之前的resourceManager的连接
closeResourceManagerConnection(cause);
startRegistrationTimeout();
tryConnectToResourceManager();
}
private void tryConnectToResourceManager() {
if (resourceManagerAddress != null) {
connectToResourceManager();
}
}
private void connectToResourceManager() {
assert(resourceManagerAddress != null);
assert(establishedResourceManagerConnection == null);
assert(resourceManagerConnection == null);
log.info("Connecting to ResourceManager {}.", resourceManagerAddress);
final TaskExecutorRegistration taskExecutorRegistration = new TaskExecutorRegistration(
getAddress(),
getResourceID(),
unresolvedTaskManagerLocation.getDataPort(),
JMXService.getPort().orElse(-1),
hardwareDescription,
memoryConfiguration,
taskManagerConfiguration.getDefaultSlotResourceProfile(),
taskManagerConfiguration.getTotalResourceProfile()
);
resourceManagerConnection =
new TaskExecutorToResourceManagerConnection(
log,
getRpcService(),
taskManagerConfiguration.getRetryingRegistrationConfiguration(),
resourceManagerAddress.getAddress(),
resourceManagerAddress.getResourceManagerId(),
getMainThreadExecutor(),
new ResourceManagerRegistrationListener(),
taskExecutorRegistration);
resourceManagerConnection.start();
}
在这里会调用resourceManagerConnection.start方法,这个方法其实是内有乾坤的,我们来看一下:
这里我们主要关注下createNewRegistration方法:
这里会在和resourceManager连接成功后回调onRegistrationSuccess方法,它在org.apache.flink.runtime.taskexecutor.TaskExecutorToResourceManagerConnection中的实现为:
代码语言:javascript复制 @Override
protected void onRegistrationSuccess(TaskExecutorRegistrationSuccess success) {
log.info("Successful registration at resource manager {} under registration id {}.",
getTargetAddress(), success.getRegistrationId());
registrationListener.onRegistrationSuccess(this, success);
}
然后进入到TaskExecutor.ResourceManagerRegistrationListener#onRegistrationSuccess方法中:
接着进入到org.apache.flink.runtime.taskexecutor.TaskExecutor#establishResourceManagerConnection方法:
这里主要有两个工作:1. 向ResourceManager注册,上报slot信息;2. 设置与ResourceManager之间的心跳监测。
ResourceManager
在MiniCluster#start方法内部有一段代码片段:
代码语言:javascript复制setupDispatcherResourceManagerComponents(configuration, dispatcherResourceManagerComponentRpcServiceFactory, metricQueryServiceRetriever);
在org.apache.flink.runtime.minicluster.MiniCluster#setupDispatcherResourceManagerComponents方法内部有一段执行逻辑为:
代码语言:javascript复制 dispatcherResourceManagerComponents.addAll(createDispatcherResourceManagerComponents(
configuration,
dispatcherResourceManagreComponentRpcServiceFactory,
haServices,
blobServer,
heartbeatServices,
metricRegistry,
metricQueryServiceRetriever,
new ShutDownFatalErrorHandler()
));
而createDispatcherResourceManagerComponents方法的执行逻辑为:
在org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory#create方法中会根据resourceManagerFactory执行resourceManager的执行逻辑并启动resourceManager。
整体方法的执行逻辑为:
代码语言:javascript复制resourceManager#start => ResourceManager#onStart => ResourceManager#startResourceManagerServices => leaderElectionService.start(this) => EmbeddedLeaderElectionService#start =>
我们来看下org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedLeaderService.EmbeddedLeaderElectionService#start方法:
代码语言:javascript复制 @Override
public void start(LeaderContender contender) throws Exception {
checkNotNull(contender);
// 添加contender
addContender(this, contender);
}
private void addContender(EmbeddedLeaderElectionService service, LeaderContender contender) {
synchronized (lock) {
checkState(!shutdown, "leader election service is shut down");
checkState(!service.running, "leader election service is already started");
try {
// 如果已经存在抛出异常
if (!allLeaderContenders.add(service)) {
throw new IllegalStateException("leader election service was added to this service multiple times");
}
service.contender = contender;
service.running = true;
updateLeader().whenComplete((aVoid, throwable) -> {
if (throwable != null) {
fatalError(throwable);
}
});
}
catch (Throwable t) {
fatalError(t);
}
}
}
紧接着进入到org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedLeaderService#updateLeader方法:
在org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedLeaderService.GrantLeadershipCall#run方法中:
紧接着进入到org.apache.flink.runtime.resourcemanager.ResourceManager#grantLeadership方法:
这个方法是用来做确定了ResourceManager的主节点后的回调方法。
JobManager
我们来看org.apache.flink.runtime.minicluster.MiniCluster#submitJob方法:
代码语言:javascript复制public CompletableFuture<JobSubmissionResult> submitJob(JobGraph jobGraph) {
final CompletableFuture<DispatcherGateway> dispatcherGatewayFuture = getDispatcherGatewayFuture();
final CompletableFuture<InetSocketAddress> blobServerAddressFuture = createBlobServerAddress(dispatcherGatewayFuture);
final CompletableFuture<Void> jarUploadFuture = uploadAndSetJobFiles(blobServerAddressFuture, jobGraph);
final CompletableFuture<Acknowledge> acknowledgeCompletableFuture = jarUploadFuture
.thenCombine(
dispatcherGatewayFuture,
(Void ack, DispatcherGateway dispatcherGateway) -> dispatcherGateway.submitJob(jobGraph, rpcTimeout))
.thenCompose(Function.identity());
return acknowledgeCompletableFuture.thenApply(
(Acknowledge ignored) -> new JobSubmissionResult(jobGraph.getJobID()));
}
上面这个方法的整个执行步骤为:
1.通过dispatcherGatewayRetriever去获取DispatcherGateway,返回的是dispatcherGatewayFuture;2.通过dispatcherGateway获取blobServerAddress;3.利用blobServer和jobGraph去执行jar的上传;4.利用dispatcherGatewayFuture返回的dispatcherGateway执行jobGraph的提交。
接下来进入到org.apache.flink.runtime.dispatcher.Dispatcher#submitJob方法:
代码语言:javascript复制@Override
public CompletableFuture<Acknowledge> submitJob(JobGraph jobGraph, Time timeout) {
log.info("Received JobGraph submission {} ({}).", jobGraph.getJobID(), jobGraph.getName());
try {
if (isDuplicateJob(jobGraph.getJobID())) {
return FutureUtils.completedExceptionally(
new DuplicateJobSubmissionException(jobGraph.getJobID()));
} else if (isPartialResourceConfigured(jobGraph)) {
return FutureUtils.completedExceptionally(
new JobSubmissionException(
jobGraph.getJobID(),
"Currently jobs is not supported if parts of the vertices have "
"resources configured. The limitation will be removed in future versions."));
} else {
return internalSubmitJob(jobGraph);
}
} catch (FlinkException e) {
return FutureUtils.completedExceptionally(e);
}
}
•这里会根据jobId去判断一下这个job是不是已经被提交或执行;•紧接着进入到internalSubmitJob方法,执行真正的提交任务的逻辑。
我们来看org.apache.flink.runtime.dispatcher.Dispatcher#internalSubmitJob方法的代码:
代码语言:javascript复制private CompletableFuture<Acknowledge> internalSubmitJob(JobGraph jobGraph) {
log.info("Submitting job {} ({}).", jobGraph.getJobID(), jobGraph.getName());
final CompletableFuture<Acknowledge> persistAndRunFuture = waitForTerminatingJob(
jobGraph.getJobID(),
jobGraph,
this::persistAndRunJob)
.thenApply(ignored -> Acknowledge.get());
// 获取结果
return persistAndRunFuture.handleAsync((acknowledge, throwable) -> {
if (throwable != null) {
cleanUpJobData(jobGraph.getJobID(), true);
ClusterEntryPointExceptionUtils.tryEnrichClusterEntryPointError(throwable);
final Throwable strippedThrowable = ExceptionUtils.stripCompletionException(
throwable);
log.error("Failed to submit job {}.", jobGraph.getJobID(), strippedThrowable);
throw new CompletionException(
new JobSubmissionException(
jobGraph.getJobID(),
"Failed to submit job.",
strippedThrowable));
} else {
return acknowledge;
}
}, ioExecutor);
}
在这个方法里会调用persistAndRunJob进行job的提交。注意这步操作是异步处理的,执行器为fencedMainThreadExecutor。
我们继续进入到org.apache.flink.runtime.dispatcher.Dispatcher#persistAndRunJob方法中:
代码语言:javascript复制private void persistAndRunJob(JobGraph jobGraph) throws Exception {
jobGraphWriter.putJobGraph(jobGraph);
runJob(jobGraph, ExecutionType.SUBMISSION);
}
jobGraphWriter的主要作用是用于将jobGraph持久化在zk或者Kubernetes的leader节点。我们当前是在standalone模式下,不会进行这步处理。直接来看runJob方法:
代码语言:javascript复制private void runJob(JobGraph jobGraph, ExecutionType executionType) {
Preconditions.checkState(!runningJobs.containsKey(jobGraph.getJobID()));
long initializationTimestamp = System.currentTimeMillis();
// 创建JobManagerRunner
CompletableFuture<JobManagerRunner> jobManagerRunnerFuture = createJobManagerRunner(
jobGraph,
initializationTimestamp);
DispatcherJob dispatcherJob = DispatcherJob.createFor(
jobManagerRunnerFuture,
jobGraph.getJobID(),
jobGraph.getName(),
initializationTimestamp);
runningJobs.put(jobGraph.getJobID(), dispatcherJob);
---------------省略部分处理future返回结果的代码----------------
}
这里的核心逻辑在createJobManagerRunner方法中,dispatcherJob与下面的逻辑主要用于处理jobManagerRunnerFuture的结果。我们主要来看下createJobManagerRunner方法:
代码语言:javascript复制 CompletableFuture<JobManagerRunner> createJobManagerRunner(
JobGraph jobGraph,
long initializationTimestamp) {
final RpcService rpcService = getRpcService();
return CompletableFuture.supplyAsync(
() -> {
try {
// 创建jobManagerRunner
JobManagerRunner runner = jobManagerRunnerFactory.createJobManagerRunner(
jobGraph,
configuration,
rpcService,
highAvailabilityServices,
heartbeatServices,
jobManagerSharedServices,
new DefaultJobManagerJobMetricGroupFactory(jobManagerMetricGroup),
fatalErrorHandler,
initializationTimestamp);
// 启动runner
runner.start();
return runner;
} catch (Exception e) {
throw new CompletionException(new JobInitializationException(
jobGraph.getJobID(),
"Could not instantiate JobManager.",
e));
}
},
ioExecutor); // do not use main thread executor. Otherwise, Dispatcher is blocked on JobManager creation
}
我们来看下org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory#createJobManagerRunner方法内部的逻辑,这里就直接无脑贴代码了:
代码语言:javascript复制@Override
public JobManagerRunner createJobManagerRunner(
JobGraph jobGraph,
Configuration configuration,
RpcService rpcService,
HighAvailabilityServices highAvailabilityServices,
HeartbeatServices heartbeatServices,
JobManagerSharedServices jobManagerServices,
JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory,
FatalErrorHandler fatalErrorHandler,
long initializationTimestamp) throws Exception {
// 获取配置
final JobMasterConfiguration jobMasterConfiguration = JobMasterConfiguration.fromConfiguration(configuration);
// 获取slotPoolFactory,处理slot的管理逻辑
final SlotPoolFactory slotPoolFactory = SlotPoolFactory.fromConfiguration(configuration);
// 获取调度工厂
final SchedulerNGFactory schedulerNGFactory = SchedulerNGFactoryFactory.createSchedulerNGFactory(configuration);
// 创建处理shuffle操作的shuffleMaster
final ShuffleMaster<?> shuffleMaster = ShuffleServiceLoader.loadShuffleServiceFactory(configuration).createShuffleMaster(configuration);
// 创建jobMasterServiceFactory
final JobMasterServiceFactory jobMasterFactory = new DefaultJobMasterServiceFactory(
jobMasterConfiguration,
slotPoolFactory,
rpcService,
highAvailabilityServices,
jobManagerServices,
heartbeatServices,
jobManagerJobMetricGroupFactory,
fatalErrorHandler,
schedulerNGFactory,
shuffleMaster);
// 创建JobManagerRunnerImpl实例
return new JobManagerRunnerImpl(
jobGraph,
jobMasterFactory,
highAvailabilityServices, jobManagerServices.getLibraryCacheManager().registerClassLoaderLease(jobGraph.getJobID()),
jobManagerServices.getScheduledExecutorService(),
fatalErrorHandler,
initializationTimestamp);
}
这个方法里的操作主要有:
1.获取jobmaster的配置信息,构建JobMasterConfiguration对象;2.通过配置信息创建SlotPoolFactory;3.创建处理shuffle操作的shuffleMaster;4.创建jobMasterServiceFactory;5.创建JobManagerRunnerImpl实例。
接下来就进入到JobManagerRunnerImpl的构造方法了,继续无脑贴代码:
代码语言:javascript复制public JobManagerRunnerImpl(
final JobGraph jobGraph,
final JobMasterServiceFactory jobMasterFactory,
final HighAvailabilityServices haServices,
final LibraryCacheManager.ClassLoaderLease classLoaderLease,
final Executor executor,
final FatalErrorHandler fatalErrorHandler,
long initializationTimestamp) throws Exception {
this.resultFuture = new CompletableFuture<>();
this.terminationFuture = new CompletableFuture<>();
this.leadershipOperation = CompletableFuture.completedFuture(null);
this.jobGraph = checkNotNull(jobGraph);
this.classLoaderLease = checkNotNull(classLoaderLease);
this.executor = checkNotNull(executor);
this.fatalErrorHandler = checkNotNull(fatalErrorHandler);
checkArgument(jobGraph.getNumberOfVertices() > 0, "The given job is empty");
// libraries and class loader first
final ClassLoader userCodeLoader;
try {
userCodeLoader = classLoaderLease.getOrResolveClassLoader(
jobGraph.getUserJarBlobKeys(),
jobGraph.getClasspaths()).asClassLoader();
} catch (IOException e) {
throw new Exception("Cannot set up the user code libraries: " e.getMessage(), e);
}
// high availability services next
this.runningJobsRegistry = haServices.getRunningJobsRegistry();
this.leaderElectionService = haServices.getJobManagerLeaderElectionService(jobGraph.getJobID());
this.leaderGatewayFuture = new CompletableFuture<>();
// now start the JobManager
this.jobMasterService = jobMasterFactory.createJobMasterService(jobGraph, this, userCodeLoader, initializationTimestamp);
}
除了相关类加载器、执行器、选主服务的准备工作外,这里会创建jobMasterService,其核心逻辑在jobMasterFactory.createJobMasterService方法中,默认使用的jobMasterFactory是DefaultJobMasterServiceFactory,在org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory#createJobMasterService方法内部会创建一个JobMaster对象。在具体来看JobMaster的内部创建逻辑之前,我们先来看下JobManagerRunnerImpl#start方法:
代码语言:javascript复制 @Override
public void start() throws Exception {
try {
// 这里传入的contender是JobManagerRunnerImpl类型的,会在选举完成时进行contender的回调逻辑
leaderElectionService.start(this);
} catch (Exception e) {
log.error("Could not start the JobManager because the leader election service did not start.", e);
throw new Exception("Could not start the leader election service.", e);
}
}
我们的场景中使用的leaderElectionService是StandaloneLeaderElectionService,StandaloneLeaderElectionService#start方法代码如下:
代码语言:javascript复制 @Override
public void start(LeaderContender newContender) throws Exception {
if (contender != null) {
// Service was already started
throw new IllegalArgumentException("Leader election service cannot be started multiple times.");
}
contender = Preconditions.checkNotNull(newContender);
// directly grant leadership to the given contender
contender.grantLeadership(HighAvailabilityServices.DEFAULT_LEADER_ID);
}
在我们的场景中传入的LeaderContender对象为JobManagerRunnerImpl实例,根据代码的执行流程会执行到JobManagerRunnerImpl的grantLeadership方法,也是确定了jobManager主节点之后的回调方法,代码如下:
代码语言:javascript复制 @Override
public void grantLeadership(final UUID leaderSessionID) {
synchronized (lock) {
if (shutdown) {
log.debug("JobManagerRunner cannot be granted leadership because it is already shut down.");
return;
}
leadershipOperation = leadershipOperation.thenCompose(
(ignored) -> {
synchronized (lock) {
return verifyJobSchedulingStatusAndStartJobManager(leaderSessionID);
}
});
handleException(leadershipOperation, "Could not start the job manager.");
}
}
那么紧接着我们来看下verifyJobSchedulingStatusAndStartJobManager方法:
代码语言:javascript复制 private CompletableFuture<Void> verifyJobSchedulingStatusAndStartJobManager(UUID leaderSessionId) {
final CompletableFuture<JobSchedulingStatus> jobSchedulingStatusFuture = getJobSchedulingStatus();
return jobSchedulingStatusFuture.thenCompose(
jobSchedulingStatus -> {
if (jobSchedulingStatus == JobSchedulingStatus.DONE) {
return jobAlreadyDone();
} else {
// 启动jobMaster
return startJobMaster(leaderSessionId);
}
});
}
可以看到在该方法内部会调用startJobMaster方法来启动JobMaster,具体流程来看下代码:
代码语言:javascript复制 private CompletionStage<Void> startJobMaster(UUID leaderSessionId) {
log.info("JobManager runner for job {} ({}) was granted leadership with session id {} at {}.",
jobGraph.getName(), jobGraph.getJobID(), leaderSessionId, jobMasterService.getAddress());
try {
runningJobsRegistry.setJobRunning(jobGraph.getJobID());
} catch (IOException e) {
return FutureUtils.completedExceptionally(
new FlinkException(
String.format("Failed to set the job %s to running in the running jobs registry.", jobGraph.getJobID()),
e));
}
final CompletableFuture<Acknowledge> startFuture;
try {
startFuture = jobMasterService.start(new JobMasterId(leaderSessionId));
} catch (Exception e) {
return FutureUtils.completedExceptionally(new FlinkException("Failed to start the JobMaster.", e));
}
final CompletableFuture<JobMasterGateway> currentLeaderGatewayFuture = leaderGatewayFuture;
return startFuture.thenAcceptAsync(
(Acknowledge ack) -> confirmLeaderSessionIdIfStillLeader(
leaderSessionId,
jobMasterService.getAddress(),
currentLeaderGatewayFuture),
executor);
}
上面这些代码里,我们主要关注下org.apache.flink.runtime.jobmaster.JobMaster#start方法:
代码语言:javascript复制public CompletableFuture<Acknowledge> start(final JobMasterId newJobMasterId) throws Exception {
// make sure we receive RPC and async calls
start();
return callAsyncWithoutFencing(() -> startJobExecution(newJobMasterId), RpcUtils.INF_TIMEOUT);
}
这里会回调它的onStart()方法,具体为什么会回调onStart方法,可以看下之前写过的一篇文章flink taskmanager启动篇。JobMaster的onStart方法使用的是RpcEndpoint中的空实现。我们主要来看下org.apache.flink.runtime.jobmaster.JobMaster#startJobExecution方法:
代码语言:javascript复制 //-- job starting and stopping -----------------------------------------------------------------
private Acknowledge startJobExecution(JobMasterId newJobMasterId) throws Exception {
validateRunsInMainThread();
checkNotNull(newJobMasterId, "The new JobMasterId must not be null.");
if (Objects.equals(getFencingToken(), newJobMasterId)) {
log.info("Already started the job execution with JobMasterId {}.", newJobMasterId);
return Acknowledge.get();
}
setNewFencingToken(newJobMasterId);
startJobMasterServices();
log.info("Starting execution of job {} ({}) under job master id {}.", jobGraph.getName(), jobGraph.getJobID(), newJobMasterId);
resetAndStartScheduler();
return Acknowledge.get();
}
这里我们主要关注两个方法startJobMasterServices与resetAndStartScheduler。
先来看startJobMasterServices方法:
代码语言:javascript复制private void startJobMasterServices() throws Exception {
startHeartbeatServices();
// 启动slotPool
// start the slot pool make sure the slot pool now accepts messages for this leader
slotPool.start(getFencingToken(), getAddress(), getMainThreadExecutor());
//TODO: Remove once the ZooKeeperLeaderRetrieval returns the stored address upon start
// try to reconnect to previously known leader
reconnectToResourceManager(new FlinkException("Starting JobMaster component."));
// job is ready to go, try to establish connection with resource manager
// - activate leader retrieval for the resource manager
// - on notification of the leader, the connection will be established and
// the slot pool will start requesting slots
// 当资源管理器有了新的主节点时会回调监听器来通知
resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener());
}
这里会做一些初始化操作:
•启动心跳服务,维持与taskmanager和resourcemanager的心跳:
•对slotPool增加一些连接池的定时检测调度手段,如调度检查slot是否空闲,批量检查slot是否超时并根据实际日志级别输出调度日志等,slotPool.start方法:
•reconnectToResourceManager方法,如果之前已经知道了resourceManager的leader地址,这里会去连接resourceManager,在连接成功后slotpool可以进行申请slot的操作。•resourceManagerLeaderRetriever.start方法,来看下它的代码:
代码语言:javascript复制 @Override
public void start(LeaderRetrievalListener listener) {
checkNotNull(listener, "Listener must not be null.");
synchronized (startStopLock) {
checkState(!started, "StandaloneLeaderRetrievalService can only be started once.");
started = true;
// directly notify the listener, because we already know the leading JobManager's address
// 在TaskExecutor启动时这里会直接触发org.apache.flink.runtime.taskexecutor.TaskExecutor.ResourceManagerLeaderListener的notifyLeaderAddress方法,因为这时候已经知道了jobManager的地址
// 在JobMaster启动时,这里会触发org.apache.flink.runtime.jobmaster.JobMaster.ResourceManagerLeaderListener.notifyLeaderAddress
listener.notifyLeaderAddress(leaderAddress, leaderId);
}
}
我们进入到org.apache.flink.runtime.jobmaster.JobMaster.ResourceManagerLeaderListener#notifyLeaderAddress方法内部:
代码语言:javascript复制 @Override
public void notifyLeaderAddress(final String leaderAddress, final UUID leaderSessionID) {
runAsync(
() -> notifyOfNewResourceManagerLeader(
leaderAddress,
ResourceManagerId.fromUuidOrNull(leaderSessionID)));
}
再进入到org.apache.flink.runtime.jobmaster.JobMaster#notifyOfNewResourceManagerLeader方法内部:
代码语言:javascript复制 private void notifyOfNewResourceManagerLeader(final String newResourceManagerAddress, final ResourceManagerId resourceManagerId) {
resourceManagerAddress = createResourceManagerAddress(newResourceManagerAddress, resourceManagerId);
reconnectToResourceManager(new FlinkException(String.format("ResourceManager leader changed to new address %s", resourceManagerAddress)));
}
可以看到,这里会去连接新的resourceManager leader,然后进行相应的操作,我们继续来看它会进行哪些操作:
代码语言:javascript复制 private void reconnectToResourceManager(Exception cause) {
closeResourceManagerConnection(cause);
tryConnectToResourceManager();
}
private void tryConnectToResourceManager() {
if (resourceManagerAddress != null) {
connectToResourceManager();
}
}
private void connectToResourceManager() {
assert(resourceManagerAddress != null);
assert(resourceManagerConnection == null);
assert(establishedResourceManagerConnection == null);
log.info("Connecting to ResourceManager {}", resourceManagerAddress);
resourceManagerConnection = new ResourceManagerConnection(
log,
jobGraph.getJobID(),
resourceId,
getAddress(),
getFencingToken(),
resourceManagerAddress.getAddress(),
resourceManagerAddress.getResourceManagerId(),
scheduledExecutorService);
// 在与resourceManager连接失败时会进行重试
resourceManagerConnection.start();
}
我们接着来看resourceManagerConnection的start方法:
这里主要有两步操作,创建新的Registration和启动这个registration,我们分别来看一下:
•createNewRegistration方法:
在RetryingRegistration内部维护着一个CompletableFuture,在future内部有一个注册成功的执行逻辑,即回调onRegistrationSuccess方法。
我们来看下org.apache.flink.runtime.jobmaster.JobMaster.ResourceManagerConnection#onRegistrationSuccess方法中的执行逻辑:
代码语言:javascript复制@Override
protected void onRegistrationSuccess(final JobMasterRegistrationSuccess success) {
runAsync(() -> {
// filter out outdated connections
//noinspection ObjectEquality
if (this == resourceManagerConnection) {
establishResourceManagerConnection(success);
}
});
}
继续来分析org.apache.flink.runtime.jobmaster.JobMaster#establishResourceManagerConnection方法,这里先无脑贴代码:
代码语言:javascript复制private void establishResourceManagerConnection(final JobMasterRegistrationSuccess success) {
final ResourceManagerId resourceManagerId = success.getResourceManagerId();
// verify the response with current connection
if (resourceManagerConnection != null
&& Objects.equals(resourceManagerConnection.getTargetLeaderId(), resourceManagerId)) {
log.info("JobManager successfully registered at ResourceManager, leader id: {}.", resourceManagerId);
final ResourceManagerGateway resourceManagerGateway = resourceManagerConnection.getTargetGateway();
final ResourceID resourceManagerResourceId = success.getResourceManagerResourceId();
establishedResourceManagerConnection = new EstablishedResourceManagerConnection(
resourceManagerGateway,
resourceManagerResourceId);
slotPool.connectToResourceManager(resourceManagerGateway);
resourceManagerHeartbeatManager.monitorTarget(resourceManagerResourceId, new HeartbeatTarget<Void>() {
@Override
public void receiveHeartbeat(ResourceID resourceID, Void payload) {
resourceManagerGateway.heartbeatFromJobManager(resourceID);
}
@Override
public void requestHeartbeat(ResourceID resourceID, Void payload) {
// request heartbeat will never be called on the job manager side
}
});
} else {
log.debug("Ignoring resource manager connection to {} because it's duplicated or outdated.", resourceManagerId);
}
}
这里主要有两个操作:1. 建立slotPool与resourceManager的连接;2. 建立与resourceManager的心跳监测机制。我们主要来看下前者,直接来看org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl#connectToResourceManager方法:
代码语言:javascript复制 @Override
public void connectToResourceManager(@Nonnull ResourceManagerGateway resourceManagerGateway) {
this.resourceManagerGateway = checkNotNull(resourceManagerGateway);
// work on all slots waiting for this connection
for (PendingRequest pendingRequest : waitingForResourceManager.values()) {
requestSlotFromResourceManager(resourceManagerGateway, pendingRequest);
}
// all sent off
waitingForResourceManager.clear();
}
该方法主要用于获取waitingForResourceManager中的pendingRequest,然后向resourceManager申请slot。
•newRegistration.startRegistration()方法,是具体的与resourceManager建立连接的方法,不是本文重点,这里就不过多去分析了。
再来看JobMaster#resetAndStartScheduler方法
在分析JobMaster#resetAndStartScheduler方法之前,我们先来看一下JobMaster的构造方法。由于JobMaster的构造方法比较长,这里我们主要关注下schedulerNG的实例化过程,代码片段如下:
代码语言:javascript复制// 创建DefaultScheduler
this.schedulerNG = createScheduler(executionDeploymentTracker, jobManagerJobMetricGroup);
其中JobMaster#createScheduler方法代码如下:
在这里是创建SchedulerNG对象的逻辑,这里我们主要关注下SchedulerBase#SchedulerBase构造方法中关于executionGraph的生成逻辑,部分代码片段如下:
代码语言:javascript复制 // 创建和恢复ExecutionGraph
this.executionGraph = createAndRestoreExecutionGraph(jobManagerJobMetricGroup, checkNotNull(shuffleMaster), checkNotNull(partitionTracker), checkNotNull(executionDeploymentTracker), initializationTimestamp);
// 调度拓朴里有partition信息和pipelineRegion信息
this.schedulingTopology = executionGraph.getSchedulingTopology();
这里是从jobGraph生成executionGraph的逻辑,具体内容后面我们专门用一篇文章来进行分析。
这里我们继续来看JobMaster#resetAndStartScheduler方法,代码如下:
如果schedulerNG的requestJobStatus为JobStatus.CREATED,则指定schedulerNG的mainThreadExecutor为当前的mainThreadExecutor;否则,创建新的SchedulerNG实例并在创建完成后设置mainThreadExecutor;在schedulerAssignedFuture中的逻辑处理完成后,执行startScheduling方法,逻辑如下:
代码语言:javascript复制private void startScheduling() {
checkState(jobStatusListener == null);
// register self as job status change listener
jobStatusListener = new JobManagerJobStatusListener();
schedulerNG.registerJobStatusListener(jobStatusListener);
schedulerNG.startScheduling();
}
@Override
public final void startScheduling() {
mainThreadExecutor.assertRunningInMainThread();
registerJobMetrics();
startAllOperatorCoordinators();
startSchedulingInternal();
}
@Override
protected void startSchedulingInternal() {
log.info(
"Starting scheduling with scheduling strategy [{}]",
schedulingStrategy.getClass().getName());
prepareExecutionGraphForNgScheduling();
schedulingStrategy.startScheduling();
}
在startScheduling方法内部是真正执行调度的部分,以我们这里使用到PipelinedRegionSchedulingStrategy#startScheduling方法为例:
代码语言:javascript复制@Override
public void startScheduling() {
final Set<SchedulingPipelinedRegion> sourceRegions = IterableUtils
.toStream(schedulingTopology.getAllPipelinedRegions())
.filter(region -> !region.getConsumedResults().iterator().hasNext())
.collect(Collectors.toSet());
maybeScheduleRegions(sourceRegions);
}
private void maybeScheduleRegions(final Set<SchedulingPipelinedRegion> regions) {
final List<SchedulingPipelinedRegion> regionsSorted =
SchedulingStrategyUtils.sortPipelinedRegionsInTopologicalOrder(schedulingTopology, regions);
for (SchedulingPipelinedRegion region : regionsSorted) {
// 处理region
maybeScheduleRegion(region);
}
}
private void maybeScheduleRegion(final SchedulingPipelinedRegion region) {
if (!areRegionInputsAllConsumable(region)) {
return;
}
checkState(areRegionVerticesAllInCreatedState(region), "BUG: trying to schedule a region which is not in CREATED state");
final List<ExecutionVertexDeploymentOption> vertexDeploymentOptions =
SchedulingStrategyUtils.createExecutionVertexDeploymentOptions(
regionVerticesSorted.get(region),
id -> deploymentOption);
// 申请slot并进行deploy
schedulerOperations.allocateSlotsAndDeploy(vertexDeploymentOptions);
}
在内部会以region为单位进行各subTask的调度布署,篇幅问题这里就不再过多分析任务提交布署的部分了,后面用专门的文章来分析。