PySpark 读写 JSON 文件到 DataFrame

2023-09-04 12:37:02 浏览数 (2)

本文中,云朵君将和大家一起学习了如何将具有单行记录和多行记录的 JSON 文件读取到 PySpark DataFrame 中,还要学习一次读取单个和多个文件以及使用不同的保存选项将 JSON 文件写回 DataFrame。

PySpark SQL 提供 read.json("path") 将单行或多行(多行)JSON 文件读取到 PySpark DataFrame 并 write.json("path") 保存或写入 JSON 文件的功能,在本教程中,您将学习如何读取单个文件、多个文件、目录中的所有文件进入 DataFrame 并使用 Python 示例将 DataFrame 写回 JSON 文件。

注意: 开箱即用的 PySpark API 支持将 JSON 文件和更多文件格式读取到 PySpark DataFrame 中。

使用 read.json("path") 或者 read.format("json").load("path") 方法将文件路径作为参数,可以将 JSON 文件读入 PySpark DataFrame。

与读取 CSV 不同,默认情况下,来自输入文件的 JSON 数据源推断模式。

此处使用的 zipcodes.json 文件可以从 GitHub 项目下载。

传送门: https://github.com/spark-examples/pyspark-examples/blob/master/resources/zipcodes.json

代码语言:javascript复制
# Read JSON file into dataframe
df = spark.read.json("PyDataStudio/zipcodes.json")
df.printSchema()
df.show()

当使用 format("json") 方法时,还可以通过其完全限定名称指定数据源,如下所示。

代码语言:javascript复制
# Read JSON file into dataframe
df = spark.read.format('org.apache.spark.sql.json') 
        .load("PyDataStudio/zipcodes.json")

从多行读取 JSON 文件

PySpark JSON 数据源在不同的选项中提供了多个读取文件的选项,使用multiline选项读取分散在多行的 JSON 文件。默认情况下,多行选项设置为 false。

下面是我们要读取的输入文件,同样的文件也可以在Github上找到。

传送门: https://github.com/spark-examples/pyspark-examples/blob/master/resources/multiline-zipcode.json

代码语言:javascript复制
[{
  "RecordNumber": 2,
  "Zipcode": 704,
  "ZipCodeType": "STANDARD",
  "City": "PASEO COSTA DEL SUR",
  "State": "PR"
},
{
  "RecordNumber": 10,
  "Zipcode": 709,
  "ZipCodeType": "STANDARD",
  "City": "BDA SAN LUIS",
  "State": "PR"
}]

使用read.option("multiline","true")

代码语言:javascript复制
# Read multiline json file
multiline_df = spark.read.option("multiline","true") 
      .json("PyDataStudio/multiline-zipcode.json")
multiline_df.show()    

一次读取多个文件

还可以使用read.json()方法从不同路径读取多个 JSON 文件,只需通过逗号分隔传递所有具有完全限定路径的文件名,例如

代码语言:javascript复制
# Read multiple files
df2 = spark.read.json(
    ['resources/zipcode1.json',
     'resources/zipcode2.json'])
df2.show()  

读取目录中的所有文件

只需将目录作为json()方法的路径传递给该方法,我们就可以将目录中的所有 JSON 文件读取到 DataFrame 中。

代码语言:javascript复制
# Read all JSON files from a folder
df3 = spark.read.json("resources/*.json")
df3.show()

使用用户自定义架构读取文件

PySpark Schema 定义了数据的结构,换句话说,它是 DataFrame 的结构。PySpark SQL 提供 StructType 和 StructField 类以编程方式指定 DataFrame 的结构。

如果事先知道文件的架构并且不想使用inferSchema选项来指定列名和类型,请使用指定的自定义列名schema并使用schema选项键入。

使用 PySpark StructType 类创建自定义 Schema,下面我们启动这个类并使用添加方法通过提供列名、数据类型和可为空的选项向其添加列。

代码语言:javascript复制
# Define custom schema
schema = StructType([
      StructField("RecordNumber",IntegerType(),True),
      StructField("Zipcode",IntegerType(),True),
      StructField("ZipCodeType",StringType(),True),
      StructField("City",StringType(),True),
      StructField("State",StringType(),True),
      StructField("LocationType",StringType(),True),
      StructField("Lat",DoubleType(),True),
      StructField("Long",DoubleType(),True),
      StructField("Xaxis",IntegerType(),True),
      StructField("Yaxis",DoubleType(),True),
      StructField("Zaxis",DoubleType(),True),
      StructField("WorldRegion",StringType(),True),
      StructField("Country",StringType(),True),
      StructField("LocationText",StringType(),True),
      StructField("Location",StringType(),True),
      StructField("Decommisioned",BooleanType(),True),
      StructField("TaxReturnsFiled",StringType(),True),
      StructField("EstimatedPopulation",IntegerType(),True),
      StructField("TotalWages",IntegerType(),True),
      StructField("Notes",StringType(),True)
  ])

