WholeStageCodeGeneration,全阶段代码生成,简称WSCG。
在理解WSCG之前,我们需要弄清楚为啥需要WSCG。
1、为啥需要WSCG? 1.1、火山迭代模型
火山模型(迭代器模型), 是1994年 Goetz Graefe 在他的论文 《Volcano, An Extensible and Parallel Query Evaluation System》中提出的概念。
该模型中的每个操作都由 3 种方法组成:
open() -用于算子的初始化操作,一般也会调用子节点的该方法来初始化整棵树;
next() - 根据算子类型进行具体的实现,首先会调用子节点的Next()方法,获取子节点的数据,进行特定的处理后(该算子的具体实现),向上返回给父节点;
close() - 关闭算子的生命周期,清理状态;
火山迭代模型虽然简单却很强大,非常灵活而具有扩展性,比如单个算子的执行逻辑完全不需要考虑其上下游是什么,也不需要考虑自身是否是并行在执行,这些逻辑都被放到了外部,而自身的策略也是注入式的,可以由外层灵活修改,整个迭代树只负责整体处理流程。
能看出,核心就在于Next()方法的实现,通过不断的调用子节点Next()方法,来实现从下往上的数据传递,就像一个流水线一样执行完整棵树。
其实在spark中用的就这种模式,比如sparksql生成的物理执行计划节点中,会实现next()函数:
1.2、火山迭代模型的缺点
1)虚函数调用
在火山迭代模型中,处理一次数据最少需要调用一次next()函数。
这些函数的调用是由编译器通过虚函数调度实现的。
虚函数慢的原因:
- 虚函数通常通过虚函数表来实现,在虚表中存储函数指针,实际调用时需要间接访问,这需要多一点时间。
- 虚函数其实最主要的性能开销在于它阻碍了编译器内联函数和各种函数级别的优化,导致性能开销较大。比如,在普通函数中log(10)会被优化掉,它就只会被计算一次,而如果使用虚函数,log(10)不会被编译器优化,它就会被计算多次。如果代码中使用了更多的虚函数,编译器能优化的代码就越少,性能就越低。
2)缓存感知(内存与 CPU 寄存器)
在火山迭代模型中,每次算子将数据传递给另一个算子时,都需要将算子放入内存。
在 WSCG 版本中,编译器实际上将中间数据放置在 CPU 寄存器中,有效地利用一些 L1、L2、L3 不同级别的 CPU 缓存。CPU 缓存一旦能够命中,它的读写的速度要比内存(或者说主存)至少要高一个量级。
ns 代表纳秒,即一秒的十亿分之一秒(1 / 1 00 000 000)
ps代表一皮秒,是一秒的万亿分之一(1 / 1 000 000 000 000)
火山迭代模型的数据(main memory)离寄存器(register)非常远,其访问时间会显著增加。
3)向量化(循环展开和 SIMD)
编译器和 CPU 对简单的循环效率很高,但对复杂函数调用的优化极少。
向量化优化借助的是 CPU 的 SIMD 指令,即通过单条指令控制多组数据的运算。
Spark 的向量化主要是靠ColumnarBatch (列批),它实际上是按照批次把每一列按照向量存储的方式一列一列存起来。
ColumnarToRowExec类中WSCG的代码:
实际上是一个用 FOR 循环一列一列地访问信息,这个过程正好吻合了 Java 向量化的处理过程:当这段代码被调用足够多次时,JVM 会发现自身对这个地方优化很有价值,其中有一个优化就是把它进行向量化——用即时编译器(JIT)把它编译成本地机器代码之后,这个机器代码再接着去调用底层的 SIMD 的指令。
1.3、WSCG的优势
- 避免虚函数调用。
- 将中间数据放在CPU 寄存器。
- 利用SIMD批量处理数据(java向量化)。现代 CPU 功能循环展开和使用 SIMD。通过向量化技术,引擎将加快对复杂操作代码生成运行的速度。对于许多数据处理的核心算子,新引擎的运行速度要提升一个数量级。
- 其他优化,比如它想通过代码编织的方式,通过字符串拼接,拼接出 Java 代码,能够减少一些基本类型的自动装箱,基本类型的自动装箱本来是 Java 或者是 Scala 语言自身的一个语法糖,但是这样的语法糖其实对于一些编译过程是不太友好的,对这些东西需要进行一些优化。除此之外,它其实还做了很多其他优化点:比如说有算子融合、缩减栈深等等
2、CollapseCodegenStages规则过程
代码语言:javascript复制select a,count(b) from testdata2 group by a
== Physical Plan ==
HashAggregate(keys=[a#3], functions=[count(1)], output=[a#3, count(b)#11L])
- HashAggregate(keys=[a#3], functions=[partial_count(1)], output=[a#3, count#15L])
- SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData2, true])).a AS a#3]
- Scan[obj#2]
--------------------规则1------------------
org.apache.spark.sql.execution.exchange.EnsureRequirements
HashAggregate(keys=[a#3], functions=[count(1)], output=[a#3, count(b)#11L])
- Exchange hashpartitioning(a#3, 5), ENSURE_REQUIREMENTS, [plan_id=17]
- HashAggregate(keys=[a#3], functions=[partial_count(1)], output=[a#3, count#15L])
- SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData2, true])).a AS a#3]
- Scan[obj#2]
--------------------规则2------------------
org.apache.spark.sql.execution.CollapseCodegenStages
*(2) HashAggregate(keys=[a#3], functions=[count(1)], output=[a#3, count(b)#11L])
- Exchange hashpartitioning(a#3, 5), ENSURE_REQUIREMENTS, [plan_id=23]
- *(1) HashAggregate(keys=[a#3], functions=[partial_count(1)], output=[a#3, count#15L])
- *(1) SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData2, true])).a AS a#3]
- Scan[obj#2]
== executedPlan ==
*(2) HashAggregate(keys=[a#3], functions=[count(1)], output=[a#3, count(b)#11L])
- Exchange hashpartitioning(a#3, 5), ENSURE_REQUIREMENTS, [plan_id=23]
- *(1) HashAggregate(keys=[a#3], functions=[partial_count(1)], output=[a#3, count#15L])
- *(1) SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData2, true])).a AS a#3]
- Scan[obj#2]
2.1、执行计划说明
- `*`代表当前节点可以WSCG
- 括号中的数字,代表的是codegen stage id,即代码生成的阶段id
codegen stage的划分:相邻节点(父子节点)且节点支持codegen ,会被划分到同一个 codegen stage,直到不支持codegen的节点。比如:HashAggregate 、SerializeFromObject 节点支持,Exchange 节点不支持,那么HashAggregate 、SerializeFromObject就会被划分到同一个stage中
- 括号里的数字,代表字段id, id相同的代表同一个字段,相同的codegen stage id 代码会被折叠在一起,减少函数的调用
- 支持Codegen的SparkPlan上添加一个WholeStageCodegenExec,不支持Codegen的SparkPlan则会添加一个InputAdapter
2.2、哪些节点支持codegen
代码语言:javascript复制 /** Prefix used in the current operator's variable names. */
private def variablePrefix: String = this match {
case _: HashAggregateExec => "hashAgg" //基于哈希的聚合算子,当数据超过内存大小时,也可以回退到排序。
case _: SortAggregateExec => "sortAgg" //基于排序的聚合算子
case _: BroadcastHashJoinExec => "bhj" //执行两个子关系的 INNER HASH JOIN。构造此算子的输出 RDD 时,将异步启动 Spark 作业,以计算广播关系的值。然后将这些数据放入 Spark 广播变量中。流式关系不会被 Shuffle。
case _: ShuffledHashJoinExec => "shj" //首先使用 JOIN KEY Shuffle 数据,执行两个子关系的 HASH JOIN
case _: SortMergeJoinExec => "smj" //执行两个子关系的 SORT MERGE JOIN
case _: BroadcastNestedLoopJoinExec => "bnlj" //以广播的方式执行两个子关系的嵌套循环 JOIN。
case _: RDDScanExec => "rdd" //用来扫描来自 InternalRow 类型 RDD 数据的物理计划节点
case _: DataSourceScanExec => "scan" //用来扫描数据源的物理计划节点。
case _: InMemoryTableScanExec => "memoryScan" //用来处理内存数据库关系InMemoryRelation中表的扫描操作。
case _: WholeStageCodegenExec => "wholestagecodegen" //将支持 codegen 的计划子树编译成单个 Java 方法。起到了一个汇总的作用,它会将支持 codegen 的计划子树,也就是其他 9 种 CodegenSupport的实现类,编译成一个 Java 方法。
case _ => nodeName.toLowerCase(Locale.ROOT)
}
3.3branch 比3.2branch多支持了SortAggregateExec
2.3、规则生成代码流程
执行计划中隐藏了WholeStageCodegenExec和InputAdapter节点,完整的执行计划如下:
2.4、查看生成代码的方法
以上