【Parquet】Spark读取Parquet问题详解……

2022-05-17 15:41:02 浏览数 (1)

「困惑」

  1. spark sql 读取 parquet 文件,stage 生成任务 4 个 task,只有一个 task 处理数据,其它无
  2. spark 任务执行 apache iceberg rewriteDataFiles 合并小文件(parquet 文件),发现偶然无变化

「Parquet 文件详解」

一个 Parquet 文件是由一个 header 以及一个或多个 block 块组成,以一个 footer 结尾。

header 中只包含一个 4 个字节的数字 PAR1 用来识别整个 Parquet 文件格式。

文件中所有的 metadata 都存在于 footer 中。

footer 中的 metadata 包含了格式的版本信息,schema 信息、key-value paris 以及所有 block 中的 metadata 信息。

footer 中最后两个字段为一个以 4 个字节长度的 footer 的 metadata,以及同 header 中包含的一样的 PAR1。

Parquet 文件格式

上图展示了一个 Parquet 文件的结构

  • 一个文件中可以存储多个行组,文件的首位都是该文件的 Magic Code,用于校验它是否是一个 Parquet 文件。
  • Footer length 存储了文件元数据的大小,通过该值和文件长度可以计算出元数据的偏移量,文件的元数据中包括每一个行组的元数据信息和当前文件的 Schema 信息。
  • 每一页的开始都会存储该页的元数据,在 Parquet 中,有三种类型的页:数据页、字典页和索引页。数据页用于存储当前行组中该列的值,字典页存储该列值的编码字典,每一个列块中最多包含一个字典页,索引页用来存储当前行组下该列的索引。
  • 存储格式:Parquet 的存储模型主要由行组(Row Group 默认 128M)、列块(Column Chuck)、页(Page)组成。
  • 支持数据嵌套模型:Parquet 支持嵌套的数据模型,类似于 Protocol Buffers。

可以看出在 Schema 中所有的基本类型字段都是叶子节点,在这个 Schema 中一共存在 6 个叶子节点,如果把这样的 Schema 转换成扁平式的关系模型,就可以理解为该表包含六个列。

❝Parquet 中没有 Map、Array 这样的复杂数据结构每一个数据模型的 schema 包含多个字段,每一个字段又可以包含多个字段,每一个字段有三个属性:重复数、数据类型和字段名, 重复数可以是以下三种:required(出现 1 次),repeated(出现 0 次或多次),optional(出现 0 次或 1 次)。每一个字段的数据类型可以分成两种:group(复杂类型)和 primitive(基本类型)。以上实现列式存储,但是无法将其恢复到原来的数据行的结构形式,Parquet 采用了 Dremel 中(R, D, V)模型 R,即 Repetition Level,用于表达一个列有重复,即有多个值的情况,其值为重复是在第几层上发生。D,即 Definition Level,用于表达某个列是否为空、在哪里为空,其值为当前列在第几层上有值 V,表示数据值 ❞

  1. 行组,Row Group:Parquet 在水平方向上将数据划分为行组,默认行组大小与 HDFS Block 块大小对齐,Parquet 保证一个行组会被一个 Mapper 处理。
  2. 列块,Column Chunk:行组中每一列保存在一个列块中,一个列块具有相同的数据类型,不同的列块可以使用不同的压缩。
  3. 页,Page:Parquet 是页存储方式,每一个列块包含多个页,一个页是最小的编码的单位,同一列块的不同页可以使用不同的编码方式。

小结

  • Parquet 是一种支持嵌套结构的列式存储格式,非常适用于 OLAP 场景,按列存储和扫描。
  • 列存使得更容易对每个列使用高效的压缩和编码(一个页是最小的编码的单位),降低磁盘空间。
  • 映射下推,这是列式存储最突出的优势,是指在获取数据时只需要扫描需要的列,不用全部扫描。
  • 谓词下推,是指通过将一些过滤条件尽可能的在最底层执行以减少结果集。谓词就是指这些过滤条件,即返回。

实战

spark 2.4.0 读取 parquet 文件

❝spark.read.parquet("") ❞

