Flink: 两个递归彻底搞懂operator chain

2022-04-18 13:22:45 浏览数 (1)

operator chain是指将满足一定条件的operator 链在一起,放在同一个task里面执行,是Flink任务优化的一种方式,在同一个task里面的operator的数据传输变成函数调用关系,这种方式减少数据传输过程。常见的chain例如:source->map->filter,这样的任务链可以chain在一起,那么其内部是如何决定是否能够chain在一起与chain一起之后如何执行就是本篇文章将要剖析的重点。

第一个递归:JobGraph生成

Flink中划分了四种图:StreamGraph、JobGraph、ExecutionGraph、物理执行图,前两种StreamGraph、JobGraph是在客户端生成,ExecutionGraph在jobMaster中生成,最后一种物理执行图是一种虚拟的图,不存在的数据结构,运行在每一个TaskExecutor中。我们在Flink Web UI中看到的就是JobGraph,如下:

JobGraph相对于StreamGraph,可以理解为优化过后的StreamGraph,将能够chain一起的operator chain在一起,上图将source与filter两个operator chain在一起了,这个步骤在生成JobGraph过程中完成。其具体实现在StreamingJobGraphGenerator中:

代码语言:javascript复制
private JobGraph createJobGraph() {
        .....
        Map<Integer, List<Tuple2<byte[], byte[]>>> chainedOperatorHashes = new HashMap<>();
        setChaining(hashes, legacyHashes, chainedOperatorHashes);
        setPhysicalEdges();
        setSlotSharingAndCoLocation();
        configureCheckpointing();
     ....
    }

重点就在setChaining方法中,在里面调用createChain方法,构造JobVertix的同时完成operator chain的操作,createChain方法:

代码语言:javascript复制
private List<StreamEdge> createChain(
            Integer startNodeId,
            Integer currentNodeId,
            Map<Integer, byte[]> hashes,
            List<Map<Integer, byte[]>> legacyHashes,
            int chainIndex,
            Map<Integer, List<Tuple2<byte[], byte[]>>> chainedOperatorHashes) {

      if (!builtVertices.contains(startNodeId)) {
   //chain 的出边
          List<StreamEdge> transitiveOutEdges = new ArrayList<StreamEdge>();
   //能够chain在一起的边
          List<StreamEdge> chainableOutputs = new ArrayList<StreamEdge>();
    //不能够chain一起的边
          List<StreamEdge> nonChainableOutputs = new ArrayList<StreamEdge>();

          for (StreamEdge outEdge : streamGraph.getStreamNode(currentNodeId).getOutEdges()) {
      //isChainable判断是否能够chain在一起
              if (isChainable(outEdge, streamGraph)) {
                  chainableOutputs.add(outEdge);
              } else {
                  nonChainableOutputs.add(outEdge);
              }
          }

          for (StreamEdge chainable : chainableOutputs) {
      //能够chain在一起那么遍历下一个节点
              transitiveOutEdges.addAll(
                      createChain(startNodeId, chainable.getTargetId(), hashes, legacyHashes, chainIndex   1, chainedOperatorHashes));
          }

          for (StreamEdge nonChainable : nonChainableOutputs) {
              transitiveOutEdges.add(nonChainable); 
      //以不能chain在一起的节点为起始点重新开始往下遍历
              createChain(nonChainable.getTargetId(), nonChainable.getTargetId(), hashes, legacyHashes, 0, chainedOperatorHashes);
          }

            List<Tuple2<byte[], byte[]>> operatorHashes =
                chainedOperatorHashes.computeIfAbsent(startNodeId, k -> new ArrayList<>());

            byte[] primaryHashBytes = hashes.get(currentNodeId);

            for (Map<Integer, byte[]> legacyHash : legacyHashes) {
                operatorHashes.add(new Tuple2<>(primaryHashBytes, legacyHash.get(currentNodeId)));
            }

            chainedNames.put(currentNodeId, createChainedName(currentNodeId, chainableOutputs));
            chainedMinResources.put(currentNodeId, createChainedMinResources(currentNodeId, chainableOutputs));
            chainedPreferredResources.put(currentNodeId, createChainedPreferredResources(currentNodeId, chainableOutputs));

      //如果currentNodeId=startNodeId 那么就说明是一个chain的起点,则需要创建jobVertix
     //不是则表示是chain的一部分,只需要创建StreamConfig即可
        StreamConfig config = currentNodeId.equals(startNodeId)
                ? createJobVertex(startNodeId, hashes, legacyHashes, chainedOperatorHashes)
                : new StreamConfig(new Configuration());

        setVertexConfig(currentNodeId, config, chainableOutputs, nonChainableOutputs);

        if (currentNodeId.equals(startNodeId)) {
            config.setChainStart(); //起始chain
            config.setChainIndex(0);
            config.setOperatorName(streamGraph.getStreamNode(currentNodeId).getOperatorName());
            config.setOutEdgesInOrder(transitiveOutEdges);
            config.setOutEdges(streamGraph.getStreamNode(currentNodeId).getOutEdges());
    //连接边
            for (StreamEdge edge : transitiveOutEdges) {
                connect(startNodeId, edge);
            }

            config.setTransitiveChainedTaskConfigs(chainedConfigs.get(startNodeId));

        } else {
    //currentNodeId属于chain的一部分 
            Map<Integer, StreamConfig> chainedConfs = chainedConfigs.get(startNodeId);

       if (chainedConfs == null) {
                    chainedConfigs.put(startNodeId, new HashMap<Integer, StreamConfig>());
        }
          config.setChainIndex(chainIndex);
          StreamNode node = streamGraph.getStreamNode(currentNodeId);
          config.setOperatorName(node.getOperatorName());
          chainedConfigs.get(startNodeId).put(currentNodeId, config);
      }

        config.setOperatorID(new OperatorID(primaryHashBytes));

        if (chainableOutputs.isEmpty()) {
            config.setChainEnd();
        }
  //返回chain的出边
        return transitiveOutEdges;

    } else {
        return new ArrayList<>();
    }
    }

