1. 介绍
Hudi支持上层Hive/Presto/Spark查询引擎,其中使用Spark读取Hudi数据集方法非常简单,在spark-shell或应用代码中,通过 spark.sqlContext.read.format("org.apache.hudi").load
便可加载Hudi数据集,本篇文章分析具体的实现。
2. 分析
2.1 源码梳理
Spark支持用户自定义的format来读取或写入文件,只需要实现对应的(RelationProvider、SchemaRelationProvider)等接口即可。而Hudi也自定义实现了 org.apache.hudi
/ hudi
来实现Spark对Hudi数据集的读写,Hudi中最重要的一个相关类为 DefaultSource
,其实现了 CreatableRelationProvider#createRelation
接口,并实现了读写逻辑。其中读逻辑实现的方法核心代码如下。
override def createRelation(sqlContext: SQLContext,
optParams: Map[String, String],
schema: StructType): BaseRelation = {
// 合并参数
val parameters = Map(QUERY_TYPE_OPT_KEY -> DEFAULT_QUERY_TYPE_OPT_VAL) translateViewTypesToQueryTypes(optParams)
// 必须提供path
val path = parameters.get("path")
if (path.isEmpty) {
throw new HoodieException("'path' must be specified.")
}
if (parameters(QUERY_TYPE_OPT_KEY).equals(QUERY_TYPE_SNAPSHOT_OPT_VAL)) {
// 添加HoodieROTablePathFilter
sqlContext.sparkContext.hadoopConfiguration.setClass(
"mapreduce.input.pathFilter.class",
classOf[HoodieROTablePathFilter],
classOf[org.apache.hadoop.fs.PathFilter]);
// 解析Relation
DataSource.apply(
sparkSession = sqlContext.sparkSession,
userSpecifiedSchema = Option(schema),
className = "parquet",
options = parameters)
.resolveRelation()
} else if (parameters(QUERY_TYPE_OPT_KEY).equals(QUERY_TYPE_INCREMENTAL_OPT_VAL)) {
// 增量Relation
new IncrementalRelation(sqlContext, path.get, optParams, schema)
} else {
throw new HoodieException("Invalid query type :" parameters(QUERY_TYPE_OPT_KEY))
}
}
可以看到,对于读优化视图(ReadOptmized),会添加 HoodieROTablePathFilter
,其用于过滤Hudi数据集中的文件。而过滤主要逻辑在 HoodieROTablePathFilter#accept
方法中, HoodieROTablePathFilter
会处理Hudi数据集和非Hudi数据集,对于Hudi数据集而言,会选取分区路径下最新的提交的parquet文件。
接着通过 DataSource#resolveRelation
方法来解析parquet文件,关键逻辑如下
val index = createInMemoryFileIndex(globbedPaths)
val (resultDataSchema, resultPartitionSchema) =
getOrInferFileFormatSchema(format, Some(index))
继续通过 DataSource#getOrInferFileFormatSchema
方法解析,其中一段关键代码如下
format.inferSchema(
sparkSession,
caseInsensitiveOptions,
tempFileIndex.allFiles())
此时会根据不同的文件类型,如Orc/Text/Parquet类型来继续推导schema,其中tempFileIndex.allFiles获取到之前通过 HoodieROTableFilter
过滤出的所有最新提交的parquet文件, inferSchema
方法的关键代码如下
val filesToTouch =
if (shouldMergeSchemas) { // 是否需要合并文件的schema,不需要默认取第一个文件,否则合并多个文件
val needMerged: Seq[FileStatus] =
if (mergeRespectSummaries) {
Seq.empty
} else {
filesByType.data
}
needMerged filesByType.metadata filesByType.commonMetadata
} else {
filesByType.commonMetadata.headOption
.orElse(filesByType.metadata.headOption)
.orElse(filesByType.data.headOption)
.toSeq
}
// 获取schema
ParquetFileFormat.mergeSchemasInParallel(filesToTouch, sparkSession)
可以看到,当不需要合并schema时,是否需要需要合并schema可通过 mergeSchema
参数控制,当不需要时,默认获取的第一个文件,需要合并时,会 把所有文件的schema合并。其会影响spark查询结果,下面通过示例说明。
2.2 示例展示
2.2.1 schema配置
第一次插入时的schema如下
代码语言:javascript复制{
"type":"record",
"name":"Person",
"fields":[{
"name": "name",
"type": "string"
}, {
"name": "age",
"type": "int"
}, {
"name": "ts",
"type": "string"
}, {
"name": "location",
"type": "string"
}
]}
第二次更新时的schema如下(新增了sex列)
代码语言:javascript复制{
"type":"record",
"name":"Person",
"fields":[{
"name": "name",
"type": "string"
}, {
"name": "age",
"type": "int"
}, {
"name": "ts",
"type": "string"
}, {
"name": "location",
"type": "string"
}, {
"name": "sex",
"type": "string"
}
]}
Hudi使用MOR模式。
2.2.2 插入/更新核心配置
写记录核心配置如下
代码语言:javascript复制df.write().format("org.apache.hudi").
option(DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY(), "MERGE_ON_READ").
option("hoodie.insert.shuffle.parallelism", "10").
option("hoodie.upsert.shuffle.parallelism", "10").
option("hoodie.delete.shuffle.parallelism", "10").
option("hoodie.bulkinsert.shuffle.parallelism", "10").
option("hoodie.datasource.write.recordkey.field", "name").
option("hoodie.datasource.write.partitionpath.field", "location").
option("hoodie.datasource.write.precombine.field", "ts").
option("hoodie.table.name", "hudi_mor_table").
mode(Overwrite).
save("D:/hudi_mor_table");
更新记录核心配置如下
代码语言:javascript复制df.write().format("org.apache.hudi").
option(DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY(), "MERGE_ON_READ").
option("hoodie.insert.shuffle.parallelism", "10").
option("hoodie.upsert.shuffle.parallelism", "10").
option("hoodie.delete.shuffle.parallelism", "10").
option("hoodie.bulkinsert.shuffle.parallelism", "10").
option("hoodie.datasource.write.recordkey.field", "name").
option("hoodie.datasource.write.partitionpath.field", "location").
option("hoodie.datasource.write.precombine.field", "ts").
option("hoodie.keep.max.commits", "5").
option("hoodie.keep.min.commits", "4").
option("hoodie.cleaner.commits.retained", "3").
option("hoodie.table.name", "hudi_mor_table").
mode(Append).
save("D:/hudi_mor_table");
2.2.3 插入/更新实际数据设置
第一次插入实际数据为 {"name":"yuan1", "ts": "1574297893837", "age": 1, "location": "beijing"}
当第二次更新实际数据为 {"name":"yuan1", "ts": "1574297893837", "age": 1, "location": "beijing", "sex": "male"}
即第二次会更新一次写入的数据,那么使用如下代码显示数据时
代码语言:javascript复制spark.sqlContext().read().format("org.apache.hudi").load("D:/hudi_mor_table" "/*").show();
那么会发现结果包含了新增的sex列,未更新的值为null。
当第二次更新实际数据为 {"name":"yuan1", "ts": "1574297893837", "age": 1, "location": "beijing1", "sex": "male"}
即第二次会写入不同的分区,即不会更新第一次写入的数据,那么查询数据时,会发现查询的结果不会出现新增的sex列。
当使用如下代码显示数据时,设置合并schema参数,即会合并多个分区下的最新的parquet的schema。
代码语言:javascript复制spark.sqlContext().read().format("org.apache.hudi").option("mergeSchema", "true").load("D:/hudi_mor_table" "/*").show();
会发现查询的结果出现了新增的sex列。
3. 总结
当使用Spark查询Hudi数据集时,当数据的schema新增时,会获取单个分区的parquet文件来推导出schema,若变更schema后未更新该分区数据,那么新增的列是不会显示,否则会显示该新增的列;若未更新该分区的记录时,那么新增的列也不会显示,可通过 mergeSchema
来控制合并不同分区下parquet文件的schema,从而可达到显示新增列的目的。