df_with_schema = spark.read.schema(schema) 
        .json("PyDataStudio/zipcodes.json")
df_with_schema.printSchema()
df_with_schema.show()

使用 PySpark SQL 读取 JSON 文件

PySpark SQL 还提供了一种读取 JSON 文件的方法,方法是使用 spark.sqlContext.sql(“将 JSON 加载到临时视图”) 直接从读取文件创建临时视图

代码语言:javascript复制
spark.sql("CREATE OR REPLACE TEMPORARY VIEW zipcode USING json OPTIONS"   
      " (path 'PyDataStudio/zipcodes.json')")
spark.sql("select * from zipcode").show()

读取 JSON 文件时的选项

NullValues

使用 nullValues 选项,可以将 JSON 中的字符串指定为 null。例如,如果想考虑一个值为 1900-01-01 的日期列,则在 DataFrame 上设置为 null。

DateFormat

选项 dateFormat用于设置输入 DateType 和 TimestampType 列的格式的选项。支持所有 java.text.SimpleDateFormat 格式。

注意:除了上述选项外,PySpark JSON 数据集还支持许多其他选项。

应用 DataFrame 转换

从 JSON 文件创建 PySpark DataFrame 后,可以应用 DataFrame 支持的所有转换和操作。

将 PySpark DataFrame 写入 JSON 文件

在 DataFrame 上使用 PySpark DataFrameWriter 对象 write 方法写入 JSON 文件。

代码语言:javascript复制
df2.write.json("/PyDataStudio/spark_output/zipcodes.json")

编写 JSON 文件时的 PySpark 选项

在编写 JSON 文件时,可以使用多个选项。如 nullValuedateFormat

PySpark 保存模式

PySpark DataFrameWriter 还有一个方法 mode() 来指定 SaveMode;此方法的参数采用overwrite, append, ignore, errorifexists.

  • overwrite – 模式用于覆盖现有文件
  • append – 将数据添加到现有文件
  • ignore – 当文件已经存在时忽略写操作
  • errorifexistserror – 这是文件已存在时的默认选项,它返回错误
代码语言:javascript复制
 df2.write.mode('Overwrite') 
       .json("/PyDataStudio/spark_output/zipcodes.json")

源代码供参考

此示例也可在GitHub PySpark 示例项目中获得以供参考。

代码语言:javascript复制
# https://github.com/spark-examples/pyspark-examples/blob/master/pyspark-read-json.py
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StructField, StringType, IntegerType,BooleanType,DoubleType
spark = SparkSession.builder 
    .master("local[1]") 
    .appName("SparkByExamples.com") 
    .getOrCreate()

# Read JSON file into dataframe    
df = spark.read.json("PyDataStudio/zipcodes.json")
df.printSchema()
df.show()

# Read multiline json file
multiline_df = spark.read.option("multiline","true") 
      .json("PyDataStudio/multiline-zipcode.json")
multiline_df.show()

#Read multiple files
df2 = spark.read.json(
    ['PyDataStudio/zipcode2.json','PyDataStudio/zipcode1.json'])
df2.show()    

#Read All JSON files from a directory
df3 = spark.read.json("PyDataStudio/*.json")
df3.show()

# Define custom schema
schema = StructType([
      StructField("RecordNumber",IntegerType(),True),
      StructField("Zipcode",IntegerType(),True),
      StructField("ZipCodeType",StringType(),True),
      StructField("City",StringType(),True),
      StructField("State",StringType(),True),
      StructField("LocationType",StringType(),True),
      StructField("Lat",DoubleType(),True),
      StructField("Long",DoubleType(),True),
      StructField("Xaxis",IntegerType(),True),
      StructField("Yaxis",DoubleType(),True),
      StructField("Zaxis",DoubleType(),True),
      StructField("WorldRegion",StringType(),True),
      StructField("Country",StringType(),True),
      StructField("LocationText",StringType(),True),
      StructField("Location",StringType(),True),
      StructField("Decommisioned",BooleanType(),True),
      StructField("TaxReturnsFiled",StringType(),True),
      StructField("EstimatedPopulation",IntegerType(),True),
      StructField("TotalWages",IntegerType(),True),
      StructField("Notes",StringType(),True)
  ])

df_with_schema = spark.read.schema(schema) 
        .json("PyDataStudio/zipcodes.json")
df_with_schema.printSchema()
df_with_schema.show()

# Create a table from Parquet File
spark.sql("CREATE OR REPLACE TEMPORARY VIEW zipcode3 USING json OPTIONS"   
      " (path 'PyDataStudio/zipcodes.json')")
spark.sql("select * from zipcode3").show()

# PySpark write Parquet File
df2.write.mode('Overwrite').json("/PyDataStudio/spark_output/zipcodes.json")

相关阅读: PySpark 读写 CSV 文件到 DataFrame

0 人点赞