整个过程就是一个递归的过程,createChain 过程就是不断寻找一个chain的出边,如果邻接的两个operator(source与filter)能够chain在一起,那么就以下一个能够chain一起的operator(filter)为起点,继续寻找,直到找到不能够chain一起的operator(process1),但是此时并没有立刻返回,而是以当前不能chain再一起的operator(process1)为起点继续往下寻找,直到终点(sink)开始一层一层返回,实际上其构造过程是一个反向过程:sink->process2->process1->(source&filter) 这样的一个过程完成operator chain并且构造JobVertix (可通过debug方式查看其详细过程)。

如何判断两个相邻的operator(StreamNode)能够chain在一起?通过isChainable方法判断:

代码语言:javascript复制
public static boolean isChainable(StreamEdge edge, StreamGraph streamGraph) {
        StreamNode upStreamVertex = streamGraph.getSourceVertex(edge);
        StreamNode downStreamVertex = streamGraph.getTargetVertex(edge);

        StreamOperator<?> headOperator = upStreamVertex.getOperator();
        StreamOperator<?> outOperator = downStreamVertex.getOperator();

        return downStreamVertex.getInEdges().size() == 1 //下游的数据流入只有一个节点
          && outOperator != null
          && headOperator != null
          && upStreamVertex.isSameSlotSharingGroup(downStreamVertex) //在同一个slotGroup中
  && outOperator.getChainingStrategy() == ChainingStrategy.ALWAYS //开启operator chain策略
          && (headOperator.getChainingStrategy() == ChainingStrategy.HEAD ||
      headOperator.getChainingStrategy() == ChainingStrategy.ALWAYS) //head 表示的是一个起点
          && (edge.getPartitioner() instanceof ForwardPartitioner) //直接转发方式
          && upStreamVertex.getParallelism() == downStreamVertex.getParallelism() //并行度相同
          && streamGraph.isChainingEnabled(); //默认允许chain
    }

第二个递归:OperatorChain生成

当Execution在deploy的过程中,也就是Task在TaskExecutor启动过程中, 会生成一个OperatorChain对象,在该OperatorChain对象中包含了所有的能够chain在一起的operator(source&filter),其内部会生成一个名为chainEntryPoint的WatermarkGaugeExposingOutput对象,一个将数据输出的对象,其输出有两种形式:

1.函数调用,将数据推送给chain在一起的下一个operator节点(filter中)

2.输出到下一个没有被chain的operator(process1)

那么chainEntryPoint是如何生成的?

OperatorChain的初始化是在StreamTask中被调用的:

代码语言:javascript复制
operatorChain = new OperatorChain<>(this, recordWriters);
headOperator = operatorChain.getHeadOperator();

在其构造函数中调用:

代码语言:javascript复制
this.chainEntryPoint = createOutputCollector(
                containingTask, //当前的streamTask
                configuration, //chain的第一个节点的StreamConfig
                chainedConfigs, //该chain的所有StreamConfig
                userCodeClassloader,
                streamOutputMap,
                allOps);

createOutputCollector获取当前节点的out,

