上一篇博客已经为大家介绍完了SparkSQL的基本概念以及其提供的两个编程抽象:DataFrame和DataSet,本篇博客,博主要为大家介绍的是关于SparkSQL编程的内容。考虑到内容比较繁琐,故分成了一个系列博客。本篇作为该系列的第一篇博客,为大家介绍的是SparkSession与DataFrame。
码字不易,先赞后看,养成习惯!
SparkSQL编程
1. SparkSession
在老的版本中,SparkSQL提供两种SQL查询起始点:一个叫SQLContext,用于Spark自己提供的SQL查询;一个叫HiveContext,用于连接Hive的查询。
SparkSession是Spark最新的SQL查询起始点,实质上是SQLContext和HiveContext的组合,所以在SQLContext和HiveContext上可用的API在SparkSession上同样是可以使用的。SparkSession内部封装了sparkContext,所以计算实际上是由sparkContext完成的。
2. DataFrame
2.1 创建
在Spark SQL中SparkSession是创建DataFrame和执行SQL的入口,创建DataFrame有三种方式:通过Spark的数据源进行创建;从一个存在的RDD进行转换;还可以从Hive Table进行查询返回。
在正式开始之前,我们需要准备数据源。
vim /opt/data/people.json
{"name":"Michael"}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}
将其上传到集群上。
hadoop fs -put /opt/data/people.json /input
ok~
1) 从Spark数据源进行创建
(1) 查看Spark数据源进行创建的文件格式,
spark.read.
按tab键表示显示:
scala> spark.read.
csv format jdbc json load option options orc parquet schema table text textFile
(2)读取json文件创建DataFrame
注意:spark.read.load
默认获取parquet
格式文件
scala> val df = spark.read.json("/input/people.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
(3)展示结果
代码语言:javascript复制scala> df.show
---- -------
| age| name|
---- -------
|null|Michael|
| 30| Andy|
| 19| Justin|
---- -------
2)从RDD中转换
参照第2.5节的内容:DateFrame 转换为RDD
3) 从Hive Table进行查询返回
这个将在后面的博文中涉及到,这里暂且不谈。
2.2 SQL风格语法 (主要)
1)创建一个DataFrame
代码语言:javascript复制scala> val df = spark.read.json("/input/people.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
2)对DataFrame创建一个临时表
代码语言:javascript复制scala> df.createOrReplaceTempView("people")
3)通过SQL语句实现查询全表
代码语言:javascript复制scala> val sqlDF = spark.sql("SELECT * FROM people")
sqlDF: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
4)结果展示
代码语言:javascript复制scala> sqlDF.show
---- -------
| age| name|
---- -------
|null|Michael|
| 30| Andy|
| 19| Justin|
---- -------
注意:临时表是Session范围的,Session退出后,表就失效了。如果想应用范围内仍有效,可以使用全局表。注意使用全局表时需要全路径访问,如:global_temp:people。
全局的临时视图存在于系统数据库 global_temp中,我们必须加上库名去引用它
5)对于DataFrame创建一个全局表
代码语言:javascript复制scala> df.createGlobalTempView("people")
6)通过SQL语句实现查询全表
代码语言:javascript复制scala> spark.sql("select * from global_temp.people").show()
---- -------
| age| name|
---- -------
|null|Michael|
| 30| Andy|
| 19| Justin|
---- -------
3. DSL 风格语法 (次要)
1)创建一个DataFrame
代码语言:javascript复制scala> val df = spark.read.json("/input/people.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
2)查看DataFrame的Schema信息
代码语言:javascript复制scala> df.printSchema
root
|-- age: long (nullable = true)
|-- name: string (nullable = true)
3)只查看"name"列数据
代码语言:javascript复制scala> df.select("name").show()
-------
| name|
-------
|Michael|
| Andy|
| Justin|
-------
4)查看"name"列数据以及"age 1"数据
代码语言:javascript复制scala> df.select($"name", $"age" 1).show()
------- ---------
| name|(age 1)|
------- ---------
|Michael| null|
| Andy| 31|
| Justin| 20|
------- ---------
5)查看"age"大于"21"的数据
代码语言:javascript复制scala> df.filter($"age" > 21).show()
--- ----
|age|name|
--- ----
| 30|Andy|
--- ----
6)按照"age"分组,查看数据条数
代码语言:javascript复制scala> df.groupBy("age").count().show()
---- -----
| age|count|
---- -----
| 19| 1|
|null| 1|
| 30| 1|
---- -----
2.4 RDD转换为DateFrame
注意:如果需要RDD与DF或者DS之间操作,那么都需要引入 import spark.implicits._
【spark不是包名,而是sparkSession对象的名称】
准备工作: 数据文件people.txt
vim /opt/data/people.txt
zhangsan,17 lisi,20, wangwu,19 上传至hdfs集群hdfs dfs -put /opt/data/people.txt /input
前置条件: 导入隐式转换并创建一个RDD
代码语言:javascript复制scala> import spark.implicits._
import spark.implicits._
scala> val peopleRDD = sc.textFile("/input/people.txt")
peopleRDD: org.apache.spark.rdd.RDD[String] = examples/src/main/resources/people.txt MapPartitionsRDD[3] at textFile at <console>:27
1)通过手动确定转换
代码语言:javascript复制scala> peopleRDD.map{x=>val para = x.split(",");(para(0),para(1).trim.toInt)}.toDF("name","age")
res1: org.apache.spark.sql.DataFrame = [name: string, age: int]
2)通过反射确定(需要用到样例类)
<1>创建一个样例类
代码语言:javascript复制scala> case class People(name:String, age:Int)
<2>根据样例类将RDD转换为DataFrame
代码语言:javascript复制scala> peopleRDD.map{ x => val para = x.split(",");People(para(0),para(1).trim.toInt)}.toDF
res2: org.apache.spark.sql.DataFrame = [name: string, age: int]
3)通过编程的方式(了解)
<1>导入所需的类型
代码语言:javascript复制scala> import org.apache.spark.sql.types._
import org.apache.spark.sql.types._
<2>创建Schema
代码语言:javascript复制scala> val structType: StructType = StructType(StructField("name", StringType) :: StructField("age", IntegerType) :: Nil)
structType: org.apache.spark.sql.types.StructType = StructType(StructField(name,StringType,true), StructField(age,IntegerType,true))
<3>导入所需的类型
代码语言:javascript复制scala> import org.apache.spark.sql.Row
import org.apache.spark.sql.Row
<4>根据给定的类型创建二元组RDD
代码语言:javascript复制scala> val data = peopleRDD.map{ x => val para = x.split(",");Row(para(0),para(1).trim.toInt)}
data: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[6] at map at <console>:33
<5>根据数据及给定的schema创建DataFrame
代码语言:javascript复制scala> val dataFrame = spark.createDataFrame(data, structType)
dataFrame: org.apache.spark.sql.DataFrame = [name: string, age: int]
2.5 DateFrame 转换为RDD
直接调用rdd即可。
1) 创建一个DataFrame
代码语言:javascript复制scala> val df = spark.read.json("/input/people.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
2)将DataFrame转换为RDD
代码语言:javascript复制scala> val dfToRDD = df.rdd
dfToRDD: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[19] at rdd at <console>:29
3)打印RDD
代码语言:javascript复制scala> dfToRDD.collect
res13: Array[org.apache.spark.sql.Row] = Array([Michael, 29], [Andy, 30], [Justin, 19])
好了,本次的分享就到这里。下一篇博客将为大家带来DataSet
的内容,敬请期待!!!