第1章 Spark SQL概述
什么是Spark SQL
Spark SQL是Spark用来处理结构化数据
的一个模块,它提供了2个编程抽象:DataFrame
和DataSet
,并且作为分布式SQL查询
引擎的作用。 我们已经学习了Hive,它是将Hive SQL转换成MapReduce然后提交到集群上执行,大大简化了编写MapReduc的程序的复杂性,由于MapReduce这种计算模型执行效率比较慢。所有Spark SQL的应运而生,它是将Spark SQL转换成RDD
,然后提交到集群执行,执行效率非常快
!
传统的数据分析中一般无非就是SQL,跟MapReduce。但是MapReduce的八股文书写方式太烦人了,所以引入了依靠MapReduce引擎建设出来的Hive,Spark为了融合Hive也推出了Shark。但是Spark模仿Hive的框架形成了SparkSQL。开发敏捷性,执行速度。
Spark SQL的特点
- 易整合
- 统一的数据访问方式
- 兼容Hive
- 标准的数据连接
什么是DataFrame
在Spark中,DataFrame是一种以RDD为基础的分布式数据集
,类似于传统数据库中的二维表格
。DataFrame与RDD的主要区别在于,前者带有schema元信息
,即DataFrame所表示的二维表数据集的每一列都带有名称
和类型
。这使得Spark SQL得以洞察更多的结构信息,从而对藏于DataFrame背后的数据源以及作用于DataFrame之上的变换进行了针对性的优化,最终达到大幅提升运行时效率的目标。反观RDD,由于无从得知所存数据元素的具体内部结构,Spark Core只能在stage层面进行简单、通用的流水线优化。
DataFrame也是懒执行
的,但性能上比RDD要高
,主要原因:优化
的执行计划,即查询计划通过Spark catalyst optimiser
进行优化。比如下面一个例子:
SQL解析成RDD编程,系统执行一般比人写的更好些。
代码语言:javascript复制val rdd1 = sc.makeRDD(List((1,"a"),(2,"b"),(3,"c")))
val rdd2 = sc.makeRDD(List((1,"1"),(2,"2"),(3,"3")))
自己写的话 笛卡尔乘积先出来然后过滤
rdd1.join(rdd2).filter{
case (key,(v1,v2)=>{
key == 1
})
}
sparksql
select * from t_table1 a join t_table2 b on a.x = b.x where a.id = 1
底层是 先过滤再笛卡尔乘积,若干底层优化。
rdd1.filter(xxx) ==> 1
join
rdd2.filter(xxx) ==> 1
什么是DataSet
DataSet是分布式数据集合
。DataSet是Spark 1.6中添加的一个新抽象,是DataFrame的一个扩展
。类似与ORM,它提供了RDD的优势(强类型,使用强大的lambda函数的能力)以及Spark SQL优化执行引擎的优点。DataSet也可以使用功能性的转换(操作map,flatMap,filter等等)。
- 是DataFrame API的一个
扩展
,是SparkSQL最新的数据抽象; - 用户友好的API风格,既具有类型安全检查也具有DataFrame的查询优化特性;
- 用样例类来对DataSet中定义数据的结构信息,样例类中每个属性的名称直接映射到DataSet中的字段名称;
- DataSet是
强
类型的。比如可以有DataSet[Car],DataSet[Person]。
三者区别: 单纯的RDD只有KV这样的数据没有结构,给RDD的数据增加若干结构
形成了DataFrame,而为了访问方便不再像SQL那样获取第几个数据,而是像读取对象
那种形成了DataSet。
第二章 SparkSQL编程
1. SparkSession新的起始点
在老的版本中,SparkSQL提供两种SQL查询起始点:一个叫SQLContext
,用于Spark自己提供的SQL查询;一个叫HiveContext
,用于连接Hive的查询。SparkSession
是Spark最新的SQL查询起始点,实质上是SQLContext和HiveContext的组合
,所以在SQLContex和HiveContext上可用的API在SparkSession上同样是可以使用的。SparkSession内部封装了sparkContext,所以计算实际上是由sparkContext
完成的
2. DataFrame
创建在Spark SQL中SparkSession
是创建DataFrame和执行SQL的入口,创建DataFrame有三种方式:通过Spark的数据源进行创建
;从一个存在的RDD进行转换
;还可以从Hive Table进行查询
返回。
从Spark数据源进行创建
- 查看Spark数据源进行创建的文件格式
scala> spark.read.
csv format jdbc json load option options orc parquet schema table text textFile
- 读取json文件创建DataFrame
scala> val df = spark.read.json("/opt/module/spark/examples/src/main/resources/people.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
- 展示结果
scala> df.show
---- -------
| age| name|
---- -------
|null|Michael|
| 30| Andy|
| 19| Justin|
---- -------
SQL风格语法(主要)
- 创建一个DataFrame
scala> val df = spark.read.json("/opt/module/spark/examples/src/main/resources/people.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
- 对DataFrame创建一个临时表,View是只读的,Table有改的意思哦。
scala> df.createOrReplaceTempView("people")
- 通过SQL语句实现查询全表
scala> val sqlDF = spark.sql("SELECT * FROM people")
sqlDF: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
---
scala> val del = spark.sql("drop table if exists stu")
del: org.apache.spark.sql.DataFrame = []
- 结果展示
scala> sqlDF.show
---- -------
| age| name|
---- -------
|null|Michael|
| 30| Andy|
| 19| Justin|
---- -------
注意
:普通临时表是Session
范围内的,如果想应用范围内有效,可以使用全局临时表。使用全局临时表时需要全路径访问
,如:global_temp.people
5. 对于DataFrame创建一个全局表
scala> df.createGlobalTempView("people")
- 通过SQL语句实现查询全表
scala> spark.sql("SELECT * FROM global_temp.people").show()
---- -------
| age| name|
---- -------
|null|Michael|
| 30| Andy|
| 19| Justin|
创建新session
scala> spark.newSession().sql("SELECT * FROM global_temp.people").show()
---- -------
| age| name|
---- -------
|null|Michael|
| 30| Andy|
| 19| Justin|
---- -------
DSL风格语法(次要)
- 创建一个DataFrame
scala> val df = spark.read.json("/opt/module/spark/examples/src/main/resources/people.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
- 查看DataFrame的Schema信息
scala> df.printSchema
root
|-- age: long (nullable = true)
|-- name: string (nullable = true)
- 只查看”name”列数据
scala> df.select("name").show()
-------
| name|
-------
|Michael|
| Andy|
| Justin|
-------
- 查看”name”列数据以及”age 1”数据
scala> df.select($"name", $"age" 1).show()
------- ---------
| name|(age 1)|
------- ---------
|Michael| null|
| Andy| 31|
| Justin| 20|
------- ---------
- 查看”age”大于”21”的数据
scala> df.filter($"age" > 21).show()
--- ----
|age|name|
--- ----
| 30|Andy|
--- ----
- 按照”age”分组,查看数据条数
scala> df.groupBy("age").count().show()
---- -----
| age|count|
---- -----
| 19| 1|
|null| 1|
| 30| 1|
---- -----
RDD转换为DataFrame
注意
:如果需要RDD与DF或者DS之间操作,那么都需要引入 import spark.implicits._ (spark不是包名,而是sparkSession对象的名称) 前置条件:导入隐式转换并创建一个RDD
1. 手动转换
代码语言:javascript复制scala>
import spark.implicits._
scala> val peopleRDD = sc.textFile("examples/src/main/resources/people.txt")
peopleRDD: org.apache.spark.rdd.RDD[String] = examples/src/main/resources/people.txt MapPartitionsRDD[3] at textFile at <console>:27
- 通过手动确定转换
scala> peopleRDD.map{x=>val para = x.split(",");(para(0),para(1).trim.toInt)}.toDF("name","age")
res1: org.apache.spark.sql.DataFrame = [name: string, age: int]
在这里插入图片描述
2. 通过反射确定(需要用到样例类)
- 创建一个样例类
scala> case class People(name:String, age:Int)
- 根据样例类将RDD转换为DataFrame
scala> peopleRDD.map{ x => val para = x.split(",");People(para(0),para(1).trim.toInt)}.toDF
res2: org.apache.spark.sql.DataFrame = [name: string, age: int]
peopleRDD.map(x=>{People(x._1,x._2)}).toDF
3. 通过编程的方式(了解)
- 导入所需的类型
scala> import org.apache.spark.sql.types._
import org.apache.spark.sql.types._
- 创建Schema
scala> val structType: StructType = StructType(StructField("name", StringType) :: StructField("age", IntegerType) :: Nil)
structType: org.apache.spark.sql.types.StructType = StructType(StructField(name,StringType,true), StructField(age,IntegerType,true))
- 导入所需的类型
scala> import org.apache.spark.sql.Row
import org.apache.spark.sql.Row
- 根据给定的类型创建二元组RDD
scala> val data = peopleRDD.map{ x => val para = x.split(",");Row(para(0),para(1).trim.toInt)}
data: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[6] at map at <console>:33
- 根据数据及给定的schema创建DataFrame
scala> val dataFrame = spark.createDataFrame(data, structType)
dataFrame: org.apache.spark.sql.DataFrame = [name: string, age: int]
DataFrame转换为RDD
直接调用rdd
即可
- 创建一个DataFrame
scala> val df = spark.read.json("/opt/module/spark/examples/src/main/resources/people.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
- 将DataFrame转换为RDD
scala> val dfToRDD = df.rdd
dfToRDD: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[19] at rdd at <console>:29
DataFrame 关心的是行,所以转换的时候是按照行来转换的
- 打印RDD
scala> dfToRDD.collect
res13: Array[org.apache.spark.sql.Row] = Array([Michael, 29], [Andy, 30], [Justin, 19])
3. DataSet
DataSet是具有强类型的数据集合,需要提供对应的类型信息。
创建
- 创建一个样例类
scala> case class Person(name: String, age: Long)
defined class Person
- 创建DataSet
scala> val caseClassDS = Seq(Person("Andy", 32)).toDS()
caseClassDS: org.apache.spark.sql.Dataset[Person] = [name: string, age: bigint]
RDD转换为DataSet
SparkSQL能够自动将包含有case类的RDD转换成DataFrame,case类定义了table的结构,case类属性通过反射变成了表的列名。Case类可以包含诸如Seqs或者Array等复杂的结构。
- 创建一个RDD
scala> val peopleRDD = sc.textFile("examples/src/main/resources/people.txt")
peopleRDD: org.apache.spark.rdd.RDD[String] = examples/src/main/resources/people.txt MapPartitionsRDD[3] at textFile at <console>:27
- 创建一个样例类
scala> case class Person(name: String, age: Long)
defined class Person
- 将RDD转化为DataSet
scala> peopleRDD.map(line => {val para = line.split(",");Person(para(0),para(1).trim.toInt)}).toDS
res8: org.apache.spark.sql.Dataset[Person] = [name: string, age: bigint]
DataSet转换为RDD
调用rdd方法即可。
- 创建一个DataSet
scala> val DS = Seq(Person("Andy", 32)).toDS()
DS: org.apache.spark.sql.Dataset[Person] = [name: string, age: bigint]
- 将DataSet转换为RDD
scala> DS.rdd
res11: org.apache.spark.rdd.RDD[Person] = MapPartitionsRDD[15] at rdd at <console>:28
4. DataFrame与DataSet的互操作
DataFrame转DataSet
- 创建一个DateFrame
scala> val df = spark.read.json("examples/src/main/resources/people.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
- 创建一个样例类
scala> case class Person(name: String, age: Long)
defined class Person
- 将DataFrame转化为DataSet,添加类型
scala> df.as[Person]
res14: org.apache.spark.sql.Dataset[Person] = [age: bigint, name: string]
Dataset转DataFrame
- 创建一个样例类
scala> case class Person(name: String, age: Long)
defined class Person
- 创建DataSet
scala> val ds = Seq(Person("Andy", 32)).toDS()
ds: org.apache.spark.sql.Dataset[Person] = [name: string, age: bigint]
- 将DataSet转化为DataFrame
scala> val df = ds.toDF
df: org.apache.spark.sql.DataFrame = [name: string, age: bigint]
- 展示
scala> df.show
---- ---
|name|age|
---- ---
|Andy| 32|
---- ---
这种方法就是在给出每一列的类型后,使用as方法,转成Dataset,这在数据类型是DataFrame又需要针对各个字段处理时极为方便。在使用一些特殊的操作时,一定要加上import spark.implicits._
不然toDF
、toDS
无法使用。
RDD、DataFrame、DataSet
在SparkSQL中Spark为我们提供了两个新的抽象,DataFrame跟DataSet,他们跟RDD的区别首先从版本上来看
代码语言:javascript复制RDD(Spark1.0) ----> DataFrame(Spark1.3)---->DataSet(Spark1.6)
如果同样的数据都给到了这三个数据结构,他们分别计算后会得到相同的结果,不同的是他们的执行效率跟执行方式,在后期的Spark版本中DataSet会逐步取代另外两者称为唯一接口。
所以在做一个整体的项目时候,一般还是以Java为主,只有在涉及到迭代式计算采用到Scala这样到函数式编程。
相同点
- RDD、DataFrame、DataSet全部都是平台下到
分布式弹性数据集
,为处理超大型数据提供了便利 - 三者都有
惰性机制
,在创建,转换,如map方法时候不会立即执行,只有遇到了Action算子比如foreach,三者才会开始遍历数据 - 三者都会根据spark的内存进行
自动缓存运算
,当数据量超大时候会自动写到磁盘,不用担心内存溢出。 - 三者都有partition的概念。
- 三者都有许多共同函数,如filter,排序等。
- 在对DataFrame跟DataSet进行许多操作都要
import spark.implicits._
- DataFrame跟DataSet均可使用模式匹配获取各个字段的值跟类型。 DataFrame:
testDF.map{
case Row(col1:String,col2:int)=>{
println(col1)
println(col2)
col1
}
case _ => ""
}
DataSet:
代码语言:javascript复制case class Coltest(col1:String,col2:Int) extends Serializable
//定义各个类型
testDS.map{
case Coltest(col1:String,col2:Int) =>{
println(col1)
println(col2)
col1
}
case _ => " "
}
不同点
- RDD:
- RDD 一般跟sparkMlib 同时使用
- RDD 不支持sparkSQL操作
- DataFrame
- 跟RDD和DataSet不同,DataFrame 每一行类型都固定为Row,每一列值无法直接访问,只有通过解析才可以获得各个字段。
testDf.foreach{
line=>
val col1 = line.getAs[String]("col1")
val col2 = line.getAs[String]("col2")
}
- DataFrame跟DataSet一般不跟sparkMlib共同使用。
- DataFrame跟DataSet均支持sparkSQL,比如select,groupby,临时注册视图,执行SQL语句。
dataDF.createOrReplaceTempView("tmp")
spark.sql("select Row,DATE from tmp where DATE is not null order by DATE").show(100,false)
- DataFrame 跟DataSet支持一些特别方便的保存方式,比如csv,可以带表头,每一列字段一目了然。这样的保存方式可以方便的获得字段名跟列的对应,而且分隔符(delimiter)可自定义
val saveoptions = Map("header"->"true","delimiter"->"t","path"->"hdfs://hadoop102:9000/test")
val dataDF = spark.read.options(options).format("com.sowhat.spark.csv").load()
- DataSet
- DataSet 跟DataFrame拥有完全一样的成员函数,唯一区别就是每一行数据类型不同。
- DataFrame也可以叫DataSet[Row],每一行类型都是Row,不解析每一行究竟有那些字段,每个字段又是什么类型无从得知,只能通上面提到的
getAs
方法或者共性的第七条的模式匹配来拿出特定的字段,而DataSet中每一行是什么类型是不一定的,在自定义了case class 之后可以自由获得每一行信息。
case class Coltest(col1:String,col2:Int) extends Serializable
//定义字段名跟类型
val test:DataSet[Coltest] = rdd.map{
Coltest(line._1,line_2)
}.toDS
test.map{
line=>
println(line.col1)
println(line.col2)
}
可以看出,DataSet在需要访问列中的某个字段时候非常方便,然而如果要写一些是适配性极强的函数时候,如果使用DataSet,行的类型又不确定,可能是各自case class,无法实现适配,这时候可以用DataFrame 既DataSet[Row]很好的解决问题。
IDEA 创建SparkSQL
引入依赖
代码语言:javascript复制 <dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.11</artifactId>
<version>2.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>1.2.1</version>
</dependency>
</dependencies>
入门demo
代码语言:javascript复制package com.sowhat.udaf
import org.apache.spark.sql.{DataFrame, SparkSession}
object TestCustomerAvg {
def main(args: Array[String]): Unit = {
//1.创建SparkSession
val spark: SparkSession = SparkSession
.builder()
.master("local[*]")
.appName("WordCount")
.getOrCreate()
//2.导入隐式转换
import spark.implicits._
//3.读取文件创建DF
val df: DataFrame = spark.read.json("/Users/liujinjie/Downloads/Spark1015/SparkSQL/src/data/people.json")
//4.创建一张临时表
df.createTempView("people")
spark.sql("select * from people").show
//7.关闭连接
spark.stop()
}
}
RDDDFDS 转换
代码语言:javascript复制package com.sowhat.test
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
object RDDToDF {
def main(args: Array[String]): Unit = {
//1.创建SparkSession
val spark: SparkSession = SparkSession
.builder()
.master("local[*]")
.appName("RDDToDF")
.getOrCreate()
//2.导入隐式转换
import spark.implicits._
//3.创建RDD
val rdd: RDD[(Int, String, Int)] = spark.sparkContext.makeRDD(List((1, "zhang", 20), (2, "san", 30), (3, "si", 40)))
val value: RDD[User] = rdd.map {
case (id, name, age) => User(id, name, age)
}
val userDS: Dataset[User] = value.toDS()
val rdd2: RDD[User] = userDS.rdd
rdd2.foreach(println)
// 转换为DF
val df: DataFrame = rdd.toDF("id", "name", "age")
// 转换为DS
val ds: Dataset[User] = df.as[User]
// 转换为DF
val df1: DataFrame = ds.toDF()
// 转换为RDD
val rdd1: RDD[Row] = df1.rdd
rdd1.foreach(row => {
// 这个是数据的索引
println(row.getString(1))
})
//8.关闭连接
spark.stop()
}
}
case class User(id: Int, name: String, age: Int)
建议SparkSQL开发尽量下面三行直接写好
代码语言:javascript复制 val sparkconf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("demo")
val spark: SparkSession = SparkSession.builder().config(sparkconf).getOrCreate()
// 进行转换前 需要引入隐式转换规则,这里引入的是SparkSession 对象名字
import spark.implicits._
用户自定义函数
在Shell窗口中可以通过spark.udf功能用户可以自定义函数。
UDF
- 创建DataFrame
scala> val df = spark.read.json("examples/src/main/resources/people.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
- 打印数据
scala> df.show()
---- -------
| age| name|
---- -------
|null|Michael|
| 30| Andy|
| 19| Justin|
---- -------
- 注册UDF,功能为在数据前添加字符串
scala> spark.udf.register("addName", (x:String)=> "Name:" x)
res5: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,StringType,Some(List(StringType)))
- 创建临时表
scala> df.createOrReplaceTempView("people")
- 应用UDF
scala> spark.sql("Select addName(name), age from people").show()
----------------- ----
|UDF:addName(name)| age|
----------------- ----
| Name:Michael|null|
| Name:Andy| 30|
| Name:Justin| 19|
----------------- ----
UDAF
强
类型的Dataset和弱
类型的DataFrame都提供了相关的聚合函数, 如 count(),countDistinct(),avg(),max(),min()。除此之外,用户可以设定自己的自定义聚合函数。通过继承UserDefinedAggregateFunction来实现用户自定义聚合函数。需求
:实现求平均工资的自定义聚合函数。
people.json
{"name":"Michael","age": 21}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}
弱类型实现
依据DataFrame
类型的查询数据,只能通过索引形式找到数据,必须记住自己的数据对应的索引位置。注意导入正确的package ! 自定义若类型函数
package com.atguigu.udaf
import org.apache.spark.sql.Row
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types._
// 数据类型均跟SparkSQL相关的类型
object CustomerAvg extends UserDefinedAggregateFunction {
//输入数据类型
override def inputSchema: StructType = StructType(StructField("input", LongType) :: Nil)
//缓存数据的类型
override def bufferSchema: StructType = StructType(StructField("sum", LongType ) :: StructField("count", LongType) :: Nil)
//输出数据类型
override def dataType: DataType = DoubleType
//函数确定性
override def deterministic: Boolean = true
// 计算前 缓冲区 初始化
override def initialize(buffer: MutableAggregationBuffer): Unit = {
buffer(0) = 0L
buffer(1) = 0L
}
//分区内 数据 更新
override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
if (!input.isNullAt(0)) {
buffer(0) = buffer.getLong(0) input.getLong(0)
buffer(1) = buffer.getLong(1) 1L
}
}
//多个节点多缓冲区 合并值
override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
buffer1(0) = buffer1.getLong(0) buffer2.getLong(0)
buffer1(1) = buffer1.getLong(1) buffer2.getLong(1)
}
//计算最终结果
override def evaluate(buffer: Row): Any = {
buffer.getLong(0).toDouble / buffer.getLong(1)
}
}
调用
代码语言:javascript复制package com.atguigu.udaf
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession, TypedColumn}
object TestCustomerAvg {
def main(args: Array[String]): Unit = {
//1.创建SparkSession
val spark: SparkSession = SparkSession
.builder()
.master("local[*]")
.appName("WordCount")
.getOrCreate()
//2.导入隐式转换
import spark.implicits._
//3.读取文件创建DF
val df: DataFrame = spark.read.json("/Users/liujinjie/Downloads/Spark1015/SparkSQL/src/data/people.json")
//4.创建一张临时表
df.createTempView("people")
//5.注册函数
spark.udf.register("MyAvg", CustomerAvg)
//6.使用UDAF
spark.sql("select MyAvg(age) as sqlAge from people").show
//创建聚合对象
val udaf = new MyAgeAvgClassFunction
// 将聚合函数查询转换为查询列
val avgCol: TypedColumn[UserBean, Double] = udaf.toColumn.name("avgAge")
val userDS: Dataset[UserBean] = df.as[UserBean]
userDS.select(avgCol).show()
//7.关闭连接
spark.stop()
}
}
在这里插入图片描述
强类型实现
强类型无法使用SQL形式查询调用函数,只能用DSL风格。 自定义函数
代码语言:javascript复制package com.atguigu.spark
import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.sql.{Encoder, Encoders, SparkSession}
// 既然是强类型,可能有case类
case class Employee(name: String, salary: Long)
case class Average(var sum: Long, var count: Long)
class MyAverage extends Aggregator[Employee, Average, Double] {
// 定义一个数据结构,保存工资总数和工资总个数,初始都为0
def zero: Average = Average(0L, 0L)
// 聚合相同executor分片中的结果
def reduce(buffer: Average, employee: Employee): Average = {
buffer.sum = employee.salary
buffer.count = 1
buffer
}
// 聚合不同execute的结果
def merge(b1: Average, b2: Average): Average = {
b1.sum = b2.sum
b1.count = b2.count
b1
}
// 计算输出
def finish(reduction: Average): Double = reduction.sum.toDouble / reduction.count
// 设定中间值类型的编码器,要转换成case类
// Encoders.product是进行scala元组和case类转换的编码器
def bufferEncoder: Encoder[Average] = Encoders.product
// 设定最终输出值的编码器
def outputEncoder: Encoder[Double] = Encoders.scalaDouble
}
object MyAverage{
def main(args: Array[String]) {
//创建SparkConf()并设置App名称
val spark = SparkSession
.builder()
.appName("sowhat")
.master("local[4]")
.config("spark.testing.memory", "471859200")
.getOrCreate()
import spark.implicits._
// For implicit conversions like converting RDDs to DataFrames
val ds = spark.read.json("employees.json").as[Employee]
ds.show()
val averageSalary = new MyAverage().toColumn.name("average_salary")
val result = ds.select(averageSalary)
result.show()
spark.stop()
}
}
/**
* {"name":"Michael", "salary":3000}
* {"name":"Andy", "salary":4500}
* {"name":"Justin", "salary":3500}
* {"name":"Berta", "salary":4000}
*
* */
在这里插入图片描述
第三章 Spark SQL数据的加载与保存
通用加载/保存方法
1. 加载数据
- read直接加载数据
scala> spark.read.
csv jdbc json orc parquet textFile… …
注意
:加载数据的相关参数需写到上述方法中。如:textFile需传入加载数据的路径,jdbc需传入JDBC相关参数。 2. format指定加载数据类型
scala> spark.read.format("…")[.option("…")].load("…")
用法详解
: 3. format("…"):指定加载的数据类型,包括"csv"、"jdbc"、"json"、"orc"、"parquet"和"textFile"。 4. load("…"):在"csv"、"orc"、"parquet"和"textFile"格式下需要传入加载数据的路径。 5. option("…"):在"jdbc"格式下需要传入JDBC相应参数,url、user、password和dbtable
2. 保存数据
- write直接保存数据
scala> df.write.
csv jdbc json orc parquet textFile… …
注意
:保存数据的相关参数需写到上述方法中。如:textFile需传入加载数据的路径,jdbc需传入JDBC相关参数。 2. format指定保存数据类型
scala> df.write.format("…")[.option("…")].save("…")
用法详解
:
- format("…"):指定保存的数据类型,包括"csv"、"jdbc"、"json"、"orc"、"parquet"和"textFile"。
- save ("…"):在"csv"、"orc"、"parquet"和"textFile"格式下需要传入保存数据的路径。
- option("…"):在"jdbc"格式下需要传入JDBC相应参数,url、user、password和dbtable
- 文件保存选项 可以采用SaveMode执行存储操作,SaveMode定义了对数据的处理模式。SaveMode是一个枚举类,其中的常量包括:
- Append:当保存路径或者表已存在时,追加内容;
- Overwrite: 当保存路径或者表已存在时,覆写内容;
- ErrorIfExists:当保存路径或者表已存在时,报错;
- Ignore:当保存路径或者表已存在时,忽略当前的保存操作。
使用详解:
代码语言:javascript复制df.write.mode(SaveMode.Append).save("… …")
df.write.mode("append").save("… …")
3. 默认数据源Parquet
Parquet是一种流行的列式存储格式,可以高效的存储具有嵌套字段的记录,Parquet格式经常在Hadoop生态圈使用,它也支持SparkSQL的全部数据类型,SparkSQL提供了直接读取跟存储Parquet格式文件的方法。并且可以通过format()来指定输入输出文件格式。
代码语言:javascript复制spark.read.format("csv").load("pwd")
- 加载数据
val df = spark.read.load("examples/src/main/resources/users.parquet")
- 保存数据
df.select("name", " color").write.save("user.parquet")
JSON文件
Spark SQL 能够自动推测 JSON数据集的结构,并将它加载为一个Dataset[Row]. 可以通过SparkSession.read.json()去加载一个一个JSON 文件。目的
:Spark读写Json数据,其中数据源可以在本地也可以在HDFS文件系统注意
:这个JSON文件不是一个传统的JSON文件,每一行
都得是一个JSON串。格式如下:
{"name":"Michael"}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}
- 导入隐式转换
import spark.implicits._
- 加载JSON文件
val path = "examples/src/main/resources/people.json"
val peopleDF = spark.read.json(path)
- 创建临时表
peopleDF.createOrReplaceTempView("people")
- 数据查询
val teenagerNamesDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19")
teenagerNamesDF.show()
------
| name|
------
|Justin|
------
MySQL文件
Spark SQL可以通过JDBC从关系型数据库中读
取数据的方式创建DataFrame,通过对DataFrame一系列的计算后,还可以将数据再写
回关系型数据库中。目的
:spark读写MySQL数据
可在启动shell时指定相关的数据库驱动路径,或者将相关的数据库驱动放到spark的类路径下。 cp /opt/mysql-libs/mysql-connector-java-5.1.27-bin.jar /opt/spark/jars
- 启动spark-shell
bin/spark-shell --master spark://hadoop102:7077 [--jars mysql-connector-java-5.1.27-bin.jar]
- 定义JDBC相关参数配置信息
val connectionProperties = new java.util.Properties()
connectionProperties.put("user", "root")
connectionProperties.put("password", "000000")
- 使用read.jdbc加载数据
val jdbcDF2 = spark.read.jdbc("jdbc:mysql://hadoop102:3306/rdd", "tableName", connectionProperties)
- 使用format形式加载数据
val jdbcDF = spark.read.format("jdbc").option("url", "jdbc:mysql://hadoop102:3306/rdd").option("dbtable", " rddtable").option("user", "root").option("password", "000000").load()
- 使用write.jdbc保存数据
jdbcDF2.write.module(“append”).jdbc("jdbc:mysql://hadoop102:3306/mysql", "db", connectionProperties)
- 使用format形式保存数据
jdbcDF.write
.format("jdbc")
.option("url", "jdbc:mysql://hadoop102:3306/rdd")
.option("dbtable", "rddtable3")
.option("user", "root")
.option("password", "000000")
.save()
其中保存的时候确保主键等信息 ,也也可以选择往mysql中添加数据的module。
Hive
Apache Hive是Hadoop上的SQL引擎,Spark SQL编译时可以包含Hive支持,也可以不包含。包含Hive支持的Spark SQL可以支持Hive表访问、UDF(用户自定义函数)以及Hive查询语言(HQL)等。spark-shell 默认是Hive支持的;代码中是默认不支持的,需要手动指定 enableHiveSupport()
。
SparkSQL中的SparkSession 就包含来自Hive跟SparkSQL的数据,这里的Hive是内置的Hive,跟HBase 里的内部独立ZooKeeper类似。工作中要跟外部Hive关联的。 内部Hive存储元数据路径:
代码语言:javascript复制/opt/module/spark/metastore_db 来存储元数据
内嵌Hive 应用
如果要使用内嵌
的Hive,什么都不用做,直接用就可以了。 前面的 RDD、DF、DS切换的时候数据都是创建的view。isTemporary = true
,但是也可以用内置的Hive来创建table哦! 可以修改其数据仓库地址,参数为:--conf spark.sql.warehouse.dir=./wear
在这里插入图片描述
注意
:如果你使用的是内部的Hive,在Spark2.0之后,spark.sql.warehouse.dir用于指定数据仓库的地址,如果你需要是用HDFS作为路径,那么需要将core-site.xml和hdfs-site.xml 加入到Spark conf目录,否则只会创建master节点上的warehouse目录,查询时会出现文件找不到的问题,这是需要使用HDFS,则需要将metastore删除,重启集群。
外部Hive应用
如果想连接外
部已经部署好的Hive,需要通过以下几个步骤。
将Hive中的hive-site.xml拷贝或者软连接到Spark安装目录下的conf目录下
。
- 打开
spark shell
,注意带上访问Hive元数据库的JDBC客户端 bin/spark-shell --master spark://hadoop102:7077 --jars mysql-connector-java-5.1.27-bin.jar注意
:每次启动时指定JDBC jar包路径很麻烦,我们可以选择将JDBC的驱动包放置在spark的lib目录下,一劳永逸。
运行Spark SQL CLI
Spark SQL CLI可以很方便的在本地运行Hive元数据服务以及从命令行执行查询任务。在Spark目录下执行如下命令启动Spark SQL CLI,直接执行SQL语句,类似一Hive窗口。
代码语言:javascript复制/bin/spark-sql
然后就可以跟在hive的终端一样进行CRUD即可了,可能会出现 若干bug
代码中操作Hive
添加依赖
代码语言:javascript复制 <dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.11</artifactId>
<version>2.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>1.2.1</version>
</dependency>
源数据:
代码语言:javascript复制1,sowhat
2,zhang
3,li
整体思路就是,创建就是跟Hive一样指定spark hive数据存放spark.sql.warehouse.dir
路径,指定spark hive的元数据存储信息metastore_db
。
package com.atguigu.hive
/**
* Created by wuyufei on 05/09/2017.
*/
import java.io.File
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.SparkSession
case class Record(key: Int, value: String)
object HiveOperation {
def main(args: Array[String]) {
Logger.getLogger("org").setLevel(Level.OFF)
Logger.getLogger("akka").setLevel(Level.OFF)
val warehouseLocation = new File("spark-warehouse").getAbsolutePath // 设定数据路径
println(warehouseLocation)
val spark = SparkSession
.builder()
.appName("Spark Hive Example")
.config("spark.sql.warehouse.dir", warehouseLocation)
.enableHiveSupport() // 注意添加
.master("local[*]")
.config("spark.testing.memory", "471859200")
.getOrCreate()
//import spark.implicits._
spark.sql("CREATE TABLE IF NOT EXISTS user (key INT, value STRING) row format delimited fields terminated by ',' ")
spark.sql("LOAD DATA LOCAL INPATH 'D:/json/kg.txt' INTO TABLE user")
// Queries are expressed in HiveQL
val df = spark.sql("SELECT * FROM user")
df.show()
df.write.format("json").save("D:/json/ssss.json")
spark.stop()
}
}
输出数据格式:
SparkSQL跟Hive实战
各种依赖:
代码语言:javascript复制<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
<!--<scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>${spark.version}</version>
<!--<scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>${spark.version}</version>
<!--<scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.11</artifactId>
<version>2.1.1</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.44</version>
</dependency>
</dependencies>
实例代码
代码语言:javascript复制val spark = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate() val
sc
: SparkContext = spark.sparkContext
package com.atguigu.spark
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, SaveMode, SparkSession}
// 订单号,交易位置,交易日期
case class tbStock(ordernumber: String, locationid: String, dateid: String) extends Serializable
// 订单号,行号,货品,数量,单价,销售额
case class tbStockDetail(ordernumber: String, rownum: Int, itemid: String, number: Int, price: Double, amount: Double) extends Serializable
// 日期,年月,年,月,日,周几,第几周,季度,旬,半月
case class tbDate(dateid: String, years: Int, theyear: Int, month: Int, day: Int, weekday: Int, week: Int, quarter: Int, period: Int, halfmonth: Int) extends Serializable
object Practice {
// 将DataFrame插入到Hive表
private def insertHive(spark: SparkSession, tableName: String, dataDF: DataFrame): Unit = {
spark.sql("DROP TABLE IF EXISTS " tableName)
dataDF.write.saveAsTable(tableName)
}
// 结果写入到MySQL
private def insertMySQL(tableName: String, dataDF: DataFrame): Unit = {
dataDF.write
.format("jdbc")
.option("url", "jdbc:mysql://localhost:3306/sparksql")
.option("dbtable", tableName)
.option("user", "root")
.option("password", "root")
.mode(SaveMode.Overwrite)
.save()
}
def main(args: Array[String]): Unit = {
// 创建Spark配置
val sparkConf = new SparkConf().setAppName("MockData").setMaster("local[*]")
// 创建Spark SQL 客户端
val spark = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate()
import spark.implicits._
// 加载数据到Hive,读取本地数据 直接 根据结构跟对象 生成DS
val tbStockRdd: RDD[String] = spark.sparkContext.textFile("D:/json/tbStock.txt")
val tbStockDS: Dataset[tbStock] = tbStockRdd.map(_.split(",")).map(attr => tbStock(attr(0), attr(1), attr(2))).toDS
insertHive(spark, "tbStock", tbStockDS.toDF)
val tbStockDetailRdd: RDD[String] = spark.sparkContext.textFile("D:/json/tbStockDetail.txt")
val tbStockDetailDS: Dataset[tbStockDetail] = tbStockDetailRdd.map(_.split(",")).map(attr => tbStockDetail(attr(0), attr(1).trim().toInt, attr(2), attr(3).trim().toInt, attr(4).trim().toDouble, attr(5).trim().toDouble)).toDS
insertHive(spark, "tbStockDetail", tbStockDetailDS.toDF)
val tbDateRdd: RDD[String] = spark.sparkContext.textFile("D:/json/tbDate.txt")
val tbDateDS: Dataset[tbDate] = tbDateRdd.map(_.split(",")).map(attr => tbDate(attr(0), attr(1).trim().toInt, attr(2).trim().toInt, attr(3).trim().toInt, attr(4).trim().toInt, attr(5).trim().toInt, attr(6).trim().toInt, attr(7).trim().toInt, attr(8).trim().toInt, attr(9).trim().toInt)).toDS
insertHive(spark, "tbDate", tbDateDS.toDF)
//需求一: 统计所有订单中每年的销售单数、销售总额
val result1: DataFrame = spark.sql("SELECT c.theyear, COUNT(DISTINCT a.ordernumber), SUM(b.amount) FROM tbStock a JOIN tbStockDetail b ON a.ordernumber = b.ordernumber JOIN tbDate c ON a.dateid = c.dateid GROUP BY c.theyear ORDER BY c.theyear")
insertMySQL("xq1", result1)
//需求二: 统计每年最大金额订单的销售额
val result2: DataFrame = spark.sql("SELECT theyear, MAX(c.SumOfAmount) AS SumOfAmount FROM (SELECT a.dateid, a.ordernumber, SUM(b.amount) AS SumOfAmount FROM tbStock a JOIN tbStockDetail b ON a.ordernumber = b.ordernumber GROUP BY a.dateid, a.ordernumber ) c JOIN tbDate d ON c.dateid = d.dateid GROUP BY theyear ORDER BY theyear DESC")
insertMySQL("xq2", result2)
//需求三: 统计每年最畅销货品
val result3: DataFrame = spark.sql("SELECT DISTINCT e.theyear, e.itemid, f.maxofamount FROM (SELECT c.theyear, b.itemid, SUM(b.amount) AS sumofamount FROM tbStock a JOIN tbStockDetail b ON a.ordernumber = b.ordernumber JOIN tbDate c ON a.dateid = c.dateid GROUP BY c.theyear, b.itemid ) e JOIN (SELECT d.theyear, MAX(d.sumofamount) AS maxofamount FROM (SELECT c.theyear, b.itemid, SUM(b.amount) AS sumofamount FROM tbStock a JOIN tbStockDetail b ON a.ordernumber = b.ordernumber JOIN tbDate c ON a.dateid = c.dateid GROUP BY c.theyear, b.itemid ) d GROUP BY d.theyear ) f ON e.theyear = f.theyear AND e.sumofamount = f.maxofamount ORDER BY e.theyear")
insertMySQL("xq3", result3)
spark.stop()
}
}
总结
- 学习跟理解RDD、DataFrame、DataSet三者之间的关系,跟如何相互转换。
- SparkSession操作Json、MySQL、Hive。主要是环境的搭建跟table的操作各种。
参考
Spark全套资料