第三天:SparkSQL

2020-11-05 12:33:30 浏览数 (1)

第1章 Spark SQL概述

什么是Spark SQL

Spark SQL是Spark用来处理结构化数据的一个模块,它提供了2个编程抽象:DataFrameDataSet,并且作为分布式SQL查询引擎的作用。 我们已经学习了Hive,它是将Hive SQL转换成MapReduce然后提交到集群上执行,大大简化了编写MapReduc的程序的复杂性,由于MapReduce这种计算模型执行效率比较慢。所有Spark SQL的应运而生,它是将Spark SQL转换成RDD,然后提交到集群执行,执行效率非常快

传统的数据分析中一般无非就是SQL,跟MapReduce。但是MapReduce的八股文书写方式太烦人了,所以引入了依靠MapReduce引擎建设出来的Hive,Spark为了融合Hive也推出了Shark。但是Spark模仿Hive的框架形成了SparkSQL。开发敏捷性,执行速度。

Spark SQL的特点

  1. 易整合
  1. 统一的数据访问方式
  1. 兼容Hive
  1. 标准的数据连接

什么是DataFrame

在Spark中,DataFrame是一种以RDD为基础的分布式数据集,类似于传统数据库中的二维表格。DataFrame与RDD的主要区别在于,前者带有schema元信息,即DataFrame所表示的二维表数据集的每一列都带有名称类型。这使得Spark SQL得以洞察更多的结构信息,从而对藏于DataFrame背后的数据源以及作用于DataFrame之上的变换进行了针对性的优化,最终达到大幅提升运行时效率的目标。反观RDD,由于无从得知所存数据元素的具体内部结构,Spark Core只能在stage层面进行简单、通用的流水线优化。

DataFrame也是懒执行的,但性能上比RDD要高,主要原因:优化的执行计划,即查询计划通过Spark catalyst optimiser进行优化。比如下面一个例子:

SQL解析成RDD编程,系统执行一般比人写的更好些。

代码语言:javascript复制
val rdd1 = sc.makeRDD(List((1,"a"),(2,"b"),(3,"c")))
val rdd2 = sc.makeRDD(List((1,"1"),(2,"2"),(3,"3")))

自己写的话 笛卡尔乘积先出来然后过滤
rdd1.join(rdd2).filter{
 case (key,(v1,v2)=>{
  key == 1
 })
}
sparksql
select *  from t_table1 a join t_table2 b on a.x = b.x where a.id = 1
底层是 先过滤再笛卡尔乘积,若干底层优化。
rdd1.filter(xxx) ==> 1
join
rdd2.filter(xxx) ==> 1 

什么是DataSet

DataSet是分布式数据集合。DataSet是Spark 1.6中添加的一个新抽象,是DataFrame的一个扩展。类似与ORM,它提供了RDD的优势(强类型,使用强大的lambda函数的能力)以及Spark SQL优化执行引擎的优点。DataSet也可以使用功能性的转换(操作map,flatMap,filter等等)。

  1. 是DataFrame API的一个扩展,是SparkSQL最新的数据抽象;
  2. 用户友好的API风格,既具有类型安全检查也具有DataFrame的查询优化特性;
  3. 用样例类来对DataSet中定义数据的结构信息,样例类中每个属性的名称直接映射到DataSet中的字段名称;
  4. DataSet是类型的。比如可以有DataSet[Car],DataSet[Person]。

三者区别: 单纯的RDD只有KV这样的数据没有结构,给RDD的数据增加若干结构形成了DataFrame,而为了访问方便不再像SQL那样获取第几个数据,而是像读取对象那种形成了DataSet。

第二章 SparkSQL编程

1. SparkSession新的起始点

在老的版本中,SparkSQL提供两种SQL查询起始点:一个叫SQLContext,用于Spark自己提供的SQL查询;一个叫HiveContext,用于连接Hive的查询。SparkSession是Spark最新的SQL查询起始点,实质上是SQLContext和HiveContext的组合,所以在SQLContex和HiveContext上可用的API在SparkSession上同样是可以使用的。SparkSession内部封装了sparkContext,所以计算实际上是由sparkContext完成的

2. DataFrame

创建在Spark SQL中SparkSession是创建DataFrame和执行SQL的入口,创建DataFrame有三种方式:通过Spark的数据源进行创建从一个存在的RDD进行转换;还可以从Hive Table进行查询返回。

从Spark数据源进行创建
  1. 查看Spark数据源进行创建的文件格式
代码语言:javascript复制
scala> spark.read.
csv   format   jdbc   json   load   option   options   orc   parquet   schema   table   text   textFile
  1. 读取json文件创建DataFrame
