一、絮叨两句
新的一年又来了,不知道大家有没有立几个每年都完不成的 FLAG ?
反正我立了,我今年给自己立的 FLAG 是大致阅读大数据几个框架的源码。
为什么要“大致”阅读,因为这些牛逼的框架都是层层封装,搞懂核心原理已经是很不易,更别谈熟读源码了。
但是目标还是要有的,我也不要当一条咸鱼。
之前几篇源码阅读的文章,不知道大家有没有亲自动手打开 Idea 去试一试,这里我再贴一下文章链接,大家可以再回顾一下。
阅读 Flink 源码前必知必会 - SPI 和 ClassLoader
阅读 Flink 源码前必会的知识 - 命令行解析库 Apache Commons Cli
Flink 源码阅读环境准备,并调试 Flink-Clients 模块
Flink Client 实现原理与源码解析(保姆级教学)
本次,我们来聊一聊,我们自己写的代码是如何变成 StreamGraph 的。
二、引出问题
开始之前,不妨稍微回顾一下......
一般我们执行一个 Flink 程序,都是使用命令行 flink run(flink 界面上执行的时候,也是在调用 flink run 命令来执行的)来执行,然后shell 会使用 java 命令,执行到 CliFrontend 类的 main 方法。
main 方法里面,首先会解析用户的输入参数,解析 flink-conf.yml 配置文件,解压出用户 jar 包里的依赖,以及其他的信息,都封装到 PackagedProgram 对象中。然后切换当前线程的类加载器为 UserCodeClassLoader,这个类加载器自定义了一些策略(Child-First 或者 Parent-First),使用这个类加载器去反射执行用户代码的 main 方法。
然后今天的故事就从这里开始。
首先我们贴一段 Flink 自带的 Example 里的代码(稍稍简化了代码,去掉了无关的逻辑):
代码语言:javascript复制final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> text = env.fromElements(WordCountData.WORDS);
DataStream<Tuple2<String, Integer>> counts =
text.flatMap(new Tokenizer())
.keyBy(value -> value.f0).sum(1);
counts.print();
它是如何变成这张图的:
这张图是一个有向无环图,组成有向无环图的就是顶点信息,以及边的信息。
这些信息被封装在 StreamGraph 类之中,这个类中有三个非常重要的属性:
代码语言:javascript复制private Map<Integer, StreamNode> streamNodes;
private Set<Integer> sources;
private Set<Integer> sinks;
可以看到这几个属性记录了这个 Graph 中有几个节点,几个是 sources,几个是 sinks。
其中 StreamNode 是对节点的封装,节点上有几个重要的属性如下:
代码语言:javascript复制private final String operatorName;
private List<StreamEdge> inEdges = new ArrayList<StreamEdge>();
private List<StreamEdge> outEdges = new ArrayList<StreamEdge>();
operatorName 表示节点的名字,inEdges 表示这个节点上游的边,outEdges 表示这个节点下游的边。
然后,StreamEdge 是对边的封装,只有输入节点 id 和目标节点 id:
代码语言:javascript复制private final int sourceId;
private final int targetId;
这三个类的这几个属性就描述了刚刚的那张图。
三、记住一个非常重要的属性
它就是 StreamExecutionEnvironment 类的 transformations 属性:
代码语言:javascript复制protected final List<Transformation<?>> transformations = new ArrayList<>();
什么是 Transformation,Transformation 就是 Flink 对我们写的算子的额外信息的封装,比如算子的名字,id,输出类型,输入,并行度等等这些信息。
有些算子最终会调用 this.tranformations.add() 加入到列表里来,而有的不会。
四、从 env.fromElements() 开始
env.fromElements(),这是一个算子,这个算子定义了 source 信息,这个算子对应的 transformation 是 LegacySourceTransformation,里面记录了算子的id,名字,输出类型,并行度,有界还是无界等等信息。
最后这个方法返回的是一个 DataStreamSource 对象,这个对象的基类是 DataStream。DataStream 里有一个 transformation 属性。
也就是说 env.fromElements() 返回了一个 DataStream 对象,并且把它自身的 transformation 信息放到这个 DataStream 实例的属性里面了。
env.fromElements 这个算子是没有加入到 上面的 transformations 列表中去的。
五、FlatMap 算子源码分析
紧接着,上面的 env.fromElements 的返回值:DataStream 实例,调用了它自己的 flatMap 方法,flatMap 最终又调用了 doTransform 方法。
FlatMap 算子也是要构造一个 transformation 的,FlatMap 对应的 transformation 是 OneInputTransformation,这个类里有一个属性是 input,也就是 FlatMap 算子的输入信息。我们看一下它的构造方法
代码语言:javascript复制public OneInputTransformation(
Transformation<IN> input,
String name,
StreamOperatorFactory<OUT> operatorFactory,
TypeInformation<OUT> outputType,
int parallelism) {
super(name, outputType, parallelism);
this.input = input;
this.operatorFactory = operatorFactory;
}
再看一下调用信息
代码语言:javascript复制OneInputTransformation<T, R> resultTransform = new OneInputTransformation<>(
this.transformation,
operatorName,
operatorFactory,
outTypeInfo,
environment.getParallelism());
也就是说,FlatMap 的 transformation 信息中,有一个 input 属性,其值是 env.fromElements 的 transformation。
通俗点讲就是,FlatMap 的 transformation 中记录了它的输入是 env.fromElements() 。
最后返回了 SingleOutputStreamOperator 对象,这里面封装了 FlatMap 的 transformation 信息。
我们可以 debug 到这里来看看它的返回值:
然后需要关注的事情是,它最终调用了这个方法:
代码语言:javascript复制getExecutionEnvironment().addOperator(resultTransform);
代码语言:javascript复制public void addOperator(Transformation<?> transformation) {
Preconditions.checkNotNull(transformation, "transformation must not be null.");
this.transformations.add(transformation);
}
也就是加入到了 transformations 列表中去。
FlatMap 最后返回了一个 SingleOutputStreamOperator 类,这个类也是 DataStream 的子类。
所以,看到这基本能够理解,我们写的代码,其实本质都是 Flink 封装后对外暴露的简单易用的 api,Flink 在背后做了大部分事情。
六、KeyBy 算子源码分析
keyBy 也是 DataStream 的一个方法,它 new 了一个 KeyedStream,并且把 this 传入了构造函数中,this 是什么?this 就是刚刚 FlatMap 的返回值,还记得吗?里面记录了 FlatMap 的 transformation。
keyBy 对应的 transformation 是 PartitionTransformation,里面也有 input 属性,直接把 this.getTransformation() 传给了 input 了。
我们来 debug 看一下返回值:
有点像套娃,一层又一层的。
需要注意的是,KeyBy 只是一个虚拟的节点,它并没有加入到 transformations 列表中来。
七、sum 算子的源码分析
这个我们就不细看了,套路都差不多了,直接 debug 看一下返回值:
sum 算子有调用这个方法:
代码语言:javascript复制getExecutionEnvironment().addOperator(reduce);
加入到了 transformations 属性中来。
八、sink 算子的源码分析
和 sum 一样,我们直接 debug 一下最终的结果:
可见 sink 中,也套娃式的记录了所有的 input。
最后,sink 也调用了
代码语言:javascript复制getExecutionEnvironment().addOperator(sink.getTransformation());
九、生成 StreamGraph
这个生成的过程,就是递归遍历 transformations 列表中的每一个值及其输入,根据不同的情况,使用不同的逻辑来构建 StreamGraph。
(这个我们下次讲,哈哈!裤子都脱了,给我看这个?不要着急,慢慢来)