源码分析系列推荐:
【Flink】第四篇:【迷思】对update语义拆解D-、I 后造成update原子性丢失
【Flink】第十五篇:Redis Connector 数据保序思考
【Flink】第十六篇:源码角度分析 sink 端的数据一致性
【Flink】第二十四篇:源码角度分析 DataStream API 调用逻辑
【Flink】第二十五篇:源码角度分析作业提交逻辑
继上篇 【Flink】第二十五篇:源码角度分析作业提交逻辑 我们分析了Flink在执行execute提交作业前,将用户编写的业务UDF逻辑封装成List<Transformation>数据结构,然后,在执行execute提交作业中,又用递归算法将其绘制成DAG数据结构,并且进行了四层的DAG转换,最终,转换为可调度的ExecutionGraph。
本文接着分析Task被调度到TaskManager上后,Task是如何处理输入数据和输出数据。
依旧以socket window wordcount程序为例,
代码语言:javascript复制public class WindowWordCount {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);
env
.socketTextStream("127.0.0.1", 5555)
.flatMap(new Splitter())
.keyBy(value -> value.f0)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.sum(1)
.print();
env.execute("Window WordCount");
}
public static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception {
for (String word : sentence.split(" ")) {
out.collect(new Tuple2<>(word, 1));
}
}
}
}
这次笔者尝试优化之前文章的行文逻辑,将结论进行切分,然后一段结论再结合一段源码分析,本文的主要线索以wrodcount程序处理一次输入数据的过程为线索,在探索这个线索的过程中,以期能达到抽丝剥茧的方式为读者呈现。主要内容有两点:
- 从TaskManager的subtask线程如何执行调用到了用户的自定义UDF业务逻辑代码
- 输入分区和输出分区的对应关系
直接上flatmap算子的调用栈,如下
可以看到,栈底是Thread,这个Thread应该是MiniCluster启动的subtask的执行线程,在往上就是flink抽象的运行时角色的实例了,例如,Task,StreamTask,自底向上逐渐由面向Thread的层面过渡到面向flink的udf用户逻辑层面。
我们直接从Task调起StreamTask的入口看起,Task将接收到的输入数据给了StreamTask的processInput,
StreamTask又将其交给inputProcessor(StreamInputProcessor)处理。
而在StreamTask与StreamInputProcessor之间使用了Mailbox线程模型,它是一个单线程的模型,在此只做简单介绍,
先来看下这个改造/改进最初的动机,在之前 Flink 的线程模型中,会有多个潜在的线程去并发访问其内部的状态,比如 event-processing 和 checkpoint triggering,它们都是通过一个全局锁(checkpoint lock)来保证线程安全,这种实现方案带来的问题是:
1. 锁对象会在多个类中传递,代码的可读性比较差
2. 在使用时,如果没有获取锁,可能会造成很多问题,使得问题难以定位
3. 锁对象还暴露给了面向用户的 API
基于上面的这些问题,关于线程模型,提出了一个全新的解决方案 —— MailBox 模型,它可以让 StreamTask 中所有状态的改变都会像在单线程中实现得一样简单。方案借鉴了 Actor 模型的 MailBox 设计理念,它会让这些 action 操作(需要获取 checkpoint lock 的操作)先加入到一个 阻塞队列,然后主线程再从队列取相应的 mail task 去执行。
最后真正执行的是 MailboxProcessor 中的 runMailboxLoop() 方法,也就是上面说的 MailBox 主线程,StreamTask 运行的核心流程也是在这个方法中,其实现如下:
上面的方法中,最关键的有两个地方:
1. processMail(): 它会检测 MailBox 中是否有 mail 需要处理,如果有的话,就做相应的处理,一直将全部的 mail 处理完才会返回,只要 loop 还在进行,这里就会返回 true,否则会返回 false
2. runDefaultAction(): 这个最终调用的 StreamTask 的 processInput() 方法,event-processing 的处理就是在这个方法中进行的
我们沿着StreamTask的线索继续探索,在processInput中,StreamTask将消息交给了StreamInputProcessor,而StreamInputProcessor是对StreamTask中读取数据的行为抽象,具体由StreamTaskInput完成,如下就是StreamInputProcessor调用StreamTaskInput的emitNext处理输入数据,
而StreamTaskInput是StreamTask输入数据的抽象,将输入数据反序列后交给StreamTaskNetWorkOutput。同时,StreamTaskInput有两个主要子类:
1. StreamTaskNetworkInput:使用InputGate读取数据
2. StreamTaskSourceInput:使用SourceFunction读取数据
那么,接着来看StreamTaskNetworkInput是如何处理StreamTask传递进来的输入数据的,
这里把流数据元素的抽象StreamElement划分为了四类,与我们在之前介绍的一致:
- Record
- Watermark
- LatencyMarker
- StreamStatus
在此我们先顺着调用栈的线索,进入OneInputStreamTask,
OneInputStreamTask持有了OneInputStreamOperator对输入进行处理,而OneInputStreamOperator我们在之前已经介绍过,它其实就是用户的UDF业务逻辑的封装,在这里因为我们进入的是FlatMap的调用栈,所以,运行时的实例是StreamFlatMap,所以继续进入这个类的处理元素的方法,
在这里,我们就和wordcount自定义的FlatMapFunction对接上了,他调用了userFunction的flatMap接口运行wordcount中的分词逻辑,即最终执行了如下wordcount代码,
代码语言:javascript复制public static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception {
for (String word : sentence.split(" ")) {
out.collect(new Tuple2<>(word, 1));
}
}
}
讲完本文的第一个内容再来看看第二个:输入分区和输出分区的对应关系
以上是一个典型的物理执行计划,
在数据输出方面,主要包含两个核心抽象:
- ResultPartition:是一个Task的输出的抽象,包含若干ResultSubPartition。
- ResultSubPartition:下游请求数据是请求ResultSubPartition,而不是ResultPartition,负责实际上存储Buffer
ResultPartition数量决定因素主要是上游并行度。
ResultSubPartition数量决定因素主要是:下游并行度 上游数据分发模式
另外,关于Buffer,我们在【Flink】第八篇:Flink 内存管理 中已经介绍过Flink的内存模型。在Flink中Java对象的有效信息被序列化,在内存中连续存储,保存在预分配的内存块上,内存块叫作MemorySegment,即内存分配的最小单元。很多运算可以直接操作序列化的二进制数据,而不需要反序列化。MemorySegment可以在堆上:Java byte数组;也可以在堆外:ByteBuffer。Task算子处理完数据后,将结果交给下游的时候,使用的抽象或者说内存对象是Buffer。其实现类是NetworkBuffer。一个NetworkBuffer包装了一个MemorySegment。
在数据输入方面,主要包含两个核心抽象:
- InputGate:是一个Task的输入数据的抽象,包含若干InputChannel,主要包含SignleInputGate和UnionInputGate两个实现类
- InputChannel:实际负责数据消费的是InputChannel,主要包含LocalInputChannel,即数据本地性;RmoteInputChannel,即跨网络数据交换Flink选择了Netty
一个InputChannel对应上游一个ResultSubPartition。