一文搞定 Flink Task 提交执行全流程

2020-07-14 11:20:01 浏览数 (1)

前言

我们知道每一个 operator chain 作为一个整体,提交 task 。

正文

代码语言:javascript复制
@Override
	// Execution 将 task submit 至此
	public CompletableFuture<Acknowledge> submitTask(
			TaskDeploymentDescriptor tdd,
			JobMasterId jobMasterId,
			Time timeout) {

		try {
			......
			// Intermediate partition state checker to query the JobManager about the state
			// * of the producer of a result partition.
			PartitionProducerStateChecker partitionStateChecker = jobManagerConnection.getPartitionStateChecker();

			// local state restore
			final TaskLocalStateStore localStateStore = localStateStoresManager.localStateStoreForSubtask(
				jobId,
				tdd.getAllocationId(),
				taskInformation.getJobVertexId(),
				tdd.getSubtaskIndex());

			final JobManagerTaskRestore taskRestore = tdd.getTaskRestore();

			final TaskStateManager taskStateManager = new TaskStateManagerImpl(
				jobId,
				tdd.getExecutionAttemptId(),
				localStateStore,
				taskRestore,
				checkpointResponder);

			Task task = new Task(
				jobInformation,
				taskInformation,
				tdd.getExecutionAttemptId(),
				tdd.getAllocationId(),
				tdd.getSubtaskIndex(),
				tdd.getAttemptNumber(),
				tdd.getProducedPartitions(),
				tdd.getInputGates(),
				tdd.getTargetSlotNumber(),
				taskExecutorServices.getMemoryManager(),
				taskExecutorServices.getIOManager(),
				taskExecutorServices.getNetworkEnvironment(),
				taskExecutorServices.getBroadcastVariableManager(),
				taskStateManager,
				taskManagerActions,
				inputSplitProvider,
				checkpointResponder,
				aggregateManager,
				blobCacheService,
				libraryCache,
				fileCache,
				taskManagerConfiguration,
				taskMetricGroup,
				resultPartitionConsumableNotifier,
				partitionStateChecker,
				getRpcService().getExecutor());

			log.info("Received task {}.", task.getTaskInfo().getTaskNameWithSubtasks());

			boolean taskAdded;

			try {
				taskAdded = taskSlotTable.addTask(task);
			} catch (SlotNotFoundException | SlotNotActiveException e) {
				throw new TaskSubmissionException("Could not submit task.", e);
			}

			if (taskAdded) {
				//启动 task
				task.startTaskThread();

				return CompletableFuture.completedFuture(Acknowledge.get());
			} else {
				final String message = "TaskManager already contains a task for id "  
					task.getExecutionId()   '.';

				log.debug(message);
				throw new TaskSubmissionException(message);
			}
		} catch (TaskSubmissionException e) {
			return FutureUtils.completedExceptionally(e);
		}
	}

这里创建了一个 Task 对象并启动,我们来看一下 Task 启动的时候都做了什么

