operator chain是指将满足一定条件的operator 链在一起,放在同一个task里面执行,是Flink任务优化的一种方式,在同一个task里面的operator的数据传输变成函数调用关系,这种方式减少数据传输过程。常见的chain例如:source->map->filter,这样的任务链可以chain在一起,那么其内部是如何决定是否能够chain在一起与chain一起之后如何执行就是本篇文章将要剖析的重点。
第一个递归:JobGraph生成
Flink中划分了四种图:StreamGraph、JobGraph、ExecutionGraph、物理执行图,前两种StreamGraph、JobGraph是在客户端生成,ExecutionGraph在jobMaster中生成,最后一种物理执行图是一种虚拟的图,不存在的数据结构,运行在每一个TaskExecutor中。我们在Flink Web UI中看到的就是JobGraph,如下:
JobGraph相对于StreamGraph,可以理解为优化过后的StreamGraph,将能够chain一起的operator chain在一起,上图将source与filter两个operator chain在一起了,这个步骤在生成JobGraph过程中完成。其具体实现在StreamingJobGraphGenerator中:
代码语言:javascript复制private JobGraph createJobGraph() {
.....
Map<Integer, List<Tuple2<byte[], byte[]>>> chainedOperatorHashes = new HashMap<>();
setChaining(hashes, legacyHashes, chainedOperatorHashes);
setPhysicalEdges();
setSlotSharingAndCoLocation();
configureCheckpointing();
....
}
重点就在setChaining方法中,在里面调用createChain方法,构造JobVertix的同时完成operator chain的操作,createChain方法:
代码语言:javascript复制private List<StreamEdge> createChain(
Integer startNodeId,
Integer currentNodeId,
Map<Integer, byte[]> hashes,
List<Map<Integer, byte[]>> legacyHashes,
int chainIndex,
Map<Integer, List<Tuple2<byte[], byte[]>>> chainedOperatorHashes) {
if (!builtVertices.contains(startNodeId)) {
//chain 的出边
List<StreamEdge> transitiveOutEdges = new ArrayList<StreamEdge>();
//能够chain在一起的边
List<StreamEdge> chainableOutputs = new ArrayList<StreamEdge>();
//不能够chain一起的边
List<StreamEdge> nonChainableOutputs = new ArrayList<StreamEdge>();
for (StreamEdge outEdge : streamGraph.getStreamNode(currentNodeId).getOutEdges()) {
//isChainable判断是否能够chain在一起
if (isChainable(outEdge, streamGraph)) {
chainableOutputs.add(outEdge);
} else {
nonChainableOutputs.add(outEdge);
}
}
for (StreamEdge chainable : chainableOutputs) {
//能够chain在一起那么遍历下一个节点
transitiveOutEdges.addAll(
createChain(startNodeId, chainable.getTargetId(), hashes, legacyHashes, chainIndex 1, chainedOperatorHashes));
}
for (StreamEdge nonChainable : nonChainableOutputs) {
transitiveOutEdges.add(nonChainable);
//以不能chain在一起的节点为起始点重新开始往下遍历
createChain(nonChainable.getTargetId(), nonChainable.getTargetId(), hashes, legacyHashes, 0, chainedOperatorHashes);
}
List<Tuple2<byte[], byte[]>> operatorHashes =
chainedOperatorHashes.computeIfAbsent(startNodeId, k -> new ArrayList<>());
byte[] primaryHashBytes = hashes.get(currentNodeId);
for (Map<Integer, byte[]> legacyHash : legacyHashes) {
operatorHashes.add(new Tuple2<>(primaryHashBytes, legacyHash.get(currentNodeId)));
}
chainedNames.put(currentNodeId, createChainedName(currentNodeId, chainableOutputs));
chainedMinResources.put(currentNodeId, createChainedMinResources(currentNodeId, chainableOutputs));
chainedPreferredResources.put(currentNodeId, createChainedPreferredResources(currentNodeId, chainableOutputs));
//如果currentNodeId=startNodeId 那么就说明是一个chain的起点,则需要创建jobVertix
//不是则表示是chain的一部分,只需要创建StreamConfig即可
StreamConfig config = currentNodeId.equals(startNodeId)
? createJobVertex(startNodeId, hashes, legacyHashes, chainedOperatorHashes)
: new StreamConfig(new Configuration());
setVertexConfig(currentNodeId, config, chainableOutputs, nonChainableOutputs);
if (currentNodeId.equals(startNodeId)) {
config.setChainStart(); //起始chain
config.setChainIndex(0);
config.setOperatorName(streamGraph.getStreamNode(currentNodeId).getOperatorName());
config.setOutEdgesInOrder(transitiveOutEdges);
config.setOutEdges(streamGraph.getStreamNode(currentNodeId).getOutEdges());
//连接边
for (StreamEdge edge : transitiveOutEdges) {
connect(startNodeId, edge);
}
config.setTransitiveChainedTaskConfigs(chainedConfigs.get(startNodeId));
} else {
//currentNodeId属于chain的一部分
Map<Integer, StreamConfig> chainedConfs = chainedConfigs.get(startNodeId);
if (chainedConfs == null) {
chainedConfigs.put(startNodeId, new HashMap<Integer, StreamConfig>());
}
config.setChainIndex(chainIndex);
StreamNode node = streamGraph.getStreamNode(currentNodeId);
config.setOperatorName(node.getOperatorName());
chainedConfigs.get(startNodeId).put(currentNodeId, config);
}
config.setOperatorID(new OperatorID(primaryHashBytes));
if (chainableOutputs.isEmpty()) {
config.setChainEnd();
}
//返回chain的出边
return transitiveOutEdges;
} else {
return new ArrayList<>();
}
}
整个过程就是一个递归的过程,createChain 过程就是不断寻找一个chain的出边,如果邻接的两个operator(source与filter)能够chain在一起,那么就以下一个能够chain一起的operator(filter)为起点,继续寻找,直到找到不能够chain一起的operator(process1),但是此时并没有立刻返回,而是以当前不能chain再一起的operator(process1)为起点继续往下寻找,直到终点(sink)开始一层一层返回,实际上其构造过程是一个反向过程:sink->process2->process1->(source&filter) 这样的一个过程完成operator chain并且构造JobVertix (可通过debug方式查看其详细过程)。
如何判断两个相邻的operator(StreamNode)能够chain在一起?通过isChainable方法判断:
代码语言:javascript复制public static boolean isChainable(StreamEdge edge, StreamGraph streamGraph) {
StreamNode upStreamVertex = streamGraph.getSourceVertex(edge);
StreamNode downStreamVertex = streamGraph.getTargetVertex(edge);
StreamOperator<?> headOperator = upStreamVertex.getOperator();
StreamOperator<?> outOperator = downStreamVertex.getOperator();
return downStreamVertex.getInEdges().size() == 1 //下游的数据流入只有一个节点
&& outOperator != null
&& headOperator != null
&& upStreamVertex.isSameSlotSharingGroup(downStreamVertex) //在同一个slotGroup中
&& outOperator.getChainingStrategy() == ChainingStrategy.ALWAYS //开启operator chain策略
&& (headOperator.getChainingStrategy() == ChainingStrategy.HEAD ||
headOperator.getChainingStrategy() == ChainingStrategy.ALWAYS) //head 表示的是一个起点
&& (edge.getPartitioner() instanceof ForwardPartitioner) //直接转发方式
&& upStreamVertex.getParallelism() == downStreamVertex.getParallelism() //并行度相同
&& streamGraph.isChainingEnabled(); //默认允许chain
}
第二个递归:OperatorChain生成
当Execution在deploy的过程中,也就是Task在TaskExecutor启动过程中, 会生成一个OperatorChain对象,在该OperatorChain对象中包含了所有的能够chain在一起的operator(source&filter),其内部会生成一个名为chainEntryPoint的WatermarkGaugeExposingOutput对象,一个将数据输出的对象,其输出有两种形式:
1.函数调用,将数据推送给chain在一起的下一个operator节点(filter中)
2.输出到下一个没有被chain的operator(process1)
那么chainEntryPoint是如何生成的?
OperatorChain的初始化是在StreamTask中被调用的:
代码语言:javascript复制operatorChain = new OperatorChain<>(this, recordWriters);
headOperator = operatorChain.getHeadOperator();
在其构造函数中调用:
代码语言:javascript复制this.chainEntryPoint = createOutputCollector(
containingTask, //当前的streamTask
configuration, //chain的第一个节点的StreamConfig
chainedConfigs, //该chain的所有StreamConfig
userCodeClassloader,
streamOutputMap,
allOps);
createOutputCollector获取当前节点的out,
代码语言:javascript复制private <T> WatermarkGaugeExposingOutput<StreamRecord<T>> createOutputCollector(
StreamTask<?, ?> containingTask,
StreamConfig operatorConfig,
Map<Integer, StreamConfig> chainedConfigs,
ClassLoader userCodeClassloader,
Map<StreamEdge, RecordWriterOutput<?>> streamOutputs,
List<StreamOperator<?>> allOperators) {
List<Tuple2<WatermarkGaugeExposingOutput<StreamRecord<T>>, StreamEdge>> allOutputs = new ArrayList<>(4);
// 当前operator的网络方式输出(filter->process1)
for (StreamEdge outputEdge : operatorConfig.getNonChainedOutputs(userCodeClassloader)) {
@SuppressWarnings("unchecked")
RecordWriterOutput<T> output = (RecordWriterOutput<T>) streamOutputs.get(outputEdge);
allOutputs.add(new Tuple2<>(output, outputEdge));
}
// chain out (source->filter)
for (StreamEdge outputEdge : operatorConfig.getChainedOutputs(userCodeClassloader)) {
int outputId = outputEdge.getTargetId();
StreamConfig chainedOpConfig = chainedConfigs.get(outputId);
//寻找被chain在一起的下一个operator(filter)的out
WatermarkGaugeExposingOutput<StreamRecord<T>> output = createChainedOperator(
containingTask,
chainedOpConfig,
chainedConfigs,
userCodeClassloader,
streamOutputs,
allOperators,
outputEdge.getOutputTag());
allOutputs.add(new Tuple2<>(output, outputEdge));
}
//最后将当前节点的out返回
.....
}
}
createChainedOperator方法:
代码语言:javascript复制private <IN, OUT> WatermarkGaugeExposingOutput<StreamRecord<IN>> createChainedOperator(
StreamTask<?, ?> containingTask,
StreamConfig operatorConfig,
Map<Integer, StreamConfig> chainedConfigs,
ClassLoader userCodeClassloader,
Map<StreamEdge, RecordWriterOutput<?>> streamOutputs,
List<StreamOperator<?>> allOperators,
OutputTag<IN> outputTag) {
// 调用createOutputCollector 获取当前operator的out
WatermarkGaugeExposingOutput<StreamRecord<OUT>> chainedOperatorOutput = createOutputCollector(
containingTask,
operatorConfig,
chainedConfigs,
userCodeClassloader,
streamOutputs,
allOperators);
// 获取当前的StreamOperator
OneInputStreamOperator<IN, OUT> chainedOperator = operatorConfig.getStreamOperator(userCodeClassloader);
//作为当前的out 传入setup方法中
chainedOperator.setup(containingTask, operatorConfig, chainedOperatorOutput);
allOperators.add(chainedOperator);
//将被chain的operator(filter)传给上一个operator(source)的out
//那么在out中就可以直接调用filter的处理source的输出数据了
WatermarkGaugeExposingOutput<StreamRecord<IN>> currentOperatorOutput;
if (containingTask.getExecutionConfig().isObjectReuseEnabled()) {
currentOperatorOutput = new ChainingOutput<>(chainedOperator, this, outputTag);
}
else {
TypeSerializer<IN> inSerializer = operatorConfig.getTypeSerializerIn1(userCodeClassloader);
currentOperatorOutput = new CopyingChainingOutput<>(chainedOperator, inSerializer, outputTag, this);
}
....
return currentOperatorOutput;
}
可以看出整个构造chainEntryPoint的过程也是一个递归的过程,会不断寻找被chain在一起的下一个operator的out,直到下游没有可chain的位置,返回网络out作为最后一个operator的out,然后通过使用当前operator构造前一个operator的out,同样是一个反向构造out的过程(filterOut->sourceOut)。简化一下逻辑代码:
代码语言:javascript复制createOut(currentOperator){
if(currentOperator.isNetworkOut){
currOut=networkOut();
}else{
currOut=creatOut(nextOperstor);
}
currentOperator.setOut(currOut);
preOut=CopyingOut(currentOperator);
return preOut;
}
最终得到的chainEntryPoint就是headOperator的out,这样在其内部不断的通过out调用operator的方式实现了chain的函数调用链关系。
总结
透过以上分析,operator chain就是将满足一定条件的的operator通过函数调用方式传递数据,避免了数据传输的中间过程。