sparksql源码系列 | 一文搞懂with one count distinct 执行原理

2022-06-09 21:34:49 浏览数 (1)

今天下午的源码课,主要是对上两次课程中留的作业的讲解,除了几个逻辑执行计划的优化器外, 重点是planAggregateWithOneDistinct(有一个count distinct情况下生成物理执行计划的原理)。

在面试时,或多或少会被问到有关count distinct的优化,现在离线任务用到的基本就是hivesql和sparksql,那sparksql中有关count distinct做了哪些优化呢?

实际上sparksql中count distinct执行原理可以从两个点来说明:

  • with one count distinct
  • more than one count distinct

这篇文章主要聊一聊 with one count distinct,如果你正好也想了解这块,就点赞、收藏吧

本文基于spark 3.2

本文大纲

1、Aggregate函数的几种mode2、生成WithOneDistinct物理执行计划的几个阶段3、除了count distinct,没有其他非distinct聚合函数的情况的执行原理4、除了count distinct,有其他非distinct聚合函数的情况的执行原理5、关键点调试6、总结

1、Aggregate函数的几种mode

Partial: 局部数据的聚合。会根据读入的原始数据更新对应的聚合缓冲区,当处理完所有的输入数据后,返回的是局部聚合的结果

PartialMerge: 主要是对Partial返回的聚合缓冲区(局部聚合结果)进行合并,但此时仍不是最终结果,还要经过Final才是最终结果(count distinct 类型)

Final: 起到的作用是将聚合缓冲区的数据进行合并,然后返回最终的结果

Complete: 不进行局部聚合计算,应用在不支持Partial模式的聚合函数上(比如求百分位percentile_approx)

非distinct类的聚合函数的路线Partial --> Final

distinct类的聚合函数的路线:Partial --> PartialMerge --> Partial --> Final

2、生成WithOneDistinct物理执行计划的几个阶段

  • partialAggregate
  • partialMergeAggregate
  • partialDistinctAggregate
  • finalAggregate

3、没有其他非distinct聚合函数的情况下执行原理

sql:

代码语言:javascript复制
select a,count(distinct b )  from testdata2 group by a

Optimized Logical Plan-->Physical Plan-->executedPlan:

