Spark笔记12-DataFrame创建、保存

2021-03-02 15:42:52 浏览数 (1)

DataFrame

概述

DataFrame可以翻译成数据框,让Spark具备了处理大规模结构化数据的能力。

  • 比原有RDD转化方式更加简单,获得了更高的性能
  • 轻松实现从mysqlDF的转化,支持SQL查询
  • DF是一种以RDD为基础的分布式数据集,提供了详细的结构信息。传统的RDDJava对象集合
创建

从Spark2.0开始,spark使用全新的SparkSession接口

  • 支持不同的数据加载来源,并将数据转成DF
  • DF转成SQLContext自身中的表,然后利用SQL语句来进行操作
  • 启动进入pyspark后,pyspark 默认提供两个对象(交互式环境)
    • SparkContext:sc
    • SparkSession:spark
代码语言:javascript复制
# 创建sparksession对象
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
spark = SparkSession.builder.config(conf=SparkConf()).getOrCreate()
读取数据
代码语言:javascript复制
df = spark.read.text("people.txt")
df = spark.read.json("people.json")
df = spark.read.parquet("people.parquet")
df.show()

spark.read.format("text").load("people.txt")

保存
代码语言:javascript复制
df.write.txt("people.txt")
df.write.json("people.json")
df.write.parquet("people.parquet")

df.write.format("text").save("people.txt")
df.write.format("json").save("people.json")
df.write.format("parquet").save("people.parquet")

DF 常见操作

代码语言:javascript复制
df = spark.read.json("people.json")
df.printSchema()   #  查看各种属性信息
df.select(df["name"], df["age"] 1).show()   # 筛选出两个属性
df.filter(df["age"]>20).show()  # 选择数据
df.groupBy("age").count().show()  # 分组再进行统计
df.sort(df["age"].desc(), df["name"].asc()).show()   #  先通过age降序,再通过name升序

RDD 转成DF

  1. 利用反射机制去推断RDD模式
  2. 用编程方式去定义RDD模式
代码语言:javascript复制
# 反射机制
from pyspark.sql import Row
people = spark.sparkContext.textFile("...")    # 读取文件
.map(lambda line:line.split(","))      # 将读取进来的每行数据按照逗号分隔
.map(lambda p: Row(name=p[0], age=int(p[1])))  # 生成行记录
schemaPeople=spark.createDataFrame(people)
schemaPeople.createOrReplaceTempView("people")   # 注册成为临时表

# 编程方式
from pyspark.sql.types import * 
from pyspark.sql import Row 
schemaString = "name age"
fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split(" ")]
schema = StructType(fields)
lines = spark.sparkContext.textFile(
" ")

spark读取mysql数据库

安装JDBC驱动程序mysql-connector-java-5.1.4.tar.gz

代码语言:javascript复制
# 存放位置
/usr/local/spark/jars

# 启动pyspark
cd /usr/local/spark
./bin/pyspark

>>> use spark;
>>> select * from student;

# 插入数据:见下图

0 人点赞