前言
前面,我们已经分析了 一文搞定 Flink 消费消息的全流程 、写给大忙人看的 Flink Window原理 还有 一文搞定 Flink Checkpoint Barrier 全流程 等等,接下来也该回归到最初始的时候,Flink Job 是如何提交的。
正文
我们知道 Flink 总共有两种提交模式:本地模式和远程模式( 当然也对应着不同的 environment,具体可以参考 Flink Context到底是什么?),我们以本地模式为主,两种模式基本上相似。
当我们执行 env.execute ,实际上调用了 LocalStreamEnvironment.execute 方法
代码语言:javascript复制/**
* Executes the JobGraph of the on a mini cluster of CLusterUtil with a user
* specified name.
*
* @param jobName
* name of the job
* @return The result of the job execution, containing elapsed time and accumulators.
*/
@Override
// 本地模式执行方法 env.execute
public JobExecutionResult execute(String jobName) throws Exception {
// transform the streaming program into a JobGraph
//TODO 111
//获取 streamGraph
StreamGraph streamGraph = getStreamGraph();
streamGraph.setJobName(jobName);
//获取 jobGraph
JobGraph jobGraph = streamGraph.getJobGraph();
jobGraph.setAllowQueuedScheduling(true);
Configuration configuration = new Configuration();
configuration.addAll(jobGraph.getJobConfiguration());
configuration.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "0");
// add (and override) the settings with what the user defined
configuration.addAll(this.configuration);
if (!configuration.contains(RestOptions.BIND_PORT)) {
configuration.setString(RestOptions.BIND_PORT, "0");
}
int numSlotsPerTaskManager = configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS, jobGraph.getMaximumParallelism());
MiniClusterConfiguration cfg = new MiniClusterConfiguration.Builder()
.setConfiguration(configuration)
.setNumSlotsPerTaskManager(numSlotsPerTaskManager)
.build();
if (LOG.isInfoEnabled()) {
LOG.info("Running job on local embedded Flink mini cluster");
}
MiniCluster miniCluster = new MiniCluster(cfg);
try {
//启动集群,包括启动JobMaster,进行leader选举等等
miniCluster.start();
configuration.setInteger(RestOptions.PORT, miniCluster.getRestAddress().get().getPort());
// 提交任务到JobMaster
return miniCluster.executeJobBlocking(jobGraph);
}
finally {
transformations.clear();
miniCluster.close();
}
}
这里构建了 StreamGraph、JobGraph,到后面还会有 ExecutionGraph,关于这些图的一些东西,一张图就差不多了
当 miniCluster.start() 时
代码语言:javascript复制// start cluster
public void start() throws Exception {
synchronized (lock) {
checkState(!running, "MiniCluster is already running");
......
ioExecutor = Executors.newFixedThreadPool(
Hardware.getNumberCPUCores(),
new ExecutorThreadFactory("mini-cluster-io"));
//创建 HA service
haServices = createHighAvailabilityServices(configuration, ioExecutor);
//启动 blobServer
blobServer = new BlobServer(configuration, haServices.createBlobStore());
blobServer.start();
heartbeatServices = HeartbeatServices.fromConfiguration(configuration);
blobCacheService = new BlobCacheService(
configuration, haServices.createBlobStore(), new InetSocketAddress(InetAddress.getLocalHost(), blobServer.getPort())
);
// task executor
startTaskManagers();
......
resourceManagerLeaderRetriever.start(resourceManagerGatewayRetriever);
dispatcherLeaderRetriever.start(dispatcherGatewayRetriever);
webMonitorLeaderRetrievalService.start(webMonitorLeaderRetriever);
......
}
创建了 HaService,启动了 blobCacheService、resourceManagerLeaderRetriever、dispatcherLeaderRetriever、webMonitorLeaderRetrievalService,我们重点看一下 startTaskManagers
代码语言:javascript复制@VisibleForTesting
void startTaskExecutor() throws Exception {
synchronized (lock) {
final Configuration configuration = miniClusterConfiguration.getConfiguration();
final TaskExecutor taskExecutor = TaskManagerRunner.startTaskManager(
configuration,
new ResourceID(UUID.randomUUID().toString()),
taskManagerRpcServiceFactory.createRpcService(),
haServices,
heartbeatServices,
metricRegistry,
blobCacheService,
useLocalCommunication(),
taskManagerTerminatingFatalErrorHandlerFactory.create(taskManagers.size()));
taskExecutor.start();
taskManagers.add(taskExecutor);
}
}
TaskExecutor 创建了 TaskExecutor 并启动了。最终的用来submitTask、cancalTask、stopTask 、执行 task 、confirmCheckpoint、requestSlot、freeSlot 等等。
一些必要的组件已经启动成功,接下来该提交 jobGraph 了 miniCluster.executeJobBlocking(jobGraph); 跟踪代码
代码语言:javascript复制public CompletableFuture<JobSubmissionResult> submitJob(JobGraph jobGraph) {
final CompletableFuture<DispatcherGateway> dispatcherGatewayFuture = getDispatcherGatewayFuture();
// we have to allow queued scheduling in Flip-6 mode because we need to request slots
// from the ResourceManager
jobGraph.setAllowQueuedScheduling(true);
final CompletableFuture<InetSocketAddress> blobServerAddressFuture = createBlobServerAddress(dispatcherGatewayFuture);
// cache jars and files
final CompletableFuture<Void> jarUploadFuture = uploadAndSetJobFiles(blobServerAddressFuture, jobGraph);
final CompletableFuture<Acknowledge> acknowledgeCompletableFuture = jarUploadFuture
.thenCombine(
dispatcherGatewayFuture,
// 这里真正 submit 操作,交给了 dispatcher 去执行
(Void ack, DispatcherGateway dispatcherGateway) -> dispatcherGateway.submitJob(jobGraph, rpcTimeout))
.thenCompose(Function.identity());
return acknowledgeCompletableFuture.thenApply(
(Acknowledge ignored) -> new JobSubmissionResult(jobGraph.getJobID()));
}
最终交给了 dispatcher 来进行 jobGraph 的提交,最终到这里
代码语言:javascript复制private CompletableFuture<Void> runJob(JobGraph jobGraph) {
Preconditions.checkState(!jobManagerRunnerFutures.containsKey(jobGraph.getJobID()));
//创建 job Manager runner
final CompletableFuture<JobManagerRunner> jobManagerRunnerFuture = createJobManagerRunner(jobGraph);
jobManagerRunnerFutures.put(jobGraph.getJobID(), jobManagerRunnerFuture);
// start job manager
return jobManagerRunnerFuture
.thenApply(FunctionUtils.uncheckedFunction(this::startJobManagerRunner))
.thenApply(FunctionUtils.nullFn())
.whenCompleteAsync(
(ignored, throwable) -> {
if (throwable != null) {
jobManagerRunnerFutures.remove(jobGraph.getJobID());
}
},
getMainThreadExecutor());
}
这个时候开始创建 jobManagerRunner,在创建 jobManagerRunner 的同时也会创建 jobMaster
代码语言:javascript复制public JobMaster(
RpcService rpcService,
JobMasterConfiguration jobMasterConfiguration,
ResourceID resourceId,
JobGraph jobGraph,
HighAvailabilityServices highAvailabilityService,
SlotPoolFactory slotPoolFactory,
SchedulerFactory schedulerFactory,
JobManagerSharedServices jobManagerSharedServices,
HeartbeatServices heartbeatServices,
JobManagerJobMetricGroupFactory jobMetricGroupFactory,
OnCompletionActions jobCompletionActions,
FatalErrorHandler fatalErrorHandler,
ClassLoader userCodeLoader) throws Exception {
super(rpcService, AkkaRpcServiceUtils.createRandomName(JOB_MANAGER_NAME));
this.jobMasterConfiguration = checkNotNull(jobMasterConfiguration);
this.resourceId = checkNotNull(resourceId);
this.jobGraph = checkNotNull(jobGraph);
this.rpcTimeout = jobMasterConfiguration.getRpcTimeout();
this.highAvailabilityServices = checkNotNull(highAvailabilityService);
this.blobWriter = jobManagerSharedServices.getBlobWriter();
this.scheduledExecutorService = jobManagerSharedServices.getScheduledExecutorService();
this.jobCompletionActions = checkNotNull(jobCompletionActions);
this.fatalErrorHandler = checkNotNull(fatalErrorHandler);
this.userCodeLoader = checkNotNull(userCodeLoader);
this.heartbeatServices = checkNotNull(heartbeatServices);
this.jobMetricGroupFactory = checkNotNull(jobMetricGroupFactory);
final String jobName = jobGraph.getName();
final JobID jid = jobGraph.getJobID();
log.info("Initializing job {} ({}).", jobName, jid);
final RestartStrategies.RestartStrategyConfiguration restartStrategyConfiguration =
jobGraph.getSerializedExecutionConfig()
.deserializeValue(userCodeLoader)
.getRestartStrategy();
// 设置重启策略
this.restartStrategy = RestartStrategyResolving.resolve(restartStrategyConfiguration,
jobManagerSharedServices.getRestartStrategyFactory(),
jobGraph.isCheckpointingEnabled());
.....
//TODO 111
//createExecutionGraph 可能会 restore from checkpoint(savepoint)
this.executionGraph = createAndRestoreExecutionGraph(jobManagerJobMetricGroup);
......
}
最关键的时在创建 jobMaster 的同时还 create executionGraph。然后开始启动 jobManagerRunner,最终会启动 jobMaster
代码语言:javascript复制private CompletionStage<Void> startJobMaster(UUID leaderSessionId) {
log.info("JobManager runner for job {} ({}) was granted leadership with session id {} at {}.",
jobGraph.getName(), jobGraph.getJobID(), leaderSessionId, getAddress());
try {
runningJobsRegistry.setJobRunning(jobGraph.getJobID());
} catch (IOException e) {
return FutureUtils.completedExceptionally(
new FlinkException(
String.format("Failed to set the job %s to running in the running jobs registry.", jobGraph.getJobID()),
e));
}
final CompletableFuture<Acknowledge> startFuture;
try {
// 通过给定的 jobId start job master
startFuture = jobMasterService.start(new JobMasterId(leaderSessionId));
} catch (Exception e) {
return FutureUtils.completedExceptionally(new FlinkException("Failed to start the JobMaster.", e));
}
final CompletableFuture<JobMasterGateway> currentLeaderGatewayFuture = leaderGatewayFuture;
return startFuture.thenAcceptAsync(
(Acknowledge ack) -> confirmLeaderSessionIdIfStillLeader(leaderSessionId, currentLeaderGatewayFuture),
executor);
}
jobMaster 启动完,就会正式开始执行 job 了
代码语言:javascript复制public CompletableFuture<Acknowledge> start(final JobMasterId newJobMasterId) throws Exception {
// make sure we receive RPC and async calls
start();
// 正式 开始执行 Job
return callAsyncWithoutFencing(() -> startJobExecution(newJobMasterId), RpcUtils.INF_TIMEOUT);
}
开始正式执行 job
代码语言:javascript复制// start job execution
private Acknowledge startJobExecution(JobMasterId newJobMasterId) throws Exception {
validateRunsInMainThread();
checkNotNull(newJobMasterId, "The new JobMasterId must not be null.");
if (Objects.equals(getFencingToken(), newJobMasterId)) {
log.info("Already started the job execution with JobMasterId {}.", newJobMasterId);
return Acknowledge.get();
}
setNewFencingToken(newJobMasterId);
startJobMasterServices();
log.info("Starting execution of job {} ({}) under job master id {}.", jobGraph.getName(), jobGraph.getJobID(), newJobMasterId);
// 重新设置或者调度 executionGraph
resetAndScheduleExecutionGraph();
return Acknowledge.get();
}
然后就开始调度 executionGraph 了
代码语言:javascript复制// 调度 execution
public void scheduleForExecution() throws JobException {
assertRunningInJobMasterMainThread();
final long currentGlobalModVersion = globalModVersion;
//改变 job 的状态,由 CREATED 变为 RUNNING
if (transitionState(JobStatus.CREATED, JobStatus.RUNNING)) {
final CompletableFuture<Void> newSchedulingFuture;
switch (scheduleMode) {
case LAZY_FROM_SOURCES:
newSchedulingFuture = scheduleLazy(slotProvider);
break;
case EAGER:
// 300000 ms default
//开始调度
newSchedulingFuture = scheduleEager(slotProvider, allocationTimeout);
break;
default:
throw new JobException("Schedule mode is invalid.");
}
if (state == JobStatus.RUNNING && currentGlobalModVersion == globalModVersion) {
schedulingFuture = newSchedulingFuture;
newSchedulingFuture.whenComplete(
(Void ignored, Throwable throwable) -> {
if (throwable != null && !(throwable instanceof CancellationException)) {
// only fail if the scheduling future was not canceled
failGlobal(ExceptionUtils.stripCompletionException(throwable));
}
});
} else {
newSchedulingFuture.cancel(false);
}
}
else {
throw new IllegalStateException("Job may only be scheduled from state " JobStatus.CREATED);
}
}
调度了之后就开始 deploy 了
代码语言:javascript复制/**
* Deploys the execution to the previously assigned resource.
*
* @throws JobException if the execution cannot be deployed to the assigned resource
*/
// 从 source 到 sink 循环部署
public void deploy() throws JobException {
assertRunningInJobMasterMainThread();
final LogicalSlot slot = assignedResource;
.....
// TaskDeploymentDescriptor 这个类保存了 task 执行所必须的所有内容,
// 例如序列化的算子,输入的 InputGate 和输出的 ResultPartition 的定义,该 task 要作为几个 subtask 执行等等。
final TaskDeploymentDescriptor deployment = vertex.createDeploymentDescriptor(
attemptId,
slot,
taskRestore,
attemptNumber);
// null taskRestore to let it be GC'ed
taskRestore = null;
final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
final ComponentMainThreadExecutor jobMasterMainThreadExecutor =
vertex.getExecutionGraph().getJobMasterMainThreadExecutor();
// We run the submission in the future executor so that the serialization of large TDDs does not block
// the main thread and sync back to the main thread once submission is completed.
// 提交 task 先 source
// 对于 TM 来说,执行 task 就是把收到的 TaskDeploymentDescriptor 对象转换成一个 task 并执行的过程。
CompletableFuture.supplyAsync(() -> taskManagerGateway.submitTask(deployment, rpcTimeout), executor)
.thenCompose(Function.identity())
.whenCompleteAsync(
(ack, failure) -> {
// only respond to the failure case
if (failure != null) {
if (failure instanceof TimeoutException) {
String taskname = vertex.getTaskNameWithSubtaskIndex() " (" attemptId ')';
markFailed(new Exception(
"Cannot deploy task " taskname " - TaskManager (" getAssignedResourceLocation()
") not responding after a rpcTimeout of " rpcTimeout, failure));
} else {
markFailed(failure);
}
}
},
jobMasterMainThreadExecutor);
}
catch (Throwable t) {
markFailed(t);
ExceptionUtils.rethrow(t);
}
}
部署的过程当中可能会申请资源,然后就开始提交 task 了,再往下就开始执行 task 了。
总结
remote 模式:yarn