PySpark 读写 CSV 文件到 DataFrame

2023-09-04 12:35:29 浏览数 (2)

本文中,云朵君将和大家一起学习如何将 CSV 文件、多个 CSV 文件和本地文件夹中的所有文件读取到 PySpark DataFrame 中,使用多个选项来更改默认行为并使用不同的保存选项将 CSV 文件写回 DataFrame。

PySpark 在 DataFrameReader 上提供了csv("path")将 CSV 文件读入 PySpark DataFrame 并保存或写入 CSV 文件的功能dataframeObj.write.csv("path"),在本文中,云朵君将和大家一起学习如何将本地目录中的单个文件、多个文件、所有文件读入 DataFrame,应用一些转换,最后使用 PySpark 示例将 DataFrame 写回 CSV 文件。

PySpark 支持读取带有竖线、逗号、制表符、空格或任何其他分隔符文件的 CSV 文件。

注意: 开箱即用的 PySpark 支持将 CSV、JSON 和更多文件格式的文件读取到 PySpark DataFrame 中。

目录

  • 读取多个 CSV 文件
  • 读取目录中的所有 CSV 文件
  • 读取 CSV 文件时的选项
    • 分隔符(delimiter)
    • 推断模式(inferschema)
    • 标题(header)
    • 引号(quotes)
    • 空值(nullValues)
    • 日期格式(dateformat)
  • 使用用户指定的模式读取 CSV 文件
  • 应用 DataFrame 转换
  • 将 DataFrame 写入 CSV 文件
    • 使用选项
    • 保存模式

将 CSV 文件读取到 DataFrame

使用DataFrameReader 的 csv("path") 或者 format("csv").load("path"),可以将 CSV 文件读入 PySpark DataFrame,这些方法将要读取的文件路径作为参数。当使用 format("csv") 方法时,还可以通过完全限定名称指定数据源,但对于内置源,可以简单地使用它们的短名称(csvjsonparquetjdbctext 等)。

请参阅 GitHub 上的数据集zipcodes.csv。

传送门: https://github.com/spark-examples/pyspark-examples/blob/master/resources/zipcodes.csv)

代码语言:javascript复制
spark = SparkSession.builder().master("local[1]")
          .appName("SparkByExamples.com")
          .getOrCreate()
df = spark.read.csv("/tmp/resources/zipcodes.csv")
df.printSchema()

使用完全限定的数据源名称,也可以执行以下操作。

代码语言:javascript复制
df = spark.read.format("csv")
                  .load("/tmp/resources/zipcodes.csv")
# 或者
df = spark.read.format("org.apache.spark.sql.csv")
                  .load("/tmp/resources/zipcodes.csv")
df.printSchema()

此示例将数据读取到 DataFrame 列"_c0"中,用于第一列和"_c1"第二列,依此类推。默认情况下,所有这些列的数据类型都被视为字符串。

代码语言:javascript复制
root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: string (nullable = true)

1.1 使用标题记录作为列名

如果输入文件中有一个带有列名的标题,则需要使用不提及这一点明确指定标题选项 option("header", True),API 将标题视为数据记录。

你需要使用option("header", True)显式地为"header"选项指定为True,若不设置,则默认将 "header" 标题作为一个数据记录。

代码语言:javascript复制
df2 = spark.read.option("header",True) 
     .csv("/tmp/resources/zipcodes.csv")

# df2 = spark.read.csv("/tmp/resources/zipcodes.csv",header=True)

如前所述,PySpark 默认将所有列读取为字符串(StringType)。我将在后面学习如何从标题记录中读取 schema (inferschema) 并根据数据派生inferschema列类型。

1.2 读取多个 CSV 文件

使用read.csv()方法还可以读取多个 csv 文件,只需通过逗号分隔作为路径传递所有文件名,例如:

代码语言:javascript复制
df = spark.read.csv("path1,path2,path3")

1.3 读取目录中的所有 CSV 文件

只需将目录作为csv()方法的路径传递给该方法,我们就可以将目录中的所有 CSV 文件读取到 DataFrame 中。

代码语言:javascript复制
df = spark.read.csv("Folder path")

2. 读取 CSV 文件时的选项

PySpark 提供了多种处理 CSV 数据集文件的选项。以下是通过示例解释的一些最重要的选项。

可以使用链接 option(self, key, value) 来使用多个选项。该方法有个替代方法:options(self, **options),效果是一样的。

2.1 Delimiter

选项 delimiter 用于指定 CSV 文件的列分隔符。默认情况下,它是 逗号(,) 字符。可使用此选项将其设置为任何字符,例如管道(|)、制表符 (t)、空格。 这都需要根据实际的 CSV 数据集文件的具体形式设定。

代码语言:javascript复制
df3 = spark.read.options(delimiter=',') 
  .csv("C:/PyDataStudio/zipcodes.csv")

2.2 InferSchema

此选项的默认值是设置为 False,设置为 True 时,spark将自动根据数据推断列类型。

代码语言:javascript复制
df4 = spark.read.options(inferSchema='True',
                         delimiter=',') 
                .csv("PyDataStudio/zipcodes.csv")

或者,也可以通过链接option()方法来编写它。

代码语言:javascript复制
df4 = spark.read.option("inferSchema",True) 
                .option("delimiter",",") 
                .csv("PyDataStudio/zipcodes.csv")

2.3 Header

此选项用于读取 CSV 文件的第一行作为列名。默认情况下,此选项的值为 False ,并且所有列类型都假定为字符串。

代码语言:javascript复制
df5 = spark.read.options(header='True', 
                         inferSchema='True', 
                         delimiter=',') 
    .csv("PyDataStudio/zipcodes.csv")

