从头分析flink源码第五篇之提交jobGraph时各组件内部都发生了什么?

2021-09-14 12:33:40 浏览数 (1)

上几篇文章中我们分析了一个flink wordcount任务生成streamGraph和jobGraph的过程。接下来,我们继续从jobGraph生成后开始来分析executionGraph的生成过程及任务的提交过程,本文主要分析任务提交过程中各组件的执行逻辑,如TaskManager、ResourceManager、JobManager等。本文只涉及到本地运行wordcount时各组件的内部运行逻辑分析,不包括其他资源管理模式如yarn或Kubernetes模式下任务的提交流程(后续会专门行文来分析)。文章较长,代码较多,不喜慎入。

接前几篇文章,分析到了org.apache.flink.client.deployment.executors.LocalExecutor#execute方法:

代码语言:javascript复制
@Override
public CompletableFuture<JobClient> execute(Pipeline pipeline, Configuration configuration, ClassLoader userCodeClassloader) throws Exception {
    checkNotNull(pipeline);
    checkNotNull(configuration);
    // 有效的配置
    Configuration effectiveConfig = new Configuration();
    effectiveConfig.addAll(this.configuration);
    effectiveConfig.addAll(configuration);

    // we only support attached execution with the local executor.
    checkState(configuration.getBoolean(DeploymentOptions.ATTACHED));
    // 获取了jobGraph
    final JobGraph jobGraph = getJobGraph(pipeline, effectiveConfig);

    return PerJobMiniClusterFactory.createWithFactory(effectiveConfig, miniClusterFactory).submitJob(jobGraph, userCodeClassloader);
}

在上面的方法中一方面对配置进行了有效性校验,另一方面生成了jobGraph,然后提交jobGraph。我们来看org.apache.flink.client.program.PerJobMiniClusterFactory#submitJob方法:

代码语言:javascript复制
    public CompletableFuture<JobClient> submitJob(JobGraph jobGraph, ClassLoader userCodeClassloader) throws Exception {
        MiniClusterConfiguration miniClusterConfig = getMiniClusterConfig(jobGraph.getMaximumParallelism());
        MiniCluster miniCluster = miniClusterFactory.apply(miniClusterConfig);
        miniCluster.start();
        return miniCluster
            .submitJob(jobGraph)
            .thenApplyAsync(FunctionUtils.uncheckedFunction(submissionResult -> {
                org.apache.flink.client.ClientUtils.waitUntilJobInitializationFinished(
                    () -> miniCluster.getJobStatus(submissionResult.getJobID()).get(),
                    () -> miniCluster.requestJobResult(submissionResult.getJobID()).get(),
                    userCodeClassloader);
                return submissionResult;
            }))
            .thenApply(result -> new MiniClusterJobClient(
                    result.getJobID(),
                    miniCluster,
                    userCodeClassloader,
                    MiniClusterJobClient.JobFinalizationBehavior.SHUTDOWN_CLUSTER))
            .whenComplete((ignored, throwable) -> {
                if (throwable != null) {
                    // We failed to create the JobClient and must shutdown to ensure cleanup.
                    shutDownCluster(miniCluster);
                }
            })
            .thenApply(Function.identity());
    }

这个方法主要执行内容有以下几步:

1.启动miniCluster;2.向mini cluster提交jobGraph;3.jobGraph提交成功后返回submissionResult;4.将结果包装成MiniClusterJobClient并返回;5.如果有异常产生则关闭mini cluster;6.如果5没有异常产生则反之返回4产生的结果。

从上面我们也能看出CompletableFuture流式编程的魅力,在这里我们主要关注下taskmanager、jobmanager、resourcemanager这些核心组件的执行动作。在MiniCluster#start方法内部会处理taskmanager和resourcemanager的逻辑,而jobmanager的逻辑主要在提交任务的流程里。

taskManager

MiniCluster#start方法内部会调用startTaskManagers方法,关于这个方法之前有写过一篇文章进行分析,可以看下之前写过的一篇文章flink taskmanager启动篇。

这里主要来挼一下整体的流程:

代码语言:javascript复制
startTaskManagers() => startTaskExecutor() => taskExecutor.start() => TaskExecutor#onStart =>
startTaskExecutorServices() => resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener()) => StandaloneLeaderRetrievalService#start => listener.notifyLeaderAddress(leaderAddress, leaderId) =>

在resourceManager选主成功后会进行回调通知,紧接着会进入到org.apache.flink.runtime.taskexecutor.TaskExecutor.ResourceManagerLeaderListener#notifyLeaderAddress方法中,我们来看下具体的代码逻辑:

代码语言:javascript复制
    @Override
    public void notifyLeaderAddress(final String leaderAddress, final UUID leaderSessionID) {
    // 异步执行,rpc触发
    runAsync(
    () -> notifyOfNewResourceManagerLeader(
    leaderAddress,
    ResourceManagerId.fromUuidOrNull(leaderSessionID)));
    }