代码语言:javascript复制
    org.apache.spark.sql.DataFrameReader.java

    val cls = DataSource.lookupDataSource(source, sparkSession.sessionState.conf)
    val jdbc = classOf[JdbcRelationProvider].getCanonicalName
    val json = classOf[JsonFileFormat].getCanonicalName
    val parquet = classOf[ParquetFileFormat].getCanonicalName
    val csv = classOf[CSVFileFormat].getCanonicalName
    val libsvm = "org.apache.spark.ml.source.libsvm.LibSVMFileFormat"
    val orc = "org.apache.spark.sql.hive.orc.OrcFileFormat"
    val nativeOrc = classOf[OrcFileFormat].getCanonicalName
    val socket = classOf[TextSocketSourceProvider].getCanonicalName   --->DataSourceV2
    val rate = classOf[RateStreamProvider].getCanonicalName     --->DataSourceV2
private def loadV1Source(paths: String*) = {
    // Code path for data source v1.
    sparkSession.baseRelationToDataFrame(
      DataSource.apply(
        sparkSession,
        paths = paths,
        userSpecifiedSchema = userSpecifiedSchema,
        className = source,
        options = extraOptions.toMap).resolveRelation())
  }
org.apache.spark.sql.execution.datasources.DataSource.resolveRelation
->getOrInferFileFormatSchema()
**Returns an [[InMemoryFileIndex]] that can be used to get partition schema and file list.**
  private def createInMemoryFileIndex(globbedPaths: Seq[Path]): InMemoryFileIndex = {
    val fileStatusCache = FileStatusCache.getOrCreate(sparkSession)
    new InMemoryFileIndex(
      sparkSession, globbedPaths, options, userSpecifiedSchema, fileStatusCache)
  }
InMemoryFileIndex.refresh0()
InMemoryFileIndex.listLeafFiles()
InMemoryFileIndex.bulkListLeafFiles()
val parallelPartitionDiscoveryParallelism =

private[sql] def bulkListLeafFiles(
...
spark.sql.sources.parallelPartitionDiscovery.parallelism 默认10000
sparkSession.sessionState.conf.parallelPartitionDiscoveryParallelism
    **设置并行度来防止下面的文件列表生成许多任务**
    **in case of large defaultParallelism.**
    **val numParallelism = Math.min(paths.size, parallelPartitionDiscoveryParallelism)**
    val previousJobDescription = sparkContext.getLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION)
    val statusMap = try {
      val description = paths.size match {
        case 0 =>
          s"Listing leaf files and directories 0 paths"
        case 1 =>
          s"Listing leaf files and directories for 1 path:<br/>${paths(0)}"
        case s =>
          s"Listing leaf files and directories for $s paths:<br/>${paths(0)}, ..."
      }
      sparkContext.setJobDescription(description)
      sparkContext
        .parallelize(serializedPaths, numParallelism)
        .mapPartitions { pathStrings =>
          val hadoopConf = serializableConfiguration.value
          pathStrings.map(new Path(_)).toSeq.map { path =>
            (path, listLeafFiles(path, hadoopConf, filter, None))
          }.iterator
        }.map { case (path, statuses) =>
        val serializableStatuses = statuses.map { status =>
          // Turn FileStatus into SerializableFileStatus so we can send it back to the driver
          val blockLocations = status match {
            case f: LocatedFileStatus =>
              f.getBlockLocations.map { loc =>
                SerializableBlockLocation(
                  loc.getNames,
                  loc.getHosts,
                  loc.getOffset,
                  loc.getLength)
              }

            case _ =>
              Array.empty[SerializableBlockLocation]
          }

          SerializableFileStatus(
            status.getPath.toString,
            status.getLen,
            status.isDirectory,
            status.getReplication,
            status.getBlockSize,
            status.getModificationTime,
            status.getAccessTime,
            blockLocations)
        }
        (path.toString, serializableStatuses)
      }.collect()
...
)

真正读取数据是 DataSourceScanExec

❝注意:这里有 DataSourceV2ScanExec v2 版本,经上面代码分析,parquet,orc 使用的是 v1 版 org.apache.spark.sql.execution.DataSourceScanExec.scala ❞

