Flink1.10任务提交流程分析(二)

2022-04-18 13:29:21 浏览数 (1)

在Flink1.10任务提交流程分析(一)中分析了从flink run开始到任务提交到集群前的流程分析,对于不同的提交模式Flink中使用不同的PipelineExecutor,本篇基于yarn-per-job模式分析向yarn-cluster提交任务的流程。(注:基于1.10.1分析)

YarnJobClusterExecutor

接着上篇的分析,任务最终提交是交给PipelineExecutor来execute,PipelineExecutor的选择是根据不同的提交模式来决定即execution.target参数来决定,对于yarn-per-job会选择YarnJobClusterExecutor类型的executor。

代码语言:javascript复制
public class YarnJobClusterExecutor extends AbstractJobClusterExecutor<ApplicationId, YarnClusterClientFactory> {
   public static final String NAME = "yarn-per-job";
   public YarnJobClusterExecutor() {
      super(new YarnClusterClientFactory());
   }
}

其实现比较简单,比较重要其构造器中YarnClusterClientFactory,用于创建YarnClusterDescriptor,包含了yarn客户端YarnClient、yarn配置、提交yarn的队列等一些提交yarn的信息。它继承了AbstractJobClusterExecutor 抽象任务提交executor,execute也是由AbstractJobClusterExecutor来执行:

代码语言:javascript复制
public class AbstractJobClusterExecutor<ClusterID, ClientFactory extends ClusterClientFactory<ClusterID>> implements PipelineExecutor {

   private static final Logger LOG = LoggerFactory.getLogger(AbstractJobClusterExecutor.class);
   //代表的就是YarnClusterClientFactory
   private final ClientFactory clusterClientFactory;

   public AbstractJobClusterExecutor(@Nonnull final ClientFactory clusterClientFactory) {
      this.clusterClientFactory = checkNotNull(clusterClientFactory);
   }

   //执行任务提交
   //pipeline 代表StreamGraph
   public CompletableFuture<JobClient> execute(@Nonnull final Pipeline pipeline, @Nonnull final Configuration configuration) throws Exception {
      //将StreamGraph转换为JobGraph
      final JobGraph jobGraph = ExecutorUtils.getJobGraph(pipeline, configuration);
     //创建提交任务的一些信息:YarnClusterDescriptor
      try (final ClusterDescriptor<ClusterID> clusterDescriptor = clusterClientFactory.createClusterDescriptor(configuration)) {
        //将配置信息封装在ExecutionConfigAccessor中
         final ExecutionConfigAccessor configAccessor = ExecutionConfigAccessor.fromConfiguration(configuration);
         //包含了提交任务所需资源描述:内存大小、并行度 
         final ClusterSpecification clusterSpecification = clusterClientFactory.getClusterSpecification(configuration);
         //提交任务
         final ClusterClientProvider<ClusterID> clusterClientProvider = clusterDescriptor
               .deployJobCluster(clusterSpecification, jobGraph, 
                                 //是否采用分离模式
                                 configAccessor.getDetachedMode());
         LOG.info("Job has been submitted with JobID "   jobGraph.getJobID());

         return CompletableFuture.completedFuture(
               new ClusterClientJobClientAdapter<>(clusterClientProvider, jobGraph.getJobID()));
      }
   }
}

关于ClusterSpecification中描述了任务提交到集群所需的资源大小,对于分配模式建议详读一下官网Flink1.10的内存管理机制便于更好的理解。任务最终交给YarnClusterDescriptor deploy。

Deploy过程

deploy过程代表了与yarn交互的过程,clusterDescriptor.deployJobCluster会调用内部deployInternal方法:

代码语言:javascript复制
private ClusterClientProvider<ApplicationId> deployInternal(
      ClusterSpecification clusterSpecification,
      String applicationName,
      String yarnClusterEntrypoint,
      @Nullable JobGraph jobGraph,
      boolean detached) throws Exception {
    //..... 会做一些检查工作: yarn队列是否存在、配置检查
    //校验资源大小等等
   ApplicationReport report = startAppMaster(
         flinkConfiguration,
         applicationName,
         yarnClusterEntrypoint,
         jobGraph,
         yarnClient,
         yarnApplication,
         validClusterSpecification);

   //....
}