继续来看org.apache.flink.runtime.taskexecutor.TaskExecutor#notifyOfNewResourceManagerLeader方法:

代码语言:javascript复制
    private void notifyOfNewResourceManagerLeader(String newLeaderAddress, ResourceManagerId newResourceManagerId) {
        resourceManagerAddress = createResourceManagerAddress(newLeaderAddress, newResourceManagerId);
        // 重新连接resourceManager
        reconnectToResourceManager(new FlinkException(String.format("ResourceManager leader changed to new address %s", resourceManagerAddress)));
    }

在这里会去建立和resourceManager的连接,具体代码如下:

代码语言:javascript复制
private void reconnectToResourceManager(Exception cause) {
        // 关闭之前的resourceManager的连接
        closeResourceManagerConnection(cause);
        startRegistrationTimeout();
        tryConnectToResourceManager();
    }

    private void tryConnectToResourceManager() {
        if (resourceManagerAddress != null) {
            connectToResourceManager();
        }
    }

private void connectToResourceManager() {
        assert(resourceManagerAddress != null);
        assert(establishedResourceManagerConnection == null);
        assert(resourceManagerConnection == null);

        log.info("Connecting to ResourceManager {}.", resourceManagerAddress);

        final TaskExecutorRegistration taskExecutorRegistration = new TaskExecutorRegistration(
            getAddress(),
            getResourceID(),
            unresolvedTaskManagerLocation.getDataPort(),
            JMXService.getPort().orElse(-1),
            hardwareDescription,
            memoryConfiguration,
            taskManagerConfiguration.getDefaultSlotResourceProfile(),
            taskManagerConfiguration.getTotalResourceProfile()
        );

        resourceManagerConnection =
            new TaskExecutorToResourceManagerConnection(
                log,
                getRpcService(),
                taskManagerConfiguration.getRetryingRegistrationConfiguration(),
                resourceManagerAddress.getAddress(),
                resourceManagerAddress.getResourceManagerId(),
                getMainThreadExecutor(),
                new ResourceManagerRegistrationListener(),
                taskExecutorRegistration);
        resourceManagerConnection.start();
    }

在这里会调用resourceManagerConnection.start方法,这个方法其实是内有乾坤的,我们来看一下:

这里我们主要关注下createNewRegistration方法:

这里会在和resourceManager连接成功后回调onRegistrationSuccess方法,它在org.apache.flink.runtime.taskexecutor.TaskExecutorToResourceManagerConnection中的实现为:

代码语言:javascript复制
    @Override
    protected void onRegistrationSuccess(TaskExecutorRegistrationSuccess success) {
        log.info("Successful registration at resource manager {} under registration id {}.",
            getTargetAddress(), success.getRegistrationId());
        registrationListener.onRegistrationSuccess(this, success);
    }

然后进入到TaskExecutor.ResourceManagerRegistrationListener#onRegistrationSuccess方法中:

接着进入到org.apache.flink.runtime.taskexecutor.TaskExecutor#establishResourceManagerConnection方法:

这里主要有两个工作:1. 向ResourceManager注册,上报slot信息;2. 设置与ResourceManager之间的心跳监测。

ResourceManager

在MiniCluster#start方法内部有一段代码片段:

代码语言:javascript复制
setupDispatcherResourceManagerComponents(configuration, dispatcherResourceManagerComponentRpcServiceFactory, metricQueryServiceRetriever);

在org.apache.flink.runtime.minicluster.MiniCluster#setupDispatcherResourceManagerComponents方法内部有一段执行逻辑为:

代码语言:javascript复制
    dispatcherResourceManagerComponents.addAll(createDispatcherResourceManagerComponents(
            configuration,
            dispatcherResourceManagreComponentRpcServiceFactory,
            haServices,
            blobServer,
            heartbeatServices,
            metricRegistry,
            metricQueryServiceRetriever,
            new ShutDownFatalErrorHandler()
        ));

而createDispatcherResourceManagerComponents方法的执行逻辑为:

在org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory#create方法中会根据resourceManagerFactory执行resourceManager的执行逻辑并启动resourceManager。

整体方法的执行逻辑为:

代码语言:javascript复制
resourceManager#start => ResourceManager#onStart => ResourceManager#startResourceManagerServices => leaderElectionService.start(this) => EmbeddedLeaderElectionService#start =>

我们来看下org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedLeaderService.EmbeddedLeaderElectionService#start方法:

代码语言:javascript复制
    @Override
    public void start(LeaderContender contender) throws Exception {
    checkNotNull(contender);
    // 添加contender
    addContender(this, contender);
    }

    private void addContender(EmbeddedLeaderElectionService service, LeaderContender contender) {
        synchronized (lock) {
            checkState(!shutdown, "leader election service is shut down");
            checkState(!service.running, "leader election service is already started");

            try {
                // 如果已经存在抛出异常
                if (!allLeaderContenders.add(service)) {
                    throw new IllegalStateException("leader election service was added to this service multiple times");
                }

                service.contender = contender;
                service.running = true;

                updateLeader().whenComplete((aVoid, throwable) -> {
                    if (throwable != null) {
                        fatalError(throwable);
                    }
                });
            }
            catch (Throwable t) {
                fatalError(t);
            }
        }
    }

紧接着进入到org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedLeaderService#updateLeader方法:

在org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedLeaderService.GrantLeadershipCall#run方法中:

紧接着进入到org.apache.flink.runtime.resourcemanager.ResourceManager#grantLeadership方法:

这个方法是用来做确定了ResourceManager的主节点后的回调方法。

JobManager

我们来看org.apache.flink.runtime.minicluster.MiniCluster#submitJob方法:

代码语言:javascript复制
public CompletableFuture<JobSubmissionResult> submitJob(JobGraph jobGraph) {
        final CompletableFuture<DispatcherGateway> dispatcherGatewayFuture = getDispatcherGatewayFuture();
        final CompletableFuture<InetSocketAddress> blobServerAddressFuture = createBlobServerAddress(dispatcherGatewayFuture);
        final CompletableFuture<Void> jarUploadFuture = uploadAndSetJobFiles(blobServerAddressFuture, jobGraph);
        final CompletableFuture<Acknowledge> acknowledgeCompletableFuture = jarUploadFuture
            .thenCombine(
                dispatcherGatewayFuture,
                (Void ack, DispatcherGateway dispatcherGateway) -> dispatcherGateway.submitJob(jobGraph, rpcTimeout))
            .thenCompose(Function.identity());
        return acknowledgeCompletableFuture.thenApply(
            (Acknowledge ignored) -> new JobSubmissionResult(jobGraph.getJobID()));
    }

上面这个方法的整个执行步骤为:

1.通过dispatcherGatewayRetriever去获取DispatcherGateway,返回的是dispatcherGatewayFuture;2.通过dispatcherGateway获取blobServerAddress;3.利用blobServer和jobGraph去执行jar的上传;4.利用dispatcherGatewayFuture返回的dispatcherGateway执行jobGraph的提交。

接下来进入到org.apache.flink.runtime.dispatcher.Dispatcher#submitJob方法:

代码语言:javascript复制
@Override
    public CompletableFuture<Acknowledge> submitJob(JobGraph jobGraph, Time timeout) {
        log.info("Received JobGraph submission {} ({}).", jobGraph.getJobID(), jobGraph.getName());

        try {
            if (isDuplicateJob(jobGraph.getJobID())) {
                return FutureUtils.completedExceptionally(
                    new DuplicateJobSubmissionException(jobGraph.getJobID()));
            } else if (isPartialResourceConfigured(jobGraph)) {
                return FutureUtils.completedExceptionally(
                    new JobSubmissionException(
                        jobGraph.getJobID(),
                        "Currently jobs is not supported if parts of the vertices have "  
                            "resources configured. The limitation will be removed in future versions."));
            } else {
                return internalSubmitJob(jobGraph);
            }
        } catch (FlinkException e) {
            return FutureUtils.completedExceptionally(e);
        }
    }

•这里会根据jobId去判断一下这个job是不是已经被提交或执行;•紧接着进入到internalSubmitJob方法,执行真正的提交任务的逻辑。

我们来看org.apache.flink.runtime.dispatcher.Dispatcher#internalSubmitJob方法的代码:

代码语言:javascript复制
private CompletableFuture<Acknowledge> internalSubmitJob(JobGraph jobGraph) {
        log.info("Submitting job {} ({}).", jobGraph.getJobID(), jobGraph.getName());
        final CompletableFuture<Acknowledge> persistAndRunFuture = waitForTerminatingJob(
            jobGraph.getJobID(),
            jobGraph,
            this::persistAndRunJob)
            .thenApply(ignored -> Acknowledge.get());
        // 获取结果
        return persistAndRunFuture.handleAsync((acknowledge, throwable) -> {
            if (throwable != null) {
                cleanUpJobData(jobGraph.getJobID(), true);
                ClusterEntryPointExceptionUtils.tryEnrichClusterEntryPointError(throwable);
                final Throwable strippedThrowable = ExceptionUtils.stripCompletionException(
                    throwable);
                log.error("Failed to submit job {}.", jobGraph.getJobID(), strippedThrowable);
                throw new CompletionException(
                    new JobSubmissionException(
                        jobGraph.getJobID(),
                        "Failed to submit job.",
                        strippedThrowable));
            } else {
                return acknowledge;
            }
        }, ioExecutor);
    }

在这个方法里会调用persistAndRunJob进行job的提交。注意这步操作是异步处理的,执行器为fencedMainThreadExecutor。

