上一篇中我们分析了一个简单的flink wordcount程序由DataStream的transformation列表转换成StreamGraph的过程,紧接着上文的步骤,本文我们着重分析一下从streamGraph生成jobGraph的过程。
背景
上一篇中我们分析了StreamGraph的生成,StreamGraph的大致结构如下:
分析入口
对于stream模式的执行会在生成StreamGraph之后都会进入到StreamExecutionEnvironment#executeAsync(org.apache.flink.streaming.api.graph.StreamGraph):
代码语言:javascript复制@Internal
public JobClient executeAsync(StreamGraph streamGraph) throws Exception {
checkNotNull(streamGraph, "StreamGraph cannot be null.");
checkNotNull(configuration.get(DeploymentOptions.TARGET), "No execution.target specified in your configuration file.");
final PipelineExecutorFactory executorFactory =
executorServiceLoader.getExecutorFactory(configuration);
checkNotNull(
executorFactory,
"Cannot find compatible factory for specified execution.target (=%s)",
configuration.get(DeploymentOptions.TARGET));
CompletableFuture<JobClient> jobClientFuture = executorFactory
.getExecutor(configuration)
.execute(streamGraph, configuration, userClassloader);
--------------省略部分代码----------------------
这里主要有两步操作:
•通过executorServiceLoader获取executorFactory,它是PipelineExecutorFactory类型的,主要为streamGraph的执行提供executor,它的实现类主要有以下几种:
•通过executorFactory获取executor,然后使用executor执行streamGraph,这里我们先以LocalExecutor为例。LocalExecutor#execute方法代码如下:
代码语言:javascript复制 @Override
public CompletableFuture<JobClient> execute(Pipeline pipeline, Configuration configuration, ClassLoader userCodeClassloader) throws Exception {
checkNotNull(pipeline);
checkNotNull(configuration);
// 有效的配置
Configuration effectiveConfig = new Configuration();
effectiveConfig.addAll(this.configuration);
effectiveConfig.addAll(configuration);
// we only support attached execution with the local executor.
checkState(configuration.getBoolean(DeploymentOptions.ATTACHED));
// 获取了jobGraph
final JobGraph jobGraph = getJobGraph(pipeline, effectiveConfig);
return PerJobMiniClusterFactory.createWithFactory(effectiveConfig, miniClusterFactory).submitJob(jobGraph, userCodeClassloader);
}
可以看到,这个方法中会选进行jobGraph的生成,然后将jobGraph提交到miniCluster上去进行下一步的操作。LocalExecutor的getJobGraph方法将是本文分析的重点。
LocalExecutor#getJobGraph
这里我们就不再一步步地贴代码了,直接来看整个调用链路:
StreamingJobGraphGenerator中包含streamGraph的全部信息,并最终由它来进行jobGraph的生成,我们接下来重点分析一下。
StreamingJobGraphGenerator#createJobGraph()方法
生成jobGraph的方法代码如下:
代码语言:javascript复制private JobGraph createJobGraph() {
preValidate();
// streaming模式下都是EAGER模式
// make sure that all vertices start immediately
jobGraph.setScheduleMode(streamGraph.getScheduleMode());
// 返回是否启用了近似本地恢复
jobGraph.enableApproximateLocalRecovery(streamGraph
.getCheckpointConfig()
.isApproximateLocalRecoveryEnabled());
// Generate deterministic hashes for the nodes in order to identify them across
// submission iff they didn't change.
// hash值用来标识JobVertexID,以便在作业提交时识别节点(如果它们没有更改)。这里会按顺序遍历整个streamGraph,为所有StreamNode都生成一个唯一的hash值。
Map<Integer, byte[]> hashes = defaultStreamGraphHasher.traverseStreamGraphAndGenerateHashes(
streamGraph);
// Generate legacy version hashes for backwards compatibility
List<Map<Integer, byte[]>> legacyHashes = new ArrayList<>(legacyStreamGraphHashers.size());
// 在org.apache.flink.streaming.api.graph.StreamGraphGenerator.legacyTransform中对应的FeedBackTransformation中会存在用户自定义hash值的情况,这里会根据特定的hasher来处理streamGraph中这种类型的节点的hash值
for (StreamGraphHasher hasher : legacyStreamGraphHashers) {
legacyHashes.add(hasher.traverseStreamGraphAndGenerateHashes(streamGraph));
}
// 输入新版本的hashes值和旧版本的legacyHashes
// 设置算子链
setChaining(hashes, legacyHashes);
// 物理边,主要为Edge的目标顶点设置特理入边,并进入序列化。主要用有序的物理边及其typeNumber来控制多输入时InputGate的顺序;多输入的情况下typeNumber是在1的基础上以步长为1增加的
setPhysicalEdges();
// 设置slot sharing group和colocation
setSlotSharingAndCoLocation();
// 设置内存
setManagedMemoryFraction(
Collections.unmodifiableMap(jobVertices),
Collections.unmodifiableMap(vertexConfigs),
Collections.unmodifiableMap(chainedConfigs),
id -> streamGraph.getStreamNode(id).getManagedMemoryOperatorScopeUseCaseWeights(),
id -> streamGraph.getStreamNode(id).getManagedMemorySlotScopeUseCases());
// 配置checkpoint
configureCheckpointing();
// 配置savepoint
jobGraph.setSavepointRestoreSettings(streamGraph.getSavepointRestoreSettings());
JobGraphUtils.addUserArtifactEntries(streamGraph.getUserArtifacts(), jobGraph);
// set the ExecutionConfig last when it has been finalized
try {
jobGraph.setExecutionConfig(streamGraph.getExecutionConfig());
} catch (IOException e) {
throw new IllegalConfigurationException("Could not serialize the ExecutionConfig."
"This indicates that non-serializable types (like custom serializers) were registered");
}
return jobGraph;
}
该方法比较长,我们拆开为以下几步来进行分析:
1.为每个StreamNode生成一个唯一的hash值;2.将能够chain在一起的StreamNode chain在一起,生成StreamConfig和JobVertex、JobEdge等信息;3.设置物理边信息;4.设置slot sharing group和co-location;5.设置内存、checkpoint、savepoint和其他用户自定义信息。
1. 为每个StreamNode生成一个唯一的hash值
目前分析的flink 1.12.0版本的源码中维护着两种hash值生成方案,一种是默认的hash值生成,另一种是用户自定义的。代码如下:
代码语言:javascript复制Map<Integer, byte[]> hashes = defaultStreamGraphHasher.traverseStreamGraphAndGenerateHashes(
streamGraph);
// Generate legacy version hashes for backwards compatibility
List<Map<Integer, byte[]>> legacyHashes = new ArrayList<>(legacyStreamGraphHashers.size());
// 在org.apache.flink.streaming.api.graph.StreamGraphGenerator.legacyTransform中对应的FeedBackTransformation中会存在用户自定义hash值的情况,这里会根据特定的hasher来处理streamGraph中这种类型的节点的hash值
for (StreamGraphHasher hasher : legacyStreamGraphHashers) {
legacyHashes.add(hasher.traverseStreamGraphAndGenerateHashes(streamGraph));
}
•defaultStreamGraphHasher是StreamGraphHasherV2类型的,如果用户指定了transformationId,会以transformationId的值来生成该节点的hash值;否则会使用节点的输入输出信息及输入节点的hash值来进行hash值的生成。这里会生成一份nodeId与节点hash值的映射关系;•legacyStreamGraphHashers在当前版本中为StreamGraphUserHashHasher,它主要是根据StreamNode中的userHash值来作为结果map的value。
2.设置算子链
从source节点为起点进行递归遍历来获取最终的jobGraph。我们来看一下代码:
代码语言:javascript复制/**
* Sets up task chains from the source {@link StreamNode} instances.
*
* <p>This will recursively create all {@link JobVertex} instances.
*/
private void setChaining(Map<Integer, byte[]> hashes, List<Map<Integer, byte[]>> legacyHashes) {
// we separate out the sources that run as inputs to another operator (chained inputs)
// from the sources that needs to run as the main (head) operator.
final Map<Integer, OperatorChainInfo> chainEntryPoints = buildChainedInputsAndGetHeadInputs(
hashes,
legacyHashes);
// 初始的值,内部是array copy,@see java.util.ArrayList.toArray()
final Collection<OperatorChainInfo> initialEntryPoints = new ArrayList<>(chainEntryPoints.values());
// iterate over a copy of the values, because this map gets concurrently modified
for (OperatorChainInfo info : initialEntryPoints) {
createChain(
// 从source节点开始
info.getStartNodeId(),
1, // operators start at position 1 because 0 is for chained source inputs
info,
chainEntryPoints);
}
}
在buildChainedInputsAndGetHeadInputs方法内部会将source节点及ChainingStrategy为HEAD_WITH_SOURCES的节点生成OperatorChainInfo对象。然后进入到createChain方法,我们直接来看代码:
代码语言:javascript复制 private List<StreamEdge> createChain(
final Integer currentNodeId,
final int chainIndex,
final OperatorChainInfo chainInfo,
final Map<Integer, OperatorChainInfo> chainEntryPoints) {
Integer startNodeId = chainInfo.getStartNodeId();
if (!builtVertices.contains(startNodeId)) {
// 记录不能chain在一起的OperatorChainInfo的起始节点,为了后面OperatorChainInfo的connect做准备
List<StreamEdge> transitiveOutEdges = new ArrayList<StreamEdge>();
List<StreamEdge> chainableOutputs = new ArrayList<StreamEdge>();
List<StreamEdge> nonChainableOutputs = new ArrayList<StreamEdge>();
StreamNode currentNode = streamGraph.getStreamNode(currentNodeId);
//遍历当前节点的出边,放到chainableOutputs和nonChainableOutputs列表中
for (StreamEdge outEdge : currentNode.getOutEdges()) {
if (isChainable(outEdge, streamGraph)) {
chainableOutputs.add(outEdge);
} else {
nonChainableOutputs.add(outEdge);
}
}
//处理当前节点可以chain在一起的outputs
for (StreamEdge chainable : chainableOutputs) {
// 所有可以进行Chain的出边也都进行处理
transitiveOutEdges.addAll(
createChain(
chainable.getTargetId(),
chainIndex 1,
chainInfo,// 可以chain在一起的出边也会与它的source节点共用一个chainInfo
chainEntryPoints));
}
for (StreamEdge nonChainable : nonChainableOutputs) {// 不能chain在一起的在这里递归处理
transitiveOutEdges.add(nonChainable);
createChain(
nonChainable.getTargetId(),
1, // operators start at position 1 because 0 is for chained source inputs
chainEntryPoints.computeIfAbsent(
nonChainable.getTargetId(),
// 创建新的chainInfo,它的startId为其目标节点的id。因为它本身已经放到了transitiveOutEdges中
(k) -> chainInfo.newChain(nonChainable.getTargetId())),
chainEntryPoints);
}
// 节点id和链名称的映射
chainedNames.put(
currentNodeId,
createChainedName(
currentNodeId,
chainableOutputs,
Optional.ofNullable(chainEntryPoints.get(currentNodeId))));
// 当前节点的资源情况
chainedMinResources.put(
currentNodeId,
createChainedMinResources(currentNodeId, chainableOutputs));
chainedPreferredResources.put(
currentNodeId,
createChainedPreferredResources(currentNodeId, chainableOutputs));
// 当前的chainInfo,可能包括多个节点。把当前节点加入到chain中来(包括节点hashes和legacyHashes)
OperatorID currentOperatorId = chainInfo.addNodeToChain(
currentNodeId,
chainedNames.get(currentNodeId));
// 如果A->B可以chain在一起,最后的情况是B结点先运行到这里,然后A结点运行到这里。它们的startNodeId都是A节点的nodeId
if (currentNode.getInputFormat() != null) {
// 如果当前节点的inputFormat不为空,则创建formatContainer
getOrCreateFormatContainer(startNodeId).addInputFormat(
currentOperatorId,
currentNode.getInputFormat());// input类型的,放入chainedInputOutputFormats中,在创䢖JobVertex时会有所不同
}
if (currentNode.getOutputFormat() != null) {
// 如果有outPutFormat则指定outputFormat
getOrCreateFormatContainer(startNodeId).addOutputFormat(
currentOperatorId,
currentNode.getOutputFormat());// output类型的,放入chainedInputOutputFormats中,在创建Jobvertex时会有所不同
}
// StreamConfig是可序列化的,如果当前节点为chain的头节点的话,则创建JobVertex,否则创建StreamConfig
// 头节点运行到这里发生在同一个chain中的节点的后面;也就是说对每个链来说这一步是从后向前执行的
StreamConfig config = currentNodeId.equals(startNodeId)
? createJobVertex(startNodeId, chainInfo)
: new StreamConfig(new Configuration());
// 设置顶点配置,如果当前节点是startNode则会在上面一步中生成JobVertex并返回该vertex的StreamConfig,如果不是startNode则会在上一步中创建新的StreamConfig;头节点运行到这里发生在同一个chain中的节点的后面
setVertexConfig(
currentNodeId,
config,
chainableOutputs,
nonChainableOutputs,
chainInfo.getChainedSources());
if (currentNodeId.equals(startNodeId)) {
// 如果当前节点就是chain的头节点
config.setChainStart();
// 设置chainIndex
config.setChainIndex(chainIndex);
config.setOperatorName(streamGraph.getStreamNode(currentNodeId).getOperatorName());
for (StreamEdge edge : transitiveOutEdges) {
// 这里会连接上一个算子链(可能也只有一个节点,也可能是多个节点chain在一起的,如果该链是最下游的节点则无需进行connect)与下游的不能chain在一起的出边对应的node
connect(
startNodeId,
edge);// 主要是生成jobEdge,而且整个jobVertex的生成是从后向前的,到这个节点时证明它的下游节点的jobVertex都已经创建成功了。这里会生成jobEdge并放到下游节点的inputs列表中去。
}
// 设置它的有序输出边
config.setOutEdgesInOrder(transitiveOutEdges);
// 添加能chain在一起的StreamNode的配置,会被序列化放到chainedConfigs; 如果两个节点被chain在一起,这里的chainedConfigs中会有两条记录(头节点是最后执行到这里的)
config.setTransitiveChainedTaskConfigs(chainedConfigs.get(startNodeId));
} else {
chainedConfigs.computeIfAbsent(
startNodeId,
k -> new HashMap<Integer, StreamConfig>());
config.setChainIndex(chainIndex);// 设置在链中的索引,只有非头节点会生成chainedConfigs内容,头节点会通过StreamConfig#setTransitiveChainedTaskConfigs将这些配置放到它的StreamConfig中去
StreamNode node = streamGraph.getStreamNode(currentNodeId);
config.setOperatorName(node.getOperatorName());
chainedConfigs.get(startNodeId).put(currentNodeId, config);
}
// 设置operatorID,里面能取到该节点的primaryHash值
config.setOperatorID(currentOperatorId);
// chainableOutputs对应的节点就是当前chain的最终节点了,一个节点可能既是头节点又是结尾节点(上下都没有chain的情况)
if (chainableOutputs.isEmpty()) {
config.setChainEnd();
}
// 每一层往外返回的都是outEdge,最终返回的会是所有的StreamEdge列表
return transitiveOutEdges;
} else {
return new ArrayList<>();
}
}
大致逻辑如下:
由于streamGraph的StreamNode之间是通过StreamEdge来连接的,所以这里对每个节点的出边进行迭代,把能chain在一起的chain在一起,然后生成JobVertex、JobEdge和IntermediateDataSet。
对于能chain在一起的节点,会延用其source节点的OperatorChainInfo并递增chainIndex向下递归,不能chain在一起的节点会新建OperatorChainInfo往下递归。
整个StreamNode链,递归的结果是从后往前处理剩余非递归部分的逻辑的。只会在每个子链的头结点的部分生成JobVertex和JobEdge及IntermediateDataSet;如果不是每个子链的startNode,则证明目前处理的还是每个子链的下游节点部分,会将该节点需要处理的信息放到StreamConfig对象中。如果是每个子链的startNode,则证明它的下游节点已经处理完成了,会生成JobVertex并将JobVertex节点信息放到StreamConfig中返回回来。
当递归处理时处理到的节点为每个子链的头节点时,会进行连接操作,将该节点与其下游子链的头节点进行连接,生成JobEdge和IntermediateDataSet(这部分的逻辑在StreamingJobGraphGenerator#connect方法中),这里会指定连接方式是POINTWISE还是ALL_TO_ALL。同时将当前建立连接的出边信息放到physicalEdgesInOrder列表中,这里维护的是有序的物理边,从后往前有序,在下文物理边的设置中会使用到。
需要注意的是chain在一起的节点的StreamConfig是怎么处理的,迭代过程中有个判断,如果是头节点的话会进行如下设置:
代码语言:javascript复制 // 添加能chain在一起的StreamNode的配置,会被序列化放到chainedConfigs; 如果两个节点被chain在一起,这里的chainedConfigs中会有两条记录(头节点是最后执行到这里的)
config.setTransitiveChainedTaskConfigs(chainedConfigs.get(startNodeId));
public void setTransitiveChainedTaskConfigs(Map<Integer, StreamConfig> chainedTaskConfigs) {
try {
InstantiationUtil.writeObjectToConfig(chainedTaskConfigs, this.config, CHAINED_TASK_CONFIG);
} catch (IOException e) {
throw new StreamTaskException("Could not serialize configuration.", e);
}
}
chainedConfigs里面是存放所有能chain在一起的节点的容器,以startNodeId为key,非头节点的nodeId和StreamConfig的映射为value。这里会将这些信息存到头节点配置的chainedTaskConfig_中去。
这setChaining方法执行完之后,StreamingJobGraphGenerator中的属性大致如下图:
可以看到每个StreamNode都对应一个StreamConfig对象。节点4和节点5是chain在一起的,其中节点4为头节点。这一点在jobGraph中也有体现,在有chain在一起的JobVertex中有两个operatorId:
需要注意的是:这个operatorID主要是给checkpoint使用的。
到这里大家应该会有个疑问,那就是在生成OneInputStreamTask的时候,这个有两个operator chain在一起的这个JobVertexID最后是怎么识别另外一个算子操作的呢(在ExecutionGraph中也不涉及到这部分)?
这里我们简单地提一下,在StreamTask的beforeInvoke方法中会为每一个Task生成一个OperatorChain对象,在创建outputCollector时会根据chain的情况生成对应的operator放到allOperatorWrappers中。
关于这一点我们后续会用专门的篇幅来分析,这里顺带着提一下。这里会对当前task涉及到的operatorConfig生成对应的operator,将task的输入和输出进行一个串联,对于chain在一起的operator之间不会涉及数据的网络传输。
在setChaining操作之后生成的jobGraph主体结构如下图:
3. 设置物理边
直接来看代码:
代码语言:javascript复制private void setPhysicalEdges() {
Map<Integer, List<StreamEdge>> physicalInEdgesInOrder = new HashMap<Integer, List<StreamEdge>>();
// 有序的StreamEdge
for (StreamEdge edge : physicalEdgesInOrder) {// 遍历有序的物理边集合
int target = edge.getTargetId();
List<StreamEdge> inEdges = physicalInEdgesInOrder.computeIfAbsent(
target,
k -> new ArrayList<>());// 如果不存在返回新创建的list
inEdges.add(edge);// 将边添加进去,形成targetId->edge集合之间的映射
}
// 为什么是从前往后找,为什么要找targetId? 因为下面要设置JobVertex顶点的streamConfig的InPhysicalEdges
for (Map.Entry<Integer, List<StreamEdge>> inEdges : physicalInEdgesInOrder.entrySet()) {
int vertex = inEdges.getKey();// 目标顶点id
List<StreamEdge> edgeList = inEdges.getValue();// 相对于目标顶点来说这是入边
// 物理边设置的还是StreamEdge
// 对于多输入的task有用,主要用于确定输入的顺序,在jobEdge中是没办法确定这种顺序的
vertexConfigs.get(vertex).setInPhysicalEdges(edgeList);// 对目标顶点设置物理入边,序列化操作
}
}
这里会遍历physicalEdgesInOrder列表,它是一个有序的物理边集合,在wordcount实例中,它运行到这里时的实际值如下图:
顺序是从后往前。setPhysicalEdges方法的作用无非是向Flat Map节点的StreamConfig和Keyed Aggregation节点的StreamConfig中设置入边列表,并进行序列化操作。
4. 设置slot sharing group和co-location
这一部分会先进行region的划分(涉及到连通器算法),然后设置slot sharingGroup信息。再根据slot sharingGroup的信息对设置了coLocationGroupKey的节点进行coLocationGroups的划分。主要作用还是体现在slot资源调度上,关于这点的详细信息可以参考博客:http://chenyuzhao.me/2017/02/09/flink-scheduler/
5. 其他设置
进行内存、checkpoint、savepoint、用户文件等的设置,最终形成的jobGraph结构如下:
总结
通过与streamGraph的对比我们可以发现,jobGraph是对streamGraph进行了一定优化处理后的结果,如一些operator的chaining操作,slot sharing group与coLocationGroupKey的设置,JobVertex之间连接拓朴的变化(在JobEdge后面添加了IntermediateDataSet)等。
对于从StreamGraph到JobGraph的变化,主要总结如下(来自Jark's Blog的一段介绍[1],里面对为何flink要用多层图来进行任务处理也有详细解释):
•StreamGraph:根据用户通过 Stream API 编写的代码生成的最初的图。•StreamNode:用来代表 operator 的类,并具有所有相关的属性,如并发度、入边和出边等。•StreamEdge:表示连接两个StreamNode的边。•JobGraph:StreamGraph经过优化后生成了 JobGraph,提交给 JobManager 的数据结构。•JobVertex:经过优化后符合条件的多个StreamNode可能会chain在一起生成一个JobVertex,即一个JobVertex包含一个或多个operator,JobVertex的输入是JobEdge,输出是IntermediateDataSet。•IntermediateDataSet:表示JobVertex的输出,即经过operator处理产生的数据集。producer是JobVertex,consumer是JobEdge。•JobEdge:代表了job graph中的一条数据传输通道。source 是 IntermediateDataSet,target 是 JobVertex。即数据通过JobEdge由IntermediateDataSet传递给目标JobVertex。
References
[1]
Jark's Blog的一段介绍: http://wuchong.me/blog/2016/05/03/flink-internals-overview/