Spark读取变更Hudi数据集Schema实现分析

2021-04-13 11:05:01 浏览数 (1)

1. 介绍

Hudi支持上层Hive/Presto/Spark查询引擎,其中使用Spark读取Hudi数据集方法非常简单,在spark-shell或应用代码中,通过 spark.sqlContext.read.format("org.apache.hudi").load便可加载Hudi数据集,本篇文章分析具体的实现。

2. 分析

2.1 源码梳理

Spark支持用户自定义的format来读取或写入文件,只需要实现对应的(RelationProvider、SchemaRelationProvider)等接口即可。而Hudi也自定义实现了 org.apache.hudi/ hudi来实现Spark对Hudi数据集的读写,Hudi中最重要的一个相关类为 DefaultSource,其实现了 CreatableRelationProvider#createRelation接口,并实现了读写逻辑。其中读逻辑实现的方法核心代码如下。

代码语言:javascript复制
override def createRelation(sqlContext: SQLContext,
                              optParams: Map[String, String],
                              schema: StructType): BaseRelation = {
    // 合并参数
    val parameters = Map(QUERY_TYPE_OPT_KEY -> DEFAULT_QUERY_TYPE_OPT_VAL)    translateViewTypesToQueryTypes(optParams)
    // 必须提供path
    val path = parameters.get("path")
    if (path.isEmpty) {
      throw new HoodieException("'path' must be specified.")
    }

    if (parameters(QUERY_TYPE_OPT_KEY).equals(QUERY_TYPE_SNAPSHOT_OPT_VAL)) {
        // 添加HoodieROTablePathFilter
      sqlContext.sparkContext.hadoopConfiguration.setClass(
        "mapreduce.input.pathFilter.class",
        classOf[HoodieROTablePathFilter],
        classOf[org.apache.hadoop.fs.PathFilter]);
      // 解析Relation
      DataSource.apply(
        sparkSession = sqlContext.sparkSession,
        userSpecifiedSchema = Option(schema),
        className = "parquet",
        options = parameters)
        .resolveRelation()
    } else if (parameters(QUERY_TYPE_OPT_KEY).equals(QUERY_TYPE_INCREMENTAL_OPT_VAL)) {
      // 增量Relation
      new IncrementalRelation(sqlContext, path.get, optParams, schema)
    } else {
      throw new HoodieException("Invalid query type :"   parameters(QUERY_TYPE_OPT_KEY))
    }
  }

可以看到,对于读优化视图(ReadOptmized),会添加 HoodieROTablePathFilter,其用于过滤Hudi数据集中的文件。而过滤主要逻辑在 HoodieROTablePathFilter#accept方法中, HoodieROTablePathFilter会处理Hudi数据集和非Hudi数据集,对于Hudi数据集而言,会选取分区路径下最新的提交的parquet文件。

接着通过 DataSource#resolveRelation方法来解析parquet文件,关键逻辑如下

代码语言:javascript复制
val index = createInMemoryFileIndex(globbedPaths)
    val (resultDataSchema, resultPartitionSchema) =
        getOrInferFileFormatSchema(format, Some(index))

继续通过 DataSource#getOrInferFileFormatSchema方法解析,其中一段关键代码如下

代码语言:javascript复制
format.inferSchema(
        sparkSession,
        caseInsensitiveOptions,
        tempFileIndex.allFiles())

此时会根据不同的文件类型,如Orc/Text/Parquet类型来继续推导schema,其中tempFileIndex.allFiles获取到之前通过 HoodieROTableFilter过滤出的所有最新提交的parquet文件, inferSchema方法的关键代码如下

代码语言:javascript复制
val filesToTouch =
      if (shouldMergeSchemas) { // 是否需要合并文件的schema,不需要默认取第一个文件,否则合并多个文件
        val needMerged: Seq[FileStatus] =
          if (mergeRespectSummaries) {
            Seq.empty
          } else {
            filesByType.data
          }
        needMerged    filesByType.metadata    filesByType.commonMetadata
      } else {
        filesByType.commonMetadata.headOption
            .orElse(filesByType.metadata.headOption)
            .orElse(filesByType.data.headOption)
            .toSeq
      }
    // 获取schema
    ParquetFileFormat.mergeSchemasInParallel(filesToTouch, sparkSession)