代码语言:javascript复制
Physical plan node for scanning data from HadoopFsRelations.
FileSourceScanExec
 private lazy val inputRDD: RDD[InternalRow] = {
    val readFile: (PartitionedFile) => Iterator[InternalRow] =
      relation.fileFormat.buildReaderWithPartitionValues(
        sparkSession = relation.sparkSession,
        dataSchema = relation.dataSchema,
        partitionSchema = relation.partitionSchema,
        requiredSchema = requiredSchema,
        filters = pushedDownFilters,
        options = relation.options,
        hadoopConf = relation.sparkSession.sessionState.newHadoopConfWithOptions(relation.options))
    relation.bucketSpec match {
      case Some(bucketing) if relation.sparkSession.sessionState.conf.bucketingEnabled =>
        createBucketedReadRDD(bucketing, readFile, selectedPartitions, relation)
      case _ =>
        createNonBucketedReadRDD(readFile, selectedPartitions, relation)
    }
  }
private def createNonBucketedReadRDD(
      readFile: (PartitionedFile) => Iterator[InternalRow],
      selectedPartitions: Seq[PartitionDirectory],
      fsRelation: HadoopFsRelation): RDD[InternalRow] = {
128M
    val defaultMaxSplitBytes =
      fsRelation.sparkSession.sessionState.conf.filesMaxPartitionBytes
4M
    val openCostInBytes = fsRelation.sparkSession.sessionState.conf.filesOpenCostInBytes
上面代码sparkcontent设置的
    val defaultParallelism = fsRelation.sparkSession.sparkContext.defaultParallelism
    val totalBytes = selectedPartitions.flatMap(_.files.map(_.getLen   openCostInBytes)).sum
    val bytesPerCore = totalBytes / defaultParallelism
    val maxSplitBytes = Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore))
    logInfo(s"Planning scan with bin packing, max size: $maxSplitBytes bytes, "  
      s"open cost is considered as scanning $openCostInBytes bytes.")
切文件
    val splitFiles = selectedPartitions.flatMap { partition =>
      partition.files.flatMap { file =>
        val blockLocations = getBlockLocations(file)
        if (fsRelation.fileFormat.isSplitable(
            fsRelation.sparkSession, fsRelation.options, file.getPath)) {
          (0L until file.getLen by maxSplitBytes).map { offset =>
            val remaining = file.getLen - offset
            val size = if (remaining > maxSplitBytes) maxSplitBytes else remaining
            val hosts = getBlockHosts(blockLocations, offset, size)
            PartitionedFile(
              partition.values, file.getPath.toUri.toString, offset, size, hosts)
          }
        } else {
          val hosts = getBlockHosts(blockLocations, 0, file.getLen)
          Seq(PartitionedFile(
            partition.values, file.getPath.toUri.toString, 0, file.getLen, hosts))
        }
      }
    }.toArray.sortBy(_.length)(implicitly[Ordering[Long]].reverse)

    val partitions = new ArrayBuffer[FilePartition]
    val currentFiles = new ArrayBuffer[PartitionedFile]
    var currentSize = 0L

    /** Close the current partition and move to the next. */
合并小文件,大文件就直接变为partition了。一路下来会以为会切大文件,然而并不会。
    def closePartition(): Unit = {
      if (currentFiles.nonEmpty) {
        val newPartition =
          FilePartition(
            partitions.size,
            currentFiles.toArray.toSeq) // Copy to a new Array.
        partitions  = newPartition
      }
      currentFiles.clear()
      currentSize = 0
    }

    // Assign files to partitions using "Next Fit Decreasing"
    splitFiles.foreach { file =>
这里遇到大文件直接放入partitions分区,小文件是几个大小达到maxSplitBytes,放入一个分区提高
      if (currentSize   file.length > maxSplitBytes) {
        closePartition()
      }
      // Add the given file to the current partition.
      currentSize  = file.length   openCostInBytes
      currentFiles  = file
    }
    closePartition()

    new FileScanRDD(fsRelation.sparkSession, readFile, partitions)
  }

小结

  1. spark 2.4.0 读取 parquet,使用的是 loadV1Source
  2. spark 读取文件默认 task 任务数(分区数)最大 10000,最小是 path 的个数(注意并行度和任务数分区数区别)
  3. createNonBucketedReadRDD 中 Bucketed 理解,是指 hive 表中的分区下面的分桶
  4. rdd 分区数确认:合并小文件,大文件就直接变为 partition 了,注意大文件没有切,目的提高 cpu 利用率

FileScanRDD 和 parquetjar 本身提供的读写 api