代码语言:javascript复制
scala> val df = spark.read.json("/opt/module/spark/examples/src/main/resources/people.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
  1. 展示结果
代码语言:javascript复制
scala> df.show
 ---- ------- 
| age|   name|
 ---- ------- 
|null|Michael|
|  30|   Andy|
|  19| Justin|
 ---- ------- 
SQL风格语法(主要)
  1. 创建一个DataFrame
代码语言:javascript复制
scala> val df = spark.read.json("/opt/module/spark/examples/src/main/resources/people.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
  1. 对DataFrame创建一个临时表,View是只读的,Table有改的意思哦。
代码语言:javascript复制
scala> df.createOrReplaceTempView("people")
  1. 通过SQL语句实现查询全表
代码语言:javascript复制
scala> val sqlDF = spark.sql("SELECT * FROM people")
sqlDF: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
---
scala> val del = spark.sql("drop table if exists stu")
del: org.apache.spark.sql.DataFrame = []
  1. 结果展示
代码语言:javascript复制
scala> sqlDF.show
 ---- ------- 
| age|   name|
 ---- ------- 
|null|Michael|
|  30|   Andy|
|  19| Justin|
 ---- ------- 

注意:普通临时表是Session范围内的,如果想应用范围内有效,可以使用全局临时表。使用全局临时表时需要全路径访问,如:global_temp.people5. 对于DataFrame创建一个全局表

代码语言:javascript复制
scala> df.createGlobalTempView("people")
  1. 通过SQL语句实现查询全表
代码语言:javascript复制
scala> spark.sql("SELECT * FROM global_temp.people").show()
 ---- ------- 
| age|   name|
 ---- ------- 
|null|Michael|
|  30|   Andy|
|  19| Justin|

创建新session
scala> spark.newSession().sql("SELECT * FROM global_temp.people").show()
 ---- ------- 
| age|   name|
 ---- ------- 
|null|Michael|
|  30|   Andy|
|  19| Justin|
 ---- ------- 
DSL风格语法(次要)
  1. 创建一个DataFrame
代码语言:javascript复制
scala> val df = spark.read.json("/opt/module/spark/examples/src/main/resources/people.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
  1. 查看DataFrame的Schema信息
代码语言:javascript复制
scala> df.printSchema
root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)
  1. 只查看”name”列数据
代码语言:javascript复制
scala> df.select("name").show()
 ------- 
|   name|
 ------- 
|Michael|
|   Andy|
| Justin|
 ------- 
  1. 查看”name”列数据以及”age 1”数据
代码语言:javascript复制
scala> df.select($"name", $"age"   1).show()
 ------- --------- 
|   name|(age   1)|
 ------- --------- 
|Michael|     null|
|   Andy|       31|
| Justin|       20|
 ------- --------- 
  1. 查看”age”大于”21”的数据
代码语言:javascript复制
scala> df.filter($"age" > 21).show()
 --- ---- 
|age|name|
 --- ---- 
| 30|Andy|
 --- ---- 
  1. 按照”age”分组,查看数据条数
代码语言:javascript复制
scala> df.groupBy("age").count().show()
 ---- ----- 
| age|count|
 ---- ----- 
|  19|     1|
|null|     1|
|  30|     1|
 ---- ----- 
RDD转换为DataFrame

注意:如果需要RDD与DF或者DS之间操作,那么都需要引入 import spark.implicits._ (spark不是包名,而是sparkSession对象的名称) 前置条件:导入隐式转换并创建一个RDD

1. 手动转换
代码语言:javascript复制
scala>  
import spark.implicits._
scala> val peopleRDD = sc.textFile("examples/src/main/resources/people.txt")
peopleRDD: org.apache.spark.rdd.RDD[String] = examples/src/main/resources/people.txt MapPartitionsRDD[3] at textFile at <console>:27
  1. 通过手动确定转换
代码语言:javascript复制
scala> peopleRDD.map{x=>val para = x.split(",");(para(0),para(1).trim.toInt)}.toDF("name","age")
res1: org.apache.spark.sql.DataFrame = [name: string, age: int]

在这里插入图片描述

2. 通过反射确定(需要用到样例类)
  1. 创建一个样例类
代码语言:javascript复制
scala> case class People(name:String, age:Int)
  1. 根据样例类将RDD转换为DataFrame
代码语言:javascript复制
scala> peopleRDD.map{ x => val para = x.split(",");People(para(0),para(1).trim.toInt)}.toDF
res2: org.apache.spark.sql.DataFrame = [name: string, age: int]
peopleRDD.map(x=>{People(x._1,x._2)}).toDF
3. 通过编程的方式(了解)
  1. 导入所需的类型
代码语言:javascript复制
scala> import org.apache.spark.sql.types._
import org.apache.spark.sql.types._
  1. 创建Schema
代码语言:javascript复制
scala> val structType: StructType = StructType(StructField("name", StringType) :: StructField("age", IntegerType) :: Nil)
structType: org.apache.spark.sql.types.StructType = StructType(StructField(name,StringType,true), StructField(age,IntegerType,true))
  1. 导入所需的类型
代码语言:javascript复制
scala> import org.apache.spark.sql.Row
import org.apache.spark.sql.Row
  1. 根据给定的类型创建二元组RDD
代码语言:javascript复制
scala> val data = peopleRDD.map{ x => val para = x.split(",");Row(para(0),para(1).trim.toInt)}
data: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[6] at map at <console>:33
  1. 根据数据及给定的schema创建DataFrame
代码语言:javascript复制
scala> val dataFrame = spark.createDataFrame(data, structType)
dataFrame: org.apache.spark.sql.DataFrame = [name: string, age: int]
DataFrame转换为RDD

直接调用rdd即可

  1. 创建一个DataFrame
代码语言:javascript复制
scala> val df = spark.read.json("/opt/module/spark/examples/src/main/resources/people.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
  1. 将DataFrame转换为RDD
代码语言:javascript复制
scala> val dfToRDD = df.rdd
dfToRDD: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[19] at rdd at <console>:29
DataFrame 关心的是行,所以转换的时候是按照行来转换的
  1. 打印RDD
代码语言:javascript复制
scala> dfToRDD.collect
res13: Array[org.apache.spark.sql.Row] = Array([Michael, 29], [Andy, 30], [Justin, 19])

3. DataSet

DataSet是具有强类型的数据集合,需要提供对应的类型信息。

创建
  1. 创建一个样例类
代码语言:javascript复制
scala> case class Person(name: String, age: Long)
defined class Person
  1. 创建DataSet
代码语言:javascript复制
scala> val caseClassDS = Seq(Person("Andy", 32)).toDS()
caseClassDS: org.apache.spark.sql.Dataset[Person] = [name: string, age: bigint]
RDD转换为DataSet

SparkSQL能够自动将包含有case类的RDD转换成DataFrame,case类定义了table的结构,case类属性通过反射变成了表的列名。Case类可以包含诸如Seqs或者Array等复杂的结构。

  1. 创建一个RDD
代码语言:javascript复制
scala> val peopleRDD = sc.textFile("examples/src/main/resources/people.txt")
peopleRDD: org.apache.spark.rdd.RDD[String] = examples/src/main/resources/people.txt MapPartitionsRDD[3] at textFile at <console>:27
  1. 创建一个样例类
代码语言:javascript复制
scala> case class Person(name: String, age: Long)
defined class Person
  1. 将RDD转化为DataSet
代码语言:javascript复制
scala> peopleRDD.map(line => {val para = line.split(",");Person(para(0),para(1).trim.toInt)}).toDS
res8: org.apache.spark.sql.Dataset[Person] = [name: string, age: bigint]
DataSet转换为RDD

调用rdd方法即可。

  1. 创建一个DataSet
代码语言:javascript复制
scala> val DS = Seq(Person("Andy", 32)).toDS()
DS: org.apache.spark.sql.Dataset[Person] = [name: string, age: bigint]
  1. 将DataSet转换为RDD
代码语言:javascript复制
scala> DS.rdd
res11: org.apache.spark.rdd.RDD[Person] = MapPartitionsRDD[15] at rdd at <console>:28

4. DataFrame与DataSet的互操作

DataFrame转DataSet
  1. 创建一个DateFrame
代码语言:javascript复制
scala> val df = spark.read.json("examples/src/main/resources/people.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
  1. 创建一个样例类
代码语言:javascript复制
scala> case class Person(name: String, age: Long)
defined class Person
  1. 将DataFrame转化为DataSet,添加类型
代码语言:javascript复制
scala> df.as[Person]
res14: org.apache.spark.sql.Dataset[Person] = [age: bigint, name: string]
Dataset转DataFrame
  1. 创建一个样例类
代码语言:javascript复制
scala> case class Person(name: String, age: Long)
defined class Person
  1. 创建DataSet
代码语言:javascript复制
scala> val ds = Seq(Person("Andy", 32)).toDS()
ds: org.apache.spark.sql.Dataset[Person] = [name: string, age: bigint]
  1. 将DataSet转化为DataFrame
代码语言:javascript复制
scala> val df = ds.toDF
df: org.apache.spark.sql.DataFrame = [name: string, age: bigint]
  1. 展示
代码语言:javascript复制
scala> df.show
 ---- --- 
|name|age|
 ---- --- 
|Andy| 32|
 ---- --- 

这种方法就是在给出每一列的类型后,使用as方法,转成Dataset,这在数据类型是DataFrame又需要针对各个字段处理时极为方便。在使用一些特殊的操作时,一定要加上import spark.implicits._不然toDFtoDS无法使用。

RDD、DataFrame、DataSet

在SparkSQL中Spark为我们提供了两个新的抽象,DataFrame跟DataSet,他们跟RDD的区别首先从版本上来看

代码语言:javascript复制
RDD(Spark1.0) ----> DataFrame(Spark1.3)---->DataSet(Spark1.6)

如果同样的数据都给到了这三个数据结构,他们分别计算后会得到相同的结果,不同的是他们的执行效率跟执行方式,在后期的Spark版本中DataSet会逐步取代另外两者称为唯一接口。

所以在做一个整体的项目时候,一般还是以Java为主,只有在涉及到迭代式计算采用到Scala这样到函数式编程。

相同点
  1. RDD、DataFrame、DataSet全部都是平台下到分布式弹性数据集,为处理超大型数据提供了便利
  2. 三者都有惰性机制,在创建,转换,如map方法时候不会立即执行,只有遇到了Action算子比如foreach,三者才会开始遍历数据
  3. 三者都会根据spark的内存进行自动缓存运算,当数据量超大时候会自动写到磁盘,不用担心内存溢出。
  4. 三者都有partition的概念。
  5. 三者都有许多共同函数,如filter,排序等。
  6. 在对DataFrame跟DataSet进行许多操作都要import spark.implicits._
  7. DataFrame跟DataSet均可使用模式匹配获取各个字段的值跟类型。 DataFrame:
代码语言:javascript复制
testDF.map{
 case Row(col1:String,col2:int)=>{
  println(col1)
  println(col2)
  col1
 }
 case _ => ""
}

DataSet:

代码语言:javascript复制
case class Coltest(col1:String,col2:Int) extends Serializable
//定义各个类型
testDS.map{
 case Coltest(col1:String,col2:Int) =>{
  println(col1)
  println(col2)
  col1
 }
 case _ => " "
}
不同点
  1. RDD:

  1. RDD 一般跟sparkMlib 同时使用
  2. RDD 不支持sparkSQL操作
  3. DataFrame

  1. 跟RDD和DataSet不同,DataFrame 每一行类型都固定为Row,每一列值无法直接访问,只有通过解析才可以获得各个字段。
代码语言:javascript复制
testDf.foreach{
 line=>
 val col1 = line.getAs[String]("col1")
 val col2 = line.getAs[String]("col2")
}

  1. DataFrame跟DataSet一般不跟sparkMlib共同使用。
  2. DataFrame跟DataSet均支持sparkSQL,比如select,groupby,临时注册视图,执行SQL语句。
代码语言:javascript复制
dataDF.createOrReplaceTempView("tmp")
spark.sql("select Row,DATE from tmp where DATE is not null order by DATE").show(100,false)

  1. DataFrame 跟DataSet支持一些特别方便的保存方式,比如csv,可以带表头,每一列字段一目了然。这样的保存方式可以方便的获得字段名跟列的对应,而且分隔符(delimiter)可自定义
代码语言:javascript复制
val saveoptions = Map("header"->"true","delimiter"->"t","path"->"hdfs://hadoop102:9000/test")
val dataDF = spark.read.options(options).format("com.sowhat.spark.csv").load()
  1. DataSet

  1. DataSet 跟DataFrame拥有完全一样的成员函数,唯一区别就是每一行数据类型不同。
  2. DataFrame也可以叫DataSet[Row],每一行类型都是Row,不解析每一行究竟有那些字段,每个字段又是什么类型无从得知,只能通上面提到的getAs方法或者共性的第七条的模式匹配来拿出特定的字段,而DataSet中每一行是什么类型是不一定的,在自定义了case class 之后可以自由获得每一行信息。
代码语言:javascript复制
case class Coltest(col1:String,col2:Int) extends Serializable
//定义字段名跟类型
val test:DataSet[Coltest] = rdd.map{
    Coltest(line._1,line_2)
}.toDS
test.map{
 line=> 
 println(line.col1)
 println(line.col2)
}

可以看出,DataSet在需要访问列中的某个字段时候非常方便,然而如果要写一些是适配性极强的函数时候,如果使用DataSet,行的类型又不确定,可能是各自case class,无法实现适配,这时候可以用DataFrame 既DataSet[Row]很好的解决问题。

IDEA 创建SparkSQL

引入依赖

代码语言:javascript复制
    <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.1.1</version>
        </dependency>
        
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>2.1.1</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_2.11</artifactId>
            <version>2.1.1</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hive</groupId>
            <artifactId>hive-exec</artifactId>
            <version>1.2.1</version>
        </dependency>

    </dependencies>

入门demo

代码语言:javascript复制
package com.sowhat.udaf
import org.apache.spark.sql.{DataFrame, SparkSession}

object TestCustomerAvg {

  def main(args: Array[String]): Unit = {

    //1.创建SparkSession
    val spark: SparkSession = SparkSession
      .builder()
      .master("local[*]")
      .appName("WordCount")
      .getOrCreate()

    //2.导入隐式转换
    import spark.implicits._

    //3.读取文件创建DF
    val df: DataFrame = spark.read.json("/Users/liujinjie/Downloads/Spark1015/SparkSQL/src/data/people.json")

    //4.创建一张临时表
    df.createTempView("people")
    spark.sql("select * from people").show
    //7.关闭连接
    spark.stop()
  }
}

RDDDFDS 转换

代码语言:javascript复制
package com.sowhat.test

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}

object RDDToDF {

  def main(args: Array[String]): Unit = {

    //1.创建SparkSession
    val spark: SparkSession = SparkSession
      .builder()
      .master("local[*]")
      .appName("RDDToDF")
      .getOrCreate()

    //2.导入隐式转换
    import spark.implicits._

    //3.创建RDD
    val rdd: RDD[(Int, String, Int)] = spark.sparkContext.makeRDD(List((1, "zhang", 20), (2, "san", 30), (3, "si", 40)))

    val value: RDD[User] = rdd.map {
      case (id, name, age) => User(id, name, age)
    }
    val userDS: Dataset[User] = value.toDS()
    val rdd2: RDD[User] = userDS.rdd
    rdd2.foreach(println)
    
    // 转换为DF
    val df: DataFrame = rdd.toDF("id", "name", "age")
    // 转换为DS
    val ds: Dataset[User] = df.as[User]
    // 转换为DF
    val df1: DataFrame = ds.toDF()
    // 转换为RDD
    val rdd1: RDD[Row] = df1.rdd
    rdd1.foreach(row => {
      // 这个是数据的索引
      println(row.getString(1))
    })


    //8.关闭连接
    spark.stop()

  }
}

case class User(id: Int, name: String, age: Int)

建议SparkSQL开发尽量下面三行直接写好

代码语言:javascript复制
    val sparkconf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("demo")
    val spark: SparkSession = SparkSession.builder().config(sparkconf).getOrCreate()
    // 进行转换前 需要引入隐式转换规则,这里引入的是SparkSession 对象名字
    import spark.implicits._

用户自定义函数

在Shell窗口中可以通过spark.udf功能用户可以自定义函数。

UDF
  1. 创建DataFrame
代码语言:javascript复制
scala> val df = spark.read.json("examples/src/main/resources/people.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
  1. 打印数据
代码语言:javascript复制
scala> df.show()
 ---- ------- 
| age|   name|
 ---- ------- 
|null|Michael|
|  30|   Andy|
|  19| Justin|
 ---- ------- 
  1. 注册UDF,功能为在数据前添加字符串
代码语言:javascript复制
scala> spark.udf.register("addName", (x:String)=> "Name:" x)
res5: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,StringType,Some(List(StringType)))
  1. 创建临时表
代码语言:javascript复制
scala> df.createOrReplaceTempView("people")
  1. 应用UDF
代码语言:javascript复制
scala> spark.sql("Select addName(name), age from people").show()
 ----------------- ---- 
|UDF:addName(name)| age|
 ----------------- ---- 
|     Name:Michael|null|
|        Name:Andy|  30|
|      Name:Justin|  19|
 ----------------- ---- 
UDAF

类型的Dataset和类型的DataFrame都提供了相关的聚合函数, 如 count(),countDistinct(),avg(),max(),min()。除此之外,用户可以设定自己的自定义聚合函数。通过继承UserDefinedAggregateFunction来实现用户自定义聚合函数。需求:实现求平均工资的自定义聚合函数。

代码语言:javascript复制
people.json
{"name":"Michael","age": 21}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}
弱类型实现

依据DataFrame类型的查询数据,只能通过索引形式找到数据,必须记住自己的数据对应的索引位置注意导入正确的package ! 自定义若类型函数

代码语言:javascript复制
package com.atguigu.udaf

import org.apache.spark.sql.Row
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types._

// 数据类型均跟SparkSQL相关的类型
object CustomerAvg extends UserDefinedAggregateFunction {

  //输入数据类型
  override def inputSchema: StructType = StructType(StructField("input", LongType) :: Nil)

  //缓存数据的类型
  override def bufferSchema: StructType = StructType(StructField("sum", LongType  ) :: StructField("count", LongType) :: Nil)

  //输出数据类型
  override def dataType: DataType = DoubleType

  //函数确定性
  override def deterministic: Boolean = true

  // 计算前 缓冲区 初始化
  override def initialize(buffer: MutableAggregationBuffer): Unit = {
    buffer(0) = 0L
    buffer(1) = 0L
  }

  //分区内 数据 更新
  override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
    if (!input.isNullAt(0)) {
      buffer(0) = buffer.getLong(0)   input.getLong(0)
      buffer(1) = buffer.getLong(1)   1L
    }
  }

  //多个节点多缓冲区 合并值
  override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
    buffer1(0) = buffer1.getLong(0)   buffer2.getLong(0)
    buffer1(1) = buffer1.getLong(1)   buffer2.getLong(1)
  }

  //计算最终结果
  override def evaluate(buffer: Row): Any = {
    buffer.getLong(0).toDouble / buffer.getLong(1)
  }
}