代码语言:javascript复制
== Optimized Logical Plan ==
Aggregate [a#3], [a#3, count(distinct b#4) AS count(DISTINCT b)#11L]
 - SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData2, true])).a AS a#3, knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData2, true])).b AS b#4]
    - ExternalRDD [obj#2]

== Physical Plan ==
HashAggregate(keys=[a#3], functions=[count(distinct b#4)], output=[a#3, count(DISTINCT b)#11L])
 - HashAggregate(keys=[a#3], functions=[partial_count(distinct b#4)], output=[a#3, count#16L])
    - HashAggregate(keys=[a#3, b#4], functions=[], output=[a#3, b#4])
       - HashAggregate(keys=[a#3, b#4], functions=[], output=[a#3, b#4])
          - SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData2, true])).a AS a#3, knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData2, true])).b AS b#4]
             - Scan[obj#2]     

== executedPlan ==
AdaptiveSparkPlan isFinalPlan=false
 - HashAggregate(keys=[a#3], functions=[count(distinct b#4)], output=[a#3, count(DISTINCT b)#11L])
    - Exchange hashpartitioning(a#3, 5), ENSURE_REQUIREMENTS, [id=#28]
       - HashAggregate(keys=[a#3], functions=[partial_count(distinct b#4)], output=[a#3, count#16L])
          - HashAggregate(keys=[a#3, b#4], functions=[], output=[a#3, b#4])
             - Exchange hashpartitioning(a#3, b#4, 5), ENSURE_REQUIREMENTS, [id=#24]
                - HashAggregate(keys=[a#3, b#4], functions=[], output=[a#3, b#4])
                   - SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData2, true])).a AS a#3, knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData2, true])).b AS b#4]
                      - Scan[obj#2]

四个阶段的运行原理:

4、有其他非distinct聚合函数的情况下执行原理

sql:

代码语言:javascript复制
select a,count(distinct b),max(b) from testdata2 group by a

Optimized Logical Plan-->Physical Plan-->executedPlan:

代码语言:javascript复制
== Optimized Logical Plan ==
Aggregate [a#3], [a#3, count(distinct b#4) AS count(DISTINCT b)#12L, max(b#4) AS max(b)#13]
 - SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData2, true])).a AS a#3, knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData2, true])).b AS b#4]
    - ExternalRDD [obj#2]

== Physical Plan ==
HashAggregate(keys=[a#3], functions=[max(b#4), count(distinct b#4)], output=[a#3, count(DISTINCT b)#12L, max(b)#13])
 - HashAggregate(keys=[a#3], functions=[merge_max(b#4), partial_count(distinct b#4)], output=[a#3, max#18, count#21L])
    - HashAggregate(keys=[a#3, b#4], functions=[merge_max(b#4)], output=[a#3, b#4, max#18])
       - HashAggregate(keys=[a#3, b#4], functions=[partial_max(b#4)], output=[a#3, b#4, max#18])
          - SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData2, true])).a AS a#3, knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData2, true])).b AS b#4]
             - Scan[obj#2]

== executedPlan ==
AdaptiveSparkPlan isFinalPlan=false
 - HashAggregate(keys=[a#3], functions=[max(b#4), count(distinct b#4)], output=[a#3, count(DISTINCT b)#12L, max(b)#13])
    - Exchange hashpartitioning(a#3, 5), ENSURE_REQUIREMENTS, [id=#28]
       - HashAggregate(keys=[a#3], functions=[merge_max(b#4), partial_count(distinct b#4)], output=[a#3, max#18, count#21L])
          - HashAggregate(keys=[a#3, b#4], functions=[merge_max(b#4)], output=[a#3, b#4, max#18])
             - Exchange hashpartitioning(a#3, b#4, 5), ENSURE_REQUIREMENTS, [id=#24]
                - HashAggregate(keys=[a#3, b#4], functions=[partial_max(b#4)], output=[a#3, b#4, max#18])
                   - SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData2, true])).a AS a#3, knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData2, true])).b AS b#4]
                      - Scan[obj#2]

四个阶段的运行原理:

5、关键点调试

distinctAggregateExpressions-->带distinct聚合函数的表达式

distinctAggregateAttributes-->带distinct聚合函数的引用

代码语言:javascript复制
val (distinctAggregateExpressions, distinctAggregateAttributes) =
        rewrittenDistinctFunctions.zipWithIndex.map { case (func, i) =>
          // We rewrite the aggregate function to a non-distinct aggregation because
          // its input will have distinct arguments.
          // We just keep the isDistinct setting to true, so when users look at the query plan,
          // they still can see distinct aggregations.
          val expr = AggregateExpression(func, Partial, isDistinct = true)
          // Use original AggregationFunction to lookup attributes, which is used to build
          // aggregateFunctionToAttribute
          val attr = functionsWithDistinct(i).resultAttribute
          (expr, attr)
      }.unzip

debug结果:

6、总结

我们对hive的count(distinct)做优化,怎么做? 先group by,再count

Sparksql with one count(distinct) 的情况,相比于hive来说,做了优化

代码语言:javascript复制

select a,count(distinct b)  from testdata2 group by a 
等价于
select a,count(b) from (
select a,b  from testdata2 group by a,b
) tmp  group by a

HashAggregate(keys=[a#3], functions=[count(distinct b#4)], output=[a#3, count(DISTINCT b)#11L])
 - HashAggregate(keys=[a#3], functions=[partial_count(distinct b#4)], output=[a#3, count#16L])
    - HashAggregate(keys=[a#3, b#4], functions=[], output=[a#3, b#4])
       - HashAggregate(keys=[a#3, b#4], functions=[], output=[a#3, b#4])
          - SerializeFromObject 
             - Scan[obj#2]  
----------------------------------------------------------------------------
HashAggregate(keys=[a#3], functions=[count(1)], output=[a#3, count(b)#11L])
 - HashAggregate(keys=[a#3], functions=[partial_count(1)], output=[a#3, count#15L])
    - HashAggregate(keys=[a#3, b#4], functions=[], output=[a#3])   
       - HashAggregate(keys=[a#3, b#4], functions=[], output=[a#3, b#4])
          - SerializeFromObject 
             - Scan[obj#2]

大家一定要觉醒一件事,那就是,我们一定要有一个提升自己的办法。

业务能力的提升、自身软能力的提升、技术能力的提升等。

精读源码,是一种有效的修炼技术内功的方式~~

0 人点赞