flink源码分析之TaskManager启动篇

2020-10-27 16:41:05 浏览数 (1)

在flink社区上关于flink集群的剖析中讲到:flink运行时由两种类型的进程组成,一个jobManager和一个或多个TaskManager。

Flink Cluster组成

一张经典的flink 集群组成图如下:

客户端不是运行时和程序执行的一部分,但用于准备数据流并将其发送到JobManager。之后,客户端可以断开连接(分离模式 detached mode),或者保持连接以接收进度报告(附加模式 attached mode)。客户机可以作为触发执行的Java/Scala程序的一部分运行,也可以在命令行使用./bin/flink run...开启进程中运行。

JobManager和taskmanager可以以各种方式启动:作为独立集群直接在机器上启动,或者在容器中启动,或者由YARN或Mesos等资源框架管理。TaskManagers连接到JobManagers,宣布它们是可用的,并分配工作。

由于篇幅有限,这里我们主要关注下TaskManager的相关内容,关于JobManager的后面再具体来分析。

TaskManager

TaskManager就是执行数据流中任务以及缓冲和交换数据流的worker。必须始终至少有一个TaskManager。任务管理器中资源调度的最小单元是任务槽。任务管理器中的任务槽数表示并发处理任务的数量。注意,多个操作算子可能在一个任务槽中执行。

每个worker (TaskManager)都是一个JVM进程,可以在单独的线程中执行一个子任务。为了控制TaskManager接受多少任务,它有所谓的Task slot(至少一个)。

每个Task slot表示TaskManager的一个固定资源子集。例如,一个有三个插槽的TaskManager,会将其托管内存的1/3分配给每个插槽。对资源进行分槽意味着子任务不会与其他作业的子任务争夺托管内存,而是拥有一定数量的保留托管内存。注意,这里没有发生CPU隔离;当前插槽只分隔任务的托管内存。

通过调整任务槽的数量,用户可以定义子任务如何相互隔离。每个TaskManager有一个槽意味着每个任务组在单独的JVM中运行(例如,JVM可以在单独的容器中启动)。拥有多个槽意味着更多子任务共享同一个JVM。相同JVM中的任务共享TCP连接(通过多路复用)和心跳消息。它们还可以共享数据集和数据结构,从而减少每个任务的开销。

默认情况下,Flink允许子任务共享插槽,即使它们是不同任务的子任务,只要它们来自相同的作业。结果是,一个插槽可以容纳作业的整个管道。允许这种插槽共享有两个主要好处:

1.Flink集群需要的任务槽数与作业中使用的最高并行度相同。不需要计算一个程序总共包含多少任务(具有不同的并行性)2.更容易获得更好的资源利用。如果没有槽共享,非密集的source/map()子任务将阻塞和资源密集的窗口子任务一样多的资源。使用插槽共享,将示例中的基本并行性从2个增加到6个,可以充分利用有插槽的资源,同时确保繁重的子任务在TaskManager中得到公平分配。

关于上面对于flink taskManager的更多介绍,可以自行查阅flink官方文档[1]。下面将进入对TaskManager启动流程的源码分析部分。

TaskManager启动流程分析

这里的源码分析,我们以本地MiniCluster中各组件的启动流程为例。在本地提交一个job时,会启动一个MiniCluster,在这个集群内部分进行各组件的初始化操作,其中也包括TaskManager的启动。

直接来看org.apache.flink.runtime.minicluster.MiniCluster#start方法的部分代码:

代码语言:javascript复制
    if (useSingleRpcService) {
                    // we always need the 'commonRpcService' for auxiliary calls
                    // 这里会创建local rpc service
                    commonRpcService = createLocalRpcService(configuration);
                    final CommonRpcServiceFactory commonRpcServiceFactory = new CommonRpcServiceFactory(commonRpcService);
                    taskManagerRpcServiceFactory = commonRpcServiceFactory;
                    dispatcherResourceManagreComponentRpcServiceFactory = commonRpcServiceFactory;
                    metricQueryServiceRpcService = MetricUtils.startLocalMetricsRpcService(configuration);
    } else {
        ------
            dispatcherResourceManagreComponentRpcServiceFactory =
            new DedicatedRpcServiceFactory(
            configuration,
            // jobManager地址
            jobManagerExternalAddress,
            jobManagerExternalPortRange,
            jobManagerBindAddress);
        taskManagerRpcServiceFactory =
            new DedicatedRpcServiceFactory(
            configuration,
            // taskManager地址
            taskManagerExternalAddress,
            taskManagerExternalPortRange,
            taskManagerBindAddress);
    }   
 --------------省略部分代码----------------       