调用

代码语言:javascript复制
package com.atguigu.udaf

import org.apache.spark.sql.{DataFrame, Dataset, SparkSession, TypedColumn}

object TestCustomerAvg {

  def main(args: Array[String]): Unit = {

    //1.创建SparkSession
    val spark: SparkSession = SparkSession
      .builder()
      .master("local[*]")
      .appName("WordCount")
      .getOrCreate()

    //2.导入隐式转换
    import spark.implicits._

    //3.读取文件创建DF
    val df: DataFrame = spark.read.json("/Users/liujinjie/Downloads/Spark1015/SparkSQL/src/data/people.json")

    //4.创建一张临时表
    df.createTempView("people")

    //5.注册函数
    spark.udf.register("MyAvg", CustomerAvg)

    //6.使用UDAF
    spark.sql("select MyAvg(age) as sqlAge from people").show

    //创建聚合对象
    val udaf = new MyAgeAvgClassFunction
    // 将聚合函数查询转换为查询列
    val avgCol: TypedColumn[UserBean, Double] = udaf.toColumn.name("avgAge")
    val userDS: Dataset[UserBean] = df.as[UserBean]
    userDS.select(avgCol).show()

    //7.关闭连接
    spark.stop()
  }
}

在这里插入图片描述

强类型实现

