DataFrame
概述
DataFrame
可以翻译成数据框,让Spark具备了处理大规模结构化数据的能力。
- 比原有
RDD
转化方式更加简单,获得了更高的性能 - 轻松实现从
mysql
到DF
的转化,支持SQL
查询 DF
是一种以RDD
为基础的分布式数据集,提供了详细的结构信息。传统的RDD
是Java
对象集合
创建
从Spark2.0开始,spark使用全新的SparkSession
接口
- 支持不同的数据加载来源,并将数据转成
DF
DF
转成SQLContext
自身中的表,然后利用SQL
语句来进行操作- 启动进入pyspark后,pyspark 默认提供两个对象(交互式环境)
SparkContext:sc
SparkSession:spark
# 创建sparksession对象
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
spark = SparkSession.builder.config(conf=SparkConf()).getOrCreate()
读取数据
代码语言:javascript复制df = spark.read.text("people.txt")
df = spark.read.json("people.json")
df = spark.read.parquet("people.parquet")
df.show()
spark.read.format("text").load("people.txt")
保存
代码语言:javascript复制df.write.txt("people.txt")
df.write.json("people.json")
df.write.parquet("people.parquet")
df.write.format("text").save("people.txt")
df.write.format("json").save("people.json")
df.write.format("parquet").save("people.parquet")
DF 常见操作
代码语言:javascript复制df = spark.read.json("people.json")
df.printSchema() # 查看各种属性信息
df.select(df["name"], df["age"] 1).show() # 筛选出两个属性
df.filter(df["age"]>20).show() # 选择数据
df.groupBy("age").count().show() # 分组再进行统计
df.sort(df["age"].desc(), df["name"].asc()).show() # 先通过age降序,再通过name升序
RDD 转成DF
- 利用反射机制去推断
RDD
模式 - 用编程方式去定义
RDD
模式
# 反射机制
from pyspark.sql import Row
people = spark.sparkContext.textFile("...") # 读取文件
.map(lambda line:line.split(",")) # 将读取进来的每行数据按照逗号分隔
.map(lambda p: Row(name=p[0], age=int(p[1]))) # 生成行记录
schemaPeople=spark.createDataFrame(people)
schemaPeople.createOrReplaceTempView("people") # 注册成为临时表
# 编程方式
from pyspark.sql.types import *
from pyspark.sql import Row
schemaString = "name age"
fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split(" ")]
schema = StructType(fields)
lines = spark.sparkContext.textFile(
" ")
spark读取mysql数据库
安装JDBC
驱动程序mysql-connector-java-5.1.4.tar.gz
# 存放位置
/usr/local/spark/jars
# 启动pyspark
cd /usr/local/spark
./bin/pyspark
>>> use spark;
>>> select * from student;
# 插入数据:见下图