版本
flink 1.14.4
方法
代码语言:javascript复制Configuration configuration = new Configuration();
configuration.setString(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID, "xxx");
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration);
源码分析
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
代码语言:javascript复制 public JobExecutionResult execute(StreamGraph streamGraph) throws Exception {
JobClient jobClient = this.executeAsync(streamGraph);
...
}
@Internal
public JobClient executeAsync(StreamGraph streamGraph) throws Exception {
PipelineExecutorFactory executorFactory = this.executorServiceLoader.getExecutorFactory(this.configuration);
CompletableFuture jobClientFuture = executorFactory.getExecutor(this.configuration).execute(streamGraph, this.configuration, this.userClassloader);
try {
JobClient jobClient = (JobClient)jobClientFuture.get();
...
}
org.apache.flink.client.program.PerJobMiniClusterFactory
代码语言:javascript复制 public CompletableFuture<JobClient> submitJob(JobGraph jobGraph, ClassLoader userCodeClassloader) throws Exception {
MiniClusterConfiguration miniClusterConfig = this.getMiniClusterConfig(jobGraph.getMaximumParallelism());
MiniCluster miniCluster = (MiniCluster)this.miniClusterFactory.apply(miniClusterConfig);
miniCluster.start();
return miniCluster.submitJob(jobGraph).thenApplyAsync(FunctionUtils.uncheckedFunction((submissionResult) -> {
ClientUtils.waitUntilJobInitializationFinished(() -> {
return (JobStatus)miniCluster.getJobStatus(submissionResult.getJobID()).get();
}, () -> {
return (JobResult)miniCluster.requestJobResult(submissionResult.getJobID()).get();
}, userCodeClassloader);
return submissionResult;
})).thenApply((result) -> {
return new MiniClusterJobClient(result.getJobID(), miniCluster, userCodeClassloader, JobFinalizationBehavior.SHUTDOWN_CLUSTER);
}).whenComplete((ignored, throwable) -> {
if (throwable != null) {
shutDownCluster(miniCluster);
}
}).thenApply(Function.identity());
}
org.apache.flink.client.deployment.executors.LocalExecutor
代码语言:javascript复制 public CompletableFuture<JobClient> execute(Pipeline pipeline, Configuration configuration, ClassLoader userCodeClassloader) throws Exception {
Configuration effectiveConfig = new Configuration();
effectiveConfig.addAll(this.configuration);
effectiveConfig.addAll(configuration);
JobGraph jobGraph = this.getJobGraph(pipeline, effectiveConfig);
return PerJobMiniClusterFactory.createWithFactory(effectiveConfig, this.miniClusterFactory).submitJob(jobGraph, userCodeClassloader);
}
private JobGraph getJobGraph(Pipeline pipeline, Configuration configuration) throws MalformedURLException {
if (pipeline instanceof Plan) {
Plan plan = (Plan)pipeline;
int slotsPerTaskManager = configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS, plan.getMaximumParallelism());
int numTaskManagers = configuration.getInteger("local.number-taskmanager", 1);
plan.setDefaultParallelism(slotsPerTaskManager * numTaskManagers);
}
return PipelineExecutorUtils.getJobGraph(pipeline, configuration);
}
- 创建JobGraph,检查配置中是否存在配置项**$internal.pipeline.job-id**,如果存在则作为jobId设置 org.apache.flink.client.deployment.executors.PipelineExecutorUtils
public static JobGraph getJobGraph(@Nonnull Pipeline pipeline, @Nonnull Configuration configuration) throws MalformedURLException {
Preconditions.checkNotNull(pipeline);
Preconditions.checkNotNull(configuration);
ExecutionConfigAccessor executionConfigAccessor = ExecutionConfigAccessor.fromConfiguration(configuration);
JobGraph jobGraph = FlinkPipelineTranslationUtil.getJobGraph(pipeline, configuration, executionConfigAccessor.getParallelism());
configuration.getOptional(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID).ifPresent((strJobID) -> {
jobGraph.setJobID(JobID.fromHexString(strJobID));
});
jobGraph.addJars(executionConfigAccessor.getJars());
jobGraph.setClasspaths(executionConfigAccessor.getClasspaths());
jobGraph.setSavepointRestoreSettings(executionConfigAccessor.getSavepointRestoreSettings());
return jobGraph;
}