强类型无法使用SQL形式查询调用函数,只能用DSL风格。 自定义函数

代码语言:javascript复制
package com.atguigu.spark

import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.sql.{Encoder, Encoders, SparkSession}

// 既然是强类型,可能有case类
case class Employee(name: String, salary: Long)
case class Average(var sum: Long, var count: Long)

class MyAverage extends Aggregator[Employee, Average, Double] {
  // 定义一个数据结构,保存工资总数和工资总个数,初始都为0
  def zero: Average = Average(0L, 0L)

  // 聚合相同executor分片中的结果
  def reduce(buffer: Average, employee: Employee): Average = {
    buffer.sum  = employee.salary
    buffer.count  = 1
    buffer
  }

  // 聚合不同execute的结果
  def merge(b1: Average, b2: Average): Average = {
    b1.sum  = b2.sum
    b1.count  = b2.count
    b1
  }

  // 计算输出
  def finish(reduction: Average): Double = reduction.sum.toDouble / reduction.count

  // 设定中间值类型的编码器,要转换成case类
  // Encoders.product是进行scala元组和case类转换的编码器
  def bufferEncoder: Encoder[Average] = Encoders.product

  // 设定最终输出值的编码器
  def outputEncoder: Encoder[Double] = Encoders.scalaDouble
}