最重的就是startAppMaster,在yarn上启动一个AppMaster进程,其中yarnClusterEntrypoint表示该进程的入口类,也就是JobMaster的启动入口类:org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint, 在集群的机器进程上也能看到该类,如果看到这个进程我们就可知道表示的是JobMaster的进程。

startAppMaster的过程比较长,这里也会逐一分解:

代码语言:javascript复制
private ApplicationReport startAppMaster(
      Configuration configuration,
      String applicationName,
      String yarnClusterEntrypoint,
      JobGraph jobGraph,
      YarnClient yarnClient,
      YarnClientApplication yarnApplication,
      ClusterSpecification clusterSpecification) throws Exception {

   // ------------------ Initialize the file systems -------------------------

   org.apache.flink.core.fs.FileSystem.initialize(
         configuration,
         PluginUtils.createPluginManagerFromRootFolder(configuration));

   //获取homeDir, 表示jar、log配置上传的路径, 一般表示在hdfs上
   //其路径为/user/hadoop, (hadoop表示的当前的用户)
   final FileSystem fs = FileSystem.get(yarnConfiguration);
   final Path homeDir = fs.getHomeDirectory();
   //提交到yarn的描述信息
   ApplicationSubmissionContext appContext = yarnApplication.getApplicationSubmissionContext();
   // 会被上传到hdfs的文件 并且被添加到classpath中
   Set<File> systemShipFiles = new HashSet<>(shipFiles.size());
   // 仅仅是会被上传到hdfs , 但是不会被添加到classpath
   Set<File> shipOnlyFiles = new HashSet<>();
   for (File file : shipFiles) {
      systemShipFiles.add(file.getAbsoluteFile());
   }

   final String logConfigFilePath = configuration.getString(YarnConfigOptionsInternal.APPLICATION_LOG_CONFIG_FILE);
   if (logConfigFilePath != null) {
      systemShipFiles.add(new File(logConfigFilePath));
   }
   //将flink_home/lib 下的文件添加到systemShipFiles、通过-yt指定的文件也在里面
   addLibFoldersToShipFiles(systemShipFiles);

   //将flink_home/plugins 下的文件添加到shipOnlyFiles
   addPluginsFoldersToShipFiles(shipOnlyFiles);

   final ApplicationId appId = appContext.getApplicationId();

   // zk-ha相关的配置
   String zkNamespace = getZookeeperNamespace();
   // no user specified cli argument for namespace?
   if (zkNamespace == null || zkNamespace.isEmpty()) {
      // namespace defined in config? else use applicationId as default.
      zkNamespace = configuration.getString(HighAvailabilityOptions.HA_CLUSTER_ID, String.valueOf(appId));
      setZookeeperNamespace(zkNamespace);
   }

   configuration.setString(HighAvailabilityOptions.HA_CLUSTER_ID, zkNamespace);

   if (HighAvailabilityMode.isHighAvailabilityModeActivated(configuration)) {
      // activate re-execution of failed applications
      appContext.setMaxAppAttempts(
            configuration.getInteger(
                  YarnConfigOptions.APPLICATION_ATTEMPTS.key(),
                  YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS));

      activateHighAvailabilitySupport(appContext);
   } else {
      // set number of application retries to 1 in the default case
      appContext.setMaxAppAttempts(
            configuration.getInteger(
                  YarnConfigOptions.APPLICATION_ATTEMPTS.key(),
                  1));
   }

  //userJarFiles  表示用户jar
   final Set<File> userJarFiles = (jobGraph == null)
         // not per-job submission
         ? Collections.emptySet()
         // add user code jars from the provided JobGraph
         : jobGraph.getUserJars().stream().map(f -> f.toUri()).map(File::new).collect(Collectors.toSet());

   //需要cache文件上传到hdfs,一般使用在文件共享中
   if (jobGraph != null) {
      for (Map.Entry<String, DistributedCache.DistributedCacheEntry> entry : jobGraph.getUserArtifacts().entrySet()) {
         org.apache.flink.core.fs.Path path = new org.apache.flink.core.fs.Path(entry.getValue().filePath);
         // only upload local files
         if (!path.getFileSystem().isDistributedFS()) {
            Path localPath = new Path(path.getPath());
            Tuple2<Path, Long> remoteFileInfo =
               Utils.uploadLocalFileToRemote(fs, appId.toString(), localPath, homeDir, entry.getKey());
            jobGraph.setUserArtifactRemotePath(entry.getKey(), remoteFileInfo.f0.toString());
         }
      }

      jobGraph.writeUserArtifactEntriesToConfiguration();
   }

   //表示启动appMaster需要的资源文件,会从hdfs上下载
   final Map<String, LocalResource> localResources = new HashMap<>(2   systemShipFiles.size()   userJarFiles.size());
   // 访问hdfs的安全设置
   final List<Path> paths = new ArrayList<>(2   systemShipFiles.size()   userJarFiles.size());
   // 启动taskExecutor需要的资源文件
   StringBuilder envShipFileList = new StringBuilder();

   //几个uploadAndRegisterFiles  方法,将systemShipFiles、shipOnlyFiles、用户jar上传到hdfs

   if (userJarInclusion == YarnConfigOptions.UserJarInclusion.ORDER) {
      systemClassPaths.addAll(userClassPaths);
   }

   // normalize classpath by sorting
   Collections.sort(systemClassPaths); //系统的一些classpath 排序
   Collections.sort(userClassPaths); //用户classpath 排序

   // classPathBuilder: 存放classpath的信息
   StringBuilder classPathBuilder = new StringBuilder();
     /*
      * 构建classpath: shipFile-jar、user-jar、log4j、yaml配置文件
      */

   final Path yarnFilesDir = getYarnFilesDir(appId);
   FsPermission permission = new FsPermission(FsAction.ALL, FsAction.NONE, FsAction.NONE);
   fs.setPermission(yarnFilesDir, permission); // set permission for path.

   /*
    *中间一堆与安全相关的配置
    */

  //执行的java命令信息,启动YarnJobClusterEntrypoint 
   final ContainerLaunchContext amContainer = setupApplicationMasterContainer(
         yarnClusterEntrypoint,
         hasLogback,
         hasLog4j,
         hasKrb5,
         clusterSpecification.getMasterMemoryMB());

   if (UserGroupInformation.isSecurityEnabled()) {
      // set HDFS delegation tokens when security is enabled
      LOG.info("Adding delegation token to the AM container.");
      Utils.setTokensFor(amContainer, paths, yarnConfiguration);
   }

   amContainer.setLocalResources(localResources);
   fs.close();

   // Setup CLASSPATH and environment variables for ApplicationMaster
   final Map<String, String> appMasterEnv = new HashMap<>();
   /**
    * 配置环境变量参数  到  appMasterEnv中,在启动启动YarnJobClusterEntrypoint时用到,
    * 例如: classpath、hadoopUser、appId等
    */

   amContainer.setEnvironment(appMasterEnv);

    // 还有一堆设置提交任务队列、yarn任务名称的配置信息

   // add a hook to clean up in case deployment fails
   Thread deploymentFailureHook = new DeploymentFailureHook(yarnApplication, yarnFilesDir);
   Runtime.getRuntime().addShutdownHook(deploymentFailureHook);
   LOG.info("Submitting application master "   appId);
   //提交任务  
   yarnClient.submitApplication(appContext);

   /**
    *  获取任务状态
    */
}

这部分的流程比较长,总结一下主要有以下几点:

  1. 将shipFiles、plugins、userJar、logFile、flink-conf.yaml、job.graph等文件上传到hdfs
  2. 构建启动需要的classpath、ha-zk配置、安全配置、jobMaster启动命令等
  3. 向yarn提交任务

在yarn上启动成功后,在JobMaster的工作目录可以看到launch_container.sh这样的一个文件,这个文件里面包含了在startAppMaster所做的所有环境变量参数设置、启动命令。

总结

本篇主要介绍了yarn-per-job的任务提交流程,结合前面两篇的分析,到现在应该掌握了如何通过API的方式去实现任务的提交,我认为重要有两点:一是做好参数的解析、配置,二是选择一个合适的PipelineExecutor提交任务。

0 人点赞