Flink 是如何将你写的代码生成 StreamGraph 的 (上篇)

2021-03-13 22:25:13 浏览数 (1)

一、絮叨两句

新的一年又来了,不知道大家有没有立几个每年都完不成的 FLAG ?

反正我立了,我今年给自己立的 FLAG 是大致阅读大数据几个框架的源码。

为什么要“大致”阅读,因为这些牛逼的框架都是层层封装,搞懂核心原理已经是很不易,更别谈熟读源码了。

但是目标还是要有的,我也不要当一条咸鱼。

之前几篇源码阅读的文章,不知道大家有没有亲自动手打开 Idea 去试一试,这里我再贴一下文章链接,大家可以再回顾一下。

阅读 Flink 源码前必知必会 - SPI 和 ClassLoader

阅读 Flink 源码前必会的知识 - 命令行解析库 Apache Commons Cli

Flink 源码阅读环境准备,并调试 Flink-Clients 模块

Flink Client 实现原理与源码解析(保姆级教学)

本次,我们来聊一聊,我们自己写的代码是如何变成 StreamGraph 的。

二、引出问题

开始之前,不妨稍微回顾一下......

一般我们执行一个 Flink 程序,都是使用命令行 flink run(flink 界面上执行的时候,也是在调用 flink run 命令来执行的)来执行,然后shell 会使用 java 命令,执行到 CliFrontend 类的 main 方法。

main 方法里面,首先会解析用户的输入参数,解析 flink-conf.yml 配置文件,解压出用户 jar 包里的依赖,以及其他的信息,都封装到 PackagedProgram 对象中。然后切换当前线程的类加载器为 UserCodeClassLoader,这个类加载器自定义了一些策略(Child-First 或者 Parent-First),使用这个类加载器去反射执行用户代码的 main 方法。

然后今天的故事就从这里开始。

首先我们贴一段 Flink 自带的 Example 里的代码(稍稍简化了代码,去掉了无关的逻辑):

代码语言:javascript复制
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> text = env.fromElements(WordCountData.WORDS);

DataStream<Tuple2<String, Integer>> counts =
    text.flatMap(new Tokenizer())
    .keyBy(value -> value.f0).sum(1);

counts.print();

它是如何变成这张图的:

这张图是一个有向无环图,组成有向无环图的就是顶点信息,以及边的信息。

这些信息被封装在 StreamGraph 类之中,这个类中有三个非常重要的属性:

代码语言:javascript复制
private Map<Integer, StreamNode> streamNodes;
private Set<Integer> sources;
private Set<Integer> sinks;

可以看到这几个属性记录了这个 Graph 中有几个节点,几个是 sources,几个是 sinks。

其中 StreamNode 是对节点的封装,节点上有几个重要的属性如下:

代码语言:javascript复制
private final String operatorName;
private List<StreamEdge> inEdges = new ArrayList<StreamEdge>();
private List<StreamEdge> outEdges = new ArrayList<StreamEdge>();

operatorName 表示节点的名字,inEdges 表示这个节点上游的边,outEdges 表示这个节点下游的边。

然后,StreamEdge 是对边的封装,只有输入节点 id 和目标节点 id:

代码语言:javascript复制
private final int sourceId;
private final int targetId;

这三个类的这几个属性就描述了刚刚的那张图。

三、记住一个非常重要的属性

它就是 StreamExecutionEnvironment 类的 transformations 属性:

代码语言:javascript复制
protected final List<Transformation<?>> transformations = new ArrayList<>();

什么是 Transformation,Transformation 就是 Flink 对我们写的算子的额外信息的封装,比如算子的名字,id,输出类型,输入,并行度等等这些信息。

有些算子最终会调用 this.tranformations.add() 加入到列表里来,而有的不会。

四、从 env.fromElements() 开始

env.fromElements(),这是一个算子,这个算子定义了 source 信息,这个算子对应的 transformation 是 LegacySourceTransformation,里面记录了算子的id,名字,输出类型,并行度,有界还是无界等等信息。