object MyAverage{
  def main(args: Array[String]) {
    //创建SparkConf()并设置App名称

    val spark = SparkSession
      .builder()
      .appName("sowhat")
      .master("local[4]")
      .config("spark.testing.memory", "471859200")
      .getOrCreate()

    import spark.implicits._
    // For implicit conversions like converting RDDs to DataFrames
    val ds = spark.read.json("employees.json").as[Employee]
    ds.show()

    val averageSalary = new MyAverage().toColumn.name("average_salary")

    val result = ds.select(averageSalary)
    result.show()

    spark.stop()
  }
}

/**
  * {"name":"Michael", "salary":3000}
  * {"name":"Andy", "salary":4500}
  * {"name":"Justin", "salary":3500}
  * {"name":"Berta", "salary":4000}
  *
  * */

在这里插入图片描述

第三章 Spark SQL数据的加载与保存

通用加载/保存方法

1. 加载数据
  1. read直接加载数据
代码语言:javascript复制
scala> spark.read.
csv  jdbc   json  orc   parquet textFile… …

注意:加载数据的相关参数需写到上述方法中。如:textFile需传入加载数据的路径,jdbc需传入JDBC相关参数。 2. format指定加载数据类型

代码语言:javascript复制
scala> spark.read.format("…")[.option("…")].load("…")

