Flink1.10任务提交流程分析(一)

2022-04-18 13:25:22 浏览数 (1)

Flink任务常见的提交方式通过flink run命令方式提交,如果我们想自己通过API方式实现任务提交,那么就需要了解flink run执行过程,本篇主要透过源码分析其提交流程。(注:基于1.10.1分析)

提交入口

查看bin/flink脚本可以看到提交入口类为:org.apache.flink.client.cli.CliFrontend,传入的参数就是flink 命令后面的参数,查看main方法:

代码语言:javascript复制
public static void main(final String[] args) {
   EnvironmentInformation.logEnvironmentInfo(LOG, "Command Line Client", args);
   // 1. $FLINK_HOME/conf
   final String configurationDirectory = getConfigurationDirectoryFromEnv();
   // 2. 加载flink-conf.yaml
   final Configuration configuration = GlobalConfiguration.loadConfiguration(configurationDirectory);
   // 3. 初始化所有的提交模式的参数解析器
   final List<CustomCommandLine> customCommandLines = loadCustomCommandLines(
      configuration,
      configurationDirectory);
   try {
     //初始化执行入口
      final CliFrontend cli = new CliFrontend(
         configuration,
         customCommandLines);
      SecurityUtils.install(new SecurityConfiguration(cli.configuration));
      int retCode = SecurityUtils.getInstalledContext()

        //parseParameters 会根据不同的类型:run、info、list、modify等执行不同的流程
            .runSecured(() -> cli.parseParameters(args));
      System.exit(retCode);
   }
   catch (Throwable t) {
      final Throwable strippedThrowable = ExceptionUtils.stripException(t, UndeclaredThrowableException.class);
      LOG.error("Fatal error while running command line interface.", strippedThrowable);
      strippedThrowable.printStackTrace();
      System.exit(31);
   }
}

CustomCommandLine 表示的是一个命令行的参数解析的接口,其实现有FlinkYarnSessionCli、DefaultCLI,FlinkYarnSessionCli解析per-job或者session模式参数,DefaultCLI解析standalone模式参数。程序会根据传入的参数选项选择合适的参数解析器,通过其isActive方法其匹配,然后调用applyCommandLineOptionsToConfiguration解析参数。

RUN流程

代码语言:javascript复制
protected void run(String[] args) throws Exception {
   LOG.info("Running 'run' command.");
   //savepoint恢复参数
   final Options commandOptions = CliFrontendParser.getRunCommandOptions();
   //将参数封装在CommandLine中
   final CommandLine commandLine = getCommandLine(commandOptions, args, true);
   //实例一个ProgramOptions对象,包含了jar路径、用户程序入口类、用户程序参数、classpath等
   final ProgramOptions programOptions = new ProgramOptions(commandLine);
   // 帮助命令
   if (commandLine.hasOption(HELP_OPTION.getOpt())) {
      CliFrontendParser.printHelpForRun(customCommandLines);
      return;
   }

   if (!programOptions.isPython()) {
      // Java program should be specified a JAR file
      if (programOptions.getJarFilePath() == null) {
         throw new CliArgsException("Java program should be specified a JAR file.");
      }
   }
  //代表程序,包含jar、参数等信息
   final PackagedProgram program;
   try {
      LOG.info("Building program from JAR file");
      program = buildProgram(programOptions);
   }
   catch (FileNotFoundException e) {
      throw new CliArgsException("Could not build the program from JAR file.", e);
   }
  //程序所需要jar信息,主要是用户jar包
   final List<URL> jobJars = program.getJobJarAndDependencies();
  //获取有效的配置信息,在这里会根据不同的参数解析器获取有效的配置信息
   final Configuration effectiveConfiguration =
         getEffectiveConfiguration(commandLine, programOptions, jobJars);

   LOG.debug("Effective executor configuration: {}", effectiveConfiguration);

   try {
      executeProgram(effectiveConfiguration, program);
   } finally {
      program.deleteExtractedLibraries();
   }
}

在getEffectiveConfiguration方法中,会根据参数选择不同的参数解析器,例如在per-job模式会使用 -m yarn-cluster,那么就会选择FlinkYarnSessionCli参数解析器,在这个过程中有一个重要的参数配置:execution.target,目标执行器,决定后面什么类型的执行器提交任务:yarn-session、yarn-per-job、remote,这个参数的配置也是通过不同的提交模式来配置的。

执行Program流程

executeProgram 方法直接调用ClientUtils.executeProgram方法:

代码语言:javascript复制
public static void executeProgram(
      PipelineExecutorServiceLoader executorServiceLoader,
      Configuration configuration,
      PackagedProgram program) throws ProgramInvocationException {
   checkNotNull(executorServiceLoader);
   final ClassLoader userCodeClassLoader = program.getUserCodeClassLoader();
   final ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
   try {
      Thread.currentThread().setContextClassLoader(userCodeClassLoader);

      LOG.info("Starting program (detached: {})", !configuration.getBoolean(DeploymentOptions.ATTACHED));
     //用户创建程序执行的上下文
      ContextEnvironmentFactory factory = new ContextEnvironmentFactory(
            executorServiceLoader,
            configuration,
            userCodeClassLoader);
     //会将factory赋给ExecutionEnvironment中变量
      ContextEnvironment.setAsContext(factory);

      try {
        //调用程序main方法
         program.invokeInteractiveModeForExecution();
      } finally {
         ContextEnvironment.unsetContext();
      }
   } finally {
      Thread.currentThread().setContextClassLoader(contextClassLoader);
   }
}