// 启动QueryService,在MetricRegistryImpl中,会启动一个rpc server
metricRegistry.startQueryService(metricQueryServiceRpcService, null);
// 执行io操作的线程池
ioExecutor = Executors.newFixedThreadPool(
ClusterEntrypointUtils.getPoolSize(configuration),
new ExecutorThreadFactory("mini-cluster-io"));
// 高可用服务
haServices = createHighAvailabilityServices(configuration, ioExecutor);
// blobServer用于管理一些jar包等
blobServer = new BlobServer(configuration, haServices.createBlobStore());
blobServer.start();
// 心跳服务
heartbeatServices = HeartbeatServices.fromConfiguration(configuration);
// blob缓存服务,用于BroadCast的缓存管理等
blobCacheService = new BlobCacheService(
configuration, haServices.createBlobStore(), new InetSocketAddress(InetAddress.getLocalHost(), blobServer.getPort())
);
// 启动taskManager,会启动taskExecutor,也会启动一个rpcServer
startTaskManagers();

关于rpcService加载的部分,会根据是否使用单例的rpcService来决定是创建CommonRpcServiceFactory还是DedicatedRpcServiceFactory。CommonRpcServiceFactory产生的是一个单例的rpcService,而DedicatedRpcServiceFactory每次产生的是不同的rpcService。

CommonRpcServiceFactory

如果使用CommonRpcServiceFactory,它rpcService产生的方法为:

代码语言:javascript复制
protected RpcService createLocalRpcService(Configuration configuration) throws Exception {
        return AkkaRpcServiceUtils.localServiceBuilder(configuration)
            .withCustomConfig(AkkaUtils.testDispatcherConfig())
            .createAndStart();
    }

这里会创建一个CommonRpcServiceFactory中使用的单例的akkaRpcService,对于JobManager的一些组件来说使用的也是这个akkaRpcService。

DedicatedRpcServiceFactory

而如果使用DedicatedRpcServiceFactory,它会调用org.apache.flink.runtime.minicluster.MiniCluster.DedicatedRpcServiceFactory#createRpcService方法创建RemoteRpcService:

代码语言:javascript复制
        @Override
        public RpcService createRpcService() throws Exception {
            final RpcService rpcService = MiniCluster.this.createRemoteRpcService(
                configuration, externalAddress, externalPortRange, bindAddress);
            synchronized (lock) {
                rpcServices.add(rpcService);
            }
            return rpcService;
        }
        protected RpcService createRemoteRpcService(
            Configuration configuration,
            String externalAddress,
            String externalPortRange,
            String bindAddress) throws Exception {
            return AkkaRpcServiceUtils.remoteServiceBuilder(configuration, externalAddress, externalPortRange)
                // 需要传入绑定地址
                .withBindAddress(bindAddress)
                .withCustomConfig(AkkaUtils.testDispatcherConfig())
                .createAndStart();
    }

参考上文DedicatedRpcServiceFactory的使用部分可以看出:对于jobManager和TaskManager来说会有不同的DedicatedRpcServiceFactory来产生相应的RpcService,它们会绑定不同的外部地址。

RpcService

这里我们主要来分析下AkkaRpcService的相关内容。

创建过程

它的创建过程发生在AkkaRpcServiceUtils#createAndStart方法中:

代码语言:javascript复制
public AkkaRpcService createAndStart() throws Exception {
        ------------------省略部分---------------
            final ActorSystem actorSystem;
            if (externalAddress == null) {
                // create local actor system
                // 创建本地的actor system,不需要外部地址,所有服务共用同一套actorSystem
                actorSystem = BootstrapTools.startLocalActorSystem(
                    configuration,
                    actorSystemName,
                    logger,
                    actorSystemExecutorConfiguration,
                    customConfig);
            } else {
                // create remote actor system
                // 因为是与外部地址绑定的,所以对于jobManager和TaskManager来说各自有一套actkorSystem
                actorSystem = BootstrapTools.startRemoteActorSystem(
                    configuration,
                    actorSystemName,
                    externalAddress,
                    externalPortRange,
                    bindAddress,
                    Optional.ofNullable(bindPort),
                    logger,
                    actorSystemExecutorConfiguration,
                    customConfig);
            }
            return new AkkaRpcService(actorSystem, AkkaRpcServiceConfiguration.fromConfiguration(configuration));
        }