代码语言:javascript复制
/**
	 * The core work method that bootstraps the task and executes its code.
	 */
	// 对应的是 subtask (一个 vertex ( operator chain ) 一个 subtask ) 执行
	@Override
	public void run() {
		
		// ----------------------------
		//  Initial State transition
		// ----------------------------
		while (true) {
			ExecutionState current = this.executionState;
			if (current == ExecutionState.CREATED) {
				if (transitionState(ExecutionState.CREATED, ExecutionState.DEPLOYING)) {
					// success, we can start our work
					break;
				}
			} else if (current == ExecutionState.FAILED) {
				// we were immediately failed. tell the TaskManager that we reached our final state
				notifyFinalState();
				if (metrics != null) {
					metrics.close();
				}
				return;
			}
			else if (current == ExecutionState.CANCELING) {
				if (transitionState(ExecutionState.CANCELING, ExecutionState.CANCELED)) {
					// we were immediately canceled. tell the TaskManager that we reached our final state
					notifyFinalState();
					if (metrics != null) {
						metrics.close();
					}
					return;
				}
			} else {
				if (metrics != null) {
					metrics.close();
				}
				throw new IllegalStateException("Invalid state for beginning of operation of task "   this   '.');
			}
		}

		/*
		接下来,就是导入用户类加载器并加载用户代码。
然后,是向网络管理器注册当前任务(flink的各个算子在运行时进行数据交换需要依赖网络管理器),分配一些缓存以保存数据
然后,读入指定的缓存文件。
然后,再把task创建时传入的那一大堆变量用于创建一个执行环境Envrionment。
再然后,对于那些并不是第一次执行的task(比如失败后重启的)要恢复其状态。
		 */
		// all resource acquisitions and registrations from here on
		// need to be undone in the end
		Map<String, Future<Path>> distributedCacheEntries = new HashMap<>();
		AbstractInvokable invokable = null;
		
		try {
			// ----------------------------
			//  Task Bootstrap - We periodically
			//  check for canceling as a shortcut
			// ----------------------------
			
			// activate safety net for task thread
			LOG.info("Creating FileSystem stream leak safety net for task {}", this);
			FileSystemSafetyNet.initializeSafetyNetForThread();
			
			blobService.getPermanentBlobService().registerJob(jobId);
			
			// first of all, get a user-code classloader
			// this may involve downloading the job's JAR files and/or classes
			LOG.info("Loading JAR files for task {}.", this);
			
			userCodeClassLoader = createUserCodeClassloader();
			final ExecutionConfig executionConfig = serializedExecutionConfig.deserializeValue(userCodeClassLoader);
			
			if (executionConfig.getTaskCancellationInterval() >= 0) {
				// override task cancellation interval from Flink config if set in ExecutionConfig
				taskCancellationInterval = executionConfig.getTaskCancellationInterval();
			}
			
			if (executionConfig.getTaskCancellationTimeout() >= 0) {
				// override task cancellation timeout from Flink config if set in ExecutionConfig
				taskCancellationTimeout = executionConfig.getTaskCancellationTimeout();
			}
			
			if (isCanceledOrFailed()) {
				throw new CancelTaskException();
			}
			
			// ----------------------------------------------------------------
			// register the task with the network stack
			// this operation may fail if the system does not have enough
			// memory to run the necessary data exchanges
			// the registration must also strictly be undone
			// ----------------------------------------------------------------
			
			LOG.info("Registering task at network: {}.", this);
			
			// registerTask的时候会为每一个 Task 的每个 ResultPartition 申请一个 BufferPool
			// 为每一个 Task 的每个 InputGate 申请一个 BufferPool,
			network.registerTask(this);
			
			// add metrics for buffers
			this.metrics.getIOMetricGroup().initializeBufferMetrics(this);
			
			// register detailed network metrics, if configured
			if (taskManagerConfig.getConfiguration().getBoolean(TaskManagerOptions.NETWORK_DETAILED_METRICS)) {
				// similar to MetricUtils.instantiateNetworkMetrics() but inside this IOMetricGroup
				MetricGroup networkGroup = this.metrics.getIOMetricGroup().addGroup("Network");
				MetricGroup outputGroup = networkGroup.addGroup("Output");
				MetricGroup inputGroup = networkGroup.addGroup("Input");
				
				// output metrics
				for (int i = 0; i < producedPartitions.length; i  ) {
					ResultPartitionMetrics.registerQueueLengthMetrics(
						outputGroup.addGroup(i), producedPartitions[i]);
				}
				
				for (int i = 0; i < inputGates.length; i  ) {
					InputGateMetrics.registerQueueLengthMetrics(
						inputGroup.addGroup(i), inputGates[i]);
				}
			}
			
			// next, kick off the background copying of files for the distributed cache
			// 将配置文件 jar 等拷贝到分布式缓存
			try {
				for (Map.Entry<String, DistributedCache.DistributedCacheEntry> entry :
					DistributedCache.readFileInfoFromConfig(jobConfiguration)) {
					LOG.info("Obtaining local cache file for '{}'.", entry.getKey());
					Future<Path> cp = fileCache.createTmpFile(entry.getKey(), entry.getValue(), jobId, executionId);
					distributedCacheEntries.put(entry.getKey(), cp);
				}
			} catch (Exception e) {
				throw new Exception(
					String.format("Exception while adding files to distributed cache of task %s (%s).", taskNameWithSubtask, executionId), e);
			}
			
			if (isCanceledOrFailed()) {
				throw new CancelTaskException();
			}
			
			// ----------------------------------------------------------------
			//  call the user code initialization methods
			// ----------------------------------------------------------------
			
			TaskKvStateRegistry kvStateRegistry = network.createKvStateTaskRegistry(jobId, getJobVertexId());
			
			Environment env = new RuntimeEnvironment(
				jobId,
				vertexId,
				executionId,
				executionConfig,
				taskInfo,
				jobConfiguration,
				taskConfiguration,
				userCodeClassLoader,
				memoryManager,
				ioManager,
				broadcastVariableManager,
				taskStateManager,
				aggregateManager,
				accumulatorRegistry,
				kvStateRegistry,
				inputSplitProvider,
				distributedCacheEntries,
				producedPartitions,
				inputGates,
				network.getTaskEventDispatcher(),
				checkpointResponder,
				taskManagerConfig,
				metrics,
				this);
			
			/*
			 invokable是在解析JobGraph的时候生成相关信息的,并在此处形成真正的可执行对象
			 */
			// now load and instantiate the task's invokable code
			// nameOfInvokableClass ---> jobVertex.getInvokableClassName()
			//  通过反射生成对象
			invokable = loadAndInstantiateInvokable(userCodeClassLoader, nameOfInvokableClass, env);
			
			// ----------------------------------------------------------------
			//  actual task core work
			// ----------------------------------------------------------------
			
			// we must make strictly sure that the invokable is accessible to the cancel() call
			// by the time we switched to running.
			this.invokable = invokable;
			
			// switch to the RUNNING state, if that fails, we have been canceled/failed in the meantime
			if (!transitionState(ExecutionState.DEPLOYING, ExecutionState.RUNNING)) {
				throw new CancelTaskException();
			}
			
			// notify everyone that we switched to running
			taskManagerActions.updateTaskExecutionState(new TaskExecutionState(jobId, executionId, ExecutionState.RUNNING));
			
			// make sure the user code classloader is accessible thread-locally
			executingThread.setContextClassLoader(userCodeClassLoader);
			
			/*
			这个方法就是用户代码所真正被执行的入口。比如我们写的什么 new MapFunction() 的逻辑,最终就是在这里被执行的
			 */
			// run the invokable
			invokable.invoke();
			
			......
	}

首先向 blobService、netWork 注册 job ,添加监控,将jar 等添加到分布式缓存中,然后就 invoke,这也是 task 真正开始执行的地方,我们以 StreamTask 为例

代码语言:javascript复制
// task run 时会调用此方法 subtask 执行 用户代码的入口
	@Override
	public final void invoke() throws Exception {

		boolean disposed = false;
		try {
			// -------- Initialize ---------
			LOG.debug("Initializing {}.", getName());

			asyncOperationsThreadPool = Executors.newCachedThreadPool(new ExecutorThreadFactory("AsyncOperations", FatalExitExceptionHandler.INSTANCE));

			CheckpointExceptionHandlerFactory cpExceptionHandlerFactory = createCheckpointExceptionHandlerFactory();

			synchronousCheckpointExceptionHandler = cpExceptionHandlerFactory.createCheckpointExceptionHandler(
				getExecutionConfig().isFailTaskOnCheckpointError(),
				getEnvironment());

			asynchronousCheckpointExceptionHandler = new AsyncCheckpointExceptionHandler(this);

			//获取 stateBackend 和 checkpointStorage
			//application-defined > config > default MemoryBackend
			stateBackend = createStateBackend();
			checkpointStorage = stateBackend.createCheckpointStorage(getEnvironment().getJobID());

			// if the clock is not already set, then assign a default TimeServiceProvider
			if (timerService == null) {
				ThreadFactory timerThreadFactory = new DispatcherThreadFactory(TRIGGER_THREAD_GROUP,
					"Time Trigger for "   getName(), getUserCodeClassLoader());

				timerService = new SystemProcessingTimeService(this, getCheckpointLock(), timerThreadFactory);
			}

			// 构建 task operator chain
			operatorChain = new OperatorChain<>(this, recordWriters);
			headOperator = operatorChain.getHeadOperator();

			// task specific initialization
			//初始化,比如创建 StreamInputProcessor ,当 run 的时候就可以直接执行了
			init();

			// save the work of reloading state, etc, if the task is already canceled
			if (canceled) {
				throw new CancelTaskException();
			}

			// -------- Invoke --------
			LOG.debug("Invoking {}", getName());

			// we need to make sure that any triggers scheduled in open() cannot be
			// executed before all operators are opened
			synchronized (lock) {

				// both the following operations are protected by the lock
				// so that we avoid race conditions in the case that initializeState()
				// registers a timer, that fires before the open() is called.
				//如果是有checkpoint的,那就从state信息里恢复,不然就作为全新的算子处理
				initializeState();
				//对富操作符,执行其open操作
				openAllOperators();
			}

			// final check to exit early before starting to run
			if (canceled) {
				throw new CancelTaskException();
			}

			// let the task do its work
			isRunning = true;
			// 非 source task 最终调用的是 while (running && inputProcessor.processInput()) {
			// source task 调用的是 headOperator.run(getCheckpointLock(), getStreamStatusMaintainer());
			//然后跟前面 barrier 处理、msg 全流程、kafka msg 就可以串起来了
			run();

			// if this left the run() method cleanly despite the fact that this was canceled,
			// make sure the "clean shutdown" is not attempted
			if (canceled) {
				throw new CancelTaskException();
			}

			LOG.debug("Finished task {}", getName());

			// make sure no further checkpoint and notification actions happen.
			// we make sure that no other thread is currently in the locked scope before
			// we close the operators by trying to acquire the checkpoint scope lock
			// we also need to make sure that no triggers fire concurrently with the close logic
			// at the same time, this makes sure that during any "regular" exit where still
			synchronized (lock) {
				// this is part of the main logic, so if this fails, the task is considered failed
				closeAllOperators();

				// make sure no new timers can come
				timerService.quiesce();

				// only set the StreamTask to not running after all operators have been closed!
				// See FLINK-7430
				isRunning = false;
			}

			// make sure all timers finish
			timerService.awaitPendingAfterQuiesce();

			LOG.debug("Closed operators for task {}", getName());

			// make sure all buffered data is flushed
			operatorChain.flushOutputs();

			// make an attempt to dispose the operators such that failures in the dispose call
			// still let the computation fail
			tryDisposeAllOperators();
			disposed = true;
		}
		finally {
			//当 cancel job的时候会进入此处 关闭资源 如Buffer  function.close(),这也就是为什么要 cancel job 而不是 kill job
			
			// clean up everything we initialized
			isRunning = false;

			// Now that we are outside the user code, we do not want to be interrupted further
			// upon cancellation. The shutdown logic below needs to make sure it does not issue calls
			// that block and stall shutdown.
			// Additionally, the cancellation watch dog will issue a hard-cancel (kill the TaskManager
			// process) as a backup in case some shutdown procedure blocks outside our control.
			setShouldInterruptOnCancel(false);

			// clear any previously issued interrupt for a more graceful shutdown
			Thread.interrupted();

			// stop all timers and threads
			tryShutdownTimerService();

			// stop all asynchronous checkpoint threads
			try {
				cancelables.close();
				shutdownAsyncThreads();
			}
			catch (Throwable t) {
				// catch and log the exception to not replace the original exception
				LOG.error("Could not shut down async checkpoint threads", t);
			}

			// we must! perform this cleanup
			try {
				cleanup();
			}
			catch (Throwable t) {
				// catch and log the exception to not replace the original exception
				LOG.error("Error during cleanup of stream task", t);
			}

			// if the operators were not disposed before, do a hard dispose
			if (!disposed) {
				disposeAllOperators();
			}

			// release the output resources. this method should never fail.
			if (operatorChain != null) {
				// beware: without synchronization, #performCheckpoint() may run in
				//         parallel and this call is not thread-safe
				synchronized (lock) {
					operatorChain.releaseOutputs();
				}
			}
		}
	}

init 然后对于一些 Rich Function 会先执行其 open方法,然后开始 run,就开始真正的消费数据了。我们以 flatMap 为例 当执行 run 方法时,首先呢 OneInputStreamTask.run

代码语言:javascript复制
	@Override
	protected void run() throws Exception {
		// cache processor reference on the stack, to make the code more JIT friendly
		final StreamInputProcessor<IN> inputProcessor = this.inputProcessor;
		//处理输入的消息
		while (running && inputProcessor.processInput()) {
			// all the work happens in the "processInput" method
		}
	}

这一块的逻辑可具体参考 一文搞定 Flink 消费消息的全流程、一文搞定 Flink Checkpoint Barrier 全流程以及 一文搞懂 Flink 处理 Barrier 全过程 我们知道当往下游发送数据的时候

代码语言:javascript复制
// 这里就是真正的,用户的代码即将被执行的地方
						// now we can do the actual processing
						StreamRecord<IN> record = recordOrMark.asRecord();
						synchronized (lock) {
							numRecordsIn.inc();
							//throught KeySelector set KeyContext setCurrentKey
							streamOperator.setKeyContextElement1(record);
							streamOperator.processElement(record);
						}

继续追踪下去到 StreamFlatMap.processElement

代码语言:javascript复制
@Override
	public void processElement(StreamRecord<IN> element) throws Exception {
		//有时间戳设置时间戳,没有则设置为 Integer.Min_VALUE
		collector.setTimestamp(element);
		//自己写代码
		userFunction.flatMap(element.getValue(), collector);
	}

其他的类似,如果是 kafka source task 调用的是 headOperator.run(getCheckpointLock(), getStreamStatusMaintainer()),然后去消费 kafka 中的数据。然后跟前面 一文搞定 Flink Job 提交全流程 、写给大忙人看的Flink 消费 Kafka、一文搞定 Flink 消费消息的全流程以及一文搞定 Flink Checkpoint Barrier 全流程就可以串起来了。而 Flink 整体流程的分析,除了 restore 之外,也差不多可以告一段落了。

0 人点赞