我们继续进入到org.apache.flink.runtime.dispatcher.Dispatcher#persistAndRunJob方法中:

代码语言:javascript复制
private void persistAndRunJob(JobGraph jobGraph) throws Exception {
        jobGraphWriter.putJobGraph(jobGraph);
        runJob(jobGraph, ExecutionType.SUBMISSION);
    }

jobGraphWriter的主要作用是用于将jobGraph持久化在zk或者Kubernetes的leader节点。我们当前是在standalone模式下,不会进行这步处理。直接来看runJob方法:

代码语言:javascript复制
private void runJob(JobGraph jobGraph, ExecutionType executionType) {
        Preconditions.checkState(!runningJobs.containsKey(jobGraph.getJobID()));
        long initializationTimestamp = System.currentTimeMillis();
        // 创建JobManagerRunner
        CompletableFuture<JobManagerRunner> jobManagerRunnerFuture = createJobManagerRunner(
            jobGraph,
            initializationTimestamp);
        DispatcherJob dispatcherJob = DispatcherJob.createFor(
            jobManagerRunnerFuture,
            jobGraph.getJobID(),
            jobGraph.getName(),
            initializationTimestamp);
        runningJobs.put(jobGraph.getJobID(), dispatcherJob);
---------------省略部分处理future返回结果的代码----------------
    }

这里的核心逻辑在createJobManagerRunner方法中,dispatcherJob与下面的逻辑主要用于处理jobManagerRunnerFuture的结果。我们主要来看下createJobManagerRunner方法:

代码语言:javascript复制
    CompletableFuture<JobManagerRunner> createJobManagerRunner(
        JobGraph jobGraph,
        long initializationTimestamp) {
        final RpcService rpcService = getRpcService();
        return CompletableFuture.supplyAsync(
            () -> {
                try {
                    // 创建jobManagerRunner
                    JobManagerRunner runner = jobManagerRunnerFactory.createJobManagerRunner(
                        jobGraph,
                        configuration,
                        rpcService,
                        highAvailabilityServices,
                        heartbeatServices,
                        jobManagerSharedServices,
                        new DefaultJobManagerJobMetricGroupFactory(jobManagerMetricGroup),
                        fatalErrorHandler,
                        initializationTimestamp);
                    // 启动runner
                    runner.start();
                    return runner;
                } catch (Exception e) {
                    throw new CompletionException(new JobInitializationException(
                        jobGraph.getJobID(),
                        "Could not instantiate JobManager.",
                        e));
                }
            },
            ioExecutor); // do not use main thread executor. Otherwise, Dispatcher is blocked on JobManager creation
    }

我们来看下org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory#createJobManagerRunner方法内部的逻辑,这里就直接无脑贴代码了:

代码语言:javascript复制
@Override
    public JobManagerRunner createJobManagerRunner(
            JobGraph jobGraph,
            Configuration configuration,
            RpcService rpcService,
            HighAvailabilityServices highAvailabilityServices,
            HeartbeatServices heartbeatServices,
            JobManagerSharedServices jobManagerServices,
            JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory,
            FatalErrorHandler fatalErrorHandler,
            long initializationTimestamp) throws Exception {
        // 获取配置
        final JobMasterConfiguration jobMasterConfiguration = JobMasterConfiguration.fromConfiguration(configuration);
        // 获取slotPoolFactory,处理slot的管理逻辑
        final SlotPoolFactory slotPoolFactory = SlotPoolFactory.fromConfiguration(configuration);
        // 获取调度工厂
        final SchedulerNGFactory schedulerNGFactory = SchedulerNGFactoryFactory.createSchedulerNGFactory(configuration);
        // 创建处理shuffle操作的shuffleMaster
        final ShuffleMaster<?> shuffleMaster = ShuffleServiceLoader.loadShuffleServiceFactory(configuration).createShuffleMaster(configuration);
        // 创建jobMasterServiceFactory
        final JobMasterServiceFactory jobMasterFactory = new DefaultJobMasterServiceFactory(
            jobMasterConfiguration,
            slotPoolFactory,
            rpcService,
            highAvailabilityServices,
            jobManagerServices,
            heartbeatServices,
            jobManagerJobMetricGroupFactory,
            fatalErrorHandler,
            schedulerNGFactory,
            shuffleMaster);
        // 创建JobManagerRunnerImpl实例
        return new JobManagerRunnerImpl(
            jobGraph,
            jobMasterFactory,
            highAvailabilityServices,            jobManagerServices.getLibraryCacheManager().registerClassLoaderLease(jobGraph.getJobID()),
            jobManagerServices.getScheduledExecutorService(),
            fatalErrorHandler,
            initializationTimestamp);
    }

这个方法里的操作主要有:

1.获取jobmaster的配置信息,构建JobMasterConfiguration对象;2.通过配置信息创建SlotPoolFactory;3.创建处理shuffle操作的shuffleMaster;4.创建jobMasterServiceFactory;5.创建JobManagerRunnerImpl实例。

