PySpark SQL 提供 read.json("path")
将单行或多行(多行)JSON 文件读取到 PySpark DataFrame 并 write.json("path")
保存或写入 JSON 文件的功能,在本教程中,您将学习如何读取单个文件、多个文件、目录中的所有文件进入 DataFrame 并使用 Python 示例将 DataFrame 写回 JSON 文件。
注意: 开箱即用的 PySpark API 支持将 JSON 文件和更多文件格式读取到 PySpark DataFrame 中。
使用 read.json("path")
或者 read.format("json").load("path")
方法将文件路径作为参数,可以将 JSON 文件读入 PySpark DataFrame。
与读取 CSV 不同,默认情况下,来自输入文件的 JSON 数据源推断模式。
此处使用的 zipcodes.json 文件可以从 GitHub 项目下载。
代码语言:javascript复制传送门: https://github.com/spark-examples/pyspark-examples/blob/master/resources/zipcodes.json
# Read JSON file into dataframe
df = spark.read.json("PyDataStudio/zipcodes.json")
df.printSchema()
df.show()
当使用 format("json")
方法时,还可以通过其完全限定名称指定数据源,如下所示。
# Read JSON file into dataframe
df = spark.read.format('org.apache.spark.sql.json')
.load("PyDataStudio/zipcodes.json")
从多行读取 JSON 文件
PySpark JSON 数据源在不同的选项中提供了多个读取文件的选项,使用multiline
选项读取分散在多行的 JSON 文件。默认情况下,多行选项设置为 false。
下面是我们要读取的输入文件,同样的文件也可以在Github上找到。
代码语言:javascript复制传送门: https://github.com/spark-examples/pyspark-examples/blob/master/resources/multiline-zipcode.json
[{
"RecordNumber": 2,
"Zipcode": 704,
"ZipCodeType": "STANDARD",
"City": "PASEO COSTA DEL SUR",
"State": "PR"
},
{
"RecordNumber": 10,
"Zipcode": 709,
"ZipCodeType": "STANDARD",
"City": "BDA SAN LUIS",
"State": "PR"
}]
使用read.option("multiline","true")
# Read multiline json file
multiline_df = spark.read.option("multiline","true")
.json("PyDataStudio/multiline-zipcode.json")
multiline_df.show()
一次读取多个文件
还可以使用read.json()
方法从不同路径读取多个 JSON 文件,只需通过逗号分隔传递所有具有完全限定路径的文件名,例如
# Read multiple files
df2 = spark.read.json(
['resources/zipcode1.json',
'resources/zipcode2.json'])
df2.show()
读取目录中的所有文件
只需将目录作为json()
方法的路径传递给该方法,我们就可以将目录中的所有 JSON 文件读取到 DataFrame 中。
# Read all JSON files from a folder
df3 = spark.read.json("resources/*.json")
df3.show()
使用用户自定义架构读取文件
PySpark Schema 定义了数据的结构,换句话说,它是 DataFrame 的结构。PySpark SQL 提供 StructType 和 StructField 类以编程方式指定 DataFrame 的结构。
如果事先知道文件的架构并且不想使用inferSchema
选项来指定列名和类型,请使用指定的自定义列名schema并使用schema
选项键入。
使用 PySpark StructType 类创建自定义 Schema,下面我们启动这个类并使用添加方法通过提供列名、数据类型和可为空的选项向其添加列。
代码语言:javascript复制# Define custom schema
schema = StructType([
StructField("RecordNumber",IntegerType(),True),
StructField("Zipcode",IntegerType(),True),
StructField("ZipCodeType",StringType(),True),
StructField("City",StringType(),True),
StructField("State",StringType(),True),
StructField("LocationType",StringType(),True),
StructField("Lat",DoubleType(),True),
StructField("Long",DoubleType(),True),
StructField("Xaxis",IntegerType(),True),
StructField("Yaxis",DoubleType(),True),
StructField("Zaxis",DoubleType(),True),
StructField("WorldRegion",StringType(),True),
StructField("Country",StringType(),True),
StructField("LocationText",StringType(),True),
StructField("Location",StringType(),True),
StructField("Decommisioned",BooleanType(),True),
StructField("TaxReturnsFiled",StringType(),True),
StructField("EstimatedPopulation",IntegerType(),True),
StructField("TotalWages",IntegerType(),True),
StructField("Notes",StringType(),True)
])
df_with_schema = spark.read.schema(schema)
.json("PyDataStudio/zipcodes.json")
df_with_schema.printSchema()
df_with_schema.show()
使用 PySpark SQL 读取 JSON 文件
PySpark SQL 还提供了一种读取 JSON 文件的方法,方法是使用 spark.sqlContext.sql(“将 JSON 加载到临时视图”) 直接从读取文件创建临时视图
代码语言:javascript复制spark.sql("CREATE OR REPLACE TEMPORARY VIEW zipcode USING json OPTIONS"
" (path 'PyDataStudio/zipcodes.json')")
spark.sql("select * from zipcode").show()
读取 JSON 文件时的选项
NullValues
使用 nullValues
选项,可以将 JSON 中的字符串指定为 null。例如,如果想考虑一个值为 1900-01-01
的日期列,则在 DataFrame 上设置为 null。
DateFormat
选项 dateFormat
用于设置输入 DateType 和 TimestampType 列的格式的选项。支持所有 java.text.SimpleDateFormat
格式。
注意:除了上述选项外,PySpark JSON 数据集还支持许多其他选项。
应用 DataFrame 转换
从 JSON 文件创建 PySpark DataFrame 后,可以应用 DataFrame 支持的所有转换和操作。
将 PySpark DataFrame 写入 JSON 文件
在 DataFrame 上使用 PySpark DataFrameWriter 对象 write
方法写入 JSON 文件。
df2.write.json("/PyDataStudio/spark_output/zipcodes.json")
编写 JSON 文件时的 PySpark 选项
在编写 JSON 文件时,可以使用多个选项。如 nullValue
,dateFormat
PySpark 保存模式
PySpark DataFrameWriter 还有一个方法 mode()
来指定 SaveMode;此方法的参数采用overwrite
, append
, ignore
, errorifexists
.
overwrite
– 模式用于覆盖现有文件append
– 将数据添加到现有文件ignore
– 当文件已经存在时忽略写操作errorifexists
或error
– 这是文件已存在时的默认选项,它返回错误
df2.write.mode('Overwrite')
.json("/PyDataStudio/spark_output/zipcodes.json")
源代码供参考
此示例也可在GitHub PySpark 示例项目中获得以供参考。
代码语言:javascript复制# https://github.com/spark-examples/pyspark-examples/blob/master/pyspark-read-json.py
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StructField, StringType, IntegerType,BooleanType,DoubleType
spark = SparkSession.builder
.master("local[1]")
.appName("SparkByExamples.com")
.getOrCreate()
# Read JSON file into dataframe
df = spark.read.json("PyDataStudio/zipcodes.json")
df.printSchema()
df.show()
# Read multiline json file
multiline_df = spark.read.option("multiline","true")
.json("PyDataStudio/multiline-zipcode.json")
multiline_df.show()
#Read multiple files
df2 = spark.read.json(
['PyDataStudio/zipcode2.json','PyDataStudio/zipcode1.json'])
df2.show()
#Read All JSON files from a directory
df3 = spark.read.json("PyDataStudio/*.json")
df3.show()
# Define custom schema
schema = StructType([
StructField("RecordNumber",IntegerType(),True),
StructField("Zipcode",IntegerType(),True),
StructField("ZipCodeType",StringType(),True),
StructField("City",StringType(),True),
StructField("State",StringType(),True),
StructField("LocationType",StringType(),True),
StructField("Lat",DoubleType(),True),
StructField("Long",DoubleType(),True),
StructField("Xaxis",IntegerType(),True),
StructField("Yaxis",DoubleType(),True),
StructField("Zaxis",DoubleType(),True),
StructField("WorldRegion",StringType(),True),
StructField("Country",StringType(),True),
StructField("LocationText",StringType(),True),
StructField("Location",StringType(),True),
StructField("Decommisioned",BooleanType(),True),
StructField("TaxReturnsFiled",StringType(),True),
StructField("EstimatedPopulation",IntegerType(),True),
StructField("TotalWages",IntegerType(),True),
StructField("Notes",StringType(),True)
])
df_with_schema = spark.read.schema(schema)
.json("PyDataStudio/zipcodes.json")
df_with_schema.printSchema()
df_with_schema.show()
# Create a table from Parquet File
spark.sql("CREATE OR REPLACE TEMPORARY VIEW zipcode3 USING json OPTIONS"
" (path 'PyDataStudio/zipcodes.json')")
spark.sql("select * from zipcode3").show()
# PySpark write Parquet File
df2.write.mode('Overwrite').json("/PyDataStudio/spark_output/zipcodes.json")
相关阅读: PySpark 读写 CSV 文件到 DataFrame