在之前的一篇文章中,我们引入了一种新的名为clustering的表服务,它可以重组数据,从而在不影响写入速度的情况下提高查询性能。 我们学习了如何设置inline clustering。 在这篇文章中,我们将讨论自那以后发生的变化,并看看如何使用HoodieClusteringJob和DeltaStreamer实用工具来设置异步clustering。
总览
在较高的层次上,集群基于可配置的策略创建计划,根据特定的标准对符合条件的文件进行分组,然后执行计划。 Hudi支持多写入器,它在多个表服务之间提供快照隔离,从而允许写入器在后台运行clustering时继续输入。 要了解clustering架构的更详细概述,请查看之前的博客文章。
Clustering 策略
如前所述,clustering计划和执行都依赖于可配置策略。 这些策略大致可分为三类:clustering计划策略、执行策略和更新策略。
计划策略
此策略在创建clustering计划时发挥作用。 它有助于决定应该对哪些文件组进行clustering。 让我们看看Hudi的不同计划策略。 注意,使用这个配置,这些策略都是插件式的。
SparkSizeBasedClusteringPlanStrategy:它根据基本文件的小文件限制选择文件片,并创建clustering组,每个组的最大文件大小为允许的最大文件大小。 最大大小可以使用这个配置来指定。 这种策略对于将中等大小的文件拼接到更大的文件中,以减少大量文件在冷分区上的传播非常有用。
SparkRecentDaysClusteringPlanStrategy:它回顾以前的“N”天分区,并创建一个计划,将这些分区中的“小”文件片clustering起来。 这是默认策略。 当工作负载是可预测的并且数据是按时间划分的时候,它会很有用。
SparkSelectedPartitionsClusteringPlanStrategy:如果您只想在一个范围内clustering特定的分区,无论这些分区是旧的还是新的,那么这个策略可能是有用的。 要使用这个策略,需要额外设置以下两个配置(开始分区和结束分区都包括在内):
代码语言:javascript复制hoodie.clustering.plan.strategy.cluster.begin.partition
hoodie.clustering.plan.strategy.cluster.end.partitionCopy
所有的策略都是分区感知的,后两个策略仍然受第一个策略的大小限制。
执行策略
在规划阶段构建clustering组之后,Hudi对每个组应用执行策略,主要基于排序列和大小。 可以使用此配置指定策略。
SparkSortAndSizeExecutionStrategy是默认策略。 当使用此配置进行clustering时,用户可以指定要对数据进行排序的列。 除此之外,我们还可以为clustering生成的parquet文件设置最大文件大小。 该策略使用大容量插入将数据写入新文件,在这种情况下,Hudi隐式地使用分区程序根据指定的列进行排序。 通过这种方式,该策略改变了数据布局,不仅提高了查询性能,还自动平衡了重写开销。
现在,这个策略可以作为单个spark作业执行,也可以作为多个作业执行,这取决于在规划阶段创建的clustering组的数量。 默认情况下,Hudi将提交多个spark作业并合并结果。 如果你想强制Hudi使用单个spark作业,设置执行策略类配置为SingleSparkJobExecutionStrategy。
更新策略
目前,clustering只能被调度到没有接收到任何并发更新的表/分区。 默认情况下,更新策略的配置设置为SparkRejectUpdateStrategy。 如果某个文件组在集群期间有更新,那么它将拒绝更新并抛出异常。 然而,在某些用例中,更新非常稀疏,并且不涉及大多数文件组。 简单地拒绝更新的默认策略似乎不公平。 在这些用例中,用户可以将配置设置为SparkAllowUpdateStrategy。
我们讨论了关键的有用配置。 这里列出了与cluster相关的所有其他配置。 在这个列表中,一些非常有用的配置是:
Config key | Remarks | Default |
---|---|---|
hoodie.clustering.async.enabled | 启用clustering服务的运行,当写入发生在表上时异步运行。 | False |
hoodie.clustering.async.max.commits | 通过指定应该触发多少提交clustering,来控制异步clustering的频率。 | 4 |
hoodie.clustering.preserve.commit.metadata | 重写数据时,保留已有的_hoodie_commit_time。 这意味着用户可以在集群数据上运行增量查询,而不会产生任何副作用。 | False |
异步Clustering
在前面,我们已经看到了用户如何设置inline cluster。 此外,用户可以利用HoodieClusteringJob来设置两步异步clustering。
HoodieClusteringJob
随着Hudi 0.9.0版本的发布,我们可以在同一个步骤中调度和执行clustering。 我们只需要指定-mode或-m选项。 有三种模式:
- schedule:制定clustering计划。 这提供了一个可以在执行模式中传递的瞬间。
- execute:在给定的瞬间执行clustering计划,这意味着这里需要指定–instant-time。
- scheduleAndExecute:首先制定一个clustering计划,然后立即执行。
注意,要在原始写入器仍在运行时运行此作业,请启用多写入:
代码语言:javascript复制hoodie.write.concurrency.mode=optimistic_concurrency_control
hoodie.write.lock.provider=org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProviderCopy
设置HoodieClusteringJob的spark-submit命令示例如下:
代码语言:javascript复制spark-submit
--class org.apache.hudi.utilities.HoodieClusteringJob
/path/to/hudi-utilities-bundle/target/hudi-utilities-bundle_2.12-0.9.0-SNAPSHOT.jar
--props /path/to/config/clusteringjob.properties
--mode scheduleAndExecute
--base-path /path/to/hudi_table/basePath
--table-name hudi_table_schedule_clustering
--spark-memory 1gCopy
clusteringjob.properties如下
代码语言:javascript复制hoodie.clustering.async.enabled=true
hoodie.clustering.async.max.commits=4
hoodie.clustering.plan.strategy.target.file.max.bytes=1073741824
hoodie.clustering.plan.strategy.small.file.limit=629145600
hoodie.clustering.execution.strategy.class=org.apache.hudi.client.clustering.run.strategy.SparkSortAndSizeExecutionStrategy
hoodie.clustering.plan.strategy.sort.columns=column1,column2Copy
HoodieDeltaStreamer
这就引出了用户在Hudi中最喜欢的实用程序。 现在,我们可以用DeltaStreamer触发异步clustering。 只需将hoodie.cluster .async.enabled配置设置为true,并在属性文件中指定其他clustering配置,这些配置的位置可以在启动deltastreamer时作为-props传递(就像HoodieClusteringJob的情况一样)。
设置HoodieDeltaStreamer的spark-submit命令示例如下:
代码语言:javascript复制spark-submit
--class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer
/path/to/hudi-utilities-bundle/target/hudi-utilities-bundle_2.12-0.9.0-SNAPSHOT.jar
--props /path/to/config/clustering_kafka.properties
--schemaprovider-class org.apache.hudi.utilities.schema.SchemaRegistryProvider
--source-class org.apache.hudi.utilities.sources.AvroKafkaSource
--source-ordering-field impresssiontime
--table-type COPY_ON_WRITE
--target-base-path /path/to/hudi_table/basePath
--target-table impressions_cow_cluster
--op INSERT
--hoodie-conf hoodie.clustering.async.enabled=true
--continuousCopy
Spark Structured Streaming
我们还可以使用Spark结构化的流接收器启用异步clustering,如下所示。
代码语言:javascript复制val commonOpts = Map(
"hoodie.insert.shuffle.parallelism" -> "4",
"hoodie.upsert.shuffle.parallelism" -> "4",
DataSourceWriteOptions.RECORDKEY_FIELD.key -> "_row_key",
DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "partition",
DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "timestamp",
HoodieWriteConfig.TBL_NAME.key -> "hoodie_test"
)
def getAsyncClusteringOpts(isAsyncClustering: String,
clusteringNumCommit: String,
executionStrategy: String):Map[String, String] = {
commonOpts (DataSourceWriteOptions.ASYNC_CLUSTERING_ENABLE.key -> isAsyncClustering,
HoodieClusteringConfig.ASYNC_CLUSTERING_MAX_COMMITS.key -> clusteringNumCommit,
HoodieClusteringConfig.EXECUTION_STRATEGY_CLASS_NAME.key -> executionStrategy
)
}
def initStreamingWriteFuture(hudiOptions: Map[String, String]): Future[Unit] = {
val streamingInput = // define the source of streaming
Future {
println("streaming starting")
streamingInput
.writeStream
.format("org.apache.hudi")
.options(hudiOptions)
.option("checkpointLocation", basePath "/checkpoint")
.mode(Append)
.start()
.awaitTermination(10000)
println("streaming ends")
}
}
def structuredStreamingWithClustering(): Unit = {
val df = //generate data frame
val hudiOptions = getClusteringOpts("true", "1", "org.apache.hudi.client.clustering.run.strategy.SparkSortAndSizeExecutionStrategy")
val f1 = initStreamingWriteFuture(hudiOptions)
Await.result(f1, Duration.Inf)
}Copy
结论与未来工作
在这篇文章中,我们讨论了不同的clustering策略以及如何建立异步cluster。 故事还没有结束,未来的工作包括:
- 支持updates clustering。
- CLI工具支持clustering。
请跟随本JIRA了解更多关于这个问题的积极发展。
本文为从大数据到人工智能博主「xiaozhch5」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://cloud.tencent.com/developer/article/1936501