2.4 Quotes

当有一列带有用于拆分列的分隔符时,使用 quotes 选项指定引号字符,默认情况下它是'',并且引号内的分隔符将被忽略。但使用此选项,可以设置任何字符。

2.5 NullValues

使用 nullValues 选项,可以将 CSV 中的字符串指定为空。例如,如果将"1900-01-01"在 DataFrame 上将值设置为 null 的日期列。

2.6 DateFormat

选项 dateFormat 用于设置输入 DateTypeTimestampType 列的格式的选项。支持所有 java.text.SimpleDateFormat 格式。

注意: 除了上述选项,PySpark CSV API 还支持许多其他选项,可以查阅PySpark官方文档。

3. 使用用户自定义架构读取 CSV 文件

如果事先知道文件的架构并且不想使用inferSchema选项来指定列名和类型,请使用指定的自定义列名schema并使用schema选项键入。

代码语言:javascript复制
schema = StructType() 
      .add("RecordNumber",IntegerType(),True) 
      .add("Zipcode",IntegerType(),True) 
      .add("ZipCodeType",StringType(),True) 
      .add("City",StringType(),True) 
      .add("State",StringType(),True) 
      .add("LocationType",StringType(),True) 
      .add("Lat",DoubleType(),True) 
      .add("Long",DoubleType(),True) 
      .add("Xaxis",IntegerType(),True) 
      .add("Yaxis",DoubleType(),True) 
      .add("Zaxis",DoubleType(),True) 
      .add("WorldRegion",StringType(),True) 
      .add("Country",StringType(),True) 
      .add("LocationText",StringType(),True) 
      .add("Location",StringType(),True) 
      .add("Decommisioned",BooleanType(),True) 
      .add("TaxReturnsFiled",StringType(),True) 
      .add("EstimatedPopulation",IntegerType(),True) 
      .add("TotalWages",IntegerType(),True) 
      .add("Notes",StringType(),True)
      
df_with_schema = spark.read.format("csv") 
      .option("header", True) 
      .schema(schema) 
      .load("/PyDataStudio/zipcodes.csv")

4. 应用 DataFrame 转换

从 CSV 文件创建 DataFrame 后,可以应用 DataFrame 支持的所有转换和操作。

5. 将 DataFrame 写入 CSV 文件

使用PySpark DataFrameWriter 对象的write()方法将 PySpark DataFrame 写入 CSV 文件。

代码语言:javascript复制
df.write.option("header",True) 
 .csv("/PyDataStudio/spark_output/zipcodes")

5.1 Options

在编写 CSV 文件时,可以使用多个选项。例如,设置 headerTrue 将 DataFrame 列名作为标题记录输出,并用 delimiter在 CSV 输出文件中指定分隔符。

代码语言:javascript复制
df2.write.options(header='True',
                  delimiter=',') 
         .csv("/PyDataStudio/spark_output/zipcodes")

其他可用选项 quote, escape, nullValue, dateFormat, quoteMode。具体可以查看官方文档。

5.2 保存mode

PySpark DataFrameWriter 还有一个 mode() 方法来指定保存模式。

  • overwrite– 模式用于覆盖现有文件。
  • append– 将数据添加到现有文件。
  • ignore– 当文件已经存在时忽略写操作。
  • error– 这是一个默认选项,当文件已经存在时,它会返回错误。
代码语言:javascript复制
df2.write.mode('overwrite') 
         .csv("/PyDataStudio/spark_output/zipcodes")
# 你也可以这样写
df2.write.format("csv") 
         .mode('overwrite') 
         .save("/PyDataStudio/spark_output/zipcodes")

6. PySpark 读取 CSV 完整示例

代码语言:javascript复制
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StructField, StringType, IntegerType 
from pyspark.sql.types import ArrayType, DoubleType, BooleanType
from pyspark.sql.functions import col,array_contains

spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()

df = spark.read.csv("/PyDataStudio/zipcodes.csv")

df.printSchema()

df2 = spark.read.option("header",True) 
     .csv("/PyDataStudio/zipcodes.csv")
df2.printSchema()
   
df3 = spark.read.options(header='True', delimiter=',') 
  .csv("/PyDataStudio/zipcodes.csv")
df3.printSchema()

schema = StructType() 
      .add("RecordNumber",IntegerType(),True) 
      .add("Zipcode",IntegerType(),True) 
      .add("ZipCodeType",StringType(),True) 
      .add("City",StringType(),True) 
      .add("State",StringType(),True) 
      .add("LocationType",StringType(),True) 
      .add("Lat",DoubleType(),True) 
      .add("Long",DoubleType(),True) 
      .add("Xaxis",IntegerType(),True) 
      .add("Yaxis",DoubleType(),True) 
      .add("Zaxis",DoubleType(),True) 
      .add("WorldRegion",StringType(),True) 
      .add("Country",StringType(),True) 
      .add("LocationText",StringType(),True) 
      .add("Location",StringType(),True) 
      .add("Decommisioned",BooleanType(),True) 
      .add("TaxReturnsFiled",StringType(),True) 
      .add("EstimatedPopulation",IntegerType(),True) 
      .add("TotalWages",IntegerType(),True) 
      .add("Notes",StringType(),True)
      
df_with_schema = spark.read.format("csv") 
      .option("header", True) 
      .schema(schema) 
      .load("/PyDataStudio/zipcodes.csv")
df_with_schema.printSchema()

df2.write.option("header",True) 
         .csv("/PyDataStudio/spark_output/zipcodes123")

0 人点赞