可以看到,当不需要合并schema时,是否需要需要合并schema可通过 mergeSchema参数控制,当不需要时,默认获取的第一个文件,需要合并时,会 把所有文件的schema合并。其会影响spark查询结果,下面通过示例说明。

2.2 示例展示
2.2.1 schema配置

第一次插入时的schema如下

代码语言:javascript复制
{
  "type":"record",
  "name":"Person",
  "fields":[{
     "name": "name",
     "type": "string"
  }, {
      "name": "age", 
      "type": "int"
  }, {
      "name": "ts", 
      "type": "string"
  }, {
      "name": "location",
      "type": "string"
  }
]}

第二次更新时的schema如下(新增了sex列)

代码语言:javascript复制
{
  "type":"record",
  "name":"Person",
  "fields":[{
     "name": "name",
     "type": "string"
  }, {
      "name": "age", 
      "type": "int"
  }, {
      "name": "ts", 
      "type": "string"
  }, {
      "name": "location",
      "type": "string"
  }, {
      "name": "sex",
      "type": "string"
  }
]}

Hudi使用MOR模式。

2.2.2 插入/更新核心配置

写记录核心配置如下

代码语言:javascript复制
df.write().format("org.apache.hudi").
        option(DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY(), "MERGE_ON_READ").
        option("hoodie.insert.shuffle.parallelism", "10").
        option("hoodie.upsert.shuffle.parallelism", "10").
        option("hoodie.delete.shuffle.parallelism", "10").
        option("hoodie.bulkinsert.shuffle.parallelism", "10").
        option("hoodie.datasource.write.recordkey.field", "name").
        option("hoodie.datasource.write.partitionpath.field", "location").
        option("hoodie.datasource.write.precombine.field", "ts").
        option("hoodie.table.name", "hudi_mor_table").

        mode(Overwrite).
        save("D:/hudi_mor_table");

更新记录核心配置如下

代码语言:javascript复制
df.write().format("org.apache.hudi").
          option(DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY(), "MERGE_ON_READ").
          option("hoodie.insert.shuffle.parallelism", "10").
          option("hoodie.upsert.shuffle.parallelism", "10").
          option("hoodie.delete.shuffle.parallelism", "10").
          option("hoodie.bulkinsert.shuffle.parallelism", "10").
          option("hoodie.datasource.write.recordkey.field", "name").
          option("hoodie.datasource.write.partitionpath.field", "location").
          option("hoodie.datasource.write.precombine.field", "ts").
          option("hoodie.keep.max.commits", "5").
          option("hoodie.keep.min.commits", "4").
          option("hoodie.cleaner.commits.retained", "3").
          option("hoodie.table.name", "hudi_mor_table").
          mode(Append).
          save("D:/hudi_mor_table");
2.2.3 插入/更新实际数据设置

第一次插入实际数据为 {"name":"yuan1", "ts": "1574297893837", "age": 1, "location": "beijing"}

当第二次更新实际数据为 {"name":"yuan1", "ts": "1574297893837", "age": 1, "location": "beijing", "sex": "male"}

即第二次会更新一次写入的数据,那么使用如下代码显示数据时

代码语言:javascript复制
spark.sqlContext().read().format("org.apache.hudi").load("D:/hudi_mor_table"   "/*").show();

那么会发现结果包含了新增的sex列,未更新的值为null

当第二次更新实际数据为 {"name":"yuan1", "ts": "1574297893837", "age": 1, "location": "beijing1", "sex": "male"}

即第二次会写入不同的分区,即不会更新第一次写入的数据,那么查询数据时,会发现查询的结果不会出现新增的sex列

当使用如下代码显示数据时,设置合并schema参数,即会合并多个分区下的最新的parquet的schema。

代码语言:javascript复制
spark.sqlContext().read().format("org.apache.hudi").option("mergeSchema", "true").load("D:/hudi_mor_table"   "/*").show();

会发现查询的结果出现了新增的sex列

3. 总结

当使用Spark查询Hudi数据集时,当数据的schema新增时,会获取单个分区的parquet文件来推导出schema,若变更schema后未更新该分区数据,那么新增的列是不会显示,否则会显示该新增的列;若未更新该分区的记录时,那么新增的列也不会显示,可通过 mergeSchema来控制合并不同分区下parquet文件的schema,从而可达到显示新增列的目的。

0 人点赞