2021年大数据Spark(二十九):SparkSQL案例四开窗函数

2021-10-09 16:55:54 浏览数 (1)


​​​​​​​案例四:开窗函数

概述

https://www.cnblogs.com/qiuting/p/7880500.html

介绍

开窗函数的引入是为了既显示聚集前的数据,又显示聚集后的数据。即在每一行的最后一列添加聚合函数的结果。

开窗用于为行定义一个窗口(这里的窗口是指运算将要操作的行的集合),它对一组值进行操作,不需要使用 GROUP BY 子句对数据进行分组,能够在同一行中同时返回基础行的列和聚合列。

聚合函数和开窗函数

聚合函数是将多行变成一行,count,avg....

开窗函数是将一行变成多行;

聚合函数如果要显示其他的列必须将列加入到group by中

开窗函数可以不使用group by,直接将所有信息显示出来

开窗函数分类

1.聚合开窗函数

聚合函数(列) OVER(选项),这里的选项可以是PARTITION BY 子句,但不可以是 ORDER BY 子句。

2.排序开窗函数

排序函数(列) OVER(选项),这里的选项可以是ORDER BY 子句,也可以是 OVER(PARTITION BY 子句 ORDER BY 子句),但不可以是 PARTITION BY 子句。

​​​​​​​聚合开窗函数

示例1

OVER 关键字表示把聚合函数当成聚合开窗函数而不是聚合函数。

SQL标准允许将所有聚合函数用做聚合开窗函数。

spark.sql("select  count(name)  from scores").show

spark.sql("select name, class, score, count(name) over() name_count from scores").show

查询结果如下所示:

---- ----- ----- ----------                                                   

|name|class|score|name_count|

---- ----- ----- ----------

|  a1|    1|   80|        11|

|  a2|    1|   78|        11|

|  a3|    1|   95|        11|

|  a4|    2|   74|        11|

|  a5|    2|   92|        11|

|  a6|    3|   99|        11|

|  a7|    3|   99|        11|

|  a8|    3|   45|        11|

|  a9|    3|   55|        11|

| a10|    3|   78|        11|

| a11|    3|  100|        11|

---- ----- ----- ----------

 示例2

OVER 关键字后的括号中还可以添加选项用以改变进行聚合运算的窗口范围。

如果 OVER 关键字后的括号中的选项为空,则开窗函数会对结果集中的所有行进行聚合运算。

开窗函数的 OVER 关键字后括号中的可以使用 PARTITION BY 子句来定义行的分区来供进行聚合计算。

与 GROUP BY 子句不同,PARTITION BY 子句创建的分区是独立于结果集的,创建的分区只是供进行聚合计算的,而且不同的开窗函数所创建的分区也不互相影响。

下面的 SQL 语句用于显示按照班级分组后每组的人数:

OVER(PARTITION BY class)表示对结果集按照 class 进行分区,并且计算当前行所属的组的聚合计算结果。

spark.sql("select name, class, score, count(name) over(partition by class) name_count from scores").show

查询结果如下所示:

---- ----- ----- ----------                                                   

|name|class|score|name_count|

---- ----- ----- ----------

|  a1|    1|   80|         3|

|  a2|    1|   78|         3|

|  a3|    1|   95|         3|

|  a6|    3|   99|         6|

|  a7|    3|   99|         6|

|  a8|    3|   45|         6|

|  a9|    3|   55|         6|

| a10|    3|   78|         6|

| a11|    3|  100|         6|

|  a4|    2|   74|         2|

|  a5|    2|   92|         2|

---- ----- ----- ----------

排序开窗函数

ROW_NUMBER顺序排序

row_number() over(order by score) as rownum 表示按score 升序的方式来排序,并得出排序结果的序号

注意:

在排序开窗函数中使用 PARTITION  BY 子句需要放置在ORDER  BY 子句之前。

 ●示例1

spark.sql("select name, class, score, row_number() over(order by score) rank from scores").show()

---- ----- ----- ----

|name|class|score|rank|

---- ----- ----- ----

|  a8|    3|   45|   1|

|  a9|    3|   55|   2|

|  a4|    2|   74|   3|

|  a2|    1|   78|   4|

| a10|    3|   78|   5|

|  a1|    1|   80|   6|

|  a5|    2|   92|   7|

|  a3|    1|   95|   8|

|  a6|    3|   99|   9|

|  a7|    3|   99|  10|

| a11|    3|  100|  11|

---- ----- ----- ----

spark.sql("select name, class, score, row_number() over(partition by class order by score) rank from scores").show()

---- ----- ----- ----                                                         

|name|class|score|rank|

---- ----- ----- ----

|  a2|    1|   78|   1|

|  a1|    1|   80|   2|

|  a3|    1|   95|   3|

