DataFrame来源
Spark 社区在 1.3 版本发布了 DataFrame。那么,相比 RDD,DataFrame 到底有何不同呢?
DataFrame被称为SchemaRDD。DataFrame使Spark具备了处理大规模结构化数据的能力。在Spark中,DataFrame是一种以RDD为基础的分布式数据集,因此DataFrame可以完成RDD的绝大多数功能,在开发使用时,也可以调用方法将RDD和DataFrame进行相互转换。
在开发API方面,RDD算子多采用高阶函数,高阶函数的优势在于表达能力强,它允许开发者灵活地设计并实现业务逻辑。而 DataFrame的表达能力却很弱,它定义了一套DSL算子(Domain Specific Language)。
代码语言:javascript复制注意:所谓的高阶函数指的是,指的是形参为函数的函数,或是返回类型为函数的函数。
> 例如:我们在WordCount程序中调用flatMap算子:
lineRDD.flatMap(line => line.split(" "))
flatMap的入参其实是一个函数。
这里需要大家注意:是不是DataFrame表达能力弱就意味着DataFrame比RDD弱呢?
恰恰相反,因为DataFrame的算子大多数都是计算逻辑确定的,Spark就可以根据基于启发式的规则或策略甚至动态运行时的信息优化DataFrame的计算过程。
那么负责DataFrame的算子优化是谁来做的呢?正是SparkSQL。
Spark Core和Spark SQL的关系
我们可以用一句话描述这个关系: Spark SQL正是在Spark Core的执行引擎基础上针对结构化数据处理进行优化和改进。
上图揭示了Spark Core体系和Spark SQL体系的关系。在上图中,Spark Core作为整个Spark系统的底层执行引擎。负责了所有的任务调度、数据存储、Shuffle等核心能力。
而Spark SQL正是基于如此强大的Spark Core底层能力,形成一套独立的优化引擎系统。
简单的说,Spark SQL的计算任务通过SQL的形式最终转换成了RDD的计算。Spark SQL会对代码事先进行优化。
DataFrame的创建方式
Spark 本身支持种类丰富的数据源与数据格式,DataFrame的创建方式更是多种多样。
这里我们列举三类最常用的Spark DataFrame的创建方式。
createDataFrame & toDF
createDataFrame方法
在SqlContext中使用createDataFrame也可以创建DataFrame。数据可以来源于RDD或者自己创建的数组。
代码语言:javascript复制import org.apache.spark.sql.types._
val schema = StructType(List(
StructField("name", StringType, nullable = false),
StructField("age", IntegerType, nullable = false),
StructField("birthday", DateType, nullable = false)
))
val rdd = spark.sparkContext.parallelize(Seq(
Row("小明", 18, java.sql.Date.valueOf("1990-01-01")),
Row("小芳", 20, java.sql.Date.valueOf("1999-02-01"))
))
val df = spark.createDataFrame(rdd, schema)
df.show()
createDataFrame 方法有两个参数,第一个参数是RDD,第二个参数就是Schema信息。createDataFrame需要的RDD的类型必须是 RDD[Row]
,其中的 Row 是 org.apache.spark.sql.Row
,因此,对于类型为 RDD[(String, Int)]
的 rdd,我们需要把它转换为RDD[Row]
。
df.show()
函数可以将数据进行输出:
-------------- ------------- -----------
|name |age |birthday |
-------------- ------------- -----------
|小明 | 18| 1990-01-01|
|小芳 | 20| 1999-02-01|
-------------- ------------- -----------
toDF方法
我们可以通过导入spark.implicits, 然后通过在 RDD 之上调用 toDF 就能轻松创建 DataFrame。只要这些数据的内容能指定数据类型即可。
代码语言:javascript复制import spark.implicits._
val df = Seq(
("小明", 18, java.sql.Date.valueOf("1990-01-01")),
("小芳", 20, java.sql.Date.valueOf("1999-02-01"))
).toDF("name", "age", "birthday")
df.show()
打印出来的结果为:
代码语言:javascript复制 -------------- ------------- -----------
|name |age |birthday |
-------------- ------------- -----------
|小明 | 18| 1990-01-01|
|小芳 | 20| 1999-02-01|
-------------- ------------- -----------
同样,我们可以将一个RDD转化为df:
代码语言:javascript复制val rdd = spark.sparkContext.parallelize(List(1,2,3,4,5))
val df = rdd.map(x=>(x,x^2)).toDF("a","b")
df.show()
通过文件系统创建DataFrame
Spark支持非常多的文件格式,例如CSV、JSON、ORC、Parquet等。支持的文件列表你可以参考这里:
https://docs.databricks.com/data/data-sources/index.html
我们以CSV文件举例,假设我们的文件数据为:
代码语言:javascript复制小明,18
小芳,20
代码语言:javascript复制val spark = SparkSession.builder()
.appName("csv reader")
.master("local")
.getOrCreate()
val result = spark.read.format("csv")
.option("delimiter", ",")
.option("header", "true")
.option("nullValue", "\N")
.option("inferSchema", "true")
.load("path/demo.csv")
result.show()
result.printSchema()
当然,不同的文件格式有非常多的可选项,你可以参考上面给出的官网连接。
通过其他数据源创建DataFrame
我们可以通过指定连接参数例如数据库地址、用户名、密码等连接其他数据源。我们以MySQL为例:
代码语言:javascript复制val url = "jdbc:mysql://localhost:3306/test"
val df = spark.read
.format("jdbc")
.option("url", url)
.option("dbtable", "test")
.option("user", "admin")
.option("password", "admin")
.load()
df.show()
代码语言:javascript复制 --- ---- ----
| id|user|age|
--- ---- ----
| 1| 小明| 18|
| 2| 小芳| 20|
--- ---- ----
常用语法和算子
Spark SQL支持的算子列表非常非常多。
你可以在这里看到所有的算子列表:
https://spark.apache.org/docs/3.2.0/api/sql/index.html
我们举几个最常用的语法演示给大家看。
- 单行查询
var userDF = List((1, "张三", true, 18, 15000, 1))
.toDF("id", "name", "sex", "age", "salary", "dept")
userDF.createTempView("t_employee")
val sql = "select * from t_employee where name = '张三'"
spark.sql(sql).show()
- 分组查询
var userDF= List( (1,"张三",true,18,15000,1),
(2,"李四",false,18,12000,1),
(3,"王五",false,18,16000,2)
) .toDF("id","name","sex","age","salary","dept")
//构建视图
userDF.createTempView("t_employee")
val sql=
"""
|select dept ,avg(salary) as avg_slalary from t_employee
|group by dept order by avg_slalary desc
""".stripMargin
spark.sql(sql).show()
代码语言:javascript复制 ---- -----------
|dept|avg_slalary|
---- -----------
| 2| 16000.0|
| 1| 13500.0|
---- -----------
- 开窗函数
// 开窗函数
var df=List(
(1,"zs",true,1,15000),
(2,"ls",false,2,18000),
(3,"ww",false,2,14000),
(4,"zl",false,1,18000),
(5,"win7",false,1,16000)
).toDF("id","name","sex","dept","salary")
df.createTempView("t_employee")
val sql=
"""
|select id,name,salary,dept,
|count(id) over(partition by dept order by salary desc) as rank,
|(count(id) over(partition by dept order by salary desc rows between current row and unbounded following) - 1) as low_than_me,
|avg(salary) over(partition by dept rows between unbounded preceding and unbounded following) as avg_salary,
|avg(salary) over() as all_avg_salary
|from t_employee t1 order by dept desc
""".stripMargin
spark.sql(sql).show()
你可以参考这篇博客,这个博客中的例子非常多:
https://mask0407.blog.csdn.net/article/details/106716575
总结
本章我们讲解了Spark SQL的来源,Spark DataFrame创建的方式以及常用的算子。下篇我们将讲解Spark SQL中的Catalyst优化器和Tungsten,以及Spark SQL的Join策略选择。