一、Shuffle机制
在MapReduce框架中,Shuffle是连接Map和Reduce之间的桥梁,Map的输出要用到Reduce中必须经过Shuffle这个环节,Shuffle的性能高低直接影响了整个程序的性能和吞吐量。Spark作为MapReduce框架的一种实现,自然也实现了Shuffle的逻辑。对于大数据计算框架而言,Shuffle阶段的效率是决定性能好坏的关键因素之一。
二、什么是Shuffle
Shuffle是MapReduce框架中的一个特定的阶段,介于Map阶段和Reduce阶段之间,当Map的输出结果要被Reduce使用时,输出结果需要按关键字值(key)哈希,并且分发到每一个Reducer上,这个过程就是Shuffle。直观来讲,Spark Shuffle机制是将一组无规则的数据转换为一组具有一定规则数据的过程。由于Shuffle涉及了磁盘的读写和网络的传输,因此Shuffle性能的高低直接影响整个程序的运行效率。
在MapReduce计算框架中,Shuffle连接了Map阶段和Reduce阶段,即每个Reduce Task从每个Map Task产生的数据中读取一片数据,极限情况下可能触发M*R个数据拷贝通道(M是Map Task数目,R是Reduce Task数目)。通常Shuffle分为两部分:Map阶段的数据准备和Reduce阶段的数据拷贝。
首先,Map阶段需根据Reduce阶段的Task数量决定每个Map Task输出的数据分片数目,有多种方式存放这些数据分片:
- 保存在内存中或者磁盘上(Spark和MapReduce都存放在磁盘上)。
- 每个分片对应一个文件(现在Spark采用的方式,以前MapReduce采用的方式),或者所有分片放到一个数据文件中,外加一个索引文件记录每个分片在数据文件中的偏移量(现在MapReduce采用的方式)。
因此可以认为Spark Shuffle与Mapreduce Shuffle的设计思想相同,但在实现细节和优化方式上不同。
spark的shuffleManager是负责shuffle过程的执行、计算和处理的组件。shuffleManager是trait,Spark中有两种Shuffle管理类型,HashShuffleManager和SortShuffleManager,Spark1.2之前是HashShuffleManager,Spark1.2引入SortShuffleManager,在Spark2.0 版本中已经将HashShuffleManager丢弃。
三、HashShuffleManager
shuffle write阶段,默认Mapper阶段会为Reducer阶段的每一个Task单独创建一个文件来保存该Task中要使用的数据。
这种方式操作数据简单,但是在一些情况下(例如数据量非常大的情况)会造成大量文件(M*R,其中M代表Mapper中的所有的并行任务数量,R代表Reducer中所有的并行任务数据)大数据的随机磁盘I/O操作且会形成大量的Memory(极易造成OOM)。所以产生了以下两个问题:
- 不能够处理大规模的数据。
- Spark不能够运行在大规模的分布式集群上。
改进方案:通过设置spark.shuffle.consolidateFiles 该参数默认值为false,将其设置为true即可开启优化的Consolidate机制。
Consolidate机制来将Shuffle时候产生的文件数量减少到C*R个(C代表在Mapper端,同时能够使用的cores数量,R代表Reducer中所有的并行任务数量)。但是此时如果Reducer端的并行数据分片过多的话则C*R可能已经过大,也有打开过多文件的问题。Consolidate并没有降低并行度,只是降低了临时文件的数量,此时Mapper端的内存消耗就会变少,所以OOM也就会降低,另外一方面磁盘的性能也会变得更好。
开启consolidate机制之后,在shuffle write过程中,task就不是为下游stage的每个task创建一个磁盘文件了。此时会出现shuffleFileGroup的概念,每个shuffleFileGroup会对应一批磁盘文件,磁盘文件的数量与下游stage的task数量是相同的。一个Executor上有多少个CPU core,就可以并行执行多少个task。而第一批并行执行的每个task都会创建一个shuffleFileGroup,并将数据写入对应的磁盘文件内。
当Executor的CPU core执行完一批task,接着执行下一批task时,下一批task就会复用之前已有的shuffleFileGroup,包括其中的磁盘文件。也就是说,此时task会将数据写入已有的磁盘文件中,而不会写入新的磁盘文件中。因此,consolidate机制允许不同的task复用同一批磁盘文件,这样就可以有效将多个task的磁盘文件进行一定程度上的合并,从而大幅度减少磁盘文件的数量,进而提升shuffle write的性能。
从理论上讲Shuffle consolidation产生的Shuffle文件数量为C×R,其中C是Spark集群的core number, R是Reducer的个数。
四、SortShuffleManager
HashShuffle写数据的时候,内存有一个bucket缓冲区,同时在本地磁盘有对应的本地文件,如果本地有文件,那么在内存应该也有文件句柄也是需要耗费内存的。也就是说,从内存的角度考虑,即有一部分存储数据,一部分管理文件句柄。如果Mapper分片数量为1000,Reduce分片数量为1000,那么总共就需要1000000个小文件。所以就会有很多内存消耗,频繁IO以及GC频繁或者出现内存溢出。而且Reducer端读取Map端数据时,Mapper有这么多小文件,就需要打开很多网络通道读取,很容易造成Reducer(下一个stage)通过driver去拉取上一个stage数据的时候,说文件找不到,其实不是文件找不到而是程序不响应,因为正在GC。
为了缓解Shuffle过程产生文件数过多和Writer缓存开销过大的问题,spark引入了类似于hadoop Map-Reduce的shuffle机制。该机制每一个ShuffleMapTask不会为后续的任务创建单独的文件,而是会将所有的Task结果写入同一个文件,并且对应生成一个索引文件。以前的数据是放在内存缓存中,等到数据完了再刷到磁盘,现在为了减少内存的使用,在内存不够用的时候,可以将输出溢写到磁盘,结束的时候,再将这些不同的文件联合内存的数据一起进行归并,从而减少内存的使用量。一方面文件数量显著减少,另一方面减少Writer缓存所占用的内存大小,而且同时避免GC的风险和频率。
SortShuffle的运行机制主要分成两种:普通运行机制和bypass运行机制。
普通运行机制:
执行流程:
- map task 的计算结果会写入到一个内存数据结构里面,内存数据结构默认是5M。
- 在shuffle的时候会有一个定时器,不定期的去估算这个内存结构的大小,当内存结构中的数据超过5M时,比如现在内存结构中的数据为5.01M,那么他会申请5.01*2-5=5.02M内存给内存数据结构。
- 如果申请成功不会进行溢写,如果申请不成功,这时候会发生溢写磁盘。
- 在溢写之前内存结构中的数据会进行排序分区。
- 然后开始溢写磁盘,写磁盘是以batch的形式去写,一个batch是1万条数据。
- map task执行完成后,会将这些磁盘小文件合并成一个大的磁盘文件,同时生成一个索引文件。
- reduce task去map端拉取数据的时候,首先解析索引文件,根据索引文件再去拉取对应的数据。
总结:这种机制产生磁盘小文件的个数:2*M(map task的个数)。
byPass运行机制:
bypass运行机制的触发条件如下:
1.shuffle map task数量小于spark.shuffle.sort.bypassMergeThreshold参数的值(默认是200)。
这个参数仅适用于SortShuffleManager,如前所述,SortShuffleManager在处理不需要排序的Shuffle操作时,由于排序带来性能的下降。这个参数决定了在这种情况下,当Reduce分区的数量小于多少的时候,在SortShuffleManager内部不使用Merge Sort的方式处理数据,而是与Hash Shuffle类似,直接将分区文件写入单独的文件,不同的是,在最后一步还是会将这些文件合并成一个单独的文件。这样通过去除Sort步骤来加快处理速度,代价是需要并发打开多个文件,所以内存消耗量增加,本质上是相对HashShuffleMananger一个折衷方案。这个参数的默认值是200个分区,如果内存GC问题严重,可以降低这个值。
2.不是聚合类的shuffle算子(比如reduceByKey)。
此时task会为每个下游task都创建一个临时磁盘文件,并将数据按key进行hash然后根据key的hash值,将key写入对应的磁盘文件之中。当然,写入磁盘文件时也是先写入内存缓冲,缓冲写满之后再溢写到磁盘文件的。最后,同样会将所有临时磁盘文件都合并成一个磁盘文件,并创建一个单独的索引文件。
该过程的磁盘写机制其实跟未经优化的HashShuffleManager是一模一样的,因为都要创建数量惊人的磁盘文件,只是在最后会做一个磁盘文件的合并而已。因此少量的最终磁盘文件,也让该机制相对未经优化的HashShuffleManager来说,shuffle read的性能会更好。
而该机制与普通SortShuffleManager运行机制的不同在于:第一,磁盘写机制不同;第二,不会进行排序。也就是说,启用该机制的最大好处在于,shuffle write过程中,不需要进行数据的排序操作,也就节省掉了这部分的性能开销。
总结:这种机制产生的磁盘的小文件为:2*M(map task的个数)。