|  a8|    3|   45|   1|

|  a9|    3|   55|   2|

| a10|    3|   78|   3|

|  a6|    3|   99|   4|

|  a7|    3|   99|   5|

| a11|    3|  100|   6|

|  a4|    2|   74|   1|

|  a5|    2|   92|   2|

---- ----- ----- ----

​​​​​​​RANK跳跃排序

rank() over(order by score) as rank表示按 score升序的方式来排序,并得出排序结果的排名号。

这个函数求出来的排名结果可以并列,并列排名之后的排名将是并列的排名加上并列数

简单说每个人只有一种排名,然后出现两个并列第一名的情况,这时候排在两个第一名后面的人将是第三名,也就是没有了第二名,但是有两个第一名

●示例2

spark.sql("select name, class, score, rank() over(order by score) rank from scores").show()                                                     

---- ----- ----- ----

|name|class|score|rank|

---- ----- ----- ----

|  a8|    3|   45|   1|

|  a9|    3|   55|   2|

|  a4|    2|   74|   3|

| a10|    3|   78|   4|

|  a2|    1|   78|   4|

|  a1|    1|   80|   6|

|  a5|    2|   92|   7|

|  a3|    1|   95|   8|

|  a6|    3|   99|   9|

|  a7|    3|   99|   9|

| a11|    3|  100|  11|

---- ----- ----- ----

spark.sql("select name, class, score, rank() over(partition by class order by score) rank from scores").show()

---- ----- ----- ----                                                         

|name|class|score|rank|

---- ----- ----- ----

|  a2|    1|   78|   1|

|  a1|    1|   80|   2|

|  a3|    1|   95|   3|

|  a8|    3|   45|   1|

|  a9|    3|   55|   2|

| a10|    3|   78|   3|

|  a6|    3|   99|   4|

|  a7|    3|   99|   4|

| a11|    3|  100|   6|

|  a4|    2|   74|   1|

|  a5|    2|   92|   2|

---- ----- ----- ----

​​​​​​​ DENSE_RANK连续排序

dense_rank() over(order by  score) as  dense_rank 表示按score 升序的方式来排序,并得出排序结果的排名号。

这个函数并列排名之后的排名只是并列排名加1

简单说每个人只有一种排名,然后出现两个并列第一名的情况,这时候排在两个第一名后面的人将是第二名,也就是两个第一名,一个第二名

●示例3

spark.sql("select name, class, score, dense_rank() over(order by score) rank from scores").show()

---- ----- ----- ----

|name|class|score|rank|

---- ----- ----- ----

|  a8|    3|   45|   1|

|  a9|    3|   55|   2|

|  a4|    2|   74|   3|

|  a2|    1|   78|   4|

| a10|    3|   78|   4|

|  a1|    1|   80|   5|

|  a5|    2|   92|   6|

|  a3|    1|   95|   7|

|  a6|    3|   99|   8|

|  a7|    3|   99|   8|

| a11|    3|  100|   9|

---- ----- ----- ----

spark.sql("select name, class, score, dense_rank() over(partition by class order by score) rank from scores").show()

---- ----- ----- ----                                                         

|name|class|score|rank|

---- ----- ----- ----

|  a2|    1|   78|   1|

|  a1|    1|   80|   2|

|  a3|    1|   95|   3|

|  a8|    3|   45|   1|

|  a9|    3|   55|   2|

| a10|    3|   78|   3|

|  a6|    3|   99|   4|

|  a7|    3|   99|   4|

| a11|    3|  100|   5|

|  a4|    2|   74|   1|

|  a5|    2|   92|   2|

---- ----- ----- ----

​​​​​​​NTILE分组排名[了解]

ntile(6) over(order by score)as ntile表示按 score 升序的方式来排序,然后 6 等分成 6 个组,并显示所在组的序号。

 示例4

spark.sql("select name, class, score, ntile(6) over(order by score) rank from scores").show()

---- ----- ----- ----

|name|class|score|rank|

---- ----- ----- ----

|  a8|    3|   45|   1|

|  a9|    3|   55|   1|

|  a4|    2|   74|   2|

|  a2|    1|   78|   2|

| a10|    3|   78|   3|

|  a1|    1|   80|   3|

|  a5|    2|   92|   4|

|  a3|    1|   95|   4|

|  a6|    3|   99|   5|

|  a7|    3|   99|   5|

| a11|    3|  100|   6|

---- ----- ----- ----

spark.sql("select name, class, score, ntile(6) over(partition by class order by score) rank from scores").show()

---- ----- ----- ----                                                         

|name|class|score|rank|

---- ----- ----- ----

|  a2|    1|   78|   1|

|  a1|    1|   80|   2|

|  a3|    1|   95|   3|

