以列的(列名,列的类型。列值)的形式构成的分布式数据集,按照列赋予不同名称,约等于关系数据库的数据表
A DataFrame is a Dataset organized into named columns. It is conceptually equivalent to a table in a relational database or a data frame in R/Python, but with richer optimizations under the hood. DataFrames can be constructed from a wide array of sources such as: structured data files, tables in Hive, external databases, or existing RDDs. In Scala and Java, a DataFrame is represented by a Dataset of
Row
s. In the Scala APIDataFrame
is simply a type alias ofDataset[Row]
. in Java API, users need to useDataset<Row>
to represent aDataFrame
.
API操作
printSchema
打印Schema信息,以树形结构输出
代码语言:javascript复制import org.apache.spark.sql.{DataFrame, SparkSession}
object DataFrameApp {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession.builder().
appName("DataFrameApp").
master("local[*]").
getOrCreate()
val peopleDF: DataFrame = spark.read.json("resources/people.json")
peopleDF.printSchema()
spark.stop()
}
}
打印结果
代码语言:javascript复制root
|-- age: long (nullable = true)
|-- name: string (nullable = true)
show
默认展示20条数据 ,通过参数指定展示的条数
代码语言:javascript复制package cn.bx.spark
import org.apache.spark.sql.{DataFrame, SparkSession}
object DataFrameApp {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession.builder().
appName("DataFrameApp").
master("local[*]").
getOrCreate()
val peopleDF: DataFrame = spark.read.json("resources/people.json")
peopleDF.show(1)
spark.stop()
}
}
打印结果
代码语言:javascript复制 ---- -------
| age| name|
---- -------
|null|Michael|
---- -------
only showing top 1 row
SLECT
指定输出列
代码语言:javascript复制package cn.bx.spark
import org.apache.spark.sql.{DataFrame, SparkSession}
object DataFrameApp {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession.builder().
appName("DataFrameApp").
master("local[*]").
getOrCreate()
val peopleDF: DataFrame = spark.read.json("resources/people.json")
peopleDF.select("name","age").show()
spark.stop()
}
}
打印结果
代码语言:javascript复制 ------- ----
| name| age|
------- ----
|Michael|null|
| Andy| 30|
| Justin| 19|
------- ----
修改数据
代码语言:javascript复制peopleDF.select(peopleDF.col("name"),peopleDF.col("age") 1).show()
打印结果
代码语言:javascript复制 ------- ---------
| name|(age 1)|
------- ---------
|Michael| null|
| Andy| 31|
| Justin| 20|
------- ---------
语法糖$
代码语言:javascript复制package cn.bx.spark
import org.apache.spark.sql.{DataFrame, SparkSession}
object DataFrameApp {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession.builder().
appName("DataFrameApp").
master("local[*]").
getOrCreate()
val peopleDF: DataFrame = spark.read.json("resources/people.json")
import spark.implicits._
peopleDF.select($"name", $"age" 1).show()
spark.stop()
}
}
filter
条件过滤
代码语言:javascript复制package cn.bx.spark
import org.apache.spark.sql.{DataFrame, SparkSession}
object DataFrameApp {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession.builder().
appName("DataFrameApp").
master("local[*]").
getOrCreate()
val peopleDF: DataFrame = spark.read.json("resources/people.json")
peopleDF.filter(peopleDF.col("age")>19).show()
spark.stop()
}
}
打印结果
代码语言:javascript复制 --- ----
|age|name|
--- ----
| 30|Andy|
--- ----
groupBy
代码语言:javascript复制package cn.bx.spark
import org.apache.spark.sql.{DataFrame, SparkSession}
object DataFrameApp {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession.builder().
appName("DataFrameApp").
master("local[*]").
getOrCreate()
val peopleDF: DataFrame = spark.read.json("resources/people.json")
peopleDF.groupBy(peopleDF.col("age")).count().show()
spark.stop()
}
}
打印结果
代码语言:javascript复制 ---- -----
| age|count|
---- -----
| 19| 1|
|null| 1|
| 30| 1|
---- -----