代码语言:javascript复制
org.apache.spark.sql.execution.datasources.FileScanRDD
 private def readCurrentFile(): Iterator[InternalRow] = {
        try {
          readFunction(currentFile)
        } catch {
          case e: FileNotFoundException =>
            throw new FileNotFoundException(
              e.getMessage   "n"  
                "It is possible the underlying files have been updated. "  
                "You can explicitly invalidate the cache in Spark by "  
                "running 'REFRESH TABLE tableName' command in SQL or "  
                "by recreating the Dataset/DataFrame involved.")
        }
      }

ParquetFileFormat.buildReaderWithPartitionValues(该方法上面有提)构造reader,
override def buildReaderWithPartitionValues(
...
if (enableVectorizedReader) {
        val vectorizedReader = new VectorizedParquetRecordReader(
          convertTz.orNull, enableOffHeapColumnVector && taskContext.isDefined, capacity)
        val iter = new RecordReaderIterator(vectorizedReader)
        // SPARK-23457 Register a task completion lister before `initialization`.
        taskContext.foreach(_.addTaskCompletionListener[Unit](_ => iter.close()))
        vectorizedReader.initialize(split, hadoopAttemptContext)
        vectorizedReader.initBatch(partitionSchema, file.partitionValues)
        if (returningBatch) {
          vectorizedReader.enableReturningBatches()
        }
      } else {
...
        reader.initialize(split, hadoopAttemptContext)
}

vectorizedReader.initialize(split, hadoopAttemptContext)
->SpecificParquetRecordReaderBase.initialize
 ->ParquetMetadata footer = readFooter(config, file, range(0, length));注意这里传入的range
->ParquetMetadataConverter.converter.readParquetMetadata(f, filter)
public ParquetMetadata readParquetMetadata(final InputStream from, MetadataFilter filter) throws IOException {
    FileMetaData fileMetaData = filter.accept(new MetadataFilterVisitor<FileMetaData, IOException>() {
      @Override
      public FileMetaData visit(NoFilter filter) throws IOException {
        return readFileMetaData(from);
      }

      @Override
      public FileMetaData visit(SkipMetadataFilter filter) throws IOException {
        return readFileMetaData(from, true);
      }

      @Override
      public FileMetaData visit(OffsetMetadataFilter filter) throws IOException {
        return filterFileMetaDataByStart(readFileMetaData(from), filter);
      }

      @Override
      public FileMetaData visit(RangeMetadataFilter filter) throws IOException {
        return filterFileMetaDataByMidpoint(readFileMetaData(from), filter);
      }
    });
    LOG.debug("{}", fileMetaData);
    ParquetMetadata parquetMetadata = fromParquetMetadata(fileMetaData);
    if (LOG.isDebugEnabled()) LOG.debug(ParquetMetadata.toPrettyJSON(parquetMetadata));
    return parquetMetadata;
  }

RangeMetadataFilter  filterFileMetaDataByMidpoint(readFileMetaData(from), filter);

 static FileMetaData filterFileMetaDataByMidpoint(FileMetaData metaData, RangeMetadataFilter filter) {
    List<RowGroup> rowGroups = metaData.getRow_groups();
    List<RowGroup> newRowGroups = new ArrayList<RowGroup>();
    for (RowGroup rowGroup : rowGroups) {
      long totalSize = 0;
      long startIndex = getOffset(rowGroup.getColumns().get(0));
      for (ColumnChunk col : rowGroup.getColumns()) {
        totalSize  = col.getMeta_data().getTotal_compressed_size();
      }
      long midPoint = startIndex   totalSize / 2;
      if (filter.contains(midPoint)) {
        newRowGroups.add(rowGroup);
      }
    }
    metaData.setRow_groups(newRowGroups);
    return metaData;
  }
到这里分割的关键点找到
现在假设我们有一个40m 的文件, 只有一个 row group, 10m 一分, 那么将会有4个 partitions
但是只有一个 partition 会占有这个 row group 的中点, 所以也只有这一个 partition 会有数据

小结

  1. spark 读取 parquet 文件默认用 enableVectorizedReader,向量读
  2. 根据 DataSourceScanExec 代码中划分的 partitions, 但不是所有 partitions 最后都会有数据
  3. 对于 parquet 文件,对于一个大的文件只含有一个 rowgroup,task 中谁拥有这个文件的中点谁处理这个 rowgroup,这样解决文章开头的疑惑!!!

0 人点赞