|  a8|    3|   45|   1|

|  a9|    3|   55|   2|

| a10|    3|   78|   3|

|  a6|    3|   99|   4|

|  a7|    3|   99|   5|

| a11|    3|  100|   6|

|  a4|    2|   74|   1|

|  a5|    2|   92|   2|

---- ----- ----- ----

​​​​​​​代码演示

代码语言:javascript复制
package cn.itcast.sql

import org.apache.spark.SparkContext
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}

/**
 * Author itcast
 * Date 2020/9/21 9:33
 * Desc 使用SparkSQL支持的开窗函数/窗口函数完成对各个班级的学生成绩的排名
 */
object RowNumberDemo {
  case class Score(name: String, clazz: Int, score: Int)
  def main(args: Array[String]): Unit = {
    //1.准备环境
    val spark: SparkSession = SparkSession.builder().appName("WordCount").master("local[*]").getOrCreate()
    val sc: SparkContext = spark.sparkContext
    sc.setLogLevel("WARN")
    import spark.implicits._

    //2.加载数据
    val scoreDF: DataFrame = sc.makeRDD(Array(
      Score("a1", 1, 80),
      Score("a2", 1, 78),
      Score("a3", 1, 95),
      Score("a4", 2, 74),
      Score("a5", 2, 92),
      Score("a6", 3, 99),
      Score("a7", 3, 99),
      Score("a8", 3, 45),
      Score("a9", 3, 55),
      Score("a10", 3, 78),
      Score("a11", 3, 100))
    ).toDF("name", "class", "score")
    scoreDF.createOrReplaceTempView("t_scores")
    scoreDF.show()
    /*
     ---- ----- ----- 
    |name|class|score|num
     ---- ----- ----- 
    |  a1|    1|   80|
    |  a2|    1|   78|
    |  a3|    1|   95|
    |  a4|    2|   74|
    |  a5|    2|   92|
    |  a6|    3|   99|
    |  a7|    3|   99|
    |  a8|    3|   45|
    |  a9|    3|   55|
    | a10|    3|   78|
    | a11|    3|  100|
     ---- ----- ----- 
     */


    //使用ROW_NUMBER顺序排序
    spark.sql("select name, class, score, row_number() over(partition by class order by score) num from t_scores").show()
    //使用RANK跳跃排序
    spark.sql("select name, class, score, rank() over(partition by class order by score) num from t_scores").show()
    //使用DENSE_RANK连续排序
    spark.sql("select name, class, score, dense_rank() over(partition by class order by score) num from t_scores").show()

    /*
ROW_NUMBER顺序排序--1234
 ---- ----- ----- --- 
|name|class|score|num|
 ---- ----- ----- --- 
|  a2|    1|   78|  1|
|  a1|    1|   80|  2|
|  a3|    1|   95|  3|
|  a8|    3|   45|  1|
|  a9|    3|   55|  2|

| a10|    3|   78|  3|
|  a6|    3|   99|  4|
|  a7|    3|   99|  5|
| a11|    3|  100|  6|

|  a4|    2|   74|  1|
|  a5|    2|   92|  2|
 ---- ----- ----- --- 

使用RANK跳跃排序--1224
 ---- ----- ----- --- 
|name|class|score|num|
 ---- ----- ----- --- 
|  a2|    1|   78|  1|
|  a1|    1|   80|  2|
|  a3|    1|   95|  3|
|  a8|    3|   45|  1|
|  a9|    3|   55|  2|

| a10|    3|   78|  3|
|  a6|    3|   99|  4|
|  a7|    3|   99|  4|
| a11|    3|  100|  6|

|  a4|    2|   74|  1|
|  a5|    2|   92|  2|
 ---- ----- ----- --- 

DENSE_RANK连续排序--1223
 ---- ----- ----- --- 
|name|class|score|num|
 ---- ----- ----- --- 
|  a2|    1|   78|  1|
|  a1|    1|   80|  2|
|  a3|    1|   95|  3|
|  a8|    3|   45|  1|
|  a9|    3|   55|  2|

| a10|    3|   78|  3|
|  a6|    3|   99|  4|
|  a7|    3|   99|  4|
| a11|    3|  100|  5|

|  a4|    2|   74|  1|
|  a5|    2|   92|  2|
 ---- ----- ----- --- 
     */

    /*
    
    val sql =
      """
        |select 字段1,字段2,字段n,
        |row_number() over(partition by 字段1 order by 字段2 desc) num
        |from 表名
        |having num <= 3
        |""".stripMargin

    import org.apache.spark.sql.functions._
    df.withColumn(
      "num",
      row_number().over(Window.partitionBy('字段1).orderBy('字段2.desc))
    ).filter('num <= 3).show(false)
    
     */
  }
}

0 人点赞