文章大纲
在《20张图详解 Spark SQL 运行原理及数据抽象》的第 5 节“SparkSession”中,我们知道了 Spark SQL 就是基于 SparkSession 作为入口实现的。
在 Spark 2.0 版本之后,SparkSession 封装了 SQLContext 及 HiveContext,实现了后两者的所有功能,并可以获取到 SparkConetxt。
那 Spark SQL 具体的实现方式是怎样的?如何进行使用呢?
下面就带大家一起来认识 Spark SQL 的使用方式,并通过十步操作实战,轻松拿下 Spark SQL 的使用。
1
DataSet 及 DataFrame 的创建
在《20张图详解 Spark SQL 运行原理及数据抽象》的第 4 节“Spark SQL 数据抽象”中,我们认识了 Spark SQL 中的两种数据抽象:DataSet 及 DataFrame。
而在《带你理解 Spark 中的核心抽象概念:RDD》的 2.1 节中,我们认识了如何在 Spark 中创建 RDD,那 DataSet 及 DataFrame 在 Spark SQL 中又是如何进行创建的呢?
DataSet 及 DataFrame 的创建方式有两种:
1.1
使用 Spark 创建函数进行创建
手动定义数据集合,然后通过 Spark 的创建操作函数 createDataset()
、createDataFrame()
, 创建 DataSet、DataFrame:
DataSet:
代码语言:javascript复制//DataSet
case class Person(name:String, age:Int, height:Int)
val seq1 = Seq(Person("Michael", 25, 176), Person("Jack", 15, 165))
val ds1 = spark.createDataset(seq1)
ds1.show
使用 Spark 创建操作函数创建 DataSet
DataFrame:
代码语言:javascript复制//DataFrame
val seq2 = Seq(("Michael", 25, 176), ("Jack", 15, 165))
val df1 = spark.createDataFrame(seq2).toDF("name", "age", "height")
df1.show
使用 Spark 创建操作函数创建 DataFrame
由于这种方式需要手动定义数据,实际操作中并不常用。
1.2
读取数据源进行创建
Spark SQL 支持的数据源包括:文件、数据库、Hive 等。
1.2.1. 读取文件数据源
Spark SQL 支持的文件类型包括:parquet、text、csv、json、orc 等。
例如读取 Spark 自带的 text 文件:
代码语言:javascript复制val sc = spark.sparkContext
val textRDD1 = sc.textFile("file:///opt/modules/spark/examples/src/main/resources/people.txt")
textRDD1.take(5)
Spark SQL 读取文件数据源方式一
或:
代码语言:javascript复制val textRDD2 = spark.read.text("file:///opt/modules/spark/examples/src/main/resources/people.txt").rdd
textRDD2.take(5)
Spark SQL 读取文件数据源方式二
两种用法的区别在于返回的数据集类型不一样
sc.textFile(path:String)
返回的数据集类型是:RDD[String]spark.read.text(path:String)
返回的数据集类型是:DataFrame(DataSet[Row])
1.2.2. 读取数据库数据源
Spark SQL 支持通过 JDBC 读取外部数据库的数据作为数据源。
以读取 Oracle 数据库为例:
启动 Spark Shell 时,指定 Oracle 数据库的驱动:
代码语言:javascript复制spark-shell --master spark://hadoop101:7077
--jars /root/temp/ojdbc6.jar
--driver-class-path /root/temp/ojdbc6.jar
连接数据库,以读取数据库中的数据:
代码语言:javascript复制val oracleDF = spark.read.format("jdbc").
option("url","jdbc:oracle:thin:@192.168.100.1:1521/orcl.example.com").
option("dbtable","scott.emp").
option("user","scott").
option("password","test").
load
1.2.3. 使用 Hive 中的数据
Spark SQL 是由 Shark 发展而来的,Shark 其实就是 Hive on Spark。Spark 1.0 版本发布后,才引入了 Spark SQL。
2014 年 7 月 1 日之后,Databricks 宣布终止对 Shark 的开发,将重点放到 Spark SQL 上。
Spark SQL 的具体发展史详见下图:
Spark SQL 发展历史
可见,Spark 原生就对 Hive 的兼容十分友好,且其还内置了 Hive 组件,Spark SQL 可以通过内置 Hive 或者外部 Hive 两种方式读取 Hive 库中的数据。
Spark SQL 具体使用和操作 Hive 数据源的方法将在后续的 Hive 专栏中进行介绍。
2
RDD、DataFrame、DataSet 的共性与转换
在 Spark 中,RDD、DataFrame、DataSet 三种类型的数据集是有一定的共同特性的,因此它们三者之间可以相互进行转换,而且需要转换的场景也较为常见。
2.1
RDD、DataFrame、DataSet 的共性
- RDD、DataFrame、DataSet 都是 Spark 平台下的分布式弹性数据集,为处理超大型数据提供了便利;
- 三者都有惰性计算机制,在进行创建、Transformation 操作时,不会立即执行,只有在遇到 Action 操作时,才会开始遍历运算(详细介绍请参见《带你理解 Spark 中的核心抽象概念:RDD》中的第 2 节“RDD 的操作”);
- 三者都有 Partition 的概念,可以进行 Cache(缓存)操作,也可以进行 CheckPoint(检查点)操作(详细介绍请参见《7000字 15张图解,学习 Spark 入门基础知识》中的 4.3.4 节及 2.3 节);
- 三者都有许多相似的操作算子,如 map、filter、groupByKey 等(详细介绍请参见《带你理解 Spark 中的核心抽象概念:RDD》中的 2.3 节“RDD API 算子”);
- 在对 DataFrame 和 Dataset 进行操作时,很多情况下需要
spark.implicits._
进行支持。
2.2
RDD、DataFrame、DataSet 的转换
RDD、DataFrame、DataSet 之间的转换
2.2.1. DataFrame/DataSet 转 RDD
这个转换比较简单,直接调用 rdd 即可将 DataFrame/DataSet 转换为 RDD:
代码语言:javascript复制val rdd1 = testDF.rdd
val rdd2 = testDS.rdd
2.2.2. RDD 转 DataFrame
a. 通过编程的方式来设置 Schema,适用于编译器不能确定列的情况:
代码语言:javascript复制val peopleRDD = spark.sparkContext.textFile("file:///opt/modules/spark/examples/src/main/resources/people.txt")
val schemaString = "name age"
val filed = schemaString.split(" ").map(filename => org.apache.spark.sql.types.StructField(filename, org.apache.spark.sql.types.StringType, nullable = true))
val schema = org.apache.spark.sql.types.StructType(filed)
peopleRDD.map(_.split(",")).map(para => org.apache.spark.sql.Row(para(0).trim, para(1).trim))
val peopleDF = spark.createDataFrame(res6, schema)
peopleDF.show
b. 用元组把一行的数据写在一起,然后在 toDF()
中指定字段名:
val peopleDF2 = rdd.map(para(para(0).trim(), para(1).trim().toInt)).toDF("name", "age")
peopleDF2.show
c. 定义 case class,通过反射来设置 Schema,使用 toDF
进行转换:
case class Person(name:String, age:Int)
val peopleDF3 = spark.sparkContext.textFile("file:///opt/modules/spark/examples/src/main/resources/people.txt").map(_.split(",")).map(para => Person(para(0).trim, para(1).trim.toInt)).toDF
peopleDF3.show
RDD 转 DataFrame(case class 方式)
2.2.3. RDD 转 DataSet
定义 case class,通过反射来设置 Schema,使用 toDS
进行转换:
case class Person(name:String, age:Int)
val peopleDS = spark.sparkContext.textFile("file:///opt/modules/spark/examples/src/main/resources/people.txt").map(_.split(",")).map(para => Person(para(0).trim, para(1).trim.toInt)).toDS
peopleDS.show
RDD 转 DataSet
2.2.4. DataSet 转 DataFrame
直接调用 toDF
,即可将 DataSet 转换为 DataFrame:
val peopleDF4 = peopleDS.toDF
peopleDF4.show
DataSet 转 DataFrame
2.2.5. DataFrame 转 DataSet
使用 as
方法,as
方法后面跟的是 case class:
val peopleDS2 = peopleDF3.as[Person]
peopleDS2.show
DataFrame 转 DataSet
DataFrame 与 DataSet 均支持 Spark SQL 的算子操作,同时也能进行 SQL 语句操作,下面的实战中会进行演示。
3
Spark SQL 查询方式
Spark SQL 支持两种查询方式:一种是DSL 风格,另外一种是SQL 风格。
3.1
DSL 风格
Spark SQL 提供了一种 DSL(Domain Specified Language,领域专用语言,在语义上与 SQL 关系查询非常相近),以方便操作结构化数据。
使用前需要引入 spark.implicits._
这个隐式转换,以将 DataFrame 隐式转换成 RDD。
3.2
SQL 风格
Spark SQL 的一个强大之处就是我们可以将它看作是一个关系型数据表,然后可以通过在程序中使用 spark.sql()
来执行 SQL 查询,并返回结果数据集。
使用前需要将 DataFrame/DataSet 注册成一张表,注册方式分两种:
1. Local Temporary View
使用 createOrReplaceTempView()
或 createTempView()
方法可以将表注册成 Local Temporary View(局部临时视图),这种方式注册的表只对当前生命周期中的 Session 有效,不能与其它 Session 共享。
2. Global Temporary View
使用 createGlobalTempView()
方法可以将表注册成 Global Temporary View(全局临时视图),这种方式注册的表可以在不同的 Session 中共享,即跨 Session 有效,而且在 Application 的运行周期内可用。
需要注意的是,使用 SQL 语句访问该表时,要加上 global_temp
作为前缀来引用,因为全局临时视图是绑定到系统保留的数据库 global_temp 上的。
下面的实战中会有注册不同类型表区别的实例操作演示。
3.3
Spark SQL 算子
DSL 支持 Spark SQL 算子,且算子十分丰富,下面列举部分算子:
3.3.1. select 相关
a. 列的数据展示有多种表示方法:""
、$""
、'
、col()
、df("")
,注意不要混合使用:
// select
df1.select($"ename", $"age", $"sal").show
df1.select("ename", "age", "sal").show
df1.select('ename, 'age, 'sal).show
df1.select(col("ename"), col("age"), col("sal")).show
df1.select(df1("ename"), df1("age"), df1("sal")).show
b. expr
表达式可以对列进行操作,注意 expr
里面只能使用引号:
// expr表达式
df1.select(expr("age 1"), expr("sal 100"), expr("ename")).show
df1.selectExpr("ename as name").show
df1.selectExpr("round(sal, -3) as newsal", "sal", "ename").show
3.3.2. 更改相关
a. drop
可删除一个或多个列,得到新的 DataFrame:
// drop
df1.drop("age").show
df1.drop("age", "sal").show
b. withColumn
可对列值进行更改:
// withColumn
df1.withColumn("sal", $"sal" 100).show
c. withColumnRenamed
可对列名进行更改:
// withColumnRenamed
df1.withColumnRenamed("sal", "newsal").show
注意:以上操作后,返回的数据集的类型是 DataFrame。
3.3.3. 筛选过滤相关
筛选、过滤的操作可以使用 filter
或 where
算子:
// filter
df1.filter("sal > 10000").show
df1.filter("sal > 10000 and job == 'MANAGER'").show
// where
df1.where("sal > 10000").show
df1.where("sal > 10000 and job == 'MANAGER'").show
3.3.4. 聚集统计相关
使用 groupBy
算子搭配统计方式或 agg
可进行数据统计操作:
// groupBy with sum, min, max, avg, count
df1.groupBy("age").sum("sal").show
df1.groupBy("age").min("sal").show
df1.groupBy("age").max("sal").show
df1.groupBy("age").avg("sal").show
df1.groupBy("age").count.show
// agg
df1.groupBy("age").agg("sal" -> "sum", "sal" -> "min", "sal" -> "max", "sal" -> "avg", "sal" -> "count").show
df1.groupBy("age").agg(sum("sal"), min("sal"), max("sal"), avg("sal"), count("sal")).show
df1.groupBy("age").agg(sum("sal").as("sum1"), min("sal").as("min2"), max("sal").as("max3"), avg("sal").as("avg4"), count("sal").as("count5")).show
3.3.5. 排序相关
使用 orderBy
或 sort
算子可进行排序操作:
// orderBy
df1.orderBy("sal").show
df1.orderBy($"sal").show
df1.orderBy($"sal".asc).show
// 降序
df1.orderBy($"sal".desc).show
df1.orderBy(-'sal).show
df1.orderBy(-'age, -'sal).show
// sort
df1.sort("sal").show
df1.sort($"sal").show
df1.sort($"sal".asc).show
df1.sort($"sal".desc).show
df1.sort(-'sal).show
df1.sort(-'age, -'sal).show
3.3.6. 集合(并、交、差)相关
使用 union
(unionAll
)、intersect
、except
算子可对数据进行并集、交集、差集操作:
// union, unionAll, intersect, except
val ds3 = ds1.select("ename")
val ds4 = ds2.select("ename")
// union(求并集,不去重)
ds3.union(ds4).show
// unionAll(求并集,去重,过期方法)
ds3.unionAll(ds4).show
// intersect(求交集)
ds3.intersect(ds4).show
// except(求差集)
ds3.except(ds4).show
3.3.7. 连接相关
与 SQL 类似,连接类型有:内连接、左(外)连接、右(外)连接、全(外)连接、半连接、反连接、笛卡尔积等:
代码语言:javascript复制// join
// inner join(内连接)
ds1.join(ds2, "empno").show
ds1.join(ds2, Seq("empno"), "inner").show
// left join(左连接), left outer join(左外连接)
ds1.join(ds2, Seq("empno"), "left").show
ds1.join(ds2, Seq("empno"), "left_outer").show
// right join(右连接), right outer join(右外连接)
ds1.join(ds2, Seq("empno"), "right").show
ds1.join(ds2, Seq("empno"), "right_outer").show
// outer join(外连接), full join(全连接), full outer join(全外连接)
ds1.join(ds2, Seq("empno"), "outer").show
ds1.join(ds2, Seq("empno"), "full").show
ds1.join(ds2, Seq("empno"), "full_outer").show
//semi join(半连接), anti join(反连接)
ds1.join(ds2, Seq("empno"), "left_semi").show
ds1.join(ds2, Seq("empno"), "left_anti").show
注意:跟更改相关的算子一样,连接操作后,返回的数据集的类型是 DataFrame。
4
Spark SQL 使用实战
有了上面及之前介绍的理论知识为基础,下面手把手带大家十步轻松拿下 Spark SQL 使用操作,用实战的形式实践学习到的理论知识,以加深对 Spark SQL 的印象与理解。
4.1
创建数据源文件
这里使用《如何快速获取并分析自己所在城市的房价行情?》中获取到的广州二手房 csv 格式的数据作为数据源文件。
数据源文件(广州二手房信息)
另外再创建一个户型信息相关的数据源文件,以进行连接操作使用。
数据源文件(户型信息)
注意数据文件的编码格式要采用中文编码,否则中文会显示乱码。
4.2
上传数据源文件至 HDFS
这里使用《万字 50图,详解 Hadoop HA 完全分布式部署配置及运行调试》中搭建的 Hadoop 中的 HDFS 作为数据文件的存储系统,因此需要将创建的数据源文件上传至 HDFS 中,供 Spark SQL 进行读取。
上传数据源文件至 HDFS:
代码语言:javascript复制hdfs dfs -put /opt/data/ershouHousePrice_lianjia_gz_hdfs.csv /input
hdfs dfs -put /opt/data/huxing_lianjia_gz_hdfs.csv /input
打开 HDFS 的 Web 页面查看:
通过 HDFS Web 页面查看上传数据文件是否成功
可以看到,两个数据源文件已经成功上传至 HDFS 中。
4.3
定义 case class(表的 schema)
打开 SparkSession,定义 case class,即表的 Schema 信息:
代码语言:javascript复制case class House(totalprice:Float, positioninfo:String, huxing:String, chaoxiang:String, zhuangxiu:String, louceng:String, louling:String, louxing:String, danjia:Int, mianji:Float, guanzhu:Int)
定义 case class(House)
这里按照数据文件中的字段名称及对应的数据类型,对 Schema 进行定义。
4.4
读取数据源,加载数据(RDD 转 DataFrame)
读取上传到 HDFS 中的广州二手房信息数据文件,分隔符为逗号,将数据加载到上面定义的 Schema 中,并转换为 DataFrame 数据集:
代码语言:javascript复制val houseDF = spark.sparkContext.textFile("hdfs://hadoop100:8020/input/ershouHousePrice_lianjia_gz_hdfs.csv").map(_.split(",")).map(para => House(para(0).trim.toFloat, para(1).trim, para(2).trim, para(3).trim, para(4).trim, para(5).trim, para(6).trim, para(7).trim, para(8).trim.toInt, para(9).trim.toFloat, para(10).trim.toInt)).toDF
houseDF.show
读取并加载数据源文件
展示加载的数据集结果
由于数据加载到 Schema 中为 RDD 数据集,需要用 toDF
转换为 DataFrame 数据集,以使用 Spark SQL 进行查询。
4.5
使用 DSL 风格查询数据
使用 Spark SQL 的 DSL 风格查询方式,对 houseDF
数据集进行查询,包括 select、筛选过滤、聚集统计:
houseDF.select("positioninfo").show
houseDF.filter($"totalprice" > 1000).show
houseDF.groupBy($"huxing").count.show
DSL 风格 - 使用 select 算子
DSL 风格 - 使用筛选过滤算子
DSL 风格 - 使用聚集统计算子
大家还可以尝试使用上面介绍的其它 Spark SQL 算子进行查询。
4.6
注册表
为 houseDF
数据集注册两种不同类型的表:Local Temporary View、Global Temporary View:
houseDF.createOrReplaceTempView("houseDF")
houseDF.createGlobalTempView("houseDF_gl")
下面对这两种类型的表进行查询,观察两者之间的区别。
4.7
使用 SQL 风格查询数据
使用 Spark SQL 的 SQL 风格查询方式,对上面注册的两种不同类型表进行查询:
代码语言:javascript复制spark.sql("select * from houseDF").show
SQL 风格 - 查询 Local Temporary View
代码语言:javascript复制spark.sql("select * from global_temp.houseDF_gl").show
SQL 风格 - 查询 Global Temporary View
注意查询 Global Temporary View 类型表时,需要加上 global_temp
前缀。
在不同的 Session 中,对上面注册的两种表进行查询:
代码语言:javascript复制spark.newSession.sql("select * from houseDF").show
在新的 Session 中查询 Local Temporary View
代码语言:javascript复制spark.newSession.sql("select * from global_temp.houseDF_gl").show
在新的 Session 中查询 Global Temporary View
通过操作实践,可以看到:
Local Temporary View 只对当前 Session 有效;而 Global Temporary View 可以在不同 Session 间共享,支持跨 Session 查询。
4.8
DataFrame 转 DataSet
将 DataFrame 数据集 houseDF
转换成 DataSet 数据集 houseDS
:
val houseDS = houseDF.as[House]
houseDS.show
DataFrame 转 DataSet 实战
使用 DSL 风格查询方式,对 houseDS
数据集进行查询操作:
houseDS.filter(_.totalprice > 1000).show
houseDS.filter(_.huxing == "3室2厅").show
houseDS.groupBy($"huxing").count.show
对 DataSet 进行 DSL 风格查询
将 houseDS
数据集转换成 Array 类型结构数据:
houseDS.collect
对 DataSet 转换为 Array 类型结构数据
可见,DataFrame 转换为 DataSet 后,同样支持 Spark SQL 的算子操作。
4.
RDD 转 DataSet
重新读取并加载广州二手房信息数据源文件,将其转换为 DataSet 数据集:
代码语言:javascript复制val houseRdd = spark.sparkContext.textFile("hdfs://hadoop100:8020/input/ershouHousePrice_lianjia_gz_hdfs.csv").map(_.split(","))
val houseDS2 = houseRdd.map(para => House(para(0).trim.toFloat,para(1).trim,para(2).trim,para(3).trim,para(4).trim,para(5).trim,para(6).trim,para(7).trim,para(8).trim.toInt,para(9).trim.toFloat,para(10).trim.toInt)).toDS
houseDS2.show
RDD 转 DataSet 实战
将 houseDS2
数据集注册成表,并使用 SQL 风格查询方式进行查询:
houseDS2.createOrReplaceTempView("houseDS2")
spark.sql("select * from houseDS2").show
注册表并进行 SQL 风格查询
代码语言:javascript复制spark.sql("select totalprice, positioninfo, huxing, danjia, mianji from houseDS2 where totalprice > 1000 and mianji < 150 order by mianji").show
对 DataSet 进行 SQL 风格查询
SQL 风格查询方式更适合进行复杂的数据查询。
4.10
使用 SQL 风格进行连接查询
读取上传到 HDFS 中的户型信息数据文件,分隔符为逗号,将数据加载到定义的 Schema 中,并转换为 DataSet 数据集:
代码语言:javascript复制case class Huxing(huxing:String, rooms:String)
val huxingRdd = spark.sparkContext.textFile("hdfs://hadoop100:8020/input/huxing_lianjia_gz_hdfs.csv").map(_.split(","))
val huxingDS = huxingRdd.map(para => Huxing(para(0).trim, para(1).trim)).toDS
huxingDS.show
加载户型信息数据源文件,并转换为 DataSet
将 huxingDS
数据集注册成表,并使用 SQL 风格查询方式进行查询:
huxingDS.createOrReplaceTempView("huxingDS")
spark.sql("select * from huxingDS").show
注册表并进行 SQL 风格查询
对 houseDS2
与 huxingDS
两个 DataSet 数据集采用 SQL 风格查询方式进行连接查询,统计所有二房和三房房子的总价格:
spark.sql("select sum(totalprice) from (select houseDS2.totalprice, huxingDS.rooms from houseDS2 join huxingDS where houseDS2.huxing = huxingDS.huxing and huxingDS.rooms in ('二房','三房')) t").show
使用 SQL 风格进行连接查询
至此,Spark SQL 的使用操作实战暂告一段落,大家可以继续深入摸索研究,发掘 Spark SQL 的精髓所在!
版权信息:© Evgeny Vasenev / Aurora Photos