[SPARK][CORE] 面试问题 之 Spark Shuffle概述

2022-05-20 09:06:42 浏览数 (1)

一提到shuffle, 我们犹如“谈虎色变”。shuffle是大数据中的性能杀手,其来源于大数据中的元老级的组件Hadoop。

代码语言:javascript复制
    在Hadoop中,map被定义为数据的初次拆分获取解析阶段, reduce被定义为负责最终数据的收集汇总阶段,除了业务
逻辑的功能外,其他的核心数据处理都是由shuffle来支持。

在Hadoop组件中定义的Shuffle包括了什么呢? 为什么Shuffle是资源和时间开销比较大的阶段呢?

简单来说,Shuffle中有三次数据排序。

  1. map端内存中的快速排序。shuffle的map端会在内存中开辟了一个缓冲区,当K-V数据从map出来后,分批进入缓冲区,对它们按K进行排序,并且按照map的逻辑进行分区,在出缓冲区落盘的时候,完成排序。
  2. map端分区文件的归并排序。一旦内存中缓冲区满了,就会被Hadoop写到file中,这个过程交spill, 所写的文件被叫做spill File(这些 spill file 存在 map 所在 host 的 local disk 上,而不是 HDFS)。 当 Map 结束时,这些 spill file 会被 merge 起来,按照分区进行归并排序合并为多个文件。
  3. reduce端文件归并排序。Reducer 内部有一个 thread 负责定期询问 map output 的位置。另一个 thread 会把拷贝过来的 map output file merge 成更大的 file。当一个 reduce task 所有的 map output 都被拷贝到一个它的 host上时,reduce 就要开始对他们排序了。

Spark中的Shuffle

Spark 中的shuffle, 经历了Hash、Sort 和 Tungsten-Sort 3个重要阶段。

在1.1之前Spark采用Hash Shuffle, 1.1 之后引入Sort Shuffle, 1.6 时引入Tungsten-Sort Shuffle, 2.0 版本所有的Shuffle被统一到了Sort Shuffle中,3.2 时引入push-based shuffle; 当然还有未被合入社区,但在各大厂被开源使用Remote Shuffle Service;除此以外还有将向量计算引入shuffle计算的实现。

由上面的介绍可知, Shuffle 就是将map阶段的拆分数据,通过设置的聚合方式按照分区进行聚合后,由reduce进行处理的过程。下面我们来总的认识下Spark中的shuffle方式:

1. Hash Shuffle

Hash Shuffle, 顾名思义,就是采取Hash的方式在Map的任务中为每个reduce端的任务生成一个文件。因此如果有M个map任务, R个reduce任务就会产生M x R个文件。巨量磁盘小文件而产生大量性能低下的Io操作,从而性能较低,因为其巨量的磁盘小文件还可能导致OOM,HashShuffle的合并机制通过重复利用buffer从而将磁盘小文件的数量降低到Core R个,但是当Reducer 端的并行任务或者是数据分片过多的时候,依然会产生大量的磁盘小文件。

代码语言:javascript复制
    开启consolidate机制之后,在shuffle write过程中,task就不是为下游stage的每个task创建一个
磁盘文件了。此时会出现shuffleFileGroup的概念,每个shuffleFileGroup会对应一批磁盘文件,磁盘
文件的数量与下游stage的task数量是相同的。一个Executor上有多少个CPU core,就可以并行执行多少
个task。而第一批并行执行的每个task都会创建一个shuffleFileGroup,并将数据写入对应的磁盘文件内。

    Executor的CPU core执行完一批task,接着执行下一批task时,下一批task就会复用之前已有的
shuffleFileGroup,包括其中的磁盘文件。也就是说,此时task会将数据写入已有的磁盘文件中,而不会
写入新的磁盘文件中。因此,consolidate机制允许不同的task复用同一批磁盘文件,这样就可以有效
将多个task的磁盘文件进行一定程度上的合并,从而大幅度减少磁盘文件的数量,进而提升
shuffle write的性能。

2. Sort Shuffle

Sort Shuffle 的引入是如何解决上述问题的呢?

