Spark SQL 快速入门系列(2) | SparkSession与DataFrame的简单介绍

2020-10-28 17:40:55 浏览数 (1)

一. SparkSession

  在老的版本中,SparkSQL 提供两种 SQL 查询起始点:一个叫SQLContext,用于Spark 自己提供的 SQL 查询;一个叫 HiveContext,用于连接 Hive 的查询。

  从2.0开始, SparkSession是 Spark 最新的 SQL 查询起始点,实质上是SQLContext和HiveContext的组合,所以在SQLContext和HiveContext上可用的 API 在SparkSession上同样是可以使用的。

  SparkSession内部封装了SparkContext,所以计算实际上是由SparkContext完成的。

  当我们使用 spark-shell 的时候, spark 会自动的创建一个叫做spark的SparkSession, 就像我们以前可以自动获取到一个sc来表示SparkContext

二. 使用 DataFrame 进行编程

  Spark SQL 的 DataFrame API 允许我们使用 DataFrame 而不用必须去注册临时表或者生成 SQL 表达式.

  DataFrame API 既有 transformation操作也有action操作. DataFrame的转换从本质上来说更具有关系, 而 DataSet API 提供了更加函数式的 API

2.1 创建 DataFrame

With a SparkSession, applications can create DataFrames from an existing RDD, from a Hive table, or from Spark data sources.

有了 SparkSession 之后, 通过 SparkSession有 3 种方式来创建DataFrame:

  1. 通过 Spark 的数据源创建
  2. 通过已知的 RDD 来创建
  3. 通过查询一个 Hive 表来创建.

1. 通过 Spark 数据源创建

  • 1. 查看Spark数据源进行创建的文件格式
  • 2. 读取json文件创建DataFrame
代码语言:javascript复制
// 读取 json 文件
scala> val df = spark.read.json("file:///opt/module/spark/examples/src/main/resources/employees.json")
df: org.apache.spark.sql.DataFrame = [name: string, salary: bigint]
  • 3. 展示结果
代码语言:javascript复制
// 展示结果
scala> df.show
 ------- ------ 
|   name|salary|
 ------- ------ 
|Michael|  3000|
|   Andy|  4500|
| Justin|  3500|
|  Berta|  4000|
 ------- ------ 

关于通过 RDD 进行转换和通过查询 Hive 表创建,博主会在后面专门探讨

2.2 DataFrame 语法风格

1. SQL 语法风格(主要)

  SQL 语法风格是指我们查询数据的时候使用 SQL 语句来查询.

  这种风格的查询必须要有临时视图或者全局视图来辅助

  • 1. 读取json文件创建DataFrame
代码语言:javascript复制
// 读取 json 文件
scala> val df = spark.read.json("file:///opt/module/spark/examples/src/main/resources/employees.json")
df: org.apache.spark.sql.DataFrame = [name: string, salary: bigint]
  • 2. 对DataFrame创建一个临时表
代码语言:javascript复制
scala> df.createOrReplaceTempView("people")
  • 3. 通过SQL语句实现查询全表
代码语言:javascript复制
scala> spark.sql("select * from people").show
 ------- ------ 
|   name|salary|
 ------- ------ 
|Michael|  3000|
|   Andy|  4500|
| Justin|  3500|
|  Berta|  4000|
 ------- ------ 

注意

  1. 临时视图只能在当前 Session 有效, 在新的 Session 中无效.
  2. 可以创建全局视图. 访问全局视图需要全路径:如global_temp.xxx
  • 4. 对于DataFrame创建一个全局表
