从头分析flink源码第三篇之jobGraph的生成

2021-07-01 15:34:14 浏览数 (1)

上一篇中我们分析了一个简单的flink wordcount程序由DataStream的transformation列表转换成StreamGraph的过程,紧接着上文的步骤,本文我们着重分析一下从streamGraph生成jobGraph的过程。

背景

上一篇中我们分析了StreamGraph的生成,StreamGraph的大致结构如下:

分析入口

对于stream模式的执行会在生成StreamGraph之后都会进入到StreamExecutionEnvironment#executeAsync(org.apache.flink.streaming.api.graph.StreamGraph):

代码语言:javascript复制
@Internal
    public JobClient executeAsync(StreamGraph streamGraph) throws Exception {
        checkNotNull(streamGraph, "StreamGraph cannot be null.");
        checkNotNull(configuration.get(DeploymentOptions.TARGET), "No execution.target specified in your configuration file.");
        final PipelineExecutorFactory executorFactory =
            executorServiceLoader.getExecutorFactory(configuration);
        checkNotNull(
            executorFactory,
            "Cannot find compatible factory for specified execution.target (=%s)",
            configuration.get(DeploymentOptions.TARGET));
        CompletableFuture<JobClient> jobClientFuture = executorFactory
            .getExecutor(configuration)
            .execute(streamGraph, configuration, userClassloader);
        --------------省略部分代码----------------------

这里主要有两步操作:

•通过executorServiceLoader获取executorFactory,它是PipelineExecutorFactory类型的,主要为streamGraph的执行提供executor,它的实现类主要有以下几种:

•通过executorFactory获取executor,然后使用executor执行streamGraph,这里我们先以LocalExecutor为例。LocalExecutor#execute方法代码如下:

代码语言:javascript复制
  @Override
      public CompletableFuture<JobClient> execute(Pipeline pipeline, Configuration configuration, ClassLoader userCodeClassloader) throws Exception {
          checkNotNull(pipeline);
          checkNotNull(configuration);
          // 有效的配置
          Configuration effectiveConfig = new Configuration();
          effectiveConfig.addAll(this.configuration);
          effectiveConfig.addAll(configuration);
          // we only support attached execution with the local executor.
          checkState(configuration.getBoolean(DeploymentOptions.ATTACHED));
          // 获取了jobGraph
          final JobGraph jobGraph = getJobGraph(pipeline, effectiveConfig);
          return PerJobMiniClusterFactory.createWithFactory(effectiveConfig, miniClusterFactory).submitJob(jobGraph, userCodeClassloader);
      }

可以看到,这个方法中会选进行jobGraph的生成,然后将jobGraph提交到miniCluster上去进行下一步的操作。LocalExecutor的getJobGraph方法将是本文分析的重点。

LocalExecutor#getJobGraph

这里我们就不再一步步地贴代码了,直接来看整个调用链路:

StreamingJobGraphGenerator中包含streamGraph的全部信息,并最终由它来进行jobGraph的生成,我们接下来重点分析一下。

StreamingJobGraphGenerator#createJobGraph()方法

生成jobGraph的方法代码如下:

代码语言:javascript复制
private JobGraph createJobGraph() {
        preValidate();
        // streaming模式下都是EAGER模式
        // make sure that all vertices start immediately
        jobGraph.setScheduleMode(streamGraph.getScheduleMode());
        // 返回是否启用了近似本地恢复
        jobGraph.enableApproximateLocalRecovery(streamGraph
            .getCheckpointConfig()
            .isApproximateLocalRecoveryEnabled());
        // Generate deterministic hashes for the nodes in order to identify them across
        // submission iff they didn't change.
        // hash值用来标识JobVertexID,以便在作业提交时识别节点(如果它们没有更改)。这里会按顺序遍历整个streamGraph,为所有StreamNode都生成一个唯一的hash值。
        Map<Integer, byte[]> hashes = defaultStreamGraphHasher.traverseStreamGraphAndGenerateHashes(
            streamGraph);
        // Generate legacy version hashes for backwards compatibility
        List<Map<Integer, byte[]>> legacyHashes = new ArrayList<>(legacyStreamGraphHashers.size());
        // 在org.apache.flink.streaming.api.graph.StreamGraphGenerator.legacyTransform中对应的FeedBackTransformation中会存在用户自定义hash值的情况,这里会根据特定的hasher来处理streamGraph中这种类型的节点的hash值
        for (StreamGraphHasher hasher : legacyStreamGraphHashers) {
            legacyHashes.add(hasher.traverseStreamGraphAndGenerateHashes(streamGraph));
        }
        // 输入新版本的hashes值和旧版本的legacyHashes
        // 设置算子链
        setChaining(hashes, legacyHashes);
        // 物理边,主要为Edge的目标顶点设置特理入边,并进入序列化。主要用有序的物理边及其typeNumber来控制多输入时InputGate的顺序;多输入的情况下typeNumber是在1的基础上以步长为1增加的
        setPhysicalEdges();
        // 设置slot sharing group和colocation
        setSlotSharingAndCoLocation();
        // 设置内存
        setManagedMemoryFraction(
            Collections.unmodifiableMap(jobVertices),
            Collections.unmodifiableMap(vertexConfigs),
            Collections.unmodifiableMap(chainedConfigs),
            id -> streamGraph.getStreamNode(id).getManagedMemoryOperatorScopeUseCaseWeights(),
            id -> streamGraph.getStreamNode(id).getManagedMemorySlotScopeUseCases());
        // 配置checkpoint
        configureCheckpointing();
        // 配置savepoint
        jobGraph.setSavepointRestoreSettings(streamGraph.getSavepointRestoreSettings());
        JobGraphUtils.addUserArtifactEntries(streamGraph.getUserArtifacts(), jobGraph);
        // set the ExecutionConfig last when it has been finalized
        try {
            jobGraph.setExecutionConfig(streamGraph.getExecutionConfig());
        } catch (IOException e) {
            throw new IllegalConfigurationException("Could not serialize the ExecutionConfig."  
                "This indicates that non-serializable types (like custom serializers) were registered");
        }
        return jobGraph;
    }

该方法比较长,我们拆开为以下几步来进行分析:

1.为每个StreamNode生成一个唯一的hash值;2.将能够chain在一起的StreamNode chain在一起,生成StreamConfig和JobVertex、JobEdge等信息;3.设置物理边信息;4.设置slot sharing group和co-location;5.设置内存、checkpoint、savepoint和其他用户自定义信息。

1. 为每个StreamNode生成一个唯一的hash值

目前分析的flink 1.12.0版本的源码中维护着两种hash值生成方案,一种是默认的hash值生成,另一种是用户自定义的。代码如下:

代码语言:javascript复制
Map<Integer, byte[]> hashes = defaultStreamGraphHasher.traverseStreamGraphAndGenerateHashes(
            streamGraph);
        // Generate legacy version hashes for backwards compatibility
        List<Map<Integer, byte[]>> legacyHashes = new ArrayList<>(legacyStreamGraphHashers.size());
        // 在org.apache.flink.streaming.api.graph.StreamGraphGenerator.legacyTransform中对应的FeedBackTransformation中会存在用户自定义hash值的情况,这里会根据特定的hasher来处理streamGraph中这种类型的节点的hash值
        for (StreamGraphHasher hasher : legacyStreamGraphHashers) {
            legacyHashes.add(hasher.traverseStreamGraphAndGenerateHashes(streamGraph));
        }

•defaultStreamGraphHasher是StreamGraphHasherV2类型的,如果用户指定了transformationId,会以transformationId的值来生成该节点的hash值;否则会使用节点的输入输出信息及输入节点的hash值来进行hash值的生成。这里会生成一份nodeId与节点hash值的映射关系;•legacyStreamGraphHashers在当前版本中为StreamGraphUserHashHasher,它主要是根据StreamNode中的userHash值来作为结果map的value。

2.设置算子链

从source节点为起点进行递归遍历来获取最终的jobGraph。我们来看一下代码:

代码语言:javascript复制
/**
     * Sets up task chains from the source {@link StreamNode} instances.
     *
     * <p>This will recursively create all {@link JobVertex} instances.
     */
    private void setChaining(Map<Integer, byte[]> hashes, List<Map<Integer, byte[]>> legacyHashes) {
        // we separate out the sources that run as inputs to another operator (chained inputs)
        // from the sources that needs to run as the main (head) operator.
        final Map<Integer, OperatorChainInfo> chainEntryPoints = buildChainedInputsAndGetHeadInputs(
            hashes,
            legacyHashes);
        // 初始的值,内部是array copy,@see java.util.ArrayList.toArray()
        final Collection<OperatorChainInfo> initialEntryPoints = new ArrayList<>(chainEntryPoints.values());

        // iterate over a copy of the values, because this map gets concurrently modified
        for (OperatorChainInfo info : initialEntryPoints) {
            createChain(
                // 从source节点开始
                info.getStartNodeId(),
                1,  // operators start at position 1 because 0 is for chained source inputs
                info,
                chainEntryPoints);
        }
    }

在buildChainedInputsAndGetHeadInputs方法内部会将source节点及ChainingStrategy为HEAD_WITH_SOURCES的节点生成OperatorChainInfo对象。然后进入到createChain方法,我们直接来看代码:

代码语言:javascript复制
    private List<StreamEdge> createChain(
        final Integer currentNodeId,
        final int chainIndex,
        final OperatorChainInfo chainInfo,
        final Map<Integer, OperatorChainInfo> chainEntryPoints) {
        Integer startNodeId = chainInfo.getStartNodeId();
        if (!builtVertices.contains(startNodeId)) {
            // 记录不能chain在一起的OperatorChainInfo的起始节点,为了后面OperatorChainInfo的connect做准备
            List<StreamEdge> transitiveOutEdges = new ArrayList<StreamEdge>();
            List<StreamEdge> chainableOutputs = new ArrayList<StreamEdge>();
            List<StreamEdge> nonChainableOutputs = new ArrayList<StreamEdge>();
            StreamNode currentNode = streamGraph.getStreamNode(currentNodeId);
            //遍历当前节点的出边,放到chainableOutputs和nonChainableOutputs列表中
            for (StreamEdge outEdge : currentNode.getOutEdges()) {
                if (isChainable(outEdge, streamGraph)) {
                    chainableOutputs.add(outEdge);
                } else {
                    nonChainableOutputs.add(outEdge);
                }
            }
            //处理当前节点可以chain在一起的outputs
            for (StreamEdge chainable : chainableOutputs) {
                // 所有可以进行Chain的出边也都进行处理
                transitiveOutEdges.addAll(
                    createChain(
                        chainable.getTargetId(),
                        chainIndex   1,
                        chainInfo,// 可以chain在一起的出边也会与它的source节点共用一个chainInfo
                        chainEntryPoints));
            }
            for (StreamEdge nonChainable : nonChainableOutputs) {// 不能chain在一起的在这里递归处理
                transitiveOutEdges.add(nonChainable);
                createChain(
                    nonChainable.getTargetId(),
                    1, // operators start at position 1 because 0 is for chained source inputs
                    chainEntryPoints.computeIfAbsent(
                        nonChainable.getTargetId(),
                        // 创建新的chainInfo,它的startId为其目标节点的id。因为它本身已经放到了transitiveOutEdges中
                        (k) -> chainInfo.newChain(nonChainable.getTargetId())),
                    chainEntryPoints);
            }
            // 节点id和链名称的映射
            chainedNames.put(
                currentNodeId,
                createChainedName(
                    currentNodeId,
                    chainableOutputs,
                    Optional.ofNullable(chainEntryPoints.get(currentNodeId))));
            // 当前节点的资源情况
            chainedMinResources.put(
                currentNodeId,
                createChainedMinResources(currentNodeId, chainableOutputs));
            chainedPreferredResources.put(
                currentNodeId,
                createChainedPreferredResources(currentNodeId, chainableOutputs));
            // 当前的chainInfo,可能包括多个节点。把当前节点加入到chain中来(包括节点hashes和legacyHashes)
            OperatorID currentOperatorId = chainInfo.addNodeToChain(
                currentNodeId,
                chainedNames.get(currentNodeId));
            // 如果A->B可以chain在一起,最后的情况是B结点先运行到这里,然后A结点运行到这里。它们的startNodeId都是A节点的nodeId
            if (currentNode.getInputFormat() != null) {
                // 如果当前节点的inputFormat不为空,则创建formatContainer
                getOrCreateFormatContainer(startNodeId).addInputFormat(
                    currentOperatorId,
                    currentNode.getInputFormat());// input类型的,放入chainedInputOutputFormats中,在创䢖JobVertex时会有所不同
            }
            if (currentNode.getOutputFormat() != null) {
                // 如果有outPutFormat则指定outputFormat
                getOrCreateFormatContainer(startNodeId).addOutputFormat(
                    currentOperatorId,
                    currentNode.getOutputFormat());// output类型的,放入chainedInputOutputFormats中,在创建Jobvertex时会有所不同
            }
            // StreamConfig是可序列化的,如果当前节点为chain的头节点的话,则创建JobVertex,否则创建StreamConfig
            // 头节点运行到这里发生在同一个chain中的节点的后面;也就是说对每个链来说这一步是从后向前执行的
            StreamConfig config = currentNodeId.equals(startNodeId)
                ? createJobVertex(startNodeId, chainInfo)
                : new StreamConfig(new Configuration());
            // 设置顶点配置,如果当前节点是startNode则会在上面一步中生成JobVertex并返回该vertex的StreamConfig,如果不是startNode则会在上一步中创建新的StreamConfig;头节点运行到这里发生在同一个chain中的节点的后面
            setVertexConfig(
                currentNodeId,
                config,
                chainableOutputs,
                nonChainableOutputs,
                chainInfo.getChainedSources());
            if (currentNodeId.equals(startNodeId)) {
                // 如果当前节点就是chain的头节点
                config.setChainStart();
                // 设置chainIndex
                config.setChainIndex(chainIndex);
            config.setOperatorName(streamGraph.getStreamNode(currentNodeId).getOperatorName());
                for (StreamEdge edge : transitiveOutEdges) {
                    // 这里会连接上一个算子链(可能也只有一个节点,也可能是多个节点chain在一起的,如果该链是最下游的节点则无需进行connect)与下游的不能chain在一起的出边对应的node
                    connect(
                        startNodeId,
                        edge);// 主要是生成jobEdge,而且整个jobVertex的生成是从后向前的,到这个节点时证明它的下游节点的jobVertex都已经创建成功了。这里会生成jobEdge并放到下游节点的inputs列表中去。
                }
                // 设置它的有序输出边
                config.setOutEdgesInOrder(transitiveOutEdges);
                // 添加能chain在一起的StreamNode的配置,会被序列化放到chainedConfigs; 如果两个节点被chain在一起,这里的chainedConfigs中会有两条记录(头节点是最后执行到这里的)
                config.setTransitiveChainedTaskConfigs(chainedConfigs.get(startNodeId));
            } else {
                chainedConfigs.computeIfAbsent(
                    startNodeId,
                    k -> new HashMap<Integer, StreamConfig>());

                config.setChainIndex(chainIndex);// 设置在链中的索引,只有非头节点会生成chainedConfigs内容,头节点会通过StreamConfig#setTransitiveChainedTaskConfigs将这些配置放到它的StreamConfig中去
                StreamNode node = streamGraph.getStreamNode(currentNodeId);
                config.setOperatorName(node.getOperatorName());
                chainedConfigs.get(startNodeId).put(currentNodeId, config);
            }
            // 设置operatorID,里面能取到该节点的primaryHash值
            config.setOperatorID(currentOperatorId);
            // chainableOutputs对应的节点就是当前chain的最终节点了,一个节点可能既是头节点又是结尾节点(上下都没有chain的情况)
            if (chainableOutputs.isEmpty()) {
                config.setChainEnd();
            }
            // 每一层往外返回的都是outEdge,最终返回的会是所有的StreamEdge列表
            return transitiveOutEdges;
        } else {
            return new ArrayList<>();
        }
    }

大致逻辑如下:

由于streamGraph的StreamNode之间是通过StreamEdge来连接的,所以这里对每个节点的出边进行迭代,把能chain在一起的chain在一起,然后生成JobVertex、JobEdge和IntermediateDataSet。

对于能chain在一起的节点,会延用其source节点的OperatorChainInfo并递增chainIndex向下递归,不能chain在一起的节点会新建OperatorChainInfo往下递归。

整个StreamNode链,递归的结果是从后往前处理剩余非递归部分的逻辑的。只会在每个子链的头结点的部分生成JobVertex和JobEdge及IntermediateDataSet;如果不是每个子链的startNode,则证明目前处理的还是每个子链的下游节点部分,会将该节点需要处理的信息放到StreamConfig对象中。如果是每个子链的startNode,则证明它的下游节点已经处理完成了,会生成JobVertex并将JobVertex节点信息放到StreamConfig中返回回来。

当递归处理时处理到的节点为每个子链的头节点时,会进行连接操作,将该节点与其下游子链的头节点进行连接,生成JobEdge和IntermediateDataSet(这部分的逻辑在StreamingJobGraphGenerator#connect方法中),这里会指定连接方式是POINTWISE还是ALL_TO_ALL。同时将当前建立连接的出边信息放到physicalEdgesInOrder列表中,这里维护的是有序的物理边,从后往前有序,在下文物理边的设置中会使用到。

需要注意的是chain在一起的节点的StreamConfig是怎么处理的,迭代过程中有个判断,如果是头节点的话会进行如下设置:

代码语言:javascript复制
    // 添加能chain在一起的StreamNode的配置,会被序列化放到chainedConfigs; 如果两个节点被chain在一起,这里的chainedConfigs中会有两条记录(头节点是最后执行到这里的)
config.setTransitiveChainedTaskConfigs(chainedConfigs.get(startNodeId));

    public void setTransitiveChainedTaskConfigs(Map<Integer, StreamConfig> chainedTaskConfigs) {
        try {
            InstantiationUtil.writeObjectToConfig(chainedTaskConfigs, this.config, CHAINED_TASK_CONFIG);
        } catch (IOException e) {
            throw new StreamTaskException("Could not serialize configuration.", e);
        }
    }

chainedConfigs里面是存放所有能chain在一起的节点的容器,以startNodeId为key,非头节点的nodeId和StreamConfig的映射为value。这里会将这些信息存到头节点配置的chainedTaskConfig_中去。

这setChaining方法执行完之后,StreamingJobGraphGenerator中的属性大致如下图:

可以看到每个StreamNode都对应一个StreamConfig对象。节点4和节点5是chain在一起的,其中节点4为头节点。这一点在jobGraph中也有体现,在有chain在一起的JobVertex中有两个operatorId:

需要注意的是:这个operatorID主要是给checkpoint使用的。

到这里大家应该会有个疑问,那就是在生成OneInputStreamTask的时候,这个有两个operator chain在一起的这个JobVertexID最后是怎么识别另外一个算子操作的呢(在ExecutionGraph中也不涉及到这部分)?

这里我们简单地提一下,在StreamTask的beforeInvoke方法中会为每一个Task生成一个OperatorChain对象,在创建outputCollector时会根据chain的情况生成对应的operator放到allOperatorWrappers中。

关于这一点我们后续会用专门的篇幅来分析,这里顺带着提一下。这里会对当前task涉及到的operatorConfig生成对应的operator,将task的输入和输出进行一个串联,对于chain在一起的operator之间不会涉及数据的网络传输。

在setChaining操作之后生成的jobGraph主体结构如下图:

3. 设置物理边

直接来看代码:

代码语言:javascript复制
private void setPhysicalEdges() {
        Map<Integer, List<StreamEdge>> physicalInEdgesInOrder = new HashMap<Integer, List<StreamEdge>>();
        // 有序的StreamEdge
        for (StreamEdge edge : physicalEdgesInOrder) {// 遍历有序的物理边集合
            int target = edge.getTargetId();
            List<StreamEdge> inEdges = physicalInEdgesInOrder.computeIfAbsent(
                target,
                k -> new ArrayList<>());// 如果不存在返回新创建的list
            inEdges.add(edge);// 将边添加进去,形成targetId->edge集合之间的映射
        }
        // 为什么是从前往后找,为什么要找targetId?  因为下面要设置JobVertex顶点的streamConfig的InPhysicalEdges
        for (Map.Entry<Integer, List<StreamEdge>> inEdges : physicalInEdgesInOrder.entrySet()) {
            int vertex = inEdges.getKey();// 目标顶点id
            List<StreamEdge> edgeList = inEdges.getValue();// 相对于目标顶点来说这是入边
            // 物理边设置的还是StreamEdge
            // 对于多输入的task有用,主要用于确定输入的顺序,在jobEdge中是没办法确定这种顺序的
            vertexConfigs.get(vertex).setInPhysicalEdges(edgeList);// 对目标顶点设置物理入边,序列化操作
        }
    }

这里会遍历physicalEdgesInOrder列表,它是一个有序的物理边集合,在wordcount实例中,它运行到这里时的实际值如下图:

顺序是从后往前。setPhysicalEdges方法的作用无非是向Flat Map节点的StreamConfig和Keyed Aggregation节点的StreamConfig中设置入边列表,并进行序列化操作。

4. 设置slot sharing group和co-location

这一部分会先进行region的划分(涉及到连通器算法),然后设置slot sharingGroup信息。再根据slot sharingGroup的信息对设置了coLocationGroupKey的节点进行coLocationGroups的划分。主要作用还是体现在slot资源调度上,关于这点的详细信息可以参考博客:http://chenyuzhao.me/2017/02/09/flink-scheduler/

5. 其他设置

进行内存、checkpoint、savepoint、用户文件等的设置,最终形成的jobGraph结构如下:

总结

通过与streamGraph的对比我们可以发现,jobGraph是对streamGraph进行了一定优化处理后的结果,如一些operator的chaining操作,slot sharing group与coLocationGroupKey的设置,JobVertex之间连接拓朴的变化(在JobEdge后面添加了IntermediateDataSet)等。

对于从StreamGraph到JobGraph的变化,主要总结如下(来自Jark's Blog的一段介绍[1],里面对为何flink要用多层图来进行任务处理也有详细解释):

•StreamGraph:根据用户通过 Stream API 编写的代码生成的最初的图。•StreamNode:用来代表 operator 的类,并具有所有相关的属性,如并发度、入边和出边等。•StreamEdge:表示连接两个StreamNode的边。•JobGraph:StreamGraph经过优化后生成了 JobGraph,提交给 JobManager 的数据结构。•JobVertex:经过优化后符合条件的多个StreamNode可能会chain在一起生成一个JobVertex,即一个JobVertex包含一个或多个operator,JobVertex的输入是JobEdge,输出是IntermediateDataSet。•IntermediateDataSet:表示JobVertex的输出,即经过operator处理产生的数据集。producer是JobVertex,consumer是JobEdge。•JobEdge:代表了job graph中的一条数据传输通道。source 是 IntermediateDataSet,target 是 JobVertex。即数据通过JobEdge由IntermediateDataSet传递给目标JobVertex。

References

[1] Jark's Blog的一段介绍: http://wuchong.me/blog/2016/05/03/flink-internals-overview/

0 人点赞