首先,在Shuffle的map阶段会将所有数据进行排序,并将分区的数据写入同一个文件中,在创建数据文件的同时会产生索引文件,来记录分区的大小和偏移量。所以这里产生文件的数量和reduce分区就没有关系了,只会产生2 * M个临时文件。

下面我们先通过简单分析来对Spark Shuffle有个简单的了解,后面再详细介绍:

Spark 在启动时就会在SparkEnv中创建ShuffleManager来管理shuffle过程。

代码语言:javascript复制
// Let the user specify short names for shuffle managers
val shortShuffleMgrNames =Map(
  "sort" ->classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName,
  "tungsten-sort" ->classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName)
val shuffleMgrName = conf.get("spark.shuffle.manager", "sort")
val shuffleMgrClass =
  shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase(Locale.ROOT), shuffleMgrName)
val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass)

从上面的代码可以看出,Spark目前只有唯一一种ShuffleManager的实现方式,就是SortShuffleManager。

下面我们可以看下ShuffleManager这个接口类:

代码语言:javascript复制
private[spark] trait ShuffleManager {

/**
   * Register a shuffle with the manager and obtain a handle for it to pass to tasks.
   *向shuffleManager注册shuffle,并返回handle
   */
def registerShuffle[K, V, C](
      shuffleId: Int,
      numMaps: Int,
      dependency: ShuffleDependency[K, V, C]): ShuffleHandle

/** Get a writer for a given partition. Called on executors by map tasks. */
def getWriter[K, V](handle: ShuffleHandle, mapId: Int, context: TaskContext): ShuffleWriter[K, V]

/**
   * Get a reader for a range of reduce partitions (startPartition to endPartition-1, inclusive).
   * Called on executors by reduce tasks.
   */
def getReader[K, C](
      handle: ShuffleHandle,
      startPartition: Int,
      endPartition: Int,
      context: TaskContext): ShuffleReader[K, C]
...
}
  • registerShuffle()方法用于注册一种shuffle机制,并返回对应的ShuffleHandle(类似于句柄),handle内会存储shuffle依赖信息。根据该handle可以进一步确定采用ShuffleWriter/ShuffleReader的种类。
  • getWriter()方法用于获取ShuffleWriter。它是executor执行map任务时调用的。
  • getReader()方法用于获取ShuffleReader。它是executor执行reduce任务时调用的。

我们都知道在Spark的DAG中,顶点是一个个 RDD,边则是 RDD 之间通过 dependencies 属性构成的父子关系。dependencies 又分为宽依赖和窄依赖,分别对应ShuffleDependency和NarrowDependency。当RDD间的依赖关系为ShuffleDependency时,RDD会通过其SparkEnv向ShuffleManager注册一个shuffle, 并返回当前处理当前shuffle所需要的句柄。

Spark有2个类型的Task: ShuffleMapTask和ResultTask,在Spark中stage是以Pipeline运行的, 除了最后一个Stage对应的是ResultStage,其余的Stage对应的都是ShuffleMapStage。

ShuffleMapStage中的每个Task,叫做ShuffleMapTask,getWriter()方法的调用主要是在ShuffleMapTask中进行。调用getWriter方法会返回一个ShuffleWriter的trait。

除了需要从外部存储读取数据和RDD已经做过cache或者checkpoint的Task,一般Task的开始都是从ShuffledRDD的调用getReader()。调用getReader()会返回一个ShuffleReader的trait。

微信截图_20220519215721.png

综上,Spark的Shuffle模块主要有ShuffleManager、ShuffleWriter和ShuffleReader。ShuffleManager目前在社区版中只有SortShuffleManager一种实现,ShuffleReader也只有BlockStoreShuffleReader一种实现,但是ShuffleWriter目前有BypassMergerSortShuffleWriter, SortShuffleWriter和UnsafeShuffleWriter三种实现。

3. Push-Based Shuffle

push-based shuffle方案,会在mapper执行后并且会被自动合并数据,然后将数据移动到下游的reducer。目前只支持Yarn方式的实现。

