「困惑」
- spark sql 读取 parquet 文件,stage 生成任务 4 个 task,只有一个 task 处理数据,其它无
- spark 任务执行 apache iceberg rewriteDataFiles 合并小文件(parquet 文件),发现偶然无变化
「Parquet 文件详解」
一个 Parquet 文件是由一个 header 以及一个或多个 block 块组成,以一个 footer 结尾。
header 中只包含一个 4 个字节的数字 PAR1 用来识别整个 Parquet 文件格式。
文件中所有的 metadata 都存在于 footer 中。
footer 中的 metadata 包含了格式的版本信息,schema 信息、key-value paris 以及所有 block 中的 metadata 信息。
footer 中最后两个字段为一个以 4 个字节长度的 footer 的 metadata,以及同 header 中包含的一样的 PAR1。
Parquet 文件格式
上图展示了一个 Parquet 文件的结构
- 一个文件中可以存储多个行组,文件的首位都是该文件的 Magic Code,用于校验它是否是一个 Parquet 文件。
- Footer length 存储了文件元数据的大小,通过该值和文件长度可以计算出元数据的偏移量,文件的元数据中包括每一个行组的元数据信息和当前文件的 Schema 信息。
- 每一页的开始都会存储该页的元数据,在 Parquet 中,有三种类型的页:数据页、字典页和索引页。数据页用于存储当前行组中该列的值,字典页存储该列值的编码字典,每一个列块中最多包含一个字典页,索引页用来存储当前行组下该列的索引。
- 存储格式:Parquet 的存储模型主要由行组(Row Group 默认 128M)、列块(Column Chuck)、页(Page)组成。
- 支持数据嵌套模型:Parquet 支持嵌套的数据模型,类似于 Protocol Buffers。
可以看出在 Schema 中所有的基本类型字段都是叶子节点,在这个 Schema 中一共存在 6 个叶子节点,如果把这样的 Schema 转换成扁平式的关系模型,就可以理解为该表包含六个列。
❝Parquet 中没有 Map、Array 这样的复杂数据结构每一个数据模型的 schema 包含多个字段,每一个字段又可以包含多个字段,每一个字段有三个属性:重复数、数据类型和字段名, 重复数可以是以下三种:required(出现 1 次),repeated(出现 0 次或多次),optional(出现 0 次或 1 次)。每一个字段的数据类型可以分成两种:group(复杂类型)和 primitive(基本类型)。以上实现列式存储,但是无法将其恢复到原来的数据行的结构形式,Parquet 采用了 Dremel 中(R, D, V)模型 R,即 Repetition Level,用于表达一个列有重复,即有多个值的情况,其值为重复是在第几层上发生。D,即 Definition Level,用于表达某个列是否为空、在哪里为空,其值为当前列在第几层上有值 V,表示数据值 ❞
- 行组,Row Group:Parquet 在水平方向上将数据划分为行组,默认行组大小与 HDFS Block 块大小对齐,Parquet 保证一个行组会被一个 Mapper 处理。
- 列块,Column Chunk:行组中每一列保存在一个列块中,一个列块具有相同的数据类型,不同的列块可以使用不同的压缩。
- 页,Page:Parquet 是页存储方式,每一个列块包含多个页,一个页是最小的编码的单位,同一列块的不同页可以使用不同的编码方式。
小结
- Parquet 是一种支持嵌套结构的列式存储格式,非常适用于 OLAP 场景,按列存储和扫描。
- 列存使得更容易对每个列使用高效的压缩和编码(一个页是最小的编码的单位),降低磁盘空间。
- 映射下推,这是列式存储最突出的优势,是指在获取数据时只需要扫描需要的列,不用全部扫描。
- 谓词下推,是指通过将一些过滤条件尽可能的在最底层执行以减少结果集。谓词就是指这些过滤条件,即返回。
实战
spark 2.4.0 读取 parquet 文件
代码语言:javascript复制❝spark.read.parquet("") ❞
org.apache.spark.sql.DataFrameReader.java
val cls = DataSource.lookupDataSource(source, sparkSession.sessionState.conf)
val jdbc = classOf[JdbcRelationProvider].getCanonicalName
val json = classOf[JsonFileFormat].getCanonicalName
val parquet = classOf[ParquetFileFormat].getCanonicalName
val csv = classOf[CSVFileFormat].getCanonicalName
val libsvm = "org.apache.spark.ml.source.libsvm.LibSVMFileFormat"
val orc = "org.apache.spark.sql.hive.orc.OrcFileFormat"
val nativeOrc = classOf[OrcFileFormat].getCanonicalName
val socket = classOf[TextSocketSourceProvider].getCanonicalName --->DataSourceV2
val rate = classOf[RateStreamProvider].getCanonicalName --->DataSourceV2
private def loadV1Source(paths: String*) = {
// Code path for data source v1.
sparkSession.baseRelationToDataFrame(
DataSource.apply(
sparkSession,
paths = paths,
userSpecifiedSchema = userSpecifiedSchema,
className = source,
options = extraOptions.toMap).resolveRelation())
}
org.apache.spark.sql.execution.datasources.DataSource.resolveRelation
->getOrInferFileFormatSchema()
**Returns an [[InMemoryFileIndex]] that can be used to get partition schema and file list.**
private def createInMemoryFileIndex(globbedPaths: Seq[Path]): InMemoryFileIndex = {
val fileStatusCache = FileStatusCache.getOrCreate(sparkSession)
new InMemoryFileIndex(
sparkSession, globbedPaths, options, userSpecifiedSchema, fileStatusCache)
}
InMemoryFileIndex.refresh0()
InMemoryFileIndex.listLeafFiles()
InMemoryFileIndex.bulkListLeafFiles()
val parallelPartitionDiscoveryParallelism =
private[sql] def bulkListLeafFiles(
...
spark.sql.sources.parallelPartitionDiscovery.parallelism 默认10000
sparkSession.sessionState.conf.parallelPartitionDiscoveryParallelism
**设置并行度来防止下面的文件列表生成许多任务**
**in case of large defaultParallelism.**
**val numParallelism = Math.min(paths.size, parallelPartitionDiscoveryParallelism)**
val previousJobDescription = sparkContext.getLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION)
val statusMap = try {
val description = paths.size match {
case 0 =>
s"Listing leaf files and directories 0 paths"
case 1 =>
s"Listing leaf files and directories for 1 path:<br/>${paths(0)}"
case s =>
s"Listing leaf files and directories for $s paths:<br/>${paths(0)}, ..."
}
sparkContext.setJobDescription(description)
sparkContext
.parallelize(serializedPaths, numParallelism)
.mapPartitions { pathStrings =>
val hadoopConf = serializableConfiguration.value
pathStrings.map(new Path(_)).toSeq.map { path =>
(path, listLeafFiles(path, hadoopConf, filter, None))
}.iterator
}.map { case (path, statuses) =>
val serializableStatuses = statuses.map { status =>
// Turn FileStatus into SerializableFileStatus so we can send it back to the driver
val blockLocations = status match {
case f: LocatedFileStatus =>
f.getBlockLocations.map { loc =>
SerializableBlockLocation(
loc.getNames,
loc.getHosts,
loc.getOffset,
loc.getLength)
}
case _ =>
Array.empty[SerializableBlockLocation]
}
SerializableFileStatus(
status.getPath.toString,
status.getLen,
status.isDirectory,
status.getReplication,
status.getBlockSize,
status.getModificationTime,
status.getAccessTime,
blockLocations)
}
(path.toString, serializableStatuses)
}.collect()
...
)
真正读取数据是 DataSourceScanExec
代码语言:javascript复制❝注意:这里有 DataSourceV2ScanExec v2 版本,经上面代码分析,parquet,orc 使用的是 v1 版 org.apache.spark.sql.execution.DataSourceScanExec.scala ❞
Physical plan node for scanning data from HadoopFsRelations.
FileSourceScanExec
private lazy val inputRDD: RDD[InternalRow] = {
val readFile: (PartitionedFile) => Iterator[InternalRow] =
relation.fileFormat.buildReaderWithPartitionValues(
sparkSession = relation.sparkSession,
dataSchema = relation.dataSchema,
partitionSchema = relation.partitionSchema,
requiredSchema = requiredSchema,
filters = pushedDownFilters,
options = relation.options,
hadoopConf = relation.sparkSession.sessionState.newHadoopConfWithOptions(relation.options))
relation.bucketSpec match {
case Some(bucketing) if relation.sparkSession.sessionState.conf.bucketingEnabled =>
createBucketedReadRDD(bucketing, readFile, selectedPartitions, relation)
case _ =>
createNonBucketedReadRDD(readFile, selectedPartitions, relation)
}
}
private def createNonBucketedReadRDD(
readFile: (PartitionedFile) => Iterator[InternalRow],
selectedPartitions: Seq[PartitionDirectory],
fsRelation: HadoopFsRelation): RDD[InternalRow] = {
128M
val defaultMaxSplitBytes =
fsRelation.sparkSession.sessionState.conf.filesMaxPartitionBytes
4M
val openCostInBytes = fsRelation.sparkSession.sessionState.conf.filesOpenCostInBytes
上面代码sparkcontent设置的
val defaultParallelism = fsRelation.sparkSession.sparkContext.defaultParallelism
val totalBytes = selectedPartitions.flatMap(_.files.map(_.getLen openCostInBytes)).sum
val bytesPerCore = totalBytes / defaultParallelism
val maxSplitBytes = Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore))
logInfo(s"Planning scan with bin packing, max size: $maxSplitBytes bytes, "
s"open cost is considered as scanning $openCostInBytes bytes.")
切文件
val splitFiles = selectedPartitions.flatMap { partition =>
partition.files.flatMap { file =>
val blockLocations = getBlockLocations(file)
if (fsRelation.fileFormat.isSplitable(
fsRelation.sparkSession, fsRelation.options, file.getPath)) {
(0L until file.getLen by maxSplitBytes).map { offset =>
val remaining = file.getLen - offset
val size = if (remaining > maxSplitBytes) maxSplitBytes else remaining
val hosts = getBlockHosts(blockLocations, offset, size)
PartitionedFile(
partition.values, file.getPath.toUri.toString, offset, size, hosts)
}
} else {
val hosts = getBlockHosts(blockLocations, 0, file.getLen)
Seq(PartitionedFile(
partition.values, file.getPath.toUri.toString, 0, file.getLen, hosts))
}
}
}.toArray.sortBy(_.length)(implicitly[Ordering[Long]].reverse)
val partitions = new ArrayBuffer[FilePartition]
val currentFiles = new ArrayBuffer[PartitionedFile]
var currentSize = 0L
/** Close the current partition and move to the next. */
合并小文件,大文件就直接变为partition了。一路下来会以为会切大文件,然而并不会。
def closePartition(): Unit = {
if (currentFiles.nonEmpty) {
val newPartition =
FilePartition(
partitions.size,
currentFiles.toArray.toSeq) // Copy to a new Array.
partitions = newPartition
}
currentFiles.clear()
currentSize = 0
}
// Assign files to partitions using "Next Fit Decreasing"
splitFiles.foreach { file =>
这里遇到大文件直接放入partitions分区,小文件是几个大小达到maxSplitBytes,放入一个分区提高
if (currentSize file.length > maxSplitBytes) {
closePartition()
}
// Add the given file to the current partition.
currentSize = file.length openCostInBytes
currentFiles = file
}
closePartition()
new FileScanRDD(fsRelation.sparkSession, readFile, partitions)
}
小结
- spark 2.4.0 读取 parquet,使用的是 loadV1Source
- spark 读取文件默认 task 任务数(分区数)最大 10000,最小是 path 的个数(注意并行度和任务数分区数区别)
- createNonBucketedReadRDD 中 Bucketed 理解,是指 hive 表中的分区下面的分桶
- rdd 分区数确认:合并小文件,大文件就直接变为 partition 了,注意大文件没有切,目的提高 cpu 利用率
FileScanRDD 和 parquetjar 本身提供的读写 api
代码语言:javascript复制org.apache.spark.sql.execution.datasources.FileScanRDD
private def readCurrentFile(): Iterator[InternalRow] = {
try {
readFunction(currentFile)
} catch {
case e: FileNotFoundException =>
throw new FileNotFoundException(
e.getMessage "n"
"It is possible the underlying files have been updated. "
"You can explicitly invalidate the cache in Spark by "
"running 'REFRESH TABLE tableName' command in SQL or "
"by recreating the Dataset/DataFrame involved.")
}
}
ParquetFileFormat.buildReaderWithPartitionValues(该方法上面有提)构造reader,
override def buildReaderWithPartitionValues(
...
if (enableVectorizedReader) {
val vectorizedReader = new VectorizedParquetRecordReader(
convertTz.orNull, enableOffHeapColumnVector && taskContext.isDefined, capacity)
val iter = new RecordReaderIterator(vectorizedReader)
// SPARK-23457 Register a task completion lister before `initialization`.
taskContext.foreach(_.addTaskCompletionListener[Unit](_ => iter.close()))
vectorizedReader.initialize(split, hadoopAttemptContext)
vectorizedReader.initBatch(partitionSchema, file.partitionValues)
if (returningBatch) {
vectorizedReader.enableReturningBatches()
}
} else {
...
reader.initialize(split, hadoopAttemptContext)
}
vectorizedReader.initialize(split, hadoopAttemptContext)
->SpecificParquetRecordReaderBase.initialize
->ParquetMetadata footer = readFooter(config, file, range(0, length));注意这里传入的range
->ParquetMetadataConverter.converter.readParquetMetadata(f, filter)
public ParquetMetadata readParquetMetadata(final InputStream from, MetadataFilter filter) throws IOException {
FileMetaData fileMetaData = filter.accept(new MetadataFilterVisitor<FileMetaData, IOException>() {
@Override
public FileMetaData visit(NoFilter filter) throws IOException {
return readFileMetaData(from);
}
@Override
public FileMetaData visit(SkipMetadataFilter filter) throws IOException {
return readFileMetaData(from, true);
}
@Override
public FileMetaData visit(OffsetMetadataFilter filter) throws IOException {
return filterFileMetaDataByStart(readFileMetaData(from), filter);
}
@Override
public FileMetaData visit(RangeMetadataFilter filter) throws IOException {
return filterFileMetaDataByMidpoint(readFileMetaData(from), filter);
}
});
LOG.debug("{}", fileMetaData);
ParquetMetadata parquetMetadata = fromParquetMetadata(fileMetaData);
if (LOG.isDebugEnabled()) LOG.debug(ParquetMetadata.toPrettyJSON(parquetMetadata));
return parquetMetadata;
}
RangeMetadataFilter filterFileMetaDataByMidpoint(readFileMetaData(from), filter);
static FileMetaData filterFileMetaDataByMidpoint(FileMetaData metaData, RangeMetadataFilter filter) {
List<RowGroup> rowGroups = metaData.getRow_groups();
List<RowGroup> newRowGroups = new ArrayList<RowGroup>();
for (RowGroup rowGroup : rowGroups) {
long totalSize = 0;
long startIndex = getOffset(rowGroup.getColumns().get(0));
for (ColumnChunk col : rowGroup.getColumns()) {
totalSize = col.getMeta_data().getTotal_compressed_size();
}
long midPoint = startIndex totalSize / 2;
if (filter.contains(midPoint)) {
newRowGroups.add(rowGroup);
}
}
metaData.setRow_groups(newRowGroups);
return metaData;
}
到这里分割的关键点找到
现在假设我们有一个40m 的文件, 只有一个 row group, 10m 一分, 那么将会有4个 partitions
但是只有一个 partition 会占有这个 row group 的中点, 所以也只有这一个 partition 会有数据
小结
- spark 读取 parquet 文件默认用 enableVectorizedReader,向量读
- 根据 DataSourceScanExec 代码中划分的 partitions, 但不是所有 partitions 最后都会有数据
- 对于 parquet 文件,对于一个大的文件只含有一个 rowgroup,task 中谁拥有这个文件的中点谁处理这个 rowgroup,这样解决文章开头的疑惑!!!