源码分析系列推荐:
【Flink】第四篇:【迷思】对update语义拆解D-、I 后造成update原子性丢失
【Flink】第十二篇:记kudu-connector写CDC数据的-D数据时,报主键不存在的异常
【Flink】第十五篇:Redis Connector 数据保序思考
【Flink】第十六篇:源码角度分析 sink 端的数据一致性
【Flink】第十七篇:记一次牛轰轰的OOM故障排查
【Flink】第十九篇:从一个批量写HBase性能问题到一个Flink issue的距离
从本篇,笔者会从Flink Client开始,抽丝剥茧,循序渐进分析Flink的源码。受限于个人水平,必然会有很多错误,请私信我,以便对错误之处进行修改或说明。
本文从一个简单的WordCount程序入手,以DAG额绘制逻辑为线索,探索在执行execute提交作业之前的源码主线逻辑。
源码分析容易绕晕,所以,先将结论及分析线索进行一个说明,以期在读者脑海中铺设一个相似的语境,
核心抽象
主要涉及四个核心抽象:
- DataStream:面向开发者。用户调用DataStream API的算子方法,将业务逻辑封装为Function传入算子。从用户角度,形式上来讲,是对DataStream进行链式调用,每次调用都是一次业务逻辑语义的表达,即将DataStream进行一次转换。这种面相DataStream的转换操作符合用户角度的调用习惯和思维方式,从用户角度来看DAG中的每个节点是一种DataStream。
- Function:表达业务逻辑。用户面向DataStream表达的原始业务逻辑的封装。
- Transformation:表达上下游关系,组织成流水线,面向内核。用户调用DataStream API进行数据处理的一系列逻辑,最终会转换为Transformation流水线。从Flink角度来看,Flink面向的DAG中各个节点是Transformation。
- Operator:关注数据物理来源、序列化、数据转发、容错。Task包含一个或者多个算子,一个算子就是一个计算步骤,具体计算由算子中包含的Function来执行。
- 关系:Transformation持有Operator,Operator持有Function。每个DataStream包含一个Transformation。调用DataStream API的算子处理流水线,最终会转换为Transformation流水线。
思维导图如下,
下面逐一简单介绍这四个核心抽象,以便于后面的源码分析。
DataStream
子类继承关系:
只有两个成员变量:
每个DataStream都有一个Transformation对象,表示该DataStream从上游DataStream使用该Transformation而来
Function
按照层次划分Function:
由于Function的实现子类很多,就不一一列举了。简单介绍一下各个层次下的Function的特点:
1. 无状态Function用来做无状态计算,使用比较简单。和RichFunction是一一对应的。如MapFunction和RichMapFunction。
2. RichFunction有俩方面增强:
- 增加了open、close方法管理Function的生命周期。
- 增加了getRuntimeContext
3. ProccessFunction可以访问三方面的构件块:
- 事件(数据流元素)
- 状态(容错和一致性)
- 定时器(事件时间和处理时间)
Transformatio
数据转换(Transformation)衔接DataStream API和Flink内核。DataStream面向开发者,Transformation面向Flink内核,调用DataStream API的流水线最终会转换为Transformation流水线。Flink Client把Transformation流水线交给Environment,调用execute,在execute进一步将Transformation流水线转换为StreamGraph,接着再转换为JobGraph。
Transformation主要分为两类:
1. 物理Transformation:转换为实际运行的算子(Operator)
- ReduceTransformation
- BroadcastStateTransformation
- SinkTransformation
- TwoInputTransformation
- LegacySourceTransformation
- LegacySinkTransformation
- TimestampsAndWatermarksTransformation
- OneInputTransformation
- AbstractMultipleInputTransformation
- SourceTransformation
2. 虚拟Transformation:不会转换为实际运行的算子(Operator)
- PartitionTransformation
- UnionTransformation
- SideOutputTransformation
- FeedbackTransformation
- CoFeedbackTransformation
Operator
算子(StreamOperator)关注数据物理来源、序列化、数据转发、容错。Task包含一个或者多个算子,一个算子就是一个计算步骤,具体计算由算子中包含的Function来执行。
StreamOperator的直接子类有:
- MultipleInputStreamOperator
- TwoInputStreamOperator
- AbstractStreamOperatorV2
- OneInputStreamOperator
- AbstractStreamOperator
源码分析
从一个WordCount开始分析:
代码语言:javascript复制public class WindowWordCount {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env
.socketTextStream("127.0.0.1", 5555)
.flatMap(new Splitter())
.keyBy(value -> value.f0)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.sum(1)
.print();
env.execute("Window WordCount");
}
public static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception {
for (String word : sentence.split(" ")) {
out.collect(new Tuple2<>(word, 1));
}
}
}
}
以上是一个WordCount Demo:
- 从Socket为Source Stream
- flatMap切分输入行为若干单词
- 按照单词进行keyBy
- 以5秒为窗口,进行在处理时间属性上的滚动开窗
- 进行sum聚合求和
- print输出聚合值到控制台
1. socketTextStream
从socketTextStream方法进入到了Environment中,经过几层简单的socketTextStream重载方法到了addSource方法:
这里将重载过程增加的默认参数,例如,delimiter,maxRetry一起打包生成一个SocketTextStreamFunction实例(Function)。并调用addSource进行添加。
addSource再次经过一系列的对方法参数的富化重载,最终到了最内层的addSource重载:
这个函数里主要逻辑:
- 入参检查
- 从SourceFunction类型抽取输出类型,这里实例是SocketTextStreamFunction,输出类型抽取的结果是String.class
- 由这个Function生成一个Operator实例(StreamSource)
- 由Operator实例生成一个DataStream类型的实例(DataStreamSource),并返回
这里貌似看起来和Transformation没啥关系,其实,在生成DataStreamSource的构造函数里我们可以看到端倪:
这里将operator封装成了Transformation,并调用父类构造器,最终在DataStream的构造器里这样进行了成员变量的初始化:
所以,经过以上对socketTextStream的函数调用栈分析,结论是最终返回了一个DataSteam实例,并且实例中持有两个重要的实例:tansformation、environment。tansformation是对SocketTextStreamFunction的封装,environment是用于持有上下文环境。
对于这个阶段的时序图如下,
2. flatMap
接着上面,从Demo中进行第二个链式调用的方法是flatMap,源码中同样是对flatMap进行了几次富参数化的重载,重载过程中同样是对数据类型进行了抽取:
以及对默认的转换名进行了添加,以及对Function封装为Operator(StreamFlatMap)
进入最内层的doTransform,
以上doTransform的主要逻辑如下,
- 由上游DataStream持有transformation抽出上游输出的类型,在这里是String.class,即为一行行的socket文本
- 由operator、上游transformation和上游输出类型以及并行度生成Transformation实例(OneInputTransformation)
- 由生成的Transformation实例和environment实例生成本次转换后的输出流:SingleOutputStreamOperator(DataStream),并最终返回这个DataStream
- 由当前DataStream得到持有的environment,将本次的Transformation实例添加到environment
我们来看看这个很重要的addOperator,
其实就是将本次的Transformation添加到一个被environment持有的List里面,
至此,我们总结一下在Demo中的第二个链式调用的操作flatMap里,Flink都做了些什么:由上游的DataStream得出上游的输出类型以及上游调用过的Transformation,再结合本次的Transformation,来生成本次的DataStream,当然同样要将environment给本次的DataStream。
还有一个重要的操作是将本次的Transformation添加到了environment的一个List结构的transformations里。
本次的调用时序图如下,
3. keyBy
接着上游的SingleOutputStreamOperator流,keyBy对齐进行了分组,我们进入keyBy一探究竟。keyBy里面看似很简单,直接返回一个KeyedStream,
我们进入KeyedStream的构造方法,发现是一系列构造函数的重载,调用栈如下,
在重载过程中富化了一系列入参,例如,分区函数StreamPartitioner,又有分区函数StreamPartitioner和用户分装的选择器KeySelector,生成transformation实例:PartitionTransformation,最终调用了父类DataStream的构造器。同样和之前一样也是将transformation和environment传给了构造的DataStream实例。
但是整个过程并没有将transformation添加到transformations的List里,因为keyBy只是一个虚操作,在前面我们已经强调过,PartitionTransformation属于虚拟Transformation,而不是物理Transformation,只有物理Transformation才会转换为真正的执行节点交给Flink去进行绘制DAG。
后面的window、sum、print就不再一一分析了,基本思想已经被以上的三种调用覆盖了:socketTextStream、flatMap、keyBy。
经过一系列的链式调用,最终用户调用execute方法提交作业,这里提交的最重要的数据结构就是List:tansformations。至于execute里面又对tansformations做了些什么,且听下回分解。。。。