一提到shuffle, 我们犹如“谈虎色变”。shuffle是大数据中的性能杀手,其来源于大数据中的元老级的组件Hadoop。
代码语言:javascript复制 在Hadoop中,map被定义为数据的初次拆分获取解析阶段, reduce被定义为负责最终数据的收集汇总阶段,除了业务
逻辑的功能外,其他的核心数据处理都是由shuffle来支持。
在Hadoop组件中定义的Shuffle包括了什么呢? 为什么Shuffle是资源和时间开销比较大的阶段呢?
简单来说,Shuffle中有三次数据排序。
- map端内存中的快速排序。shuffle的map端会在内存中开辟了一个缓冲区,当K-V数据从map出来后,分批进入缓冲区,对它们按K进行排序,并且按照map的逻辑进行分区,在出缓冲区落盘的时候,完成排序。
- map端分区文件的归并排序。一旦内存中缓冲区满了,就会被Hadoop写到file中,这个过程交spill, 所写的文件被叫做spill File(这些 spill file 存在 map 所在 host 的 local disk 上,而不是 HDFS)。 当 Map 结束时,这些 spill file 会被 merge 起来,按照分区进行归并排序合并为多个文件。
- reduce端文件归并排序。Reducer 内部有一个 thread 负责定期询问 map output 的位置。另一个 thread 会把拷贝过来的 map output file merge 成更大的 file。当一个 reduce task 所有的 map output 都被拷贝到一个它的 host上时,reduce 就要开始对他们排序了。
Spark中的Shuffle
Spark 中的shuffle, 经历了Hash、Sort 和 Tungsten-Sort 3个重要阶段。
在1.1之前Spark采用Hash Shuffle, 1.1 之后引入Sort Shuffle, 1.6 时引入Tungsten-Sort Shuffle, 2.0 版本所有的Shuffle被统一到了Sort Shuffle中,3.2 时引入push-based shuffle; 当然还有未被合入社区,但在各大厂被开源使用Remote Shuffle Service;除此以外还有将向量计算引入shuffle计算的实现。
由上面的介绍可知, Shuffle 就是将map阶段的拆分数据,通过设置的聚合方式按照分区进行聚合后,由reduce进行处理的过程。下面我们来总的认识下Spark中的shuffle方式:
1. Hash Shuffle
Hash Shuffle, 顾名思义,就是采取Hash的方式在Map的任务中为每个reduce端的任务生成一个文件。因此如果有M个map任务, R个reduce任务就会产生M x R个文件。巨量磁盘小文件而产生大量性能低下的Io操作,从而性能较低,因为其巨量的磁盘小文件还可能导致OOM,HashShuffle的合并机制通过重复利用buffer从而将磁盘小文件的数量降低到Core R个,但是当Reducer 端的并行任务或者是数据分片过多的时候,依然会产生大量的磁盘小文件。
代码语言:javascript复制 开启consolidate机制之后,在shuffle write过程中,task就不是为下游stage的每个task创建一个
磁盘文件了。此时会出现shuffleFileGroup的概念,每个shuffleFileGroup会对应一批磁盘文件,磁盘
文件的数量与下游stage的task数量是相同的。一个Executor上有多少个CPU core,就可以并行执行多少
个task。而第一批并行执行的每个task都会创建一个shuffleFileGroup,并将数据写入对应的磁盘文件内。
Executor的CPU core执行完一批task,接着执行下一批task时,下一批task就会复用之前已有的
shuffleFileGroup,包括其中的磁盘文件。也就是说,此时task会将数据写入已有的磁盘文件中,而不会
写入新的磁盘文件中。因此,consolidate机制允许不同的task复用同一批磁盘文件,这样就可以有效
将多个task的磁盘文件进行一定程度上的合并,从而大幅度减少磁盘文件的数量,进而提升
shuffle write的性能。
2. Sort Shuffle
Sort Shuffle 的引入是如何解决上述问题的呢?
首先,在Shuffle的map阶段会将所有数据进行排序,并将分区的数据写入同一个文件中,在创建数据文件的同时会产生索引文件,来记录分区的大小和偏移量。所以这里产生文件的数量和reduce分区就没有关系了,只会产生2 * M个临时文件。
下面我们先通过简单分析来对Spark Shuffle有个简单的了解,后面再详细介绍:
Spark 在启动时就会在SparkEnv中创建ShuffleManager来管理shuffle过程。
代码语言:javascript复制// Let the user specify short names for shuffle managers
val shortShuffleMgrNames =Map(
"sort" ->classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName,
"tungsten-sort" ->classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName)
val shuffleMgrName = conf.get("spark.shuffle.manager", "sort")
val shuffleMgrClass =
shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase(Locale.ROOT), shuffleMgrName)
val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass)
从上面的代码可以看出,Spark目前只有唯一一种ShuffleManager的实现方式,就是SortShuffleManager。
下面我们可以看下ShuffleManager这个接口类:
代码语言:javascript复制private[spark] trait ShuffleManager {
/**
* Register a shuffle with the manager and obtain a handle for it to pass to tasks.
*向shuffleManager注册shuffle,并返回handle
*/
def registerShuffle[K, V, C](
shuffleId: Int,
numMaps: Int,
dependency: ShuffleDependency[K, V, C]): ShuffleHandle
/** Get a writer for a given partition. Called on executors by map tasks. */
def getWriter[K, V](handle: ShuffleHandle, mapId: Int, context: TaskContext): ShuffleWriter[K, V]
/**
* Get a reader for a range of reduce partitions (startPartition to endPartition-1, inclusive).
* Called on executors by reduce tasks.
*/
def getReader[K, C](
handle: ShuffleHandle,
startPartition: Int,
endPartition: Int,
context: TaskContext): ShuffleReader[K, C]
...
}
- registerShuffle()方法用于注册一种shuffle机制,并返回对应的ShuffleHandle(类似于句柄),handle内会存储shuffle依赖信息。根据该handle可以进一步确定采用ShuffleWriter/ShuffleReader的种类。
- getWriter()方法用于获取ShuffleWriter。它是executor执行map任务时调用的。
- getReader()方法用于获取ShuffleReader。它是executor执行reduce任务时调用的。
我们都知道在Spark的DAG中,顶点是一个个 RDD,边则是 RDD 之间通过 dependencies 属性构成的父子关系。dependencies 又分为宽依赖和窄依赖,分别对应ShuffleDependency和NarrowDependency。当RDD间的依赖关系为ShuffleDependency时,RDD会通过其SparkEnv向ShuffleManager注册一个shuffle, 并返回当前处理当前shuffle所需要的句柄。
Spark有2个类型的Task: ShuffleMapTask和ResultTask,在Spark中stage是以Pipeline运行的, 除了最后一个Stage对应的是ResultStage,其余的Stage对应的都是ShuffleMapStage。
ShuffleMapStage中的每个Task,叫做ShuffleMapTask,getWriter()方法的调用主要是在ShuffleMapTask中进行。调用getWriter方法会返回一个ShuffleWriter的trait。
除了需要从外部存储读取数据和RDD已经做过cache或者checkpoint的Task,一般Task的开始都是从ShuffledRDD的调用getReader()。调用getReader()会返回一个ShuffleReader的trait。
微信截图_20220519215721.png
综上,Spark的Shuffle模块主要有ShuffleManager、ShuffleWriter和ShuffleReader。ShuffleManager目前在社区版中只有SortShuffleManager一种实现,ShuffleReader也只有BlockStoreShuffleReader一种实现,但是ShuffleWriter目前有BypassMergerSortShuffleWriter, SortShuffleWriter和UnsafeShuffleWriter三种实现。
3. Push-Based Shuffle
push-based shuffle方案,会在mapper执行后并且会被自动合并数据,然后将数据移动到下游的reducer。目前只支持Yarn方式的实现。
在中大规模的Spark shuffle中,Shuffle依然是很多的性能问题:
- 第一个挑战是可靠性问题。由于计算节点数据量大和 shuffle 工作负载的规模,可能会导致 shuffle fetch 失败,从而导致昂贵的 stage 重试。
- 第二个挑战是效率问题。由于 reducer 的 shuffle fetch 请求是随机到达的,因此 shuffle 服务也会随机访问 shuffle 文件中的数据。如果单个 shuffle 块大小较小,则 shuffle 服务产生的小随机读取会严重影响磁盘吞吐量,从而延长 shuffle fetch 等待时间。
- 第三个挑战是扩展问题。由于 external shuffle service 是我们基础架构中的共享服务,因此一些对 shuffle services 错误调优的作业也会影响其他作业。当一个作业错误地配置导致产生许多小的 shuffle blocks 将会给 shuffle 服务带来压力时,它不仅会给自身带来性能下降,还会使共享相同 shuffle 服务的所有相邻作业的性能下降。这可能会导致原本正常运行的作业出现不可预测的运行时延迟,尤其是在集群高峰时段。
push-based shuffle 利用共享的ESS服务,在map阶段时将溢写的数据文件,通过推送的方式推送到reduce对应的ESS节点,并在其中对小文件进行合并。其中push-based shuffle 实现了一个magnet shuffle服务,是一个增强的Spark ESS,它可以接受远程推送的shuffle block,它会将block合并到每一个唯一shuffle分区文件。
push merge shuffle采用的push-merge shuffle机制。Mapper生成的shuffle数据被推送到远程的magnet shuffle服务,并按照每个shuffle合并。Magnet在此期间可以将小的shuffle块的随机读取转换为MB大小的顺序读取。这个push操作与map任务完全解耦,所以无需添加到执行map任务的运行时中,一旦推送失败就会导致maptask失败。
尽可能地执行push,Magnet无需所有的shuffle都是完美的push成功。通过push-merge shuffle,Magnet复制shuffle数据,Reducer可以获取合并后的、或者是没有合并的shuffle数据作为任务输入。也就是,即使没有合并也可以读取。
那么Spark是如何选择Sort-based ShuffleWriter的具体实现方式呢?
ShuffleWriter方式的选择
代码语言:javascript复制override def registerShuffle[K, V, C](
shuffleId: Int,
numMaps: Int,
dependency: ShuffleDependency[K, V, C]): ShuffleHandle = {
if (SortShuffleWriter.shouldBypassMergeSort(conf, dependency)) {
// If there are fewer than spark.shuffle.sort.bypassMergeThreshold partitions and we don't
// need map-side aggregation, then write numPartitions files directly and just concatenate
// them at the end. This avoids doing serialization and deserialization twice to merge
// together the spilled files, which would happen with the normal code path. The downside is
// having multiple files open at a time and thus more memory allocated to buffers.
new BypassMergeSortShuffleHandle[K, V](
shuffleId, numMaps, dependency.asInstanceOf[ShuffleDependency[K, V, V]])
} else if (SortShuffleManager.canUseSerializedShuffle(dependency)) {
// Otherwise, try to buffer map outputs in a serialized form, since this is more efficient:
new SerializedShuffleHandle[K, V](
shuffleId, numMaps, dependency.asInstanceOf[ShuffleDependency[K, V, V]])
} else {
// Otherwise, buffer map outputs in a deserialized form:
new BaseShuffleHandle(shuffleId, numMaps, dependency)
}
}
可以看出,根据条件的不同,会返回3种不同的handle,对应3种shuffle机制。从上到下来分析一下:
1. 检查是否符合SortShuffleWriter.shouldBypassMergeSort()方法的条件:
代码语言:javascript复制def shouldBypassMergeSort(conf: SparkConf, dep: ShuffleDependency[_, _, _]): Boolean = {
// We cannot bypass sorting if we need to do map-side aggregation.
if (dep.mapSideCombine) {
false
} else {
val bypassMergeThreshold: Int = conf.getInt("spark.shuffle.sort.bypassMergeThreshold", 200)
dep.partitioner.numPartitions <= bypassMergeThreshold
}
}
判断是否符合bypassMergeSort的条件主要有以下两个:
- 该shuffle依赖中没有map端聚合操作(如groupByKey()算子)
- 分区数不大于参数
spark.shuffle.sort.bypassMergeThreshold
规定的值(默认200)
那么会返回BypassMergeSortShuffleHandle,启用bypass merge-sort shuffle机制。
2. 如果不启用上述bypass机制,那么继续检查是否符合canUseSerializedShuffle()方法的条件:
代码语言:javascript复制def canUseSerializedShuffle(dependency: ShuffleDependency[_, _, _]): Boolean = {
val shufId = dependency.shuffleId
val numPartitions = dependency.partitioner.numPartitions
if (!dependency.serializer.supportsRelocationOfSerializedObjects) {
log.debug(/*...*/)
false
} else if (dependency.aggregator.isDefined) {
log.debug(/*...*/)
false
} else if (numPartitions > MAX_SHUFFLE_OUTPUT_PARTITIONS_FOR_SERIALIZED_MODE) {
log.debug(/*...*/)
false
} else {
log.debug(/*...*/)
true
}
}
}
也就是说,如果同时满足以下三个条件:
- 使用的序列化器支持序列化对象的重定位(如KryoSerializer)
- shuffle依赖中完全没有聚合操作
- 分区数不大于常量
MAX_SHUFFLE_OUTPUT_PARTITIONS_FOR_SERIALIZED_MODE
的值(最大分区ID号 1,即2^24=16777216)
那么会返回SerializedShuffleHandle,启用序列化sort shuffle机制(也就是tungsten-sort)。
3. 如果既不用bypass也不用tungsten-sort,那么就返回默认的BaseShuffleHandle,采用基本的sort shuffle机制。
Untitled.png
目前spark中只有一种Shuffle的实现方式,即sort-Shuffle, 但是它包括了使用Tungsten实现的unsafeSortShuffle, 不需要排序的BypassMergeSortShuffle和baseSortShuffle, 并通过实现ShuffleWriter的方式根据不通过的情况进行选择,可以更好的处理不同情况的数据任务。
总结,Spark为不同情况实现不同有趣的Shuffle机制。Spark的shuffle是通过将中间文件物化到spark.local.dir的本地临时文件中,来增强spark的容错性,但是也造成了shuffle时的性能压力。在传统的Hash shuffle中,可以直接将map端的数据shuffle归类为对应的reduce 分区的数据,但是也造成了产生MxN(M是map Task, N 是Reduce Task)数量级的中间文件, 即使通过重用buffer, 将不同批次的Task的写入文件进行重用,减少了一定量的数据文件,但是并不能从根本上减少文件的数量级。采用Sort-based Shuffle 主要是使用在数据量比较大的情况下,通过将map端的数据进行排序,并生成文件索引,那么就可以通过读取文件的偏移量来区别不同的reduce应该拉取那部分的数据,产生的中间文件数据也变成了2 * M 个, 大大的减少了处理的文件数量。但是随着数据服务压力增加,大量的中间小文件会造成随机io, io的压力也会导致fetchfail的发生几率的上升,push-based shuffle 主要是将map端的数据push到共享ESS进行合并,进一步的减少小文件的数量,将随机io变为顺序io, 同时减少中间文件的数量,提升集群的稳定性。
今天就先到这里,通过上面的介绍,我们也留下些面试题?
- Sort-based Shuffle, push- based Shuffle 和 Remote Shuffle Service 那么现在社区或业界提供这么多有趣的shuffle可供选择,那么应用中应该如何选择具体的Shuffle方式?他们的适用范围是什么?
- 如果你要实现一种新的ShuffleManage应该怎么在Spark实现配置?
- 既然是Sort-based Shuffle 那么Shuffle后的数据是否是有序的?
- Remote Shuffle Service 和 Push-based Shuffle 他们的优劣分别是什么?