这里对于actorSystem需要关注下面两点:

•创建本地的actor system,不需要外部地址,所有服务共用同一套actorSystem•因为是与外部地址绑定的,所以对于jobManager和TaskManager来说各自有一套actkorSystem

构造方法
代码语言:javascript复制
@VisibleForTesting
    public AkkaRpcService(final ActorSystem actorSystem, final AkkaRpcServiceConfiguration configuration) {
        this.actorSystem = checkNotNull(actorSystem, "actor system");
        this.configuration = checkNotNull(configuration, "akka rpc service configuration");
        // 需要传入一个actorSystem
        Address actorSystemAddress = AkkaUtils.getAddress(actorSystem);
        if (actorSystemAddress.host().isDefined()) {
            address = actorSystemAddress.host().get();
        } else {
            address = "";
        }
        if (actorSystemAddress.port().isDefined()) {
            port = (Integer) actorSystemAddress.port().get();
        } else {
            port = -1;
        }
        captureAskCallstacks = configuration.captureAskCallStack();
        // 内部调度执行器
        internalScheduledExecutor = new ActorSystemScheduledExecutorAdapter(actorSystem);
        terminationFuture = new CompletableFuture<>();
        stopped = false;
        // actor体系中的监督者,主要用于服务重启,对于flink的重启机制发挥作用的地方,它有自己的executorService
        supervisor = startSupervisorActor();
    }

主要执行一些属性的初始化操作,并且会创建并启动对应actorSystem的监督者。

org.apache.flink.runtime.rpc.akka.AkkaRpcService#startServer方法

AkkaRpcService#startServer方法的调用位置在TaskExecutor的父类RpcEndpoint中,在执行TaskExecutor的构造方法时会调用RpcEndpoint的构造方法:

代码语言:javascript复制
    protected RpcEndpoint(final RpcService rpcService, final String endpointId) {
        this.rpcService = checkNotNull(rpcService, "rpcService");
        this.endpointId = checkNotNull(endpointId, "endpointId");
        this.rpcServer = rpcService.startServer(this);
        this.mainThreadExecutor = new MainThreadExecutor(rpcServer, this::validateRunsInMainThread);
    }

我们聚焦到 rpcService.startServer(this),我们先关注下这个this,在下面会有比较重要的作用。

我们先来看下startServer方法代码:

代码语言:javascript复制
    @Override
    public <C extends RpcEndpoint & RpcGateway> RpcServer startServer(C rpcEndpoint) {
        checkNotNull(rpcEndpoint, "rpc endpoint");
        // 注册akkaRpcActor,TaskExecutor不是FencedRpcEndpoint类型的,JobMaster是FencedRpcEndpoint类型的
        final SupervisorActor.ActorRegistration actorRegistration = registerAkkaRpcActor(rpcEndpoint);
        // 获取注册后得到的actor对象
        final ActorRef actorRef = actorRegistration.getActorRef();
        // TerminationFuture,可以在里面执行一些中止后的操作
        final CompletableFuture<Void> actorTerminationFuture = actorRegistration.getTerminationFuture();
        LOG.info("Starting RPC endpoint for {} at {} .", rpcEndpoint.getClass().getName(), actorRef.path());
        // 获得对应actor对象在对应的actorSystem中的地址
        final String akkaAddress = AkkaUtils.getAkkaURL(actorSystem, actorRef);
        final String hostname;
        Option<String> host = actorRef.path().address().host();
        if (host.isEmpty()) {
            hostname = "localhost";
        } else {
            hostname = host.get();
        }
        Set<Class<?>> implementedRpcGateways = new HashSet<>(RpcUtils.extractImplementedRpcGateways(rpcEndpoint.getClass()));
        // 为代理接口中添加RpcServer接口
        implementedRpcGateways.add(RpcServer.class);
        // 添加AkkaBasedEndpoint接口
        implementedRpcGateways.add(AkkaBasedEndpoint.class);
        // 用于触发代理类方法的invocationHandler
        final InvocationHandler akkaInvocationHandler;
        // 如果是FencedRpcEndpoint,这里对应的akkaInvocationHandler为FencedAkkaInvocationHandler类型,JobMaster返回的是这种类型的
        if (rpcEndpoint instanceof FencedRpcEndpoint) {
            // a FencedRpcEndpoint needs a FencedAkkaInvocationHandler
            akkaInvocationHandler = new FencedAkkaInvocationHandler<>(
                akkaAddress,
                hostname,
                actorRef,
                configuration.getTimeout(),
                configuration.getMaximumFramesize(),
                actorTerminationFuture,
                ((FencedRpcEndpoint<?>) rpcEndpoint)::getFencingToken,
                captureAskCallstacks);
           implementedRpcGateways.add(FencedMainThreadExecutable.class);
        } else {// 如果不是FencedRpcEndpoint,这里返回的是AkkaInvocationHandler,TaskExecutor返回的是这种类型的
            akkaInvocationHandler = new AkkaInvocationHandler(
                akkaAddress,
                hostname,
                actorRef,
                configuration.getTimeout(),
                configuration.getMaximumFramesize(),
                actorTerminationFuture,
                captureAskCallstacks);
        }
        // Rather than using the System ClassLoader directly, we derive the ClassLoader
        // from this class . That works better in cases where Flink runs embedded and all Flink
        // code is loaded dynamically (for example from an OSGI bundle) through a custom ClassLoader
        ClassLoader classLoader = getClass().getClassLoader();
        // jdk的动态代理,对implementedRpcGateways里维护的接口实现进行代理,会在akkaInvocationHandler的invoke方法中反射调用接口中的一些方法
        @SuppressWarnings("unchecked")
        RpcServer server = (RpcServer) Proxy.newProxyInstance(
            classLoader,
            implementedRpcGateways.toArray(new Class<?>[implementedRpcGateways.size()]),
            akkaInvocationHandler);
        return server;
    }

根据流程来看,主要分为以下几步:

•注册akkaRpcActor,TaskExecutor不是FencedRpcEndpoint类型的,注册的是AkkaRpcActor类型的actor,JobMaster是FencedRpcEndpoint类型的,注册的是FencedAkkaRpcActor类型的actor。这点有疑惑的可以参考下下面这个类图:

•FencedRpcEndpoint类型的会创建FencedAkkaInvocationHandler类型的invocationHandler,否则会创建AkkaInvocationHandler类型的invocationHandler。为RpcServer和AkkaBasedEndpoint接口创建动态代理,这是一个jdk的动态代理,对implementedRpcGateways里维护的接口实现进行代理,会在akkaInvocationHandler的invoke方法中反射调用接口中的一些方法。

可以看出在这里返回的RpcServer是一个动态代理。也就是说在TaskExecutor的父类RpcEndpoint中维护的属性RpcServer rpcServer是一个动态代理,这时我们再回过头来看taskExecutor.start方法的调用链:

代码链截图如下:

其中org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler#start方法代码如下:

代码语言:javascript复制
@Override
    public void start() {
        rpcEndpoint.tell(ControlMessages.START, ActorRef.noSender());
    }

这里会通过rpcEndpoint向actorSystem发送一条ControlMessages.START类型的消息,关于这条消息有什么用,怎么形成的闭环,请继续看下面的分析内容。

继续来看启动流程

当然,org.apache.flink.runtime.minicluster.MiniCluster#start方法会加载一系列的服务和JobManager的几个关键组件ResourceManager、Dispatcher、JobMaster等,这里我们主要关注TaskManager的启动流程。

进入到org.apache.flink.runtime.minicluster.MiniCluster#startTaskManagers方法:

代码语言:javascript复制
@GuardedBy("lock")
private void startTaskManagers() throws Exception {
    final int numTaskManagers = miniClusterConfiguration.getNumTaskManagers();
    LOG.info("Starting {} TaskManger(s)", numTaskManagers);
    for (int i = 0; i < numTaskManagers; i  ) {
        startTaskExecutor();
    }
}

在这里会去获取配置中需要启动的taskManager的个数,然后启动指定个数的TaskManager。

继续来看org.apache.flink.runtime.minicluster.MiniCluster#startTaskExecutor方法:

