今天下午的源码课,主要是对上两次课程中留的作业的讲解,除了几个逻辑执行计划的优化器外, 重点是planAggregateWithOneDistinct(有一个count distinct情况下生成物理执行计划的原理)。
在面试时,或多或少会被问到有关count distinct的优化,现在离线任务用到的基本就是hivesql和sparksql,那sparksql中有关count distinct做了哪些优化呢?
实际上sparksql中count distinct执行原理可以从两个点来说明:
- with one count distinct
- more than one count distinct
这篇文章主要聊一聊 with one count distinct,如果你正好也想了解这块,就点赞、收藏吧
本文基于spark 3.2
本文大纲
1、Aggregate函数的几种mode2、生成WithOneDistinct物理执行计划的几个阶段3、除了count distinct,没有其他非distinct聚合函数的情况的执行原理4、除了count distinct,有其他非distinct聚合函数的情况的执行原理5、关键点调试6、总结 |
---|
1、Aggregate函数的几种mode
Partial: 局部数据的聚合。会根据读入的原始数据更新对应的聚合缓冲区,当处理完所有的输入数据后,返回的是局部聚合的结果
PartialMerge: 主要是对Partial返回的聚合缓冲区(局部聚合结果)进行合并,但此时仍不是最终结果,还要经过Final才是最终结果(count distinct 类型)
Final: 起到的作用是将聚合缓冲区的数据进行合并,然后返回最终的结果
Complete: 不进行局部聚合计算,应用在不支持Partial模式的聚合函数上(比如求百分位percentile_approx)
非distinct类的聚合函数的路线:Partial --> Final
distinct类的聚合函数的路线:Partial --> PartialMerge --> Partial --> Final
2、生成WithOneDistinct物理执行计划的几个阶段
- partialAggregate
- partialMergeAggregate
- partialDistinctAggregate
- finalAggregate
3、没有其他非distinct聚合函数的情况下执行原理
sql:
代码语言:javascript复制select a,count(distinct b ) from testdata2 group by a
Optimized Logical Plan-->Physical Plan-->executedPlan:
代码语言:javascript复制== Optimized Logical Plan ==
Aggregate [a#3], [a#3, count(distinct b#4) AS count(DISTINCT b)#11L]
- SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData2, true])).a AS a#3, knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData2, true])).b AS b#4]
- ExternalRDD [obj#2]
== Physical Plan ==
HashAggregate(keys=[a#3], functions=[count(distinct b#4)], output=[a#3, count(DISTINCT b)#11L])
- HashAggregate(keys=[a#3], functions=[partial_count(distinct b#4)], output=[a#3, count#16L])
- HashAggregate(keys=[a#3, b#4], functions=[], output=[a#3, b#4])
- HashAggregate(keys=[a#3, b#4], functions=[], output=[a#3, b#4])
- SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData2, true])).a AS a#3, knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData2, true])).b AS b#4]
- Scan[obj#2]
== executedPlan ==
AdaptiveSparkPlan isFinalPlan=false
- HashAggregate(keys=[a#3], functions=[count(distinct b#4)], output=[a#3, count(DISTINCT b)#11L])
- Exchange hashpartitioning(a#3, 5), ENSURE_REQUIREMENTS, [id=#28]
- HashAggregate(keys=[a#3], functions=[partial_count(distinct b#4)], output=[a#3, count#16L])
- HashAggregate(keys=[a#3, b#4], functions=[], output=[a#3, b#4])
- Exchange hashpartitioning(a#3, b#4, 5), ENSURE_REQUIREMENTS, [id=#24]
- HashAggregate(keys=[a#3, b#4], functions=[], output=[a#3, b#4])
- SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData2, true])).a AS a#3, knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData2, true])).b AS b#4]
- Scan[obj#2]
四个阶段的运行原理:
4、有其他非distinct聚合函数的情况下执行原理
sql:
代码语言:javascript复制select a,count(distinct b),max(b) from testdata2 group by a
Optimized Logical Plan-->Physical Plan-->executedPlan:
代码语言:javascript复制== Optimized Logical Plan ==
Aggregate [a#3], [a#3, count(distinct b#4) AS count(DISTINCT b)#12L, max(b#4) AS max(b)#13]
- SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData2, true])).a AS a#3, knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData2, true])).b AS b#4]
- ExternalRDD [obj#2]
== Physical Plan ==
HashAggregate(keys=[a#3], functions=[max(b#4), count(distinct b#4)], output=[a#3, count(DISTINCT b)#12L, max(b)#13])
- HashAggregate(keys=[a#3], functions=[merge_max(b#4), partial_count(distinct b#4)], output=[a#3, max#18, count#21L])
- HashAggregate(keys=[a#3, b#4], functions=[merge_max(b#4)], output=[a#3, b#4, max#18])
- HashAggregate(keys=[a#3, b#4], functions=[partial_max(b#4)], output=[a#3, b#4, max#18])
- SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData2, true])).a AS a#3, knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData2, true])).b AS b#4]
- Scan[obj#2]
== executedPlan ==
AdaptiveSparkPlan isFinalPlan=false
- HashAggregate(keys=[a#3], functions=[max(b#4), count(distinct b#4)], output=[a#3, count(DISTINCT b)#12L, max(b)#13])
- Exchange hashpartitioning(a#3, 5), ENSURE_REQUIREMENTS, [id=#28]
- HashAggregate(keys=[a#3], functions=[merge_max(b#4), partial_count(distinct b#4)], output=[a#3, max#18, count#21L])
- HashAggregate(keys=[a#3, b#4], functions=[merge_max(b#4)], output=[a#3, b#4, max#18])
- Exchange hashpartitioning(a#3, b#4, 5), ENSURE_REQUIREMENTS, [id=#24]
- HashAggregate(keys=[a#3, b#4], functions=[partial_max(b#4)], output=[a#3, b#4, max#18])
- SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData2, true])).a AS a#3, knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData2, true])).b AS b#4]
- Scan[obj#2]
四个阶段的运行原理:
5、关键点调试
distinctAggregateExpressions-->带distinct聚合函数的表达式
distinctAggregateAttributes-->带distinct聚合函数的引用
代码语言:javascript复制val (distinctAggregateExpressions, distinctAggregateAttributes) =
rewrittenDistinctFunctions.zipWithIndex.map { case (func, i) =>
// We rewrite the aggregate function to a non-distinct aggregation because
// its input will have distinct arguments.
// We just keep the isDistinct setting to true, so when users look at the query plan,
// they still can see distinct aggregations.
val expr = AggregateExpression(func, Partial, isDistinct = true)
// Use original AggregationFunction to lookup attributes, which is used to build
// aggregateFunctionToAttribute
val attr = functionsWithDistinct(i).resultAttribute
(expr, attr)
}.unzip
debug结果:
6、总结
我们对hive的count(distinct)做优化,怎么做? 先group by,再count
Sparksql with one count(distinct) 的情况,相比于hive来说,做了优化
代码语言:javascript复制
select a,count(distinct b) from testdata2 group by a
等价于
select a,count(b) from (
select a,b from testdata2 group by a,b
) tmp group by a
HashAggregate(keys=[a#3], functions=[count(distinct b#4)], output=[a#3, count(DISTINCT b)#11L])
- HashAggregate(keys=[a#3], functions=[partial_count(distinct b#4)], output=[a#3, count#16L])
- HashAggregate(keys=[a#3, b#4], functions=[], output=[a#3, b#4])
- HashAggregate(keys=[a#3, b#4], functions=[], output=[a#3, b#4])
- SerializeFromObject
- Scan[obj#2]
----------------------------------------------------------------------------
HashAggregate(keys=[a#3], functions=[count(1)], output=[a#3, count(b)#11L])
- HashAggregate(keys=[a#3], functions=[partial_count(1)], output=[a#3, count#15L])
- HashAggregate(keys=[a#3, b#4], functions=[], output=[a#3])
- HashAggregate(keys=[a#3, b#4], functions=[], output=[a#3, b#4])
- SerializeFromObject
- Scan[obj#2]
大家一定要觉醒一件事,那就是,我们一定要有一个提升自己的办法。
业务能力的提升、自身软能力的提升、技术能力的提升等。
精读源码,是一种有效的修炼技术内功的方式~~