一、Shuffle原理
当使⽤reduceByKey、groupByKey、sortByKey、countByKey、join、cogroup等操作的时候,会发⽣shuffle操作。Spark在DAG调度阶段将job划分成多个stage,上游stage做map操作,下游stage做reduce操作,其本质还是MR计算架 构。Shuffle是连接map和reduce之间的桥梁,它将map的输出对应到reduce的输⼊,这期间涉及到序列化和反序列化、跨节点⽹络IO和磁盘读写IO等,所以说shuffle是整个应⽤过程特别昂贵的阶段。
与MapReduce计算框架⼀样,spark的shuffle实现⼤致如下图所⽰,在DAG阶段以shuffle为界,划分stage,上游stage 做map task,每个map task将计算结果数据分成多份,每⼀份对应到下游stage的每个partition中,并将其临时写到磁盘,该过程就叫做shuffle write;下游stage叫做reduce task,每个reduce task通过⽹络拉取指定分区结果数据,该过程叫做shuffle read,最后完成reduce的业务逻辑。
举例:上游stage有100个map task,下游有1000个reduce task,那么这100个map task中每个maptask都会得到1000份数据,⽽这1000个reduce task中的每个reduce task都会拉取上游100个map task对应的那份数据,即第⼀个reduce task会拉取所有map task结果数据的第⼀份,以此类推。
在map阶段,除了map的业务逻辑外,还有shuffle write的过程,这个过程涉及序列化、磁盘IO等耗时操作;在reduce阶段,除了reduce的业务逻辑外,还有shuffle read过程,这个过程涉及到⽹络IO、反序列化等耗时操作。所以整个shuffle过程是极其昂贵的。
因为shuffle是⼀个涉及CPU(序列化反序列化)、⽹络IO(跨节点数据传输)以及磁盘IO(shuffle中间结果落地)的操作,所以应当考虑shuffle相关的调优,提升spark应⽤程序的性能。
二、Shuffle调优
2.1 程序调优;
⾸先,尽量减少shuffle次数;
代码语言:javascript复制//两次shuffle
rdd.map().repartition(1000).reduceByKey(_ _,3000)
//⼀次shuffle
Rdd.map().repartition(3000).reduceByKey(_ _)
然后必要时主动shuffle,通常⽤于改变并⾏度,提⾼后续分布式运⾏速度;
代码语言:javascript复制rdd.repartition(largerNumPartition).map()
最后,使⽤treeReduce&treeAggregate替换reduce&aggregate。数据量较⼤时,reduce&aggregate⼀次性聚 合,shuffle量太⼤,⽽treeReduce&treeAggregate是分批聚合,更为保险。
2.2 参数调优;
spark.shuffle.file.buffer : map task到buffer到磁盘 默认值:32K
参数说明:该参数⽤于设置shuffle write task的BufferedOutputStream的buffer缓冲⼤⼩。将数据写到磁盘⽂件之前,会先写⼊buffer缓冲中,待缓冲写满之后,才会溢写到磁盘;
调优建议:如果作业可⽤的内存资源较为充⾜的话,可以适当增加这个参数的⼤⼩(⽐如64k),从⽽减少shufflewrite过程中溢写磁盘⽂件的次数,也就可以减少磁盘IO次数,进⽽提升性能。在实践中发现,合理调节该参数,性能会有1到5%的提升。
spark.reducer.maxSizeFlight:reduce task去磁盘拉取数据 默认值:48m
参数说明:该参数⽤于设置shuffle read task的buffer缓冲⼤⼩,⽽这个buffer缓冲决定了每次能够拉取多少数据。调优建议:如果作业可⽤的内存资源较为充⾜的话,可以增加这个参数的⼤⼩(⽐如96M),从⽽减少拉取数据的次数,也就可以减少⽹络传输的次数,进⽽提升性能。在实践中发现,合理调节该参数,性能会有1到5%的提升。
Spark.shuffle.io.maxRetries 默认值:3
参数说明:shuffle read task从shuffle write task所在节点拉取属于⾃⼰的数据时,如果因为⽹络异常导致拉取失败,时会⾃动进⾏重试的。该参数就代表了可以重试的最⼤次数,如果在指定次数内拉取属于还是没有成功,就可能会导致作业执⾏失败。
调优建议:对于那些包含了特别耗时的shuffle操作的作业,建议增加重试最⼤次数(⽐如6次),可以避免由于JVM的full gc或者⽹络不稳定等因素导致的数据拉取失败。在实践中发现,对于超⼤数据量(数⼗亿到上百亿)的shuffle过程,调节该参数可以⼤幅度提升稳定性。
Spark.shuffle.io.retryWait 默认值:5s
参数说明:shuffle read task从shuffle write task所在节点拉取属于⾃⼰的数据时,如果拉取失败了每次重试拉取数据的等待时间间隔,默认是5s;
调优建议:建议加⼤时间间隔时长,⽐如60s,以增加shuffle操作的稳定性。
spark.shuffle.memoryFraction 默认值:0.2
参数说明:该参数代表了executor内存中,分配给shuffle read task进⾏聚合操作的内存⽐例,默认是20%;
调优建议:如果内存充⾜,⽽且很少使⽤持久化操作,建议调⾼和这个⽐例,给shuffle read的聚合操作更多内存,以避免由于内存不⾜导致聚合过程中频繁读写磁盘。在实践中发现,合理调节该参数可以将性能提升10%。
Spark.shuffle.manager 默认值:sort
参数说明:该参数⽤于设置shuffleManager的类型。Spark1.5以后有三个可选项:hash、sort和tungsten- sort。Tungsten-sort与sort类似,但是使⽤了tungsten计划中的堆外内存管理机制,内存使⽤效率提⾼。
调优建议:由于sort shuffleManager默认会对数据进⾏排序,因此如果你的业务逻辑中需要该排序机制的话,则使⽤默认的sort ShuffleManager就可以;但是如果你的业务逻辑不需要对数据进⾏排序,那么建议参考后⾯的⼏个参数调优,通过bypass机制或优化的hash ShuffleManager来避免排序操作,同时提供较好的磁盘读写性能。这⾥要注意的是,tungsten-sort要慎⽤,因为之前发现了⼀些相应的bug。
Spark.shuffle.sort.bypassMergeThreshold 默认值:200
参数说明:当shuffleManager为sortshuffleManager时,如果shuffle read task的数量⼩于这个阈值,则shuffle write过程中不会进⾏排序操作,⽽是直接按照未经优化的hashShuffleManager的⽅式去写数据,但是最后会将每个task产⽣的所有临时磁盘⽂件都合并成⼀个⽂件,并会创建单独的索引⽂件。
调优建议:当你使⽤sortShuffleManager时,如果的确不需要排序操作,那么建议将这个参数调⼤⼀些,⼤于shuffleread task的数量,那么此时就会⾃动启⽤bupass机制,map-side就不会进⾏排序,减少了排序的性能开销。但是这种⽅式下,依然会产⽣⼤量的磁盘⽂件,因此shuffle write性能有待提⾼。
Spark.shuffle.consolidateFiles 默认值:false
参数说明:如果使⽤hashShuffleManager,该参数有效。如果设置为true,那么就会开启consilidate机制,会⼤幅度合并shuflle write的输出⽂件,对于shuffle read task数量特别多的情况下,这种⽅法可以极⼤地减少磁盘IO开销,提升性能。
调优建议:如果的确不需要sortHashShuffle的排序机制,那么除了使⽤bypass机制,还可以尝试 将spark.shuffle.manager参数⼿动调节为hash,使⽤hashShuffleManager,同时开启consolidate机制。在实践中尝试过,发现其性能⽐开启了bypass机制的sortshuffleManager要⾼出10%到30%。