代码语言:javascript复制
@VisibleForTesting
    void startTaskExecutor() throws Exception {
        synchronized (lock) {
            final Configuration configuration = miniClusterConfiguration.getConfiguration();
            // 创建TaskExecutor
            final TaskExecutor taskExecutor = TaskManagerRunner.startTaskManager(
                configuration,
                // 这个用于在ResourceManager中标识TaskExecutor的使用资源
                new ResourceID(UUID.randomUUID().toString()),
                taskManagerRpcServiceFactory.createRpcService(),
                haServices,
                heartbeatServices,
                metricRegistry,
                blobCacheService,
                useLocalCommunication(),
                ExternalResourceInfoProvider.NO_EXTERNAL_RESOURCES,
                taskManagerTerminatingFatalErrorHandlerFactory.create(taskManagers.size()));
            // 启动TaskExecutor
            taskExecutor.start();
            // 将taskExecutor加入到taskMangers列表中
            taskManagers.add(taskExecutor);
        }
    }

这里会创建并启动taskExecutor,并将启动了的taskExecutor放入到taskManagers列表中。

我们再来分析下org.apache.flink.runtime.taskexecutor.TaskManagerRunner#startTaskManager方法:

代码语言:javascript复制
public static TaskExecutor startTaskManager(
            Configuration configuration,
            ResourceID resourceID,
            RpcService rpcService,
            HighAvailabilityServices highAvailabilityServices,
            HeartbeatServices heartbeatServices,
            MetricRegistry metricRegistry,
            BlobCacheService blobCacheService,
            boolean localCommunicationOnly,
            ExternalResourceInfoProvider externalResourceInfoProvider,
            FatalErrorHandler fatalErrorHandler) throws Exception {
   -----------------省略部分代码------------------------------------
        String externalAddress = rpcService.getAddress();
        // 声明运行TaskExecutor所需要的资源
        final TaskExecutorResourceSpec taskExecutorResourceSpec = TaskExecutorResourceUtils.resourceSpecFromConfig(configuration);
        // TaskManager服务配置
        TaskManagerServicesConfiguration taskManagerServicesConfiguration =
            TaskManagerServicesConfiguration.fromConfiguration(
                configuration,
                resourceID,
                externalAddress,
                localCommunicationOnly,
                taskExecutorResourceSpec);
        // taskManager的Metric组,用于metric管理
        Tuple2<TaskManagerMetricGroup, MetricGroup> taskManagerMetricGroup = MetricUtils.instantiateTaskManagerMetricGroup(
            metricRegistry,
            externalAddress,
            resourceID,
            taskManagerServicesConfiguration.getSystemResourceMetricsProbingInterval());
        // taskExecutor的执行线程池,它的个数是根据cluster.io-pool.size参数配置,如果没有配置会使用cpu核数的四倍
        final ExecutorService ioExecutor = Executors.newFixedThreadPool(
            taskManagerServicesConfiguration.getNumIoThreads(),
            new ExecutorThreadFactory("flink-taskexecutor-io"));
        // TaskManager的一系列服务
        TaskManagerServices taskManagerServices = TaskManagerServices.fromConfiguration(
            taskManagerServicesConfiguration,
            blobCacheService.getPermanentBlobService(),
            taskManagerMetricGroup.f1,
            ioExecutor,
            fatalErrorHandler);
             ------------------省略部分代码---------------------
        return new TaskExecutor(
            rpcService,
            taskManagerConfiguration,
            highAvailabilityServices,
            taskManagerServices,
            externalResourceInfoProvider,
            heartbeatServices,
            taskManagerMetricGroup.f0,
            metricQueryServiceAddress,
            blobCacheService,
            fatalErrorHandler,
            new TaskExecutorPartitionTrackerImpl(taskManagerServices.getShuffleEnvironment()),
            createBackPressureSampleService(configuration, rpcService.getScheduledExecutor()));         
    }

在这个方法中会依次声明运行TaskExecutor所需要的资源,然后创建taskExecutor的执行线程池,它的个数是根据cluster.io-pool.size参数配置,如果没有配置会使用cpu核数的四倍,接着会创建TaskManager运行需要的一系列服务,最终会依据这些属性创建TaskExecutor实例并返回。

我们进入到org.apache.flink.runtime.taskexecutor.TaskManagerServices#fromConfiguration方法中来看下具体代码:

代码语言:javascript复制
public static TaskManagerServices fromConfiguration(
            TaskManagerServicesConfiguration taskManagerServicesConfiguration,
            PermanentBlobService permanentBlobService,
            MetricGroup taskManagerMetricGroup,
            ExecutorService ioExecutor,
            FatalErrorHandler fatalErrorHandler) throws Exception {
        // pre-start checks
        checkTempDirs(taskManagerServicesConfiguration.getTmpDirPaths());
        // 类似于一个even source模型,内部维护着一个分区与TaskEventHandler之间关系的map,其他线程可以
        // 执行subscribe进行事件订阅,在有事件发生时TaskEventDispatcher会执行publish方法进行通知,主要为了达到某个地方发生改变后,关注的位置也相应作出反应。
        final TaskEventDispatcher taskEventDispatcher = new TaskEventDispatcher();
        // start the I/O manager, it will create some temp directories.
        // 异步的io管理器,主要用于控制io操作
        final IOManager ioManager = new IOManagerAsync(taskManagerServicesConfiguration.getTmpDirPaths());
         // shuffle环境,在这个方法内部会有一个SPI的拓展点,会从配置的shuffle-service-factory.class中加载用户定义的shuffle-service-factory。内部提供了一个NettyShuffleServiceFactory的实现,会创建NettyShuffleEnvironment对象
        final ShuffleEnvironment<?, ?> shuffleEnvironment = createShuffleEnvironment(
            taskManagerServicesConfiguration,
            taskEventDispatcher,
            taskManagerMetricGroup,
            ioExecutor);
        // 启动shuffleEnvironment,如果使用的是NettyShuffleServiceFactory这里会启动一个netty server和netty client,并和dubbo 协议类似用一个ConnectionId维护一个PartitionRequestClient,关于这个内部的细节后续会详细来分析
        final int listeningDataPort = shuffleEnvironment.start();
        // 创建kvState服务
        final KvStateService kvStateService = KvStateService.fromConfiguration(taskManagerServicesConfiguration);
           // 启动kvState服务
        kvStateService.start();
        // 未能解析的taskManager的位置
        final UnresolvedTaskManagerLocation unresolvedTaskManagerLocation = new UnresolvedTaskManagerLocation(
            taskManagerServicesConfiguration.getResourceID(),
            taskManagerServicesConfiguration.getExternalAddress(),
            // we expose the task manager location with the listening port
            // iff the external data port is not explicitly defined
            taskManagerServicesConfiguration.getExternalDataPort() > 0 ?
                taskManagerServicesConfiguration.getExternalDataPort() :
                listeningDataPort);
        // 广播变量管理器
        final BroadcastVariableManager broadcastVariableManager = new BroadcastVariableManager();
        // 任务槽table
        final TaskSlotTable<Task> taskSlotTable = createTaskSlotTable(
            taskManagerServicesConfiguration.getNumberOfSlots(),
            taskManagerServicesConfiguration.getTaskExecutorResourceSpec(),
            taskManagerServicesConfiguration.getTimerServiceShutdownTimeout(),
            taskManagerServicesConfiguration.getPageSize(),
            ioExecutor);
        // 任务table
        final JobTable jobTable = DefaultJobTable.create();
        // 任务leader服务
        final JobLeaderService jobLeaderService = new DefaultJobLeaderService(unresolvedTaskManagerLocation, taskManagerServicesConfiguration.getRetryingRegistrationConfiguration());
        // state根目录
        final String[] stateRootDirectoryStrings = taskManagerServicesConfiguration.getLocalRecoveryStateRootDirectories();
        final File[] stateRootDirectoryFiles = new File[stateRootDirectoryStrings.length];
        for (int i = 0; i < stateRootDirectoryStrings.length;   i) {
            stateRootDirectoryFiles[i] = new File(stateRootDirectoryStrings[i], LOCAL_STATE_SUB_DIRECTORY_ROOT);
        }
        // taskState管理器
        final TaskExecutorLocalStateStoresManager taskStateManager = new TaskExecutorLocalStateStoresManager(
            taskManagerServicesConfiguration.isLocalRecoveryEnabled(),
            stateRootDirectoryFiles,
            ioExecutor);
         // 当jvm的元数据空间内存溢出时的错误
        final boolean failOnJvmMetaspaceOomError =
taskManagerServicesConfiguration.getConfiguration().getBoolean(CoreOptions.FAIL_ON_USER_CLASS_LOADING_METASPACE_OOM);
        // 库缓存管理器
        final LibraryCacheManager libraryCacheManager = new BlobLibraryCacheManager(
            permanentBlobService,
            BlobLibraryCacheManager.defaultClassLoaderFactory(
                taskManagerServicesConfiguration.getClassLoaderResolveOrder(),
                taskManagerServicesConfiguration.getAlwaysParentFirstLoaderPatterns(),
                failOnJvmMetaspaceOomError ? fatalErrorHandler : null));
        // 返回TaskManager服务
        return new TaskManagerServices(
            unresolvedTaskManagerLocation,
            taskManagerServicesConfiguration.getManagedMemorySize().getBytes(),
            ioManager,
            shuffleEnvironment,
            kvStateService,
            broadcastVariableManager,
            taskSlotTable,
            jobTable,
            jobLeaderService,
            taskStateManager,
            taskEventDispatcher,
            ioExecutor,
            libraryCacheManager);
    }

