一. SparkSession
在老的版本中,SparkSQL 提供两种 SQL 查询起始点:一个叫SQLContext,用于Spark 自己提供的 SQL 查询;一个叫 HiveContext,用于连接 Hive 的查询。
从2.0开始, SparkSession是 Spark 最新的 SQL 查询起始点,实质上是SQLContext和HiveContext的组合,所以在SQLContext和HiveContext上可用的 API 在SparkSession上同样是可以使用的。
SparkSession内部封装了SparkContext,所以计算实际上是由SparkContext完成的。
当我们使用 spark-shell 的时候, spark 会自动的创建一个叫做spark的SparkSession, 就像我们以前可以自动获取到一个sc来表示SparkContext
二. 使用 DataFrame 进行编程
Spark SQL 的 DataFrame API 允许我们使用 DataFrame 而不用必须去注册临时表或者生成 SQL 表达式.
DataFrame API 既有 transformation操作也有action操作. DataFrame的转换从本质上来说更具有关系, 而 DataSet API 提供了更加函数式的 API
2.1 创建 DataFrame
With a SparkSession, applications can create DataFrames from an existing RDD, from a Hive table, or from Spark data sources.
有了 SparkSession 之后, 通过 SparkSession有 3 种方式来创建DataFrame:
- 通过 Spark 的数据源创建
- 通过已知的 RDD 来创建
- 通过查询一个 Hive 表来创建.
1. 通过 Spark 数据源创建
- 1. 查看Spark数据源进行创建的文件格式
- 2. 读取json文件创建DataFrame
// 读取 json 文件
scala> val df = spark.read.json("file:///opt/module/spark/examples/src/main/resources/employees.json")
df: org.apache.spark.sql.DataFrame = [name: string, salary: bigint]
- 3. 展示结果
// 展示结果
scala> df.show
------- ------
| name|salary|
------- ------
|Michael| 3000|
| Andy| 4500|
| Justin| 3500|
| Berta| 4000|
------- ------
关于通过 RDD 进行转换和通过查询 Hive 表创建,博主会在后面专门探讨
2.2 DataFrame 语法风格
1. SQL 语法风格(主要)
SQL 语法风格是指我们查询数据的时候使用 SQL 语句来查询.
这种风格的查询必须要有临时视图或者全局视图来辅助
- 1. 读取json文件创建DataFrame
// 读取 json 文件
scala> val df = spark.read.json("file:///opt/module/spark/examples/src/main/resources/employees.json")
df: org.apache.spark.sql.DataFrame = [name: string, salary: bigint]
- 2. 对DataFrame创建一个临时表
scala> df.createOrReplaceTempView("people")
- 3. 通过SQL语句实现查询全表
scala> spark.sql("select * from people").show
------- ------
| name|salary|
------- ------
|Michael| 3000|
| Andy| 4500|
| Justin| 3500|
| Berta| 4000|
------- ------
注意:
- 临时视图只能在当前 Session 有效, 在新的 Session 中无效.
- 可以创建全局视图. 访问全局视图需要全路径:如global_temp.xxx
- 4. 对于DataFrame创建一个全局表
scala> val df = spark.read.json("file:///opt/module/spark-local/examples/src/main/resources/people.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
scala> df.createGlobalTempView("people")
- 5. 通过SQL语句实现查询全表
scala> spark.sql("select * from global_temp.people")
res31: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
scala> res5.show
------- ------
| name|salary|
------- ------
|Michael| 3000|
| Andy| 4500|
| Justin| 3500|
| Berta| 4000|
------- ------
scala> spark.newSession.sql("select * from global_temp.people")
res33: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
scala> res7.show
------- ------
| name|salary|
------- ------
|Michael| 3000|
| Andy| 4500|
| Justin| 3500|
| Berta| 4000|
------- ------
2. DSL 语法风格(了解)
DataFrame提供一个特定领域语言(domain-specific language, DSL)去管理结构化的数据. 可以在 Scala, Java, Python 和 R 中使用 DSL
使用 DSL 语法风格不必去创建临时视图了.
1. 查看 Schema 信息
代码语言:javascript复制scala> val df = spark.read.json("file:///opt/module/spark-local/examples/src/main/resources/people.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
scala> df.printSchema
root
|-- age: long (nullable = true)
|-- name: string (nullable = true)
2. 使用 DSL 查询
- 1. 只查询name列数据
scala> df.select($"name").show
-------
| name|
-------
|Michael|
| Andy|
| Justin|
-------
scala> df.select("name").show
-------
| name|
-------
|Michael|
| Andy|
| Justin|
-------
- 2. 查询name和age
scala> df.select("name", "age").show
------- ----
| name| age|
------- ----
|Michael|null|
| Andy| 30|
| Justin| 19|
------- ----
- 3. 查询name和age 1
// 设计到运算的时候, 每列都必须使用$
scala> df.select($"name", $"age" 1).show
------- ---------
| name|(age 1)|
------- ---------
|Michael| null|
| Andy| 31|
| Justin| 20|
------- ---------
- 4. 查询age大于20的数据
scala> df.filter($"age" > 21).show
--- ----
|age|name|
--- ----
| 30|Andy|
--- ----
- 5. 按照age分组,查看数据条数
scala> df.groupBy("age").count.show
---- -----
| age|count|
---- -----
| 19| 1|
|null| 1|
| 30| 1|
---- -----
2.3 RDD 和 DataFrame 的交互
1. 从 RDD 到 DataFrame
涉及到RDD, DataFrame, DataSet之间的操作时, 需要导入:import spark.implicits._ 这里的spark不是包名, 而是表示SparkSession 的那个对象. 所以必须先创建SparkSession对象再导入. implicits是一个内部object
首先创建一个RDD
代码语言:javascript复制scala> val rdd1 = sc.textFile("/opt/module/spark-local/examples/src/main/resources/people.txt")
rdd1: org.apache.spark.rdd.RDD[String] = /opt/module/spark-local/examples/src/main/resources/people.txt MapPartitionsRDD[10] at textFile at <console>:24
- 1. 手动转换
scala> val rdd2 = rdd1.map(line => { val paras = line.split(", "); (paras(0), paras(1).toInt)})
rdd2: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[11] at map at <console>:26
// 转换为 DataFrame 的时候手动指定每个数据字段名
scala> rdd2.toDF("name", "age").show
------- ---
| name|age|
------- ---
|Michael| 29|
| Andy| 30|
| Justin| 19|
------- ---
- 2. 通过样例类反射转换(最常用)
// 1.创建样例类
scala> case class People(name :String, age: Int)
defined class People
// 2.使用样例把 RDD 转换成DataFrame
scala> val rdd2 = rdd1.map(line => { val paras = line.split(", "); People(paras(0), paras(1).toInt) })
rdd2: org.apache.spark.rdd.RDD[People] = MapPartitionsRDD[6] at map at <console>:28
scala> rdd2.toDF.show
------- ---
| name|age|
------- ---
|Michael| 29|
| Andy| 30|
| Justin| 19|
------- ---
- 3. 通过 API 的方式转换
package day05
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
object DataFrameDemo2 {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession.builder()
.master("local[*]")
.appName("Word Count")
.getOrCreate()
val sc: SparkContext = spark.sparkContext
val rdd: RDD[(String, Int)] = sc.parallelize(Array(("lisi", 10), ("zs", 20), ("zhiling", 40)))
// 映射出来一个 RDD[Row], 因为 DataFrame其实就是 DataSet[Row]
val rowRdd: RDD[Row] = rdd.map(x => Row(x._1, x._2))
// 创建 StructType 类型
val types = StructType(Array(StructField("name", StringType), StructField("age", IntegerType)))
val df: DataFrame = spark.createDataFrame(rowRdd, types)
df.show
}
}
2. 从 DataFrame到RDD
直接调用DataFrame的rdd方法就完成了从转换.
代码语言:javascript复制scala> val df = spark.read.json("/opt/module/spark-local/examples/src/main/resources/people.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
scala> val rdd = df.rdd
rdd: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[6] at rdd at <console>:25
scala> rdd.collect
res0: Array[org.apache.spark.sql.Row] = Array([null,Michael], [30,Andy], [19,Justin])
说明:
得到的RDD中存储的数据类型是:Row.
本次的分享就到这里了