接下来就进入到JobManagerRunnerImpl的构造方法了,继续无脑贴代码:

代码语言:javascript复制
public JobManagerRunnerImpl(
            final JobGraph jobGraph,
            final JobMasterServiceFactory jobMasterFactory,
            final HighAvailabilityServices haServices,
            final LibraryCacheManager.ClassLoaderLease classLoaderLease,
            final Executor executor,
            final FatalErrorHandler fatalErrorHandler,
            long initializationTimestamp) throws Exception {

        this.resultFuture = new CompletableFuture<>();
        this.terminationFuture = new CompletableFuture<>();
        this.leadershipOperation = CompletableFuture.completedFuture(null);

        this.jobGraph = checkNotNull(jobGraph);
        this.classLoaderLease = checkNotNull(classLoaderLease);
        this.executor = checkNotNull(executor);
        this.fatalErrorHandler = checkNotNull(fatalErrorHandler);

        checkArgument(jobGraph.getNumberOfVertices() > 0, "The given job is empty");

        // libraries and class loader first
        final ClassLoader userCodeLoader;
        try {
            userCodeLoader = classLoaderLease.getOrResolveClassLoader(
                jobGraph.getUserJarBlobKeys(),
                jobGraph.getClasspaths()).asClassLoader();
        } catch (IOException e) {
            throw new Exception("Cannot set up the user code libraries: "   e.getMessage(), e);
        }

        // high availability services next
        this.runningJobsRegistry = haServices.getRunningJobsRegistry();
        this.leaderElectionService = haServices.getJobManagerLeaderElectionService(jobGraph.getJobID());

        this.leaderGatewayFuture = new CompletableFuture<>();

        // now start the JobManager
        this.jobMasterService = jobMasterFactory.createJobMasterService(jobGraph, this, userCodeLoader, initializationTimestamp);
    }

除了相关类加载器、执行器、选主服务的准备工作外,这里会创建jobMasterService,其核心逻辑在jobMasterFactory.createJobMasterService方法中,默认使用的jobMasterFactory是DefaultJobMasterServiceFactory,在org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory#createJobMasterService方法内部会创建一个JobMaster对象。在具体来看JobMaster的内部创建逻辑之前,我们先来看下JobManagerRunnerImpl#start方法:

代码语言:javascript复制
    @Override
    public void start() throws Exception {
        try {
            // 这里传入的contender是JobManagerRunnerImpl类型的,会在选举完成时进行contender的回调逻辑
            leaderElectionService.start(this);
        } catch (Exception e) {
            log.error("Could not start the JobManager because the leader election service did not start.", e);
            throw new Exception("Could not start the leader election service.", e);
        }
    }

我们的场景中使用的leaderElectionService是StandaloneLeaderElectionService,StandaloneLeaderElectionService#start方法代码如下:

代码语言:javascript复制
    @Override
    public void start(LeaderContender newContender) throws Exception {
        if (contender != null) {
            // Service was already started
            throw new IllegalArgumentException("Leader election service cannot be started multiple times.");
        }
        contender = Preconditions.checkNotNull(newContender);
        // directly grant leadership to the given contender
        contender.grantLeadership(HighAvailabilityServices.DEFAULT_LEADER_ID);
    }

在我们的场景中传入的LeaderContender对象为JobManagerRunnerImpl实例,根据代码的执行流程会执行到JobManagerRunnerImpl的grantLeadership方法,也是确定了jobManager主节点之后的回调方法,代码如下:

代码语言:javascript复制
    @Override
    public void grantLeadership(final UUID leaderSessionID) {
        synchronized (lock) {
            if (shutdown) {
                log.debug("JobManagerRunner cannot be granted leadership because it is already shut down.");
                return;
            }
            leadershipOperation = leadershipOperation.thenCompose(
                (ignored) -> {
                    synchronized (lock) {
                        return verifyJobSchedulingStatusAndStartJobManager(leaderSessionID);
                    }
                });
            handleException(leadershipOperation, "Could not start the job manager.");
        }
    }

那么紧接着我们来看下verifyJobSchedulingStatusAndStartJobManager方法:

代码语言:javascript复制
    private CompletableFuture<Void> verifyJobSchedulingStatusAndStartJobManager(UUID leaderSessionId) {
        final CompletableFuture<JobSchedulingStatus> jobSchedulingStatusFuture = getJobSchedulingStatus();

        return jobSchedulingStatusFuture.thenCompose(
            jobSchedulingStatus -> {
                if (jobSchedulingStatus == JobSchedulingStatus.DONE) {
                    return jobAlreadyDone();
                } else {
                    // 启动jobMaster
                    return startJobMaster(leaderSessionId);
                }
            });
    }

可以看到在该方法内部会调用startJobMaster方法来启动JobMaster,具体流程来看下代码:

代码语言:javascript复制
    private CompletionStage<Void> startJobMaster(UUID leaderSessionId) {
        log.info("JobManager runner for job {} ({}) was granted leadership with session id {} at {}.",
            jobGraph.getName(), jobGraph.getJobID(), leaderSessionId, jobMasterService.getAddress());
        try {
            runningJobsRegistry.setJobRunning(jobGraph.getJobID());
        } catch (IOException e) {
            return FutureUtils.completedExceptionally(
                new FlinkException(
                    String.format("Failed to set the job %s to running in the running jobs registry.", jobGraph.getJobID()),
                    e));
        }

        final CompletableFuture<Acknowledge> startFuture;
        try {
            startFuture = jobMasterService.start(new JobMasterId(leaderSessionId));
        } catch (Exception e) {
            return FutureUtils.completedExceptionally(new FlinkException("Failed to start the JobMaster.", e));
        }

        final CompletableFuture<JobMasterGateway> currentLeaderGatewayFuture = leaderGatewayFuture;
        return startFuture.thenAcceptAsync(
            (Acknowledge ack) -> confirmLeaderSessionIdIfStillLeader(
                leaderSessionId,
                jobMasterService.getAddress(),
                currentLeaderGatewayFuture),
            executor);
    }

上面这些代码里,我们主要关注下org.apache.flink.runtime.jobmaster.JobMaster#start方法:

代码语言:javascript复制
public CompletableFuture<Acknowledge> start(final JobMasterId newJobMasterId) throws Exception {
        // make sure we receive RPC and async calls
        start();

        return callAsyncWithoutFencing(() -> startJobExecution(newJobMasterId), RpcUtils.INF_TIMEOUT);
    }

这里会回调它的onStart()方法,具体为什么会回调onStart方法,可以看下之前写过的一篇文章flink taskmanager启动篇。JobMaster的onStart方法使用的是RpcEndpoint中的空实现。我们主要来看下org.apache.flink.runtime.jobmaster.JobMaster#startJobExecution方法:

代码语言:javascript复制
    //-- job starting and stopping  -----------------------------------------------------------------
    private Acknowledge startJobExecution(JobMasterId newJobMasterId) throws Exception {
        validateRunsInMainThread();
        checkNotNull(newJobMasterId, "The new JobMasterId must not be null.");
        if (Objects.equals(getFencingToken(), newJobMasterId)) {
            log.info("Already started the job execution with JobMasterId {}.", newJobMasterId);
            return Acknowledge.get();
        }
        setNewFencingToken(newJobMasterId);
        startJobMasterServices();
        log.info("Starting execution of job {} ({}) under job master id {}.", jobGraph.getName(), jobGraph.getJobID(), newJobMasterId);
        resetAndStartScheduler();
        return Acknowledge.get();
    }

这里我们主要关注两个方法startJobMasterServices与resetAndStartScheduler。

先来看startJobMasterServices方法:
代码语言:javascript复制
private void startJobMasterServices() throws Exception {
        startHeartbeatServices();
        // 启动slotPool
        // start the slot pool make sure the slot pool now accepts messages for this leader
        slotPool.start(getFencingToken(), getAddress(), getMainThreadExecutor());

        //TODO: Remove once the ZooKeeperLeaderRetrieval returns the stored address upon start
        // try to reconnect to previously known leader
        reconnectToResourceManager(new FlinkException("Starting JobMaster component."));

        // job is ready to go, try to establish connection with resource manager
        //   - activate leader retrieval for the resource manager
        //   - on notification of the leader, the connection will be established and
        //     the slot pool will start requesting slots
        // 当资源管理器有了新的主节点时会回调监听器来通知
        resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener());
    }

这里会做一些初始化操作:

•启动心跳服务,维持与taskmanager和resourcemanager的心跳:

•对slotPool增加一些连接池的定时检测调度手段,如调度检查slot是否空闲,批量检查slot是否超时并根据实际日志级别输出调度日志等,slotPool.start方法:

•reconnectToResourceManager方法,如果之前已经知道了resourceManager的leader地址,这里会去连接resourceManager,在连接成功后slotpool可以进行申请slot的操作。•resourceManagerLeaderRetriever.start方法,来看下它的代码:

代码语言:javascript复制
      @Override
      public void start(LeaderRetrievalListener listener) {
          checkNotNull(listener, "Listener must not be null.");
          synchronized (startStopLock) {
              checkState(!started, "StandaloneLeaderRetrievalService can only be started once.");
              started = true;
              // directly notify the listener, because we already know the leading JobManager's address
              // 在TaskExecutor启动时这里会直接触发org.apache.flink.runtime.taskexecutor.TaskExecutor.ResourceManagerLeaderListener的notifyLeaderAddress方法,因为这时候已经知道了jobManager的地址
              // 在JobMaster启动时,这里会触发org.apache.flink.runtime.jobmaster.JobMaster.ResourceManagerLeaderListener.notifyLeaderAddress
              listener.notifyLeaderAddress(leaderAddress, leaderId);
          }
      }

