在Flink1.10任务提交流程分析(一)中分析了从flink run开始到任务提交到集群前的流程分析,对于不同的提交模式Flink中使用不同的PipelineExecutor,本篇基于yarn-per-job模式分析向yarn-cluster提交任务的流程。(注:基于1.10.1分析)
YarnJobClusterExecutor
接着上篇的分析,任务最终提交是交给PipelineExecutor来execute,PipelineExecutor的选择是根据不同的提交模式来决定即execution.target参数来决定,对于yarn-per-job会选择YarnJobClusterExecutor类型的executor。
代码语言:javascript复制public class YarnJobClusterExecutor extends AbstractJobClusterExecutor<ApplicationId, YarnClusterClientFactory> {
public static final String NAME = "yarn-per-job";
public YarnJobClusterExecutor() {
super(new YarnClusterClientFactory());
}
}
其实现比较简单,比较重要其构造器中YarnClusterClientFactory,用于创建YarnClusterDescriptor,包含了yarn客户端YarnClient、yarn配置、提交yarn的队列等一些提交yarn的信息。它继承了AbstractJobClusterExecutor 抽象任务提交executor,execute也是由AbstractJobClusterExecutor来执行:
代码语言:javascript复制public class AbstractJobClusterExecutor<ClusterID, ClientFactory extends ClusterClientFactory<ClusterID>> implements PipelineExecutor {
private static final Logger LOG = LoggerFactory.getLogger(AbstractJobClusterExecutor.class);
//代表的就是YarnClusterClientFactory
private final ClientFactory clusterClientFactory;
public AbstractJobClusterExecutor(@Nonnull final ClientFactory clusterClientFactory) {
this.clusterClientFactory = checkNotNull(clusterClientFactory);
}
//执行任务提交
//pipeline 代表StreamGraph
public CompletableFuture<JobClient> execute(@Nonnull final Pipeline pipeline, @Nonnull final Configuration configuration) throws Exception {
//将StreamGraph转换为JobGraph
final JobGraph jobGraph = ExecutorUtils.getJobGraph(pipeline, configuration);
//创建提交任务的一些信息:YarnClusterDescriptor
try (final ClusterDescriptor<ClusterID> clusterDescriptor = clusterClientFactory.createClusterDescriptor(configuration)) {
//将配置信息封装在ExecutionConfigAccessor中
final ExecutionConfigAccessor configAccessor = ExecutionConfigAccessor.fromConfiguration(configuration);
//包含了提交任务所需资源描述:内存大小、并行度
final ClusterSpecification clusterSpecification = clusterClientFactory.getClusterSpecification(configuration);
//提交任务
final ClusterClientProvider<ClusterID> clusterClientProvider = clusterDescriptor
.deployJobCluster(clusterSpecification, jobGraph,
//是否采用分离模式
configAccessor.getDetachedMode());
LOG.info("Job has been submitted with JobID " jobGraph.getJobID());
return CompletableFuture.completedFuture(
new ClusterClientJobClientAdapter<>(clusterClientProvider, jobGraph.getJobID()));
}
}
}
关于ClusterSpecification中描述了任务提交到集群所需的资源大小,对于分配模式建议详读一下官网Flink1.10的内存管理机制便于更好的理解。任务最终交给YarnClusterDescriptor deploy。
Deploy过程
deploy过程代表了与yarn交互的过程,clusterDescriptor.deployJobCluster会调用内部deployInternal方法:
代码语言:javascript复制private ClusterClientProvider<ApplicationId> deployInternal(
ClusterSpecification clusterSpecification,
String applicationName,
String yarnClusterEntrypoint,
@Nullable JobGraph jobGraph,
boolean detached) throws Exception {
//..... 会做一些检查工作: yarn队列是否存在、配置检查
//校验资源大小等等
ApplicationReport report = startAppMaster(
flinkConfiguration,
applicationName,
yarnClusterEntrypoint,
jobGraph,
yarnClient,
yarnApplication,
validClusterSpecification);
//....
}
最重的就是startAppMaster,在yarn上启动一个AppMaster进程,其中yarnClusterEntrypoint表示该进程的入口类,也就是JobMaster的启动入口类:org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint, 在集群的机器进程上也能看到该类,如果看到这个进程我们就可知道表示的是JobMaster的进程。
startAppMaster的过程比较长,这里也会逐一分解:
代码语言:javascript复制private ApplicationReport startAppMaster(
Configuration configuration,
String applicationName,
String yarnClusterEntrypoint,
JobGraph jobGraph,
YarnClient yarnClient,
YarnClientApplication yarnApplication,
ClusterSpecification clusterSpecification) throws Exception {
// ------------------ Initialize the file systems -------------------------
org.apache.flink.core.fs.FileSystem.initialize(
configuration,
PluginUtils.createPluginManagerFromRootFolder(configuration));
//获取homeDir, 表示jar、log配置上传的路径, 一般表示在hdfs上
//其路径为/user/hadoop, (hadoop表示的当前的用户)
final FileSystem fs = FileSystem.get(yarnConfiguration);
final Path homeDir = fs.getHomeDirectory();
//提交到yarn的描述信息
ApplicationSubmissionContext appContext = yarnApplication.getApplicationSubmissionContext();
// 会被上传到hdfs的文件 并且被添加到classpath中
Set<File> systemShipFiles = new HashSet<>(shipFiles.size());
// 仅仅是会被上传到hdfs , 但是不会被添加到classpath
Set<File> shipOnlyFiles = new HashSet<>();
for (File file : shipFiles) {
systemShipFiles.add(file.getAbsoluteFile());
}
final String logConfigFilePath = configuration.getString(YarnConfigOptionsInternal.APPLICATION_LOG_CONFIG_FILE);
if (logConfigFilePath != null) {
systemShipFiles.add(new File(logConfigFilePath));
}
//将flink_home/lib 下的文件添加到systemShipFiles、通过-yt指定的文件也在里面
addLibFoldersToShipFiles(systemShipFiles);
//将flink_home/plugins 下的文件添加到shipOnlyFiles
addPluginsFoldersToShipFiles(shipOnlyFiles);
final ApplicationId appId = appContext.getApplicationId();
// zk-ha相关的配置
String zkNamespace = getZookeeperNamespace();
// no user specified cli argument for namespace?
if (zkNamespace == null || zkNamespace.isEmpty()) {
// namespace defined in config? else use applicationId as default.
zkNamespace = configuration.getString(HighAvailabilityOptions.HA_CLUSTER_ID, String.valueOf(appId));
setZookeeperNamespace(zkNamespace);
}
configuration.setString(HighAvailabilityOptions.HA_CLUSTER_ID, zkNamespace);
if (HighAvailabilityMode.isHighAvailabilityModeActivated(configuration)) {
// activate re-execution of failed applications
appContext.setMaxAppAttempts(
configuration.getInteger(
YarnConfigOptions.APPLICATION_ATTEMPTS.key(),
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS));
activateHighAvailabilitySupport(appContext);
} else {
// set number of application retries to 1 in the default case
appContext.setMaxAppAttempts(
configuration.getInteger(
YarnConfigOptions.APPLICATION_ATTEMPTS.key(),
1));
}
//userJarFiles 表示用户jar
final Set<File> userJarFiles = (jobGraph == null)
// not per-job submission
? Collections.emptySet()
// add user code jars from the provided JobGraph
: jobGraph.getUserJars().stream().map(f -> f.toUri()).map(File::new).collect(Collectors.toSet());
//需要cache文件上传到hdfs,一般使用在文件共享中
if (jobGraph != null) {
for (Map.Entry<String, DistributedCache.DistributedCacheEntry> entry : jobGraph.getUserArtifacts().entrySet()) {
org.apache.flink.core.fs.Path path = new org.apache.flink.core.fs.Path(entry.getValue().filePath);
// only upload local files
if (!path.getFileSystem().isDistributedFS()) {
Path localPath = new Path(path.getPath());
Tuple2<Path, Long> remoteFileInfo =
Utils.uploadLocalFileToRemote(fs, appId.toString(), localPath, homeDir, entry.getKey());
jobGraph.setUserArtifactRemotePath(entry.getKey(), remoteFileInfo.f0.toString());
}
}
jobGraph.writeUserArtifactEntriesToConfiguration();
}
//表示启动appMaster需要的资源文件,会从hdfs上下载
final Map<String, LocalResource> localResources = new HashMap<>(2 systemShipFiles.size() userJarFiles.size());
// 访问hdfs的安全设置
final List<Path> paths = new ArrayList<>(2 systemShipFiles.size() userJarFiles.size());
// 启动taskExecutor需要的资源文件
StringBuilder envShipFileList = new StringBuilder();
//几个uploadAndRegisterFiles 方法,将systemShipFiles、shipOnlyFiles、用户jar上传到hdfs
if (userJarInclusion == YarnConfigOptions.UserJarInclusion.ORDER) {
systemClassPaths.addAll(userClassPaths);
}
// normalize classpath by sorting
Collections.sort(systemClassPaths); //系统的一些classpath 排序
Collections.sort(userClassPaths); //用户classpath 排序
// classPathBuilder: 存放classpath的信息
StringBuilder classPathBuilder = new StringBuilder();
/*
* 构建classpath: shipFile-jar、user-jar、log4j、yaml配置文件
*/
final Path yarnFilesDir = getYarnFilesDir(appId);
FsPermission permission = new FsPermission(FsAction.ALL, FsAction.NONE, FsAction.NONE);
fs.setPermission(yarnFilesDir, permission); // set permission for path.
/*
*中间一堆与安全相关的配置
*/
//执行的java命令信息,启动YarnJobClusterEntrypoint
final ContainerLaunchContext amContainer = setupApplicationMasterContainer(
yarnClusterEntrypoint,
hasLogback,
hasLog4j,
hasKrb5,
clusterSpecification.getMasterMemoryMB());
if (UserGroupInformation.isSecurityEnabled()) {
// set HDFS delegation tokens when security is enabled
LOG.info("Adding delegation token to the AM container.");
Utils.setTokensFor(amContainer, paths, yarnConfiguration);
}
amContainer.setLocalResources(localResources);
fs.close();
// Setup CLASSPATH and environment variables for ApplicationMaster
final Map<String, String> appMasterEnv = new HashMap<>();
/**
* 配置环境变量参数 到 appMasterEnv中,在启动启动YarnJobClusterEntrypoint时用到,
* 例如: classpath、hadoopUser、appId等
*/
amContainer.setEnvironment(appMasterEnv);
// 还有一堆设置提交任务队列、yarn任务名称的配置信息
// add a hook to clean up in case deployment fails
Thread deploymentFailureHook = new DeploymentFailureHook(yarnApplication, yarnFilesDir);
Runtime.getRuntime().addShutdownHook(deploymentFailureHook);
LOG.info("Submitting application master " appId);
//提交任务
yarnClient.submitApplication(appContext);
/**
* 获取任务状态
*/
}
这部分的流程比较长,总结一下主要有以下几点:
- 将shipFiles、plugins、userJar、logFile、flink-conf.yaml、job.graph等文件上传到hdfs
- 构建启动需要的classpath、ha-zk配置、安全配置、jobMaster启动命令等
- 向yarn提交任务
在yarn上启动成功后,在JobMaster的工作目录可以看到launch_container.sh这样的一个文件,这个文件里面包含了在startAppMaster所做的所有环境变量参数设置、启动命令。
总结
本篇主要介绍了yarn-per-job的任务提交流程,结合前面两篇的分析,到现在应该掌握了如何通过API的方式去实现任务的提交,我认为重要有两点:一是做好参数的解析、配置,二是选择一个合适的PipelineExecutor提交任务。