用法详解: 3. format("…"):指定加载的数据类型,包括"csv"、"jdbc"、"json"、"orc"、"parquet"和"textFile"。 4. load("…"):在"csv"、"orc"、"parquet"和"textFile"格式下需要传入加载数据的路径。 5. option("…"):在"jdbc"格式下需要传入JDBC相应参数,url、user、password和dbtable

2. 保存数据
  1. write直接保存数据
代码语言:javascript复制
scala> df.write.
csv  jdbc   json  orc   parquet textFile… …

注意:保存数据的相关参数需写到上述方法中。如:textFile需传入加载数据的路径,jdbc需传入JDBC相关参数。 2. format指定保存数据类型

代码语言:javascript复制
scala> df.write.format("…")[.option("…")].save("…")

用法详解

  1. format("…"):指定保存的数据类型,包括"csv"、"jdbc"、"json"、"orc"、"parquet"和"textFile"。
  2. save ("…"):在"csv"、"orc"、"parquet"和"textFile"格式下需要传入保存数据的路径。
  3. option("…"):在"jdbc"格式下需要传入JDBC相应参数,url、user、password和dbtable
  4. 文件保存选项 可以采用SaveMode执行存储操作,SaveMode定义了对数据的处理模式。SaveMode是一个枚举类,其中的常量包括:

  1. Append:当保存路径或者表已存在时,追加内容;
  2. Overwrite: 当保存路径或者表已存在时,覆写内容;
  3. ErrorIfExists:当保存路径或者表已存在时,报错;
  4. Ignore:当保存路径或者表已存在时,忽略当前的保存操作。

使用详解:

代码语言:javascript复制
df.write.mode(SaveMode.Append).save("… …")
df.write.mode("append").save("… …")
3. 默认数据源Parquet

Parquet是一种流行的列式存储格式,可以高效的存储具有嵌套字段的记录,Parquet格式经常在Hadoop生态圈使用,它也支持SparkSQL的全部数据类型,SparkSQL提供了直接读取跟存储Parquet格式文件的方法。并且可以通过format()来指定输入输出文件格式。

代码语言:javascript复制
spark.read.format("csv").load("pwd")
  1. 加载数据
代码语言:javascript复制
val df = spark.read.load("examples/src/main/resources/users.parquet") 
  1. 保存数据
代码语言:javascript复制
df.select("name", " color").write.save("user.parquet")

JSON文件

Spark SQL 能够自动推测 JSON数据集的结构,并将它加载为一个Dataset[Row]. 可以通过SparkSession.read.json()去加载一个一个JSON 文件。目的:Spark读写Json数据,其中数据源可以在本地也可以在HDFS文件系统注意:这个JSON文件不是一个传统的JSON文件,每一行都得是一个JSON串。格式如下:

代码语言:javascript复制
{"name":"Michael"}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}
  1. 导入隐式转换
代码语言:javascript复制
import spark.implicits._
  1. 加载JSON文件
代码语言:javascript复制
val path = "examples/src/main/resources/people.json"
val peopleDF = spark.read.json(path)
  1. 创建临时表
代码语言:javascript复制
peopleDF.createOrReplaceTempView("people")
  1. 数据查询
代码语言:javascript复制
val teenagerNamesDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19")
teenagerNamesDF.show()
 ------ 
|  name|
 ------ 
|Justin|
 ------ 

MySQL文件

Spark SQL可以通过JDBC从关系型数据库中取数据的方式创建DataFrame,通过对DataFrame一系列的计算后,还可以将数据再回关系型数据库中。目的:spark读写MySQL数据

可在启动shell时指定相关的数据库驱动路径,或者将相关的数据库驱动放到spark的类路径下。 cp /opt/mysql-libs/mysql-connector-java-5.1.27-bin.jar /opt/spark/jars

  1. 启动spark-shell
代码语言:javascript复制
bin/spark-shell --master spark://hadoop102:7077 [--jars mysql-connector-java-5.1.27-bin.jar]
  1. 定义JDBC相关参数配置信息
代码语言:javascript复制
val connectionProperties = new java.util.Properties()
connectionProperties.put("user", "root")
connectionProperties.put("password", "000000")
  1. 使用read.jdbc加载数据
代码语言:javascript复制
val jdbcDF2 = spark.read.jdbc("jdbc:mysql://hadoop102:3306/rdd", "tableName", connectionProperties)
  1. 使用format形式加载数据
代码语言:javascript复制
val jdbcDF = spark.read.format("jdbc").option("url", "jdbc:mysql://hadoop102:3306/rdd").option("dbtable", " rddtable").option("user", "root").option("password", "000000").load()
  1. 使用write.jdbc保存数据
代码语言:javascript复制
jdbcDF2.write.module(“append”).jdbc("jdbc:mysql://hadoop102:3306/mysql", "db", connectionProperties)
  1. 使用format形式保存数据