代码语言:javascript复制
private <T> WatermarkGaugeExposingOutput<StreamRecord<T>> createOutputCollector(
            StreamTask<?, ?> containingTask,
            StreamConfig operatorConfig,
            Map<Integer, StreamConfig> chainedConfigs,
            ClassLoader userCodeClassloader,
            Map<StreamEdge, RecordWriterOutput<?>> streamOutputs,
            List<StreamOperator<?>> allOperators) {
        List<Tuple2<WatermarkGaugeExposingOutput<StreamRecord<T>>, StreamEdge>> allOutputs = new ArrayList<>(4);

        // 当前operator的网络方式输出(filter->process1)
      for (StreamEdge outputEdge : operatorConfig.getNonChainedOutputs(userCodeClassloader)) {
          @SuppressWarnings("unchecked")
          RecordWriterOutput<T> output = (RecordWriterOutput<T>) streamOutputs.get(outputEdge);

          allOutputs.add(new Tuple2<>(output, outputEdge));
      }

        // chain out (source->filter)
        for (StreamEdge outputEdge : operatorConfig.getChainedOutputs(userCodeClassloader)) {
            int outputId = outputEdge.getTargetId();
            StreamConfig chainedOpConfig = chainedConfigs.get(outputId);
     //寻找被chain在一起的下一个operator(filter)的out
        WatermarkGaugeExposingOutput<StreamRecord<T>> output = createChainedOperator(
            containingTask,
            chainedOpConfig,
            chainedConfigs,
            userCodeClassloader,
            streamOutputs,
            allOperators,
            outputEdge.getOutputTag());
        allOutputs.add(new Tuple2<>(output, outputEdge));
        }

  //最后将当前节点的out返回
    .....

        }
    }

createChainedOperator方法:

代码语言:javascript复制
private <IN, OUT> WatermarkGaugeExposingOutput<StreamRecord<IN>> createChainedOperator(
            StreamTask<?, ?> containingTask,
            StreamConfig operatorConfig,
            Map<Integer, StreamConfig> chainedConfigs,
            ClassLoader userCodeClassloader,
            Map<StreamEdge, RecordWriterOutput<?>> streamOutputs,
            List<StreamOperator<?>> allOperators,
            OutputTag<IN> outputTag) {
        // 调用createOutputCollector 获取当前operator的out
        WatermarkGaugeExposingOutput<StreamRecord<OUT>> chainedOperatorOutput = createOutputCollector(
            containingTask,
            operatorConfig,
            chainedConfigs,
            userCodeClassloader,
            streamOutputs,
            allOperators);

        // 获取当前的StreamOperator
        OneInputStreamOperator<IN, OUT> chainedOperator = operatorConfig.getStreamOperator(userCodeClassloader);
   //作为当前的out 传入setup方法中
        chainedOperator.setup(containingTask, operatorConfig, chainedOperatorOutput);

        allOperators.add(chainedOperator);

  //将被chain的operator(filter)传给上一个operator(source)的out
  //那么在out中就可以直接调用filter的处理source的输出数据了
        WatermarkGaugeExposingOutput<StreamRecord<IN>> currentOperatorOutput;
        if (containingTask.getExecutionConfig().isObjectReuseEnabled()) {
            currentOperatorOutput = new ChainingOutput<>(chainedOperator, this, outputTag);
        }
        else {
            TypeSerializer<IN> inSerializer = operatorConfig.getTypeSerializerIn1(userCodeClassloader);
            currentOperatorOutput = new CopyingChainingOutput<>(chainedOperator, inSerializer, outputTag, this);
        }
    ....
        return currentOperatorOutput;
    }

可以看出整个构造chainEntryPoint的过程也是一个递归的过程,会不断寻找被chain在一起的下一个operator的out,直到下游没有可chain的位置,返回网络out作为最后一个operator的out,然后通过使用当前operator构造前一个operator的out,同样是一个反向构造out的过程(filterOut->sourceOut)。简化一下逻辑代码:

代码语言:javascript复制
createOut(currentOperator){

    if(currentOperator.isNetworkOut){
        currOut=networkOut();
    }else{
        currOut=creatOut(nextOperstor);
    }
    currentOperator.setOut(currOut);
    preOut=CopyingOut(currentOperator);
    return preOut;
}

最终得到的chainEntryPoint就是headOperator的out,这样在其内部不断的通过out调用operator的方式实现了chain的函数调用链关系。

总结

透过以上分析,operator chain就是将满足一定条件的的operator通过函数调用方式传递数据,避免了数据传输的中间过程。

0 人点赞