这个方法的主要功能是创建TaskManager中的一系列服务,按照流程它的操作主要如下:

•创建一个类似于even source的模型,内部维护着一个分区与TaskEventHandler之间关系的map,其他线程可以执行subscribe进行事件订阅,在有事件发生时TaskEventDispatcher会执行publish方法进行通知。•创建异步的io管理器,主要用于控制io操作。•创建ShuffleEnvironment,在这个方法内部会有一个SPI的拓展点,会从配置的shuffle-service-factory.class中加载用户定义的shuffle-service-factory。内部提供了一个NettyShuffleServiceFactory的实现,会创建NettyShuffleEnvironment对象。•启动shuffleEnvironment,如果使用的是NettyShuffleServiceFactory这里会启动一个netty server和netty client,并和dubbo 协议类似用一个ConnectionId维护一个PartitionRequestClient,关于这个内部的细节后续会详细来分析。•创建kvState服务。•启动kvState服务•创建 广播变量管理器•初始化任务槽table,里面维护着任务槽和task manager之间的关系•创建JobTable,jobTable中维护的信息如下:

代码语言:javascript复制
  private final Map<JobID, JobOrConnection> jobs;
      private final Map<ResourceID, JobID> resourceIdJobIdIndex;
      private DefaultJobTable() {
          this.jobs = new HashMap<>();
          this.resourceIdJobIdIndex = new HashMap<>();
     }

维护着任务容器和resourceId与jobId关系的容器。

•创建JobLeaderService•创建TaskExecutor的state管理器•创建库文件缓存管理器。

TashExecutor的启动

启动的代码在org.apache.flink.runtime.minicluster.MiniCluster#startTaskExecutor方法中:

代码语言:javascript复制
taskExecutor.start();

它会调用org.apache.flink.runtime.rpc.RpcEndpoint#start方法:

代码语言:javascript复制
public final void start() {
        rpcServer.start();
    }

那么rpcServer来自哪里呢?

我们看下RpcEndpoint的构造方法:

代码语言:javascript复制
    protected RpcEndpoint(final RpcService rpcService, final String endpointId) {
        this.rpcService = checkNotNull(rpcService, "rpcService");
        this.endpointId = checkNotNull(endpointId, "endpointId");
        this.rpcServer = rpcService.startServer(this);
        this.mainThreadExecutor = new MainThreadExecutor(rpcServer, this::validateRunsInMainThread);
    }

可见rpcServer来自于rpcService,其中RpcEndpoint是TaskExecutor的父类,我们此时回过头来看下TaskExecutor的构造方法有下面这一行代码:

代码语言:javascript复制
super(rpcService, AkkaRpcServiceUtils.createRandomName(TASK_MANAGER_NAME));

那么对于rpcService我们上文有提到过,它是AkkaRpcService类型的。

那么上面RpcEndpoint中rpcService.startServer(this)调用的就是org.apache.flink.runtime.rpc.akka.AkkaRpcService#startServer方法,会返回一个RpcServer代理,包装的是AkkaInvocationHandler对象。

AkkaInvocationHandler中的start方法如下:

代码语言:javascript复制
@Override
    public void start() {
        rpcEndpoint.tell(ControlMessages.START, ActorRef.noSender());
    }

也就是说,在调用taskExecutor.start()方法时最后会调用上面这个AkkaInvocationHandler.start()方法,这里注意下它发出的消息是一个ControlMessages.START格式的,这对于接下来非常有用。