最后这个方法返回的是一个 DataStreamSource 对象,这个对象的基类是 DataStream。DataStream 里有一个 transformation 属性。

也就是说 env.fromElements() 返回了一个 DataStream 对象,并且把它自身的 transformation 信息放到这个 DataStream 实例的属性里面了。

env.fromElements 这个算子是没有加入到 上面的 transformations 列表中去的。

五、FlatMap 算子源码分析

紧接着,上面的 env.fromElements 的返回值:DataStream 实例,调用了它自己的 flatMap 方法,flatMap 最终又调用了 doTransform 方法。

FlatMap 算子也是要构造一个 transformation 的,FlatMap 对应的 transformation 是 OneInputTransformation,这个类里有一个属性是 input,也就是 FlatMap 算子的输入信息。我们看一下它的构造方法

代码语言:javascript复制
public OneInputTransformation(
      Transformation<IN> input,
      String name,
      StreamOperatorFactory<OUT> operatorFactory,
      TypeInformation<OUT> outputType,
      int parallelism) {
   super(name, outputType, parallelism);
   this.input = input;
   this.operatorFactory = operatorFactory;
}

再看一下调用信息

代码语言:javascript复制
OneInputTransformation<T, R> resultTransform = new OneInputTransformation<>(
      this.transformation,
      operatorName,
      operatorFactory,
      outTypeInfo,
      environment.getParallelism());

也就是说,FlatMap 的 transformation 信息中,有一个 input 属性,其值是 env.fromElements 的 transformation。

通俗点讲就是,FlatMap 的 transformation 中记录了它的输入是 env.fromElements() 。

最后返回了 SingleOutputStreamOperator 对象,这里面封装了 FlatMap 的 transformation 信息。

我们可以 debug 到这里来看看它的返回值:

然后需要关注的事情是,它最终调用了这个方法:

代码语言:javascript复制
getExecutionEnvironment().addOperator(resultTransform);
代码语言:javascript复制
public void addOperator(Transformation<?> transformation) {
   Preconditions.checkNotNull(transformation, "transformation must not be null.");
   this.transformations.add(transformation);
}

也就是加入到了 transformations 列表中去。

FlatMap 最后返回了一个 SingleOutputStreamOperator 类,这个类也是 DataStream 的子类。

所以,看到这基本能够理解,我们写的代码,其实本质都是 Flink 封装后对外暴露的简单易用的 api,Flink 在背后做了大部分事情。

六、KeyBy 算子源码分析

keyBy 也是 DataStream 的一个方法,它 new 了一个 KeyedStream,并且把 this 传入了构造函数中,this 是什么?this 就是刚刚 FlatMap 的返回值,还记得吗?里面记录了 FlatMap 的 transformation。

keyBy 对应的 transformation 是 PartitionTransformation,里面也有 input 属性,直接把 this.getTransformation() 传给了 input 了。

我们来 debug 看一下返回值:

有点像套娃,一层又一层的。

需要注意的是,KeyBy 只是一个虚拟的节点,它并没有加入到 transformations 列表中来。

七、sum 算子的源码分析

这个我们就不细看了,套路都差不多了,直接 debug 看一下返回值:

sum 算子有调用这个方法:

代码语言:javascript复制
getExecutionEnvironment().addOperator(reduce);

加入到了 transformations 属性中来。

八、sink 算子的源码分析

和 sum 一样,我们直接 debug 一下最终的结果:

可见 sink 中,也套娃式的记录了所有的 input。

最后,sink 也调用了

代码语言:javascript复制
getExecutionEnvironment().addOperator(sink.getTransformation());

九、生成 StreamGraph

这个生成的过程,就是递归遍历 transformations 列表中的每一个值及其输入,根据不同的情况,使用不同的逻辑来构建 StreamGraph。

(这个我们下次讲,哈哈!裤子都脱了,给我看这个?不要着急,慢慢来)

0 人点赞