在中大规模的Spark shuffle中,Shuffle依然是很多的性能问题:

  • 第一个挑战是可靠性问题。由于计算节点数据量大和 shuffle 工作负载的规模,可能会导致 shuffle fetch 失败,从而导致昂贵的 stage 重试。
  • 第二个挑战是效率问题。由于 reducer 的 shuffle fetch 请求是随机到达的,因此 shuffle 服务也会随机访问 shuffle 文件中的数据。如果单个 shuffle 块大小较小,则 shuffle 服务产生的小随机读取会严重影响磁盘吞吐量,从而延长 shuffle fetch 等待时间。
  • 第三个挑战是扩展问题。由于 external shuffle service 是我们基础架构中的共享服务,因此一些对 shuffle services 错误调优的作业也会影响其他作业。当一个作业错误地配置导致产生许多小的 shuffle blocks 将会给 shuffle 服务带来压力时,它不仅会给自身带来性能下降,还会使共享相同 shuffle 服务的所有相邻作业的性能下降。这可能会导致原本正常运行的作业出现不可预测的运行时延迟,尤其是在集群高峰时段。

push-based shuffle 利用共享的ESS服务,在map阶段时将溢写的数据文件,通过推送的方式推送到reduce对应的ESS节点,并在其中对小文件进行合并。其中push-based shuffle 实现了一个magnet shuffle服务,是一个增强的Spark ESS,它可以接受远程推送的shuffle block,它会将block合并到每一个唯一shuffle分区文件。

push merge shuffle采用的push-merge shuffle机制。Mapper生成的shuffle数据被推送到远程的magnet shuffle服务,并按照每个shuffle合并。Magnet在此期间可以将小的shuffle块的随机读取转换为MB大小的顺序读取。这个push操作与map任务完全解耦,所以无需添加到执行map任务的运行时中,一旦推送失败就会导致maptask失败。

尽可能地执行push,Magnet无需所有的shuffle都是完美的push成功。通过push-merge shuffle,Magnet复制shuffle数据,Reducer可以获取合并后的、或者是没有合并的shuffle数据作为任务输入。也就是,即使没有合并也可以读取。

那么Spark是如何选择Sort-based ShuffleWriter的具体实现方式呢?

ShuffleWriter方式的选择

代码语言:javascript复制
override def registerShuffle[K, V, C](
    shuffleId: Int,
    numMaps: Int,
    dependency: ShuffleDependency[K, V, C]): ShuffleHandle = {
  if (SortShuffleWriter.shouldBypassMergeSort(conf, dependency)) {
    // If there are fewer than spark.shuffle.sort.bypassMergeThreshold partitions and we don't
    // need map-side aggregation, then write numPartitions files directly and just concatenate
    // them at the end. This avoids doing serialization and deserialization twice to merge
    // together the spilled files, which would happen with the normal code path. The downside is
    // having multiple files open at a time and thus more memory allocated to buffers.
    new BypassMergeSortShuffleHandle[K, V](
      shuffleId, numMaps, dependency.asInstanceOf[ShuffleDependency[K, V, V]])
  } else if (SortShuffleManager.canUseSerializedShuffle(dependency)) {
    // Otherwise, try to buffer map outputs in a serialized form, since this is more efficient:
    new SerializedShuffleHandle[K, V](
      shuffleId, numMaps, dependency.asInstanceOf[ShuffleDependency[K, V, V]])
  } else {
    // Otherwise, buffer map outputs in a deserialized form:
    new BaseShuffleHandle(shuffleId, numMaps, dependency)
  }
}

可以看出,根据条件的不同,会返回3种不同的handle,对应3种shuffle机制。从上到下来分析一下:

1. 检查是否符合SortShuffleWriter.shouldBypassMergeSort()方法的条件:

代码语言:javascript复制
def shouldBypassMergeSort(conf: SparkConf, dep: ShuffleDependency[_, _, _]): Boolean = {
  // We cannot bypass sorting if we need to do map-side aggregation.
  if (dep.mapSideCombine) {
    false
  } else {
    val bypassMergeThreshold: Int = conf.getInt("spark.shuffle.sort.bypassMergeThreshold", 200)
    dep.partitioner.numPartitions <= bypassMergeThreshold
  }
}