代码语言:javascript复制
scala> val df = spark.read.json("file:///opt/module/spark-local/examples/src/main/resources/people.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]

scala> df.createGlobalTempView("people")
  • 5. 通过SQL语句实现查询全表
代码语言:javascript复制
scala> spark.sql("select * from global_temp.people")
res31: org.apache.spark.sql.DataFrame = [age: bigint, name: string]

scala> res5.show
 ------- ------ 
|   name|salary|
 ------- ------ 
|Michael|  3000|
|   Andy|  4500|
| Justin|  3500|
|  Berta|  4000|
 ------- ------ 



scala> spark.newSession.sql("select * from global_temp.people")
res33: org.apache.spark.sql.DataFrame = [age: bigint, name: string]

scala> res7.show
 ------- ------ 
|   name|salary|
 ------- ------ 
|Michael|  3000|
|   Andy|  4500|
| Justin|  3500|
|  Berta|  4000|
 ------- ------ 

2. DSL 语法风格(了解)

  DataFrame提供一个特定领域语言(domain-specific language, DSL)去管理结构化的数据. 可以在 Scala, Java, Python 和 R 中使用 DSL

  使用 DSL 语法风格不必去创建临时视图了.

1. 查看 Schema 信息
代码语言:javascript复制
scala> val df = spark.read.json("file:///opt/module/spark-local/examples/src/main/resources/people.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]

scala> df.printSchema
root
|-- age: long (nullable = true)
|-- name: string (nullable = true)
2. 使用 DSL 查询
  • 1. 只查询name列数据
代码语言:javascript复制
scala> df.select($"name").show
 ------- 
|   name|
 ------- 
|Michael|
|   Andy|
| Justin|
 ------- 


scala> df.select("name").show
 ------- 
|   name|
 ------- 
|Michael|
|   Andy|
| Justin|
 ------- 
  • 2. 查询name和age
代码语言:javascript复制
scala> df.select("name", "age").show
 ------- ---- 
|   name| age|
 ------- ---- 
|Michael|null|
|   Andy|  30|
| Justin|  19|
 ------- ---- 
  • 3. 查询name和age 1
代码语言:javascript复制
// 设计到运算的时候, 每列都必须使用$
scala> df.select($"name", $"age"   1).show
 ------- --------- 
|   name|(age   1)|
 ------- --------- 
|Michael|     null|
|   Andy|       31|
| Justin|       20|
 ------- --------- 
  • 4. 查询age大于20的数据
代码语言:javascript复制
scala> df.filter($"age" > 21).show
 --- ---- 
|age|name|
 --- ---- 
| 30|Andy|
 --- ---- 
  • 5. 按照age分组,查看数据条数
代码语言:javascript复制
scala> df.groupBy("age").count.show
 ---- ----- 
| age|count|
 ---- ----- 
|  19|    1|
|null|    1|
|  30|    1|
 ---- ----- 

2.3 RDD 和 DataFrame 的交互

1. 从 RDD 到 DataFrame

  涉及到RDD, DataFrame, DataSet之间的操作时, 需要导入:import spark.implicits._ 这里的spark不是包名, 而是表示SparkSession 的那个对象. 所以必须先创建SparkSession对象再导入. implicits是一个内部object

  首先创建一个RDD

代码语言:javascript复制
scala> val rdd1 = sc.textFile("/opt/module/spark-local/examples/src/main/resources/people.txt")
rdd1: org.apache.spark.rdd.RDD[String] = /opt/module/spark-local/examples/src/main/resources/people.txt MapPartitionsRDD[10] at textFile at <console>:24
  • 1. 手动转换
代码语言:javascript复制
scala> val rdd2 = rdd1.map(line => { val paras = line.split(", "); (paras(0), paras(1).toInt)})
rdd2: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[11] at map at <console>:26

// 转换为 DataFrame 的时候手动指定每个数据字段名
scala> rdd2.toDF("name", "age").show
 ------- --- 
|   name|age|
 ------- --- 
|Michael| 29|
|   Andy| 30|
| Justin| 19|
 ------- --- 
  • 2. 通过样例类反射转换(最常用)
代码语言:javascript复制
// 1.创建样例类
scala> case class People(name :String, age: Int)
defined class People

// 2.使用样例把 RDD 转换成DataFrame
scala> val rdd2 = rdd1.map(line => { val paras = line.split(", "); People(paras(0), paras(1).toInt) })
rdd2: org.apache.spark.rdd.RDD[People] = MapPartitionsRDD[6] at map at <console>:28

scala> rdd2.toDF.show
 ------- --- 
|   name|age|
 ------- --- 
|Michael| 29|
|   Andy| 30|
| Justin| 19|
 ------- --- 
  • 3. 通过 API 的方式转换
代码语言:javascript复制
package day05

import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}

object DataFrameDemo2 {
    def main(args: Array[String]): Unit = {

        val spark: SparkSession = SparkSession.builder()
            .master("local[*]")
            .appName("Word Count")
            .getOrCreate()
        val sc: SparkContext = spark.sparkContext
        val rdd: RDD[(String, Int)] = sc.parallelize(Array(("lisi", 10), ("zs", 20), ("zhiling", 40)))
        // 映射出来一个 RDD[Row], 因为 DataFrame其实就是 DataSet[Row]
        val rowRdd: RDD[Row] = rdd.map(x => Row(x._1, x._2))
        // 创建 StructType 类型
        val types = StructType(Array(StructField("name", StringType), StructField("age", IntegerType)))
        val df: DataFrame = spark.createDataFrame(rowRdd, types)
        df.show

    }
}

2. 从 DataFrame到RDD

直接调用DataFrame的rdd方法就完成了从转换.

代码语言:javascript复制
scala> val df = spark.read.json("/opt/module/spark-local/examples/src/main/resources/people.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]

scala> val rdd = df.rdd
rdd: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[6] at rdd at <console>:25

scala> rdd.collect
res0: Array[org.apache.spark.sql.Row] = Array([null,Michael], [30,Andy], [19,Justin])

说明

得到的RDD中存储的数据类型是:Row.

  本次的分享就到这里了

0 人点赞