代码语言:javascript复制
jdbcDF.write
.format("jdbc")
.option("url", "jdbc:mysql://hadoop102:3306/rdd")
.option("dbtable", "rddtable3")
.option("user", "root")
.option("password", "000000")
.save()

其中保存的时候确保主键等信息 ,也也可以选择往mysql中添加数据的module。

Hive

Apache Hive是Hadoop上的SQL引擎,Spark SQL编译时可以包含Hive支持,也可以不包含。包含Hive支持的Spark SQL可以支持Hive表访问、UDF(用户自定义函数)以及Hive查询语言(HQL)等。spark-shell 默认是Hive支持的;代码中是默认不支持的,需要手动指定 enableHiveSupport()

SparkSQL中的SparkSession 就包含来自Hive跟SparkSQL的数据,这里的Hive是内置的Hive,跟HBase 里的内部独立ZooKeeper类似。工作中要跟外部Hive关联的。 内部Hive存储元数据路径:

代码语言:javascript复制
/opt/module/spark/metastore_db 来存储元数据
内嵌Hive 应用

如果要使用内嵌的Hive,什么都不用做,直接用就可以了。 前面的 RDD、DF、DS切换的时候数据都是创建的view。isTemporary = true但是也可以用内置的Hive来创建table哦! 可以修改其数据仓库地址,参数为:--conf spark.sql.warehouse.dir=./wear

在这里插入图片描述

注意:如果你使用的是内部的Hive,在Spark2.0之后,spark.sql.warehouse.dir用于指定数据仓库的地址,如果你需要是用HDFS作为路径,那么需要将core-site.xml和hdfs-site.xml 加入到Spark conf目录,否则只会创建master节点上的warehouse目录,查询时会出现文件找不到的问题,这是需要使用HDFS,则需要将metastore删除,重启集群。

外部Hive应用

如果想连接部已经部署好的Hive,需要通过以下几个步骤。

  1. 将Hive中的hive-site.xml拷贝或者软连接到Spark安装目录下的conf目录下
  1. 打开spark shell,注意带上访问Hive元数据库的JDBC客户端 bin/spark-shell --master spark://hadoop102:7077 --jars mysql-connector-java-5.1.27-bin.jar注意:每次启动时指定JDBC jar包路径很麻烦,我们可以选择将JDBC的驱动包放置在spark的lib目录下,一劳永逸。
运行Spark SQL CLI

Spark SQL CLI可以很方便的在本地运行Hive元数据服务以及从命令行执行查询任务。在Spark目录下执行如下命令启动Spark SQL CLI,直接执行SQL语句,类似一Hive窗口。

代码语言:javascript复制
/bin/spark-sql

然后就可以跟在hive的终端一样进行CRUD即可了,可能会出现 若干bug

代码中操作Hive

添加依赖

代码语言:javascript复制
  <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-hive_2.11</artifactId>
      <version>2.1.1</version>
  </dependency>

  <dependency>
      <groupId>org.apache.hive</groupId>
      <artifactId>hive-exec</artifactId>
      <version>1.2.1</version>
  </dependency>

源数据:

代码语言:javascript复制
1,sowhat
2,zhang
3,li

整体思路就是,创建就是跟Hive一样指定spark hive数据存放spark.sql.warehouse.dir路径,指定spark hive的元数据存储信息metastore_db

代码语言:javascript复制
package com.atguigu.hive

/**
  * Created by wuyufei on 05/09/2017.
  */


import java.io.File

import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.SparkSession

case class Record(key: Int, value: String)

object HiveOperation {

  def main(args: Array[String]) {
    Logger.getLogger("org").setLevel(Level.OFF)
    Logger.getLogger("akka").setLevel(Level.OFF)

    val warehouseLocation = new File("spark-warehouse").getAbsolutePath // 设定数据路径
    println(warehouseLocation)

    val spark = SparkSession
      .builder()
      .appName("Spark Hive Example")
      .config("spark.sql.warehouse.dir", warehouseLocation)
      .enableHiveSupport() // 注意添加
      .master("local[*]")
      .config("spark.testing.memory", "471859200")
      .getOrCreate()
    //import spark.implicits._

    spark.sql("CREATE TABLE IF NOT EXISTS user (key INT, value STRING) row format delimited fields terminated by ',' ")
    spark.sql("LOAD DATA LOCAL INPATH 'D:/json/kg.txt' INTO TABLE user")

    // Queries are expressed in HiveQL
    val df = spark.sql("SELECT * FROM user")
    df.show()
    df.write.format("json").save("D:/json/ssss.json")
    spark.stop()
  }
}

输出数据格式:

SparkSQL跟Hive实战

各种依赖:

代码语言:javascript复制
<dependencies>
  <dependency>
  <groupId>org.scala-lang</groupId>
  <artifactId>scala-library</artifactId>
  <version>${scala.version}</version>
  <!--<scope>provided</scope>-->
 </dependency>

 <dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-core_2.11</artifactId>
  <version>${spark.version}</version>
  <!--<scope>provided</scope>-->
 </dependency>
 
 <dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-sql_2.11</artifactId>
  <version>${spark.version}</version>
  <!--<scope>provided</scope>-->
 </dependency>
 
 <dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-hive_2.11</artifactId>
  <version>2.1.1</version>
 </dependency>
 
 <dependency>
  <groupId>mysql</groupId>
  <artifactId>mysql-connector-java</artifactId>
  <version>5.1.44</version>
 </dependency>
 
</dependencies>

实例代码