我们来反推一下:org.apache.flink.runtime.taskexecutor.TaskExecutor#onStart方法在org.apache.flink.runtime.rpc.RpcEndpoint#internalCallOnStart中会回调,而internalCallOnStart方法会在org.apache.flink.runtime.rpc.akka.AkkaRpcActor.StoppedState#start中调用。StoppedState#start方法会在org.apache.flink.runtime.rpc.akka.AkkaRpcActor#handleControlMessage方法中调用,而handleControlMessage方法会在org.apache.flink.runtime.rpc.akka.AkkaRpcActor#createReceive方法中模式匹配:

代码语言:javascript复制
@Override
public Receive createReceive() {
    return ReceiveBuilder.create()
        .match(RemoteHandshakeMessage.class, this::handleHandshakeMessage)
        .match(ControlMessages.class, this::handleControlMessage)
        .matchAny(this::handleMessage)
        .build();
}

在Actor编程中Actor 类需要继承AbstractActor类实现createReceive方法,绑定各类actor收到不同类型消息对应处理不同业务逻辑。createReceive方法内部就是一些模式匹配的逻辑,我们可以看到当match的消息类型为ControlMessages类型时会进入handleControlMessage方法来处理,然后会调用StoppedState#start方法,接下来回调AkkaRpcActor.StoppedState#start方法,接下来回调RpcEndpoint#internalCallOnStart方法,然后回调TaskExecutor#onStart方法,我们来看一下org.apache.flink.runtime.taskexecutor.TaskExecutor#onStart方法:

代码语言:javascript复制
    @Override
    public void onStart() throws Exception {
        try {
            // 启动相关服务
            startTaskExecutorServices();
        } catch (Exception e) {
            final TaskManagerException exception = new TaskManagerException(String.format("Could not start the TaskExecutor %s", getAddress()), e);
            onFatalError(exception);
            throw exception;
        }
        // 处理注册超时的问题
        startRegistrationTimeout();
    }

在startTaskExecutorServices方法内部会启动与TaskExecutor相关的一些组件:

代码语言:javascript复制
    private void startTaskExecutorServices() throws Exception {
        try {
            // start by connecting to the ResourceManager
            resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener());
            // tell the task slot table who's responsible for the task slot actions
            taskSlotTable.start(new SlotActionsImpl(), getMainThreadExecutor());

            // start the job leader service
            jobLeaderService.start(getAddress(), getRpcService(), haServices, new JobLeaderListenerImpl());

            fileCache = new FileCache(taskManagerConfiguration.getTmpDirectories(), blobCacheService.getPermanentBlobService());
        } catch (Exception e) {
            handleStartTaskExecutorServicesException(e);
        }
    }

这里主要涉及如下几种操作:

•通过连接ResourceManager的方式(注册监听器)启动resourceManagerLeaderRetriever。•taskSlotTable.start方法的目的是设置taskSlotTable的相关属性,告诉taskSlotTable谁负责任务槽操作。•启动job leader service•创建文件缓存

好了,到这里TaskExecutor的启动过程的源码分析就结束了,接下来就可以通过org.apache.flink.runtime.jobmaster.RpcTaskManagerGateway#submitTask方法来调用org.apache.flink.runtime.taskexecutor.TaskExecutor#submitTask方法来提交任务了(org.apache.flink.runtime.executiongraph.Execution#deploy方法中调用RpcTaskManagerGateway#submitTask方法时RpcTaskManagerGateway也使用了代理模式)。由于篇幅有限,本篇就不再继续分析了。

总结

本篇主要分析了TaskExecutor启动的一系列流程,其中包括它的RpcService的构建、dispatcher、ioManager、shuffleEnvironment、kvStateService、广播变量管理器、任务槽table、taskState管理器的创建及启动过程分析。主要分析了RpcService中的actorSystem构建流程及其中的代理模式的使用,另外对shuffleEnvironment内部的nettyShuffleEnvironment及其中的nettyServer、nettyClient的工作模式进行了走马观花式的分析。本文的重点是理一下TaskManger的启动脉络,后面有时间的话会对其中的一些细节进行具体分析,敬请期待。

References

[1] flink官方文档: https://ci.apache.org/projects/flink/flink-docs-release-1.11

0 人点赞