概述
从Spark shuffle原理可知,Spark shuffle在计算与IO方面,都可能有较大开销,故,Spark shuffle调优就是优化这2个方面。 这里仅关注调参的调优方式,不关注应用代码层面的调优。
计算方面
计算方面可分为为Cpu、内存、算法这3个方面,其中Cpu一般是不用纳入shuffle优化范畴的,内存方面主要关注内存比例与堆外内存,算法指的是采用什么类型的ShuffleManager。
1、内存
spark.shuffle.io.preferDirectBufs 默认为true,即启用堆外内存,可以减少shuffle与block传输期间的gc,若堆外内存非常紧张,则可以考虑关闭这个选项 spark.yarn.executor.memoryoverhead 默认申请的堆外内存是Executor内存的10%,处理大数据的时候,这里会出现问题,导致spark作业反复崩溃,无法运行,故手动设置这个参数,到至少1G(1024M),甚至2G、4G spark.shuffle.memoryFraction 此参数属于spark静态内存模型中的参数,统一内存模型中不再生效 spark.memory.fraction 默认是0.6,即Spark存储内存与执行内存总共占用Executor内存的60%,一般不建议修改 spark.memory.storageFraction 默认是0.5,即Spark存储内存与执行内存各占spark.memory.fraction的50%,一般不建议修改
2、Shuffle算法
spark.shuffle.manager 默认是sort,即SortShuffleManager,hash则在spark2.0之后被废弃并移除 spark.shuffle.sort.bypassMergeThreshold 默认是200,即若shuffle read task的数量小于等于200,则在sortshufflemanager模式下,会启用bypass SortShuffleManager。 若为spark2.0以下版本,这里的调优建议是bypass SortShuffleManager的性能有时并不如开启“合并运行机制”的HashShuffleManager,故当shuffle read task的数量小于等于200时,可尝试“合并运行机制”的HashShuffleManager,与bypass SortShuffleManager进行性能对比。在实践中发现“合并运行机制”的HashShuffleManager的性能比bypass SortShuffleManager要高出10%~30% spark.shuffle.consolidateFiles 默认为false,即不使用“合并运行机制”的HashShuffleManager,该参数在spark2.0之后被废弃并移除
IO方面
1、缓存
spark.shuffle.file.buffer 默认值:32k 参数说明:用于设置shuffle write task的BufferedOutputStream的buffer缓冲大小。将数据写到磁盘文件之前,会先写入buffer缓冲中,待缓冲写满之后,才会溢写到磁盘。默认使用这么小的缓存,是希望在硬件较小的情况下也可以部署。 调优建议:若作业可用的内存资源较为充足的话,可以适当增加这个参数的大小(比如64k),从而减少shuffle write过程中溢写磁盘文件的次数,也就可以减少磁盘IO次数,进而提升性能。在实践中发现,合理调节该参数,性能会有1%~5%的提升。 spark.reducer.maxSizeInFlight 默认值:48m 参数说明:用于设置shuffle read task的buffer缓冲大小,这个buffer缓冲决定了每次能够拉取多少数据。 调优建议:若作业可用的内存资源较为充足的话,可以适当增加这个参数的大小(比如96m),从而减少拉取数据的次数,也就可以减少网络传输的次数,进而提升性能。在实践中发现,合理调节该参数,性能会有1%~5%的提升。
2、压缩
spark.shuffle.compress 默认值:true 参数说明:判断是否对mapper端聚合输出进行压缩,当设置为true,表示在每个shuffle过程中都会对mapper端的输出进行压缩,减少shuffle过程中下一个stage向上一个stage抓取数据的网络开销,减轻shuffle的压力。 调优建议:压缩会消耗大量的CPU资源,故打开压缩选项会增加Map任务的执行时间,因此当CPU负载的影响远大于磁盘和网络带宽的影响时,可设置为false。 spark.io.compression.codec 默认值:spark2.2之前默认压缩方式snappy,spark2.2之后默认是lz4 参数说明:压缩内部数据,如RDD分区,广播变量和shuffle输出的数据,该参数的值有三个选项,分别是snappy,lz4和lzf。 调优建议:无。 spark.shuffle.spill.compress 默认值:true 参数说明:在shuffle过程中,是否压缩spill的数据 调优建议:压缩会消耗大量的CPU资源,故打开压缩选项会增加Map任务的执行时间,因此当CPU负载的影响远大于磁盘和网络带宽的影响时,可设置为false。
3、网络
spark.shuffle.io.maxRetries 默认值:3 参数说明:shuffle read task从shuffle write task所在节点拉取属于自己的数据时,如果因为网络异常导致拉取失败,是会自动进行重试的。该参数就代表了可以重试的最大次数。如果在指定次数之内拉取还是没有成功,就可能会导致作业执行失败。 调优建议:通常建议调节到8~10次,对于那些包含了特别耗时的shuffle操作的作业,建议增加重试最大次数(比如60次),以避免由于JVM的full gc或者网络不稳定等因素导致的数据拉取失败,调节该参数可以大幅度提升稳定性。 spark.shuffle.io.retryWait 默认值:5s 参数说明:每次重试拉取数据的等待间隔 调优建议:通常建议加大时长,理由同上。 spark.shuffle.service.enabled 默认值:false 参数说明:设置客户端读取Executor上的shuffle文件的方式,默认值是false,表示使用BlockTransferService读取;设置为true,表示BlockManager实例生成时,需要读取spark.shuffle.service.port配置的端口,同时对应的BlockManager的shuffleclient不再是默认的BlockTransferService实例,而是ExternalShuffleClient实例。启用外部shuffle服务,这个服务会安全地保存shuffle过程中,executor写的磁盘文件,因此executor即使挂掉也不要紧,必须配合spark.dynamicAllocation.enabled属性设置为true,才能生效,而且外部shuffle服务必须进行安装和启动,才能启用这个属性。 调优建议:Spark系统在运行含shuffle过程的应用时,Executor进程除了运行task,还要负责写shuffle数据,给其他Executor提供shuffle数据。当Executor进程任务过重,导致GC而不能为其他Executor提供shuffle数据时,会影响任务运行。 External shuffle Service是长期存在于NodeManager进程中的一个辅助服务。通过该服务来抓取shuffle数据,减少了Executor的压力,在Executor GC的时候也不会影响其他Executor的任务运行。 在NodeManager中启动External shuffle Service。在“yarn-site.xml”中添加如下配置项: <property> <name>yarn.nodemanager.aux-services</name> <value>spark_shuffle</value> </property> <property> <name>yarn.nodemanager.aux-services.spark_shuffle.class</name> <value>org.apache.spark.network.yarn.YarnShuffleService</value> </property> <property> <name>spark.shuffle.service.port</name> <value>7337</value> </property>