我们进入到org.apache.flink.runtime.jobmaster.JobMaster.ResourceManagerLeaderListener#notifyLeaderAddress方法内部:

代码语言:javascript复制
  @Override
          public void notifyLeaderAddress(final String leaderAddress, final UUID leaderSessionID) {
              runAsync(
                  () -> notifyOfNewResourceManagerLeader(
                      leaderAddress,
                      ResourceManagerId.fromUuidOrNull(leaderSessionID)));
          }

再进入到org.apache.flink.runtime.jobmaster.JobMaster#notifyOfNewResourceManagerLeader方法内部:

代码语言:javascript复制
      private void notifyOfNewResourceManagerLeader(final String newResourceManagerAddress, final ResourceManagerId resourceManagerId) {
          resourceManagerAddress = createResourceManagerAddress(newResourceManagerAddress, resourceManagerId);

          reconnectToResourceManager(new FlinkException(String.format("ResourceManager leader changed to new address %s", resourceManagerAddress)));
      }

可以看到,这里会去连接新的resourceManager leader,然后进行相应的操作,我们继续来看它会进行哪些操作:

代码语言:javascript复制
      private void reconnectToResourceManager(Exception cause) {
          closeResourceManagerConnection(cause);
          tryConnectToResourceManager();
      }

      private void tryConnectToResourceManager() {
          if (resourceManagerAddress != null) {
              connectToResourceManager();
          }
      }

  private void connectToResourceManager() {
          assert(resourceManagerAddress != null);
          assert(resourceManagerConnection == null);
          assert(establishedResourceManagerConnection == null);
          log.info("Connecting to ResourceManager {}", resourceManagerAddress);
          resourceManagerConnection = new ResourceManagerConnection(
              log,
              jobGraph.getJobID(),
              resourceId,
              getAddress(),
              getFencingToken(),
              resourceManagerAddress.getAddress(),
              resourceManagerAddress.getResourceManagerId(),
              scheduledExecutorService);
          // 在与resourceManager连接失败时会进行重试
          resourceManagerConnection.start();
      }

我们接着来看resourceManagerConnection的start方法:

这里主要有两步操作,创建新的Registration和启动这个registration,我们分别来看一下:

•createNewRegistration方法:

在RetryingRegistration内部维护着一个CompletableFuture,在future内部有一个注册成功的执行逻辑,即回调onRegistrationSuccess方法。

我们来看下org.apache.flink.runtime.jobmaster.JobMaster.ResourceManagerConnection#onRegistrationSuccess方法中的执行逻辑:

代码语言:javascript复制
@Override
protected void onRegistrationSuccess(final JobMasterRegistrationSuccess success) {
    runAsync(() -> {
        // filter out outdated connections
        //noinspection ObjectEquality
        if (this == resourceManagerConnection) {
            establishResourceManagerConnection(success);
        }
    });
}

继续来分析org.apache.flink.runtime.jobmaster.JobMaster#establishResourceManagerConnection方法,这里先无脑贴代码:

代码语言:javascript复制
private void establishResourceManagerConnection(final JobMasterRegistrationSuccess success) {
        final ResourceManagerId resourceManagerId = success.getResourceManagerId();
        // verify the response with current connection
        if (resourceManagerConnection != null
                && Objects.equals(resourceManagerConnection.getTargetLeaderId(), resourceManagerId)) {
            log.info("JobManager successfully registered at ResourceManager, leader id: {}.", resourceManagerId);
            final ResourceManagerGateway resourceManagerGateway = resourceManagerConnection.getTargetGateway();
            final ResourceID resourceManagerResourceId = success.getResourceManagerResourceId();
            establishedResourceManagerConnection = new EstablishedResourceManagerConnection(
                resourceManagerGateway,
                resourceManagerResourceId);
            slotPool.connectToResourceManager(resourceManagerGateway);
            resourceManagerHeartbeatManager.monitorTarget(resourceManagerResourceId, new HeartbeatTarget<Void>() {
                @Override
                public void receiveHeartbeat(ResourceID resourceID, Void payload) {
                    resourceManagerGateway.heartbeatFromJobManager(resourceID);
                }

                @Override
                public void requestHeartbeat(ResourceID resourceID, Void payload) {
                    // request heartbeat will never be called on the job manager side
                }
            });
        } else {
            log.debug("Ignoring resource manager connection to {} because it's duplicated or outdated.", resourceManagerId);

        }
    }

这里主要有两个操作:1. 建立slotPool与resourceManager的连接;2. 建立与resourceManager的心跳监测机制。我们主要来看下前者,直接来看org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl#connectToResourceManager方法:

代码语言:javascript复制
    @Override
    public void connectToResourceManager(@Nonnull ResourceManagerGateway resourceManagerGateway) {
        this.resourceManagerGateway = checkNotNull(resourceManagerGateway);
        // work on all slots waiting for this connection
        for (PendingRequest pendingRequest : waitingForResourceManager.values()) {
            requestSlotFromResourceManager(resourceManagerGateway, pendingRequest);
        }
        // all sent off
        waitingForResourceManager.clear();
    }

该方法主要用于获取waitingForResourceManager中的pendingRequest,然后向resourceManager申请slot。

•newRegistration.startRegistration()方法,是具体的与resourceManager建立连接的方法,不是本文重点,这里就不过多去分析了。

再来看JobMaster#resetAndStartScheduler方法

在分析JobMaster#resetAndStartScheduler方法之前,我们先来看一下JobMaster的构造方法。由于JobMaster的构造方法比较长,这里我们主要关注下schedulerNG的实例化过程,代码片段如下:

代码语言:javascript复制
// 创建DefaultScheduler
this.schedulerNG = createScheduler(executionDeploymentTracker, jobManagerJobMetricGroup);

其中JobMaster#createScheduler方法代码如下:

在这里是创建SchedulerNG对象的逻辑,这里我们主要关注下SchedulerBase#SchedulerBase构造方法中关于executionGraph的生成逻辑,部分代码片段如下:

代码语言:javascript复制
    // 创建和恢复ExecutionGraph
        this.executionGraph = createAndRestoreExecutionGraph(jobManagerJobMetricGroup, checkNotNull(shuffleMaster), checkNotNull(partitionTracker), checkNotNull(executionDeploymentTracker), initializationTimestamp);
        // 调度拓朴里有partition信息和pipelineRegion信息
        this.schedulingTopology = executionGraph.getSchedulingTopology();

这里是从jobGraph生成executionGraph的逻辑,具体内容后面我们专门用一篇文章来进行分析。

这里我们继续来看JobMaster#resetAndStartScheduler方法,代码如下:

如果schedulerNG的requestJobStatus为JobStatus.CREATED,则指定schedulerNG的mainThreadExecutor为当前的mainThreadExecutor;否则,创建新的SchedulerNG实例并在创建完成后设置mainThreadExecutor;在schedulerAssignedFuture中的逻辑处理完成后,执行startScheduling方法,逻辑如下:

代码语言:javascript复制
private void startScheduling() {
    checkState(jobStatusListener == null);
    // register self as job status change listener
    jobStatusListener = new JobManagerJobStatusListener();
    schedulerNG.registerJobStatusListener(jobStatusListener);
    schedulerNG.startScheduling();
}

@Override
public final void startScheduling() {
    mainThreadExecutor.assertRunningInMainThread();
    registerJobMetrics();
    startAllOperatorCoordinators();
    startSchedulingInternal();
}

@Override
protected void startSchedulingInternal() {
    log.info(
        "Starting scheduling with scheduling strategy [{}]",
        schedulingStrategy.getClass().getName());
    prepareExecutionGraphForNgScheduling();
    schedulingStrategy.startScheduling();
}

在startScheduling方法内部是真正执行调度的部分,以我们这里使用到PipelinedRegionSchedulingStrategy#startScheduling方法为例:

代码语言:javascript复制
@Override
public void startScheduling() {
    final Set<SchedulingPipelinedRegion> sourceRegions = IterableUtils
        .toStream(schedulingTopology.getAllPipelinedRegions())
        .filter(region -> !region.getConsumedResults().iterator().hasNext())
        .collect(Collectors.toSet());
    maybeScheduleRegions(sourceRegions);
}

private void maybeScheduleRegions(final Set<SchedulingPipelinedRegion> regions) {
        final List<SchedulingPipelinedRegion> regionsSorted =
            SchedulingStrategyUtils.sortPipelinedRegionsInTopologicalOrder(schedulingTopology, regions);
        for (SchedulingPipelinedRegion region : regionsSorted) {
            // 处理region
            maybeScheduleRegion(region);
        }
    }

    private void maybeScheduleRegion(final SchedulingPipelinedRegion region) {
        if (!areRegionInputsAllConsumable(region)) {
            return;
        }
        checkState(areRegionVerticesAllInCreatedState(region), "BUG: trying to schedule a region which is not in CREATED state");
        final List<ExecutionVertexDeploymentOption> vertexDeploymentOptions =
            SchedulingStrategyUtils.createExecutionVertexDeploymentOptions(
                regionVerticesSorted.get(region),
                id -> deploymentOption);
        // 申请slot并进行deploy
        schedulerOperations.allocateSlotsAndDeploy(vertexDeploymentOptions);
    }

在内部会以region为单位进行各subTask的调度布署,篇幅问题这里就不再过多分析任务提交布署的部分了,后面用专门的文章来分析。

0 人点赞