判断是否符合bypassMergeSort的条件主要有以下两个:

  • 该shuffle依赖中没有map端聚合操作(如groupByKey()算子)
  • 分区数不大于参数spark.shuffle.sort.bypassMergeThreshold规定的值(默认200)

那么会返回BypassMergeSortShuffleHandle,启用bypass merge-sort shuffle机制。

2. 如果不启用上述bypass机制,那么继续检查是否符合canUseSerializedShuffle()方法的条件:

代码语言:javascript复制
def canUseSerializedShuffle(dependency: ShuffleDependency[_, _, _]): Boolean = {
    val shufId = dependency.shuffleId
    val numPartitions = dependency.partitioner.numPartitions
    if (!dependency.serializer.supportsRelocationOfSerializedObjects) {
      log.debug(/*...*/)
      false
    } else if (dependency.aggregator.isDefined) {
      log.debug(/*...*/)
      false
    } else if (numPartitions > MAX_SHUFFLE_OUTPUT_PARTITIONS_FOR_SERIALIZED_MODE) {
      log.debug(/*...*/)
      false
    } else {
      log.debug(/*...*/)
      true
    }
  }
}

也就是说,如果同时满足以下三个条件:

  • 使用的序列化器支持序列化对象的重定位(如KryoSerializer)
  • shuffle依赖中完全没有聚合操作
  • 分区数不大于常量MAX_SHUFFLE_OUTPUT_PARTITIONS_FOR_SERIALIZED_MODE的值(最大分区ID号 1,即2^24=16777216)

那么会返回SerializedShuffleHandle,启用序列化sort shuffle机制(也就是tungsten-sort)。

3. 如果既不用bypass也不用tungsten-sort,那么就返回默认的BaseShuffleHandle,采用基本的sort shuffle机制。

Untitled.png

目前spark中只有一种Shuffle的实现方式,即sort-Shuffle, 但是它包括了使用Tungsten实现的unsafeSortShuffle, 不需要排序的BypassMergeSortShuffle和baseSortShuffle, 并通过实现ShuffleWriter的方式根据不通过的情况进行选择,可以更好的处理不同情况的数据任务。

总结,Spark为不同情况实现不同有趣的Shuffle机制。Spark的shuffle是通过将中间文件物化到spark.local.dir的本地临时文件中,来增强spark的容错性,但是也造成了shuffle时的性能压力。在传统的Hash shuffle中,可以直接将map端的数据shuffle归类为对应的reduce 分区的数据,但是也造成了产生MxN(M是map Task, N 是Reduce Task)数量级的中间文件, 即使通过重用buffer, 将不同批次的Task的写入文件进行重用,减少了一定量的数据文件,但是并不能从根本上减少文件的数量级。采用Sort-based Shuffle 主要是使用在数据量比较大的情况下,通过将map端的数据进行排序,并生成文件索引,那么就可以通过读取文件的偏移量来区别不同的reduce应该拉取那部分的数据,产生的中间文件数据也变成了2 * M 个, 大大的减少了处理的文件数量。但是随着数据服务压力增加,大量的中间小文件会造成随机io, io的压力也会导致fetchfail的发生几率的上升,push-based shuffle 主要是将map端的数据push到共享ESS进行合并,进一步的减少小文件的数量,将随机io变为顺序io, 同时减少中间文件的数量,提升集群的稳定性。

今天就先到这里,通过上面的介绍,我们也留下些面试题?

  1. Sort-based Shuffle, push- based Shuffle 和 Remote Shuffle Service 那么现在社区或业界提供这么多有趣的shuffle可供选择,那么应用中应该如何选择具体的Shuffle方式?他们的适用范围是什么?
  2. 如果你要实现一种新的ShuffleManage应该怎么在Spark实现配置?
  3. 既然是Sort-based Shuffle 那么Shuffle后的数据是否是有序的?
  4. Remote Shuffle Service 和 Push-based Shuffle 他们的优劣分别是什么?

0 人点赞