val spark = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate() val sc: SparkContext = spark.sparkContext

代码语言:javascript复制
package com.atguigu.spark

import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, SaveMode, SparkSession}

// 订单号,交易位置,交易日期
case class tbStock(ordernumber: String, locationid: String, dateid: String) extends Serializable

// 订单号,行号,货品,数量,单价,销售额
case class tbStockDetail(ordernumber: String, rownum: Int, itemid: String, number: Int, price: Double, amount: Double) extends Serializable

// 日期,年月,年,月,日,周几,第几周,季度,旬,半月
case class tbDate(dateid: String, years: Int, theyear: Int, month: Int, day: Int, weekday: Int, week: Int, quarter: Int, period: Int, halfmonth: Int) extends Serializable

object Practice {
  // 将DataFrame插入到Hive表
  private def insertHive(spark: SparkSession, tableName: String, dataDF: DataFrame): Unit = {
    spark.sql("DROP TABLE IF EXISTS "   tableName)
    dataDF.write.saveAsTable(tableName)
  }

  // 结果写入到MySQL
  private def insertMySQL(tableName: String, dataDF: DataFrame): Unit = {
    dataDF.write
      .format("jdbc")
      .option("url", "jdbc:mysql://localhost:3306/sparksql")
      .option("dbtable", tableName)
      .option("user", "root")
      .option("password", "root")
      .mode(SaveMode.Overwrite)
      .save()
  }

  def main(args: Array[String]): Unit = {
    // 创建Spark配置
    val sparkConf = new SparkConf().setAppName("MockData").setMaster("local[*]")

    // 创建Spark SQL 客户端
    val spark = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate()

    import spark.implicits._

    // 加载数据到Hive,读取本地数据 直接 根据结构跟对象 生成DS
    val tbStockRdd: RDD[String] = spark.sparkContext.textFile("D:/json/tbStock.txt")
    val tbStockDS: Dataset[tbStock] = tbStockRdd.map(_.split(",")).map(attr => tbStock(attr(0), attr(1), attr(2))).toDS
    insertHive(spark, "tbStock", tbStockDS.toDF)

    val tbStockDetailRdd: RDD[String] = spark.sparkContext.textFile("D:/json/tbStockDetail.txt")
    val tbStockDetailDS: Dataset[tbStockDetail] = tbStockDetailRdd.map(_.split(",")).map(attr => tbStockDetail(attr(0), attr(1).trim().toInt, attr(2), attr(3).trim().toInt, attr(4).trim().toDouble, attr(5).trim().toDouble)).toDS
    insertHive(spark, "tbStockDetail", tbStockDetailDS.toDF)

    val tbDateRdd: RDD[String] = spark.sparkContext.textFile("D:/json/tbDate.txt")
    val tbDateDS: Dataset[tbDate] = tbDateRdd.map(_.split(",")).map(attr => tbDate(attr(0), attr(1).trim().toInt, attr(2).trim().toInt, attr(3).trim().toInt, attr(4).trim().toInt, attr(5).trim().toInt, attr(6).trim().toInt, attr(7).trim().toInt, attr(8).trim().toInt, attr(9).trim().toInt)).toDS
    insertHive(spark, "tbDate", tbDateDS.toDF)

    //需求一: 统计所有订单中每年的销售单数、销售总额
    val result1: DataFrame = spark.sql("SELECT c.theyear, COUNT(DISTINCT a.ordernumber), SUM(b.amount) FROM tbStock a JOIN tbStockDetail b ON a.ordernumber = b.ordernumber JOIN tbDate c ON a.dateid = c.dateid GROUP BY c.theyear ORDER BY c.theyear")
    insertMySQL("xq1", result1)

    //需求二: 统计每年最大金额订单的销售额
    val result2: DataFrame = spark.sql("SELECT theyear, MAX(c.SumOfAmount) AS SumOfAmount FROM (SELECT a.dateid, a.ordernumber, SUM(b.amount) AS SumOfAmount FROM tbStock a JOIN tbStockDetail b ON a.ordernumber = b.ordernumber GROUP BY a.dateid, a.ordernumber ) c JOIN tbDate d ON c.dateid = d.dateid GROUP BY theyear ORDER BY theyear DESC")
    insertMySQL("xq2", result2)

    //需求三: 统计每年最畅销货品
    val result3: DataFrame = spark.sql("SELECT DISTINCT e.theyear, e.itemid, f.maxofamount FROM (SELECT c.theyear, b.itemid, SUM(b.amount) AS sumofamount FROM tbStock a JOIN tbStockDetail b ON a.ordernumber = b.ordernumber JOIN tbDate c ON a.dateid = c.dateid GROUP BY c.theyear, b.itemid ) e JOIN (SELECT d.theyear, MAX(d.sumofamount) AS maxofamount FROM (SELECT c.theyear, b.itemid, SUM(b.amount) AS sumofamount FROM tbStock a JOIN tbStockDetail b ON a.ordernumber = b.ordernumber JOIN tbDate c ON a.dateid = c.dateid GROUP BY c.theyear, b.itemid ) d GROUP BY d.theyear ) f ON e.theyear = f.theyear AND e.sumofamount = f.maxofamount ORDER BY e.theyear")
    insertMySQL("xq3", result3)
    spark.stop()
  }
}

总结

  1. 学习跟理解RDD、DataFrame、DataSet三者之间的关系,跟如何相互转换。
  2. SparkSession操作Json、MySQL、Hive。主要是环境的搭建跟table的操作各种。

参考

Spark全套资料

0 人点赞