PipelineExecutorServiceLoader 用户Executor执行器的选择,参考Flink1.10基于工厂模式的任务提交与SPI机制;

ContextEnvironmentFactory用于创建程序执行的上下文ExecutionEnvironment,可以理解为其封装了程序与外界之间的交互方式,例如per-job模式还是standalone模式、需要的资源大小等等,同时也会根据其类型创建不同StreamExecutionEnvironment(看下文详解)。对于客户端提交方式创建的是ContextEnvironment类型的ExecutionEnvironment。

Main提交流程

program.invokeInteractiveModeForExecution方法用户调用用户程序的main方法,在main方法中会调用StreamExecutionEnvironment.getExecutionEnvironment 获取合适的StreamExecutionEnvironment:

代码语言:javascript复制
//StreamExecutionEnvironment.java
public static StreamExecutionEnvironment getExecutionEnvironment() {

  //threadLocalContextEnvironmentFactory、contextEnvironmentFactory默认都为空,所以会调用createStreamExecutionEnvironment方法
   return Utils.resolveFactory(threadLocalContextEnvironmentFactory, contextEnvironmentFactory)
      .map(StreamExecutionEnvironmentFactory::createExecutionEnvironment)
      .orElseGet(StreamExecutionEnvironment::createStreamExecutionEnvironment);
}

private static StreamExecutionEnvironment createStreamExecutionEnvironment() {

   ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
   if (env instanceof ContextEnvironment) {
      return new StreamContextEnvironment((ContextEnvironment) env);
   } else if (env instanceof OptimizerPlanEnvironment) {
      return new StreamPlanEnvironment(env);
   } else {
      return createLocalEnvironment();
   }
}
代码语言:javascript复制
//ExecutionEnvironment.java
public static ExecutionEnvironment getExecutionEnvironment() {
   return Utils.resolveFactory(threadLocalContextEnvironmentFactory, contextEnvironmentFactory)
      .map(ExecutionEnvironmentFactory::createExecutionEnvironment)
       //在本地local的模式,创建LocalEnvironment
      .orElseGet(ExecutionEnvironment::createLocalEnvironment);
}

在ClientUtils.executeProgram 中分析到,会通过ContextEnvironment. setAsContext( factory)给threadLocalContextEnvironment Factory与contextEnvironmentFactory赋值,那么调用ContextEnvironmentFactory. createExecutionEnvironment 得到一个ContextEnvironment。

最终StreamExecutionEnvironment. getExecutionEnvironment 得到一个内部封装了ContextEnvironment 对象的StreamExecutionEnvironment对象。

Execute流程

待main方法执行用户代码流程之后会调用StreamExecutionEnvironment.execute方法,接着会调用executeAsync(StreamGraph)方法:

代码语言:javascript复制
public JobClient executeAsync(StreamGraph streamGraph) throws Exception {
   checkNotNull(streamGraph, "StreamGraph cannot be null.");
   checkNotNull(configuration.get(DeploymentOptions.TARGET), "No execution.target specified in your configuration file.");
   //根绝提交模式选择匹配的factory
   final PipelineExecutorFactory executorFactory =
      executorServiceLoader.getExecutorFactory(configuration);

   checkNotNull(
      executorFactory,
      "Cannot find compatible factory for specified execution.target (=%s)",
      configuration.get(DeploymentOptions.TARGET));
  //选择合适的executor提交任务
   CompletableFuture<? extends JobClient> jobClientFuture = executorFactory
      .getExecutor(configuration)
      .execute(streamGraph, configuration);

   try {
      JobClient jobClient = jobClientFuture.get();
      jobListeners.forEach(jobListener -> jobListener.onJobSubmitted(jobClient, null));
      return jobClient;
   } catch (Throwable t) {
      jobListeners.forEach(jobListener -> jobListener.onJobSubmitted(null, t));
      ExceptionUtils.rethrow(t);

      // make javac happy, this code path will not be reached
      return null;
   }
}

这里就是上一篇讲到的根据SPI机制加载出所有PipelineExecutorFactory,然后选择匹配的factory,匹配的条件就是符合上文提到的execution.target参数的factory,对于yarn-per-job就是YarnJobClusterExecutorFactory,最终会获取到YarnJobClusterExecutor类型的Executor去向yarn提交作业。

总结

本文主要分析了flink run的开始到提交到集群前的流程,我认为可以简化为三步:

  • 选择合适的参数解析器解析命令参数(CustomCommandLine);
  • 选择合适的执行上下文环境(StreamExecutionEnvironment)
  • 选择合适的任务提交器(PipelineExecutor)

下一篇将会以yarn-per-job提交模式为例分析其具体提交过程。

0 人点赞