Spark SparkSession:一个新的入口

2019-08-07 14:13:48 浏览数 (1)

在 Spark 1.x 中,使用 HiveContext 作为 DataFrame API 的入口显得并不直观。在 Spark 2.0 引入 SparkSession 作为一个新的入口,并且包含 SQLContext 和 HiveContext 的特性,同时为了向后兼容,两者都保留下来。SparkSession 有很多特性,在这里我们展示一些更重要的特性。

1. 创建SparkSession

SparkSession 可以使用建造者模式创建。如果 SparkContext 存在,那么 SparkSession 将会重用它,但是如果不存在就会创建一个 SparkContext。在I/O期间,在 builder 中设置的配置选项会自动传递给 Spark 和 Hadoop。

Java版本:

代码语言:javascript复制
SparkSession sparkSession = SparkSession
  .builder()
  .master("local[2]")
  .appName("SparkSession Example")
  .config("spark.some.config.option", "config-value")
  .getOrCreate();

Scala版本:

代码语言:javascript复制
import org.apache.spark.sql.SparkSession
val sparkSession = SparkSession.builder
  .master("local[2]")
  .appName("SparkSession Example")
  .config("spark.some.config.option", "config-value")
  .getOrCreate()

import org.apache.spark.sql.SparkSession
sparkSession: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@46d6b87c

2. 统一读取数据的入口

SparkSession 是读取数据的入口,类似于旧的 SQLContext.read。

Java版本:

代码语言:javascript复制
Dataset<Row> dataFrame = sparkSession.read().json("src/main/resources/person.json");

Scala版本:

代码语言:javascript复制
val jsonData = sparkSession.read.json("src/main/resources/person.json")
jsonData: org.apache.spark.sql.DataFrame = [email: string, iq: bigint ... 1 more field]

输出:

代码语言:javascript复制
display(jsonData)

email

iq

name

matei@databricks.com

180

Matei Zaharia

rxin@databricks.com

80

Reynold Xin

3. 运行SQL查询

SparkSession 可以在数据上执行SQL查询,结果以 DataFrame 形式返回(即DataSet[Row])。

代码语言:javascript复制
display(spark.sql("select * from person"))

email

iq

name

matei@databricks.com

180

Matei Zaharia

rxin@databricks.com

80

Reynold Xin

4. 使用配置选项

SparkSession 还可以用来设置运行时配置选项,这些选项可以触发性能优化或I/O(即Hadoop)行为。

代码语言:javascript复制
spark.conf.set("spark.some.config", "abcd")
res12: org.apache.spark.sql.RuntimeConfig = org.apache.spark.sql.RuntimeConfig@55d93752

spark.conf.get("spark.some.config")
res13: String = abcd

配置选项也可以在 SQL 中使用变量替换:

代码语言:javascript复制
%sql select "${spark.some.config}"
abcd

5. 直接使用元数据

SparkSession还包含一个 catalog 方法,该方法包含操作 Metastore(即数据目录)的方法。这些方法以 Datasets 形式返回结果,所以你可以在它们上面使用相同的 Datasets API。

代码语言:javascript复制
// To get a list of tables in the current database
val tables = spark.catalog.listTables()
tables: org.apache.spark.sql.Dataset[org.apache.spark.sql.catalog.Table] = [name: string, database: string ... 3 more fields]

输出:

代码语言:javascript复制
display(tables)

name

database

description

tableType

isTemporary

person

default

null

MANAGED

false

smart

default

null

MANAGED

false

代码语言:javascript复制
// Use the Dataset API to filter on names
display(tables.filter(_.name contains "son"))

name

database

description

tableType

isTemporary

person

default

null

MANAGED

false

代码语言:javascript复制
// Get the list of columns for a table
display(spark.catalog.listColumns("smart"))

name

description

dataType

nullable

isPartition

isBucket

email

null

string

true

false

false

iq

null

bigint

true

false

false

name

null

string

true

false

false

6. 访问底层的SparkContext

SparkSession.sparkContext 返回底层的 SparkContext,用于创建 RDD 以及管理集群资源。

代码语言:javascript复制
spark.sparkContext
res17: org.apache.spark.SparkContext = org.apache.spark.SparkContext@2debe9ac

0 人点赞