数据倾斜?Spark 3.0 AQE专治各种不服(下)

2021-02-23 16:09:02 浏览数 (1)

Spark3.0AQE在FreeWheel的应用与实践

FreeWheel团队通过高效的敏捷开发赶在 2020 年圣诞广告季之前在生产环境顺利发布上线,整体性能提升高达 40%(对于大 batch)的数据,AWS Cost 平均节省 25%~30%之间,大约每年至少能为公司节省百万成本。

主要升级改动

打开 Spark 3.0 AQE 的新特性,主要配置如下:

代码语言:javascript复制
"spark.sql.adaptive.enabled": true,
"spark.sql.adaptive.coalescePartitions.enabled": true,
"spark.sql.adaptive.coalescePartitions.minPartitionNum": 1,
"spark.sql.adaptive.advisoryPartitionSizeInBytes": "128MB"

需要注意的是,AQE 特性只是在 reducer 阶段不用指定 reducer 的个数,但并不代表你不再需要指定任务的并行度了。因为 map 阶段仍然需要将数据划分为合适的分区进行处理,如果没有指定并行度会使用默认的 200,当数据量过大时,很容易出现 OOM。建议还是按照任务之前的并行度设置来配置参数spark.sql.shuffle.partitions和spark.default.parallelism。

我们来仔细看一下为什么升级到 3.0 以后可以减少运行时间,又能节省集群的成本。以 Optimus 数据建模里的一张表的运行情况为例:

  • 在 reduce 阶段从没有 AQE 的40320个 tasks 锐减到4580个 tasks,减少了一个数量级。
  • 下图里下半部分是没有 AQE 的 Spark 2.x 的 task 情况,上半部分是打开 AQE 特性后的 Spark 3.x 的情况。
  • 从更详细的运行时间图来看,shuffler reader后同样的 aggregate 的操作等时间也从4.44h到2.56h,节省将近一半。
  • 左边是 spark 2.x 的运行指标明细,右边是打开 AQE 后通过custom shuffler reader后的运行指标情况。
性能提升

AQE性能

AQE对于整体的 Spark SQL 的执行过程做了相应的调整和优化(如下图),它最大的亮点是可以根据已经完成的计划结点真实且精确的执行统计结果来不停的反馈并重新优化剩下的执行计划。

AQE 自动调整 reducer 的数量,减小 partition 数量。Spark 任务的并行度一直是让用户比较困扰的地方。如果并行度太大的话,会导致 task 过多,overhead 比较大,整体拉慢任务的运行。而如果并行度太小的,数据分区会比较大,容易出现 OOM 的问题,并且资源也得不到合理的利用,并行运行任务优势得不到最大的发挥。

而且由于 Spark Context 整个任务的并行度,需要一开始设定好且没法动态修改,这就很容易出现任务刚开始的时候数据量大需要大的并行度,而运行的过程中通过转化过滤可能最终的数据集已经变得很小,最初设定的分区数就显得过大了。AQE 能够很好的解决这个问题,在 reducer 去读取数据时,会根据用户设定的分区数据的大小(spark.sql.adaptive.advisoryPartitionSizeInBytes)来自动调整和合并(Coalesce)小的 partition,自适应地减小 partition 的数量,以减少资源浪费和 overhead,提升任务的性能。

由上面单张表可以看到,打开 AQE 的时候极大的降低了 task 的数量,除了减轻了 Driver 的负担,也减少启动 task 带来的 schedule,memory,启动管理等 overhead,减少 cpu 的占用,提升的 I/O 性能。

拿历史 Data Pipelines 为例,同时会并行有三十多张表在 Spark 里运行,每张表都有极大的性能提升,那么也使得其他的表能够获得资源更早更多,互相受益,那么最终整个的数据建模过程会自然而然有一个加速的结果。

大 batch(>200G)相对小 batch(< 100G )有比较大的提升,有高达 40%提升,主要是因为大 batch 本身数据量大,需要机器数多,设置并发度也更大,那么 AQE 展现特性的时刻会更多更明显。而小 batch 并发度相对较低,那么提升也就相对会少一些,不过也是有 27.5%左右的加速。

内存优化

除了因为 AQE 的打开,减少过碎的 task 对于 memory 的占用外,Spark 3.0 也在其他地方做了很多内存方面的优化,比如 Aggregate 部分指标瘦身、Netty 的共享内存 Pool 功能、Task Manager 死锁问题、避免某些场景下从网络读取 shuffle block等等,来减少内存的压力。一系列内存的优化加上 AQE 特性叠加从前文内存实践图中可以看到集群的内存使用同时有30%左右的下降。

实践成果

升级主要的实践成果如下:

性能提升明显

  • 历史数据 Pipeline 对于大 batch 的数据(200~400G/每小时)性能提升高达40%, 对于小 batch(小于 100G/每小时)提升效果没有大 batch 提升的那么明显,每天所有 batches平均提升水平27.5%左右。
  • 预测数据性能平均提升30%。由于数据输入源不一样,目前是分别两个 pipelines 在跑历史和预测数据,产生的表的数目也不太一样,因此做了分别的评估。

以历史数据上线后的端到端到运行时间为例(如下图),肉眼可见上线后整体 pipeline 的运行时间有了明显的下降,能够更快的输出数据供下游使用。

集群内存使用降低

集群内存使用对于大 batch 达降低30%左右,每天平均平均节省25%左右。

以历史数据上线后的运行时集群的 memory 在 ganglia 上的截图为例(如下图),整体集群的内存使用从 41.2T 降到 30.1T,这意味着我们可以用更少的机器花更少的钱来跑同样的 Spark 任务。

AWS Cost 降低

Pipelines 做了自动的 Scale In/Scale Out 策略: 在需要资源的时候扩集群的 Task 结点,在任务结束后自动去缩集群的 Task 结点,且会根据每次 batch 数据的大小通过算法学习得到最佳的机器数。通过升级到 Spark 3.0 后,由于现在任务跑的更快并且需要的机器更少,上线后统计 AWS Cost 每天节省30%左右,大约一年能为公司节省百万成本。

0 人点赞