[SPARK][CORE] 面试问题之 3.2新的特性Push-based Shuffle源码解析

2022-06-15 18:19:30 浏览数 (1)

Spark 3.2为spark shuffle带来了重大的改变,其中新增了push-based shuffle机制。但其实在push-based shuffle 之前,业界也有人提出了remote shuffle service的实践,不过由于它们是依赖于外部组件实现的所以一直不被社区所接收。

在上一讲我们先来了解push-based shuffle机制的实现原理,这里我们来通过源码分析下其实现的过程。

首先,Push-based shuffle机制是不依赖于外部组件的方案,但使用升级版的ESS进行shuffle data的合并,所以PBS(Push-based shuffle)只支持Yarn方式的实现。

其次,引入PBS新特性的主要原因是为了解决大shuffle的场景存在的问题:

  • 第一个挑战是可靠性问题。由于计算节点数据量大和 shuffle 工作负载的规模,可能会导致 shuffle fetch 失败,从而导致昂贵的 stage 重试。
  • 第二个挑战是效率问题。由于 reducer 的 shuffle fetch 请求是随机到达的,因此 shuffle 服务也会随机访问 shuffle 文件中的数据。如果单个 shuffle 块大小较小,则 shuffle 服务产生的小随机读取会严重影响磁盘吞吐量,从而延长 shuffle fetch 等待时间。
  • 第三个挑战是扩展问题。由于 external shuffle service 是我们基础架构中的共享服务,因此一些对 shuffle services 错误调优的作业也会影响其他作业。当一个作业错误地配置导致产生许多小的 shuffle blocks 将会给 shuffle 服务带来压力时,它不仅会给自身带来性能下降,还会使共享相同 shuffle 服务的所有相邻作业的性能下降。这可能会导致原本正常运行的作业出现不可预测的运行时延迟,尤其是在集群高峰时段。

此外,PBS不仅适用于大shuffle的场景,对于大量小shuffle文件,这种严重影响磁盘IO性能的情况下, 也有很好的性能提升。push-based shuffle并不是来替换sort-based shuffle, 它是通过补充的方式来优化shuffle。

接下来我们将从以下shuffle service 准备、Map端push shuffle数据、shuffle service merge数据、更新MergeStatues和reducer拉取merge shuffle 数据五部分进行分析代码的实现。

shuffle service 准备

push-based shuffle依赖于driver节点的行为,并将其作为中心的协调节点,为其协调资源、记录mergeLocs信息和记录mergeStatues等。

push-based shuffle虽然有很多的性能的提升,但是社区在其使用上还是比较保守,默认pbs是关闭的。如果要开启它还需要满足比较严格的条件,下面我们首先了解下开启PBS需要满足什么。

我们从DAGScheduler类中pushBasedShuffleEnabled可以看出,开启pbs需要满足以下条件限制:

代码语言:javascript复制
// 标志开启push-based shuffle, push based shuffle 只能在以下的情况下开启
private val pushBasedShuffleEnabled = Utils.isPushBasedShuffleEnabled(sc.getConf, isDriver = true)

def isPushBasedShuffleEnabled(conf: SparkConf,
      isDriver: Boolean,
      checkSerializer: Boolean = true): Boolean = {
    // [1] spark.shuffle.push.enabled 设置为true
    val pushBasedShuffleEnabled = conf.get(PUSH_BASED_SHUFFLE_ENABLED)
    if (pushBasedShuffleEnabled) {
      val canDoPushBasedShuffle = {
        val isTesting = conf.get(IS_TESTING).getOrElse(false)
        // [2] spark.shuffle.service.enabled 必须设置为true, shuffle merge 就是在ess上进行合并的
        // [3] 目前resource manager资源管理的方式,只支持yarn模式
        val isShuffleServiceAndYarn = conf.get(SHUFFLE_SERVICE_ENABLED) &&
            conf.get(SparkLauncher.SPARK_MASTER, null) == "yarn"
        // [4] 序列化程序支持对象重定位relocation
        lazy val serializerIsSupported = {
          if (checkSerializer) {
            Option(SparkEnv.get)
              .map(_.serializer)
              .filter(_ != null)
              .getOrElse(instantiateSerializerFromConf[Serializer](SERIALIZER, conf, isDriver))
              .supportsRelocationOfSerializedObjects
          } else {
            // if no need to check Serializer, always set serializerIsSupported as true
            true
          }
        }
        // [5] spark.io.encryption.enabled 需要关闭
        // TODO: [SPARK-36744] needs to support IO encryption for push-based shuffle
        val ioEncryptionDisabled = !conf.get(IO_ENCRYPTION_ENABLED)
        (isShuffleServiceAndYarn || isTesting) && ioEncryptionDisabled && serializerIsSupported
      }
      if (!canDoPushBasedShuffle) {
        logWarning("Push-based shuffle can only be enabled when the application is submitted "  
          "to run in YARN mode, with external shuffle service enabled, IO encryption disabled, "  
          "and relocation of serialized objects supported.")
      }

      canDoPushBasedShuffle
    } else {
      false
    }
  }

从上述代码可以看出,开启push-based shuffle 需要满足以下条件:

  • [1] spark.shuffle.push.enabled 设置为true
  • [2] spark.shuffle.service.enabled 必须设置为true, shuffle merge 就是在ess上进行合并的
  • [3] 目前resource manager资源管理的方式,只支持yarn模式
  • [4] 序列化程序支持对象重定位relocation
  • [5] spark.io.encryption.enabled 需要关闭

如果以上条件满足并开启了PBS,那么在Driver节点会发生哪些行为呢?这些行为的作用是什么?

我们回到DAGScheduler中,在DAGScheduler 进行submitTasks时会为pbs做以下准备:

代码语言:javascript复制
// DAGScheduler
private def submitMissingTasks(stage: Stage, jobId: Int): Unit = {
  stage match {
    // 在提交shuffleMapTask节点会
    case s: ShuffleMapStage =>
      outputCommitCoordinator.stageStart(stage = s.id, maxPartitionId = s.numPartitions- 1)
      // Only generate merger location for a given shuffle dependency once.
      // [1] 如果shuffle merge 开启,同时shuffle merge没有完成,准备shuffleservice为ShuffleMapStage
      if (s.shuffleDep.shuffleMergeEnabled) {
        if (!s.shuffleDep.shuffleMergeFinalized) {
          prepareShuffleServicesForShuffleMapStage(s)
        } else {
          s.shuffleDep.setShuffleMergeEnabled(false)
          logInfo("Push-based shuffle disabled for $stage (${stage.name}) since it"  
            " is already shuffle merge finalized")
        }
      }
// [2] 在prepareShuffleServicesForShuffleMapStage,通过schedulerBackend获取ShufflePushMergerLocations
// prepareShuffleServicesForShuffleMapStage
val mergerLocs = sc.schedulerBackend.getShufflePushMergerLocations(
        stage.shuffleDep.partitioner.numPartitions, stage.resourceProfileId)

// [3] 向主节点块管理器(BlockManagerMasterEndpoint 类)发送GetShufflePushMergerLocations消息
def getShufflePushMergerLocations(
      numMergersNeeded: Int,
      hostsToFilter: Set[String]): Seq[BlockManagerId] = {
    driverEndpoint.askSync[Seq[BlockManagerId]](
      GetShufflePushMergerLocations(numMergersNeeded, hostsToFilter))
  }

从上面可以看出,当DAGScheduler 进行submitTasks时,如果stage是ShuffleMapStage,同时shuffle merge没有完成,那么shuffleMerge会向resource manager资源管理器后端询问可用于shuffle merge services的Executor列表。

在返回用于托管 shuffle merge 服务的可用节点后,DAGScheduler 将它们记录在ShuffleDependency的mergerLocs 属性中。下面是其详细的过程:

  • [1] 如果shuffle merge 开启,同时shuffle merge没有完成,准备shuffleService为ShuffleMapStage。
  • [2] 在prepareShuffleServicesForShuffleMapStage,通过schedulerBackend获取ShufflePushMergerLocations,返回mergerLocs。
  • [3] 向主节点块管理器(BlockManagerMasterEndpoint 类)发送GetShufflePushMergerLocations消息

现在我们进入BlockManagerMasterEndpoint类的getShufflePushMergerLocations方法中,进一步看看shuffleMerge是如何获取足够的可用于合并的Executor列表的。

代码语言:javascript复制
// BlockManagerMasterEndpoint
// 获取足够的executor进行合并
private def getShufflePushMergerLocations(
    numMergersNeeded: Int,
    hostsToFilter: Set[String]): Seq[BlockManagerId] = {
  // [1] 通过blockManagerIdByExecutor过滤非driver的Executor如果满足numMergersNeeded则直接返回
  val blockManagerHosts = blockManagerIdByExecutor
.filterNot(_._2.isDriver).values.map(_.host).toSet
  val filteredBlockManagerHosts = blockManagerHosts.filterNot(hostsToFilter.contains(_))
  val filteredMergersWithExecutors = filteredBlockManagerHosts.map(
BlockManagerId(BlockManagerId.SHUFFLE_MERGER_IDENTIFIER, _,externalShuffleServicePort))
  // Enough mergers are available as part of active executors list
  if (filteredMergersWithExecutors.size >= numMergersNeeded) {
    filteredMergersWithExecutors.toSeq
  } else {
    // [2] 否则需要激活过去使用的Executor(最多 500 个)用于进行合并
    // Delta mergers added from inactive mergers list to the active mergers list
    val filteredMergersWithExecutorsHosts = filteredMergersWithExecutors.map(_.host)
    val filteredMergersWithoutExecutors =shuffleMergerLocations.values
      .filterNot(x => hostsToFilter.contains(x.host))
      .filterNot(x => filteredMergersWithExecutorsHosts.contains(x.host))
    val randomFilteredMergersLocations =
      if (filteredMergersWithoutExecutors.size >
        numMergersNeeded - filteredMergersWithExecutors.size) {
        Utils.randomize(filteredMergersWithoutExecutors)
          .take(numMergersNeeded - filteredMergersWithExecutors.size)
      } else {
        filteredMergersWithoutExecutors
      }
    filteredMergersWithExecutors.toSeq    randomFilteredMergersLocations
  }
}

[1] 通过blockManagerIdByExecutor过滤非driver的Executor如果满足numMergersNeeded则直接返回

[2] 否则需要激活过去使用的Executor(最多 500 个)用于进行合并。

从中可以看出,如果executor数不满足numMergersNeeded,会从过去使用executor中选择进行激活,直到获取到足够的可用于合并的Executor列表。

这时shuffle merge service已经准备好了,同时其被记录在shuffleDependency的mergerLocs 属性中。

总而言之,这个阶段在Driver的DAGScheduler中主要做了两件事:

  1. 获取足够的可用于shuffle merge services的Executor列表。
  2. 将它们记录在ShuffleDependency的mergerLocs属性中。

那么shuffle data 是如何被push到shuffle service中的呢?

Map端push shuffle数据

乍一看,shuffle Writer中的代码并没有变化,没有增加一种新的shuffle Writer。但PBS的实现主要是shuffle data生成后推送出去进行合并。

还记的在介绍getWriter时(参考bypass的文章),在ShuffleWriteProcessor.write 中,在 ShuffleWriter.write 成功后, 曾有段shuffleMerge处理的代码。

下面我们来详细介绍下push-based shuffle 是怎样处理getWriter返回的结果数据的。

代码语言:javascript复制
def write(
    ...
    val manager = SparkEnv.get.shuffleManager
    writer = manager.getWriter[Any, Any](
      dep.shuffleHandle,
      mapId,
      context,
      createMetricsReporter(context))
    writer.write(
      rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])
    val mapStatus = writer.stop(success = true)
    if (mapStatus.isDefined) {
      // 创建了一个ShuffleBlockPusher实例并调用了它的initialBlockPush方法。
      // 在该方法内部,推送器通过获取连续的 shuffle 数据块来创建 shuffle 合并请求
      if (dep.shuffleMergeEnabled && dep.getMergerLocs.nonEmpty && !dep.shuffleMergeFinalized) {
        manager.shuffleBlockResolver match {
          case resolver: IndexShuffleBlockResolver =>
            val dataFile = resolver.getDataFile(dep.shuffleId, mapId)
            new ShuffleBlockPusher(SparkEnv.get.conf)
              .initiateBlockPush(dataFile, writer.getPartitionLengths(), dep, partition.index)
          case _ =>
        }
      }
    }
    mapStatus.get
  } catch {
  ...
  }
}

从上面的代码可见,在执行完map端的writer后,会判断shuffleMergeEnabled是否开启, 要求dependency中MergerLocs不为空,其次就是shuffleMerge还未执行完成。如果满足这些条件,则会创建ShuffleBlockPusher类,并调用其initiateBlockPush方法。

看来具体的push实现位于initiateBlockPush方法中。

代码语言:javascript复制
private[shuffle] def initiateBlockPush(
    dataFile: File,
    partitionLengths: Array[Long],
    dep: ShuffleDependency[_, _, _],
    mapIndex: Int): Unit = {
  val numPartitions = dep.partitioner.numPartitions
  val transportConf = SparkTransportConf.fromSparkConf(conf, "shuffle")
  // [1] 将map task的shuffle数据转换为PushRequest请求
  val requests = prepareBlockPushRequests(numPartitions, mapIndex, dep.shuffleId,
    dep.shuffleMergeId, dataFile, partitionLengths, dep.getMergerLocs, transportConf)
  // [2] 将PushRequest请求列表变为随机请求,这样不同的mapper同时推送块不会推送相同范围的 shuffle 分区
  // Randomize the orders of the PushRequest, so different mappers pushing blocks at the same
  // time won't be pushing the same ranges of shuffle partitions.
pushRequests  = Utils.randomize(requests)

  // [3] 尽力而为的push shuffle数据到ess
  submitTask(() => {
    tryPushUpToMax()
  })
}

从上面的代码可以看出,在initiateBlockPush主要做了以下三步工作:

  • [1] 将map task的shuffle数据转换为PushRequest请求
  • [2] 将PushRequest请求列表变为随机请求,这样不同的mapper同时推送块不会推送相同范围的 shuffle 分区
  • [3] 尽力而为的push shuffle数据到shuffle merge service

顾名思义,prepareBlockPushRequests方法的作用是将map端生成的shuffle data封装转换为PushRequest请求。不过除此以外还会将shuffle中连续的块分到同一个请求中,可以允许更有效的数据读取。如下图所示:

代码语言:javascript复制
// prepareBlockPushRequests 方法
for (reduceId <-0 until numPartitions) {
// 分区块进行合并并非直接按分区发送,而是通过以下公式
  val mergeId = math.min(math.floor(reduceId *1.0 / numPartitions * numMergers),
          numMergers -1 ).asInstanceOf[Int ]

在prepareBlockPushRequests,分区块进行合并按照chunk进行发送,通过上面的公式进行划分合并块的,同时会跳过空的分区块和超过maxBlockSizeToPush,从而避免数据倾斜。这里已经在上一讲讲过了,就不再过多赘述了,具体可以看上一讲push-based shuffle初探。

在封装好PushRequest请求后,最后通过调用tryPushUpToMax方法将数据块推送出去。

tryPushUpToMax方法调用的是我们在shuffle read中使用的pushUpToMax方法。这个方法在shuffle reader文章中也已经介绍过了,这里只简单总结下。这里的工作就是将shuffle data push到对应的shuffle merge service。在发送时将数据封装为PushBlockStream,push 的时候使用的是 streamUpload 的方式,通过 OneForOneBlockPusher ,利用 RetryingBlockFetcher 进行发送。

在Map端push data的阶段,主要做了三件事:

  1. 将相同分区shuffle data block合并拆分到chunk中,并将其封装为PushRequest;
  2. 随机打乱PushRequest,避免顺序的构造push chunk,导致热点和严重的争用冲突;
  3. 通过pushUpToMax方法,将数据封装为PushBlockStream,通过 OneForOneBlockPusher,利用 RetryingBlockFetcher 进行发送。

Shuffle Service merge数据

shuffle service 上使用 TransportRequestHandler.processStreamUpload处理上传的shuffle数据块流。一个 block 的数据会被拆成若干个 bytebuffer 进行处理,最后 onComplete 的时候进行合并。

代码语言:javascript复制
if (msgObj instanceof PushBlockStream) {
  PushBlockStream message = (PushBlockStream) msgObj;
  checkAuth(client, message.appId);
  return mergeManager.receiveBlockDataAsStream(message);
....
}

如上代码所示:在合并过程中,会调用ExternalBlockHandler.receiveStream方法中操作请求。它将推送消息传递给RemoteBlockPushResolver的receiveBlockDataAsStream方法。

那么shuffle data具体是如何被合并的,这里涉及到一个重要的数据结构AppShufflePartitionInfo。

在ShuffleService上会保存appId到AppShuffleInfo的map映射,每个AppShuffleInfo内会保存shuffleId到AppShuffleMergePartitionsInfo 的map映射,在appShuffleMergePartitionsInfo内会保存reduceId到AppShufflePartitionInfo 的映射,最终在AppShufflePartitionInfo 内部会保存三个File;

代码语言:javascript复制
public static class AppShufflePartitionInfo {

  private final String appId;
  private final int shuffleId;
  private final int shuffleMergeId;
  private final int reduceId;
  private final File dataFile;
  // The merged shuffle data file channel
  public final FileChannel dataChannel;
  // The index file for a particular merged shuffle contains the chunk offsets.
  private final MergeShuffleFile indexFile;
  // The meta file for a particular merged shuffle contains all the map indices that belong to
  // every chunk. The entry per chunk is a serialized bitmap.
  private final MergeShuffleFile metaFile;
...
}

如上代码所示,AppShufflePartitionInfo 中包含 3 个 FileChannel,分别用于 data/index/meta 信息的保存。

当shuffle service接收到 block 块时,在尝试添加到对应的 shuffle 合并文件之前,它首先要检索相应的 Shuffle 分区元数据。保存的元数据可以帮助shuffle service正确处理一些潜在的异常场景。

在onData进行数据处理时,对于 streamUpload 过来的 ByteBuffer,只会对 AppShufflePartitionInfo 进行加锁,如果当前 ByteBuffer 的数据不属于currentMergingMapId 的,则加入到一个列表中。在写当前正在处理的 ByteBuffer 前,会将前面列表中的数据都写入到数据文件中。

最终的合并时在onComplete进行的,下面我们详细看下合并要满足的条件:

代码语言:javascript复制
@Override
public void onComplete(String streamId) throws IOException {
  synchronized (partitionInfo) {
logger.trace("{} onComplete invoked", partitionInfo);

    AppShuffleMergePartitionsInfo info = appShuffleInfo.shuffles.get(partitionInfo.shuffleId);
    // [1] 如果shuffle map任务终止(或者说reducers已经启动),表明太迟了,则不会发生合并
    if (isTooLate(info, partitionInfo.reduceId)) {
     ...
    }
    // [2] stage的状态不确定,(stage在重试中), 也不会进行合并
    if (isStale(info, partitionInfo.shuffleMergeId)) {
      ...
    }

    // [3] 校验给定 reducer 的一个映射流只能与现有文件合并
    // Check if we can commit this block
    if (allowedToWrite()) {
      // [4] 如果是推测任务执行中发送重复的reducer数据,则直接返回 
      if (isDuplicateBlock()) {
        deferredBufs = null;
        return;
      }
      if (partitionInfo.getCurrentMapIndex() < 0) {
        ...
      }
      // [5] 执行buffer合并
      long updatedPos = partitionInfo.getDataFilePos()   length;
      boolean indexUpdated = false;
      if (updatedPos - partitionInfo.getLastChunkOffset() >= mergeManager.minChunkSize) {
        try {
          partitionInfo.updateChunkInfo(updatedPos, mapIndex);
          indexUpdated = true;
        } catch (IOException ioe) {
          incrementIOExceptionsAndAbortIfNecessary();
          // If the above doesn't throw a RuntimeException, then we do not propagate the
          // IOException to the client. This may increase the chunk size however the increase is
          // still limited because of the limit on the number of IOExceptions for a
          // particular shuffle partition.
        }
      }
      partitionInfo.setDataFilePos(updatedPos);
      partitionInfo.setCurrentMapIndex(-1);

      // update merged results
      partitionInfo.blockMerged(mapIndex);
      if (indexUpdated) {
        partitionInfo.resetChunkTracker();
      }
    } else {
      deferredBufs = null;
      throw new BlockPushNonFatalFailure(
        new BlockPushReturnCode(ReturnCode.BLOCK_APPEND_COLLISION_DETECTED.id(), streamId)
          .toByteBuffer(), BlockPushNonFatalFailure.getErrorMsg(
            streamId, ReturnCode.BLOCK_APPEND_COLLISION_DETECTED));
    }
  }
  isWriting = false;
}

可见在合并前也需要满足PushBlockStreamCallback定义的条件:

  • [1] 如果shuffle map任务终止(或者说reducers已经启动),表明太迟了,则不会发生合并
  • [2] stage的状态不确定,(stage在重试中), 也不会进行合并
  • [3] 校验给定 reducer 的一个映射流只能与现有文件合并
  • [4] 如果是推测任务执行中发送重复的reducer数据,则直接返回

最后再进行合并时,会将 shuffle 字节添加到数据文件后,合并器首先将合并后的偏移量写入索引文件,然后才将映射器信息添加到元文件中。

这里的逻辑有点复杂,为了避免错误,只总结下要点:

  1. shuffle service 上使用 TransportRequestHandler.processStreamUpload处理上传的shuffle数据块流。一个 block 的数据会被拆成若干个 bytebuffer 进行处理。
  2. 在onData进行数据处理时,对于 streamUpload 过来的 ByteBuffer,只会对 AppShufflePartitionInfo 进行加锁,如果当前 ByteBuffer 的数据不属于currentMergingMapId 的,则加入到一个列表中。在写当前正在处理的 ByteBuffer 前,会将前面列表中的数据都写入到数据文件中。
  3. 在onComplete进行合并时,会先判断是否满足合并的条件。合并时,会将 shuffle 字节append到数据data文件后,合并器首先将合并后的偏移量写入索引index文件,然后才将映射器信息添加到元meta文件中。

获取更新MergeStatues

当每个 ShuffleMapTask 结束的时候,DAGScheduler都会去判断 ShuffleMapStage 是否 pending partitions 为空,如果为空说明 stage 结束了,此时开始向 shuffle service 上发送 finalize 信息,并将信息返回给 driver 并添加到 merge statuses 信息中。

代码语言:javascript复制
private[scheduler] def handleTaskCompletion(event: CompletionEvent): Unit = {
     ...
        if (runningStages.contains(shuffleStage) && shuffleStage.pendingPartitions.isEmpty) {
              if (!shuffleStage.shuffleDep.shuffleMergeFinalized &&
                shuffleStage.shuffleDep.getMergerLocs.nonEmpty) {
                scheduleShuffleMergeFinalize(shuffleStage)
              } else {
                processShuffleMapStageCompletion(shuffleStage)
              }
            }
     ...

上面是DAGScheduler.handleTaskCompletion中的代码,可以从中看出在TaskCompletion时,当DAGScheduler收到有关执行的最后一个map task的通知时, 它会向所有 shuffle 服务发送FinalizeShuffleMerge消息。服务拦截消息并从MergerShuffleFileManager完成合并过程。任何正在进行的合并都会被中断并取消,以避免合并文件中有部分数据。

同时DAGScheduler 等待spark.shuffle.push.result.timeout来获取响应。如果 shuffle 服务在此延迟内响应,DAGScheduler会拦截包含以下属性的响应:

代码语言:javascript复制
public class MergeStatuses extends BlockTransferMessage {
  /** Shuffle ID **/
  public final int shuffleId;
  /**
   * shuffleMergeId is used to uniquely identify merging process of shuffle by
   * an indeterminate stage attempt.
   */
  public final int shuffleMergeId;
  /**
   * Array of bitmaps tracking the set of mapper partition blocks merged for each
   * reducer partition
   */
  public final RoaringBitmap[] bitmaps;
  /** Array of reducer IDs **/
  public final int[] reduceIds;
  /**
   * Array of merged shuffle partition block size. Each represents the total size of all
   * merged shuffle partition blocks for one reducer partition.
   * **/
  public final long[] sizes;

获取MergeStatues的过程和MapStatus的过程类似,其是通过getPushBasedShuffleMapSizesByExecutorId进行获取的,具体可以参考shuffle reader 的文章。

在更新Merge Status阶段,主要做了下面的工作:

  1. 向shuffle service上发送 FinalizeShuffleMerge 信息。注意:任何正在进行的合并都会被中断并取消,以避免合并文件中有部分数据。
  2. 等待spark.shuffle.push.result.timeout来获取响应,拦截获取MergeStatuses。

reducer拉取merge shuffle数据

reduce task 开始之后,从 driver 上获取 merge statuses 信息,并在数据划分的时候,如果是 merged block 则先向 shuffle service 上请求一次 meta 信息,获取到 meta 信息后,利用 shuffle service 上的 index 文件信息,读取文件中 block 数据。

通过get reader获取shuffle数据,这块已经在shuffle reader中讲过了,这里就不大段的贴代码了,只讲解涉及到pbs的地方,具体的可以再复习shuffle reader的文章。

代码语言:javascript复制
// 在划分数据源的请求:本地、主机本地和远程块, 同时划分出pbs的blocks
val remoteRequests = partitionBlocksByFetchMode(
  blocksByAddress, localBlocks, hostLocalBlocksByExecutor, pushMergedLocalBlocks)

我们具体的收集pbs的远程blocks地址的实现:

代码语言:javascript复制
blockId match {
        // 获取数据请求
        case ShuffleBlockChunkId(_, _, _, _) =>
          if (curRequestSize >= targetRemoteRequestSize ||
            curBlocks.size >= maxBlocksInFlightPerAddress) {
            curBlocks = createFetchRequests(curBlocks.toSeq, address, isLast = false,
              collectedRemoteRequests, enableBatchFetch = false)
            curRequestSize = curBlocks.map(_.size).sum
          }
       // 从forMergedMetas可以看出这里会为获取元数据构建单独的请求
        case ShuffleMergedBlockId(_, _, _) =>
          if (curBlocks.size >= maxBlocksInFlightPerAddress) {
            curBlocks = createFetchRequests(curBlocks.toSeq, address, isLast = false,
              collectedRemoteRequests, enableBatchFetch = false, forMergedMetas = true)
          }
        case _ =>
          // For batch fetch, the actual block in flight should count for merged block.
          val mayExceedsMaxBlocks = !doBatchFetch && curBlocks.size >= maxBlocksInFlightPerAddress
          if (curRequestSize >= targetRemoteRequestSize || mayExceedsMaxBlocks) {
            curBlocks = createFetchRequests(curBlocks.toSeq, address, isLast = false,
              collectedRemoteRequests, doBatchFetch)
            curRequestSize = curBlocks.map(_.size).sum
          }
      }

从这里可以看出如果是 merged block 则先向 shuffle service 上请求一次 meta 信息。

接下来,我们再来看下发送请求的方法:

代码语言:javascript复制
def send(remoteAddress: BlockManagerId, request: FetchRequest): Unit = {
  if (request.forMergedMetas) {
    pushBasedFetchHelper.sendFetchMergedStatusRequest(request)
  } else {
    sendRequest(request)
  }
numBlocksInFlightPerAddress(remoteAddress) =
numBlocksInFlightPerAddress.getOrElse(remoteAddress, 0)   request.blocks.size
}

iterator.addToResultsQueue(PushMergedRemoteMetaFetchResult(shuffleId, shuffleMergeId,
            reduceId, sizeMap((shuffleId, reduceId)), meta.readChunkBitmaps(), address))

在发送请求时,会区分请求元数据的请求和数据的获取请求。元数据请求返回PushMergedRemoteMetaFetchResult

最后在ShuffleBlockFetcherIterator.next读取数据时,将获取元数据进行模式匹配:

代码语言:javascript复制
 case PushMergedRemoteMetaFetchResult(
          shuffleId, shuffleMergeId, reduceId, blockSize, bitmaps, address) =>
// ...
          val blocksToFetch = pushBasedFetchHelper.createChunkBlockInfosFromMetaResponse(
            shuffleId, shuffleMergeId, reduceId, blockSize, bitmaps)
          val additionalRemoteReqs = new ArrayBuffer[FetchRequest]
          collectFetchRequests(address, blocksToFetch.toSeq, additionalRemoteReqs)
          fetchRequests   = additionalRemoteReqs
          // Set result to null to force another iteration.
          result = null

从这里可以看出, 在获取取到 PushMergedRemoteMetaFetchResult 信息后,利用 shuffle service 上的 index 文件信息,再次发起读取文件中 block 数据的请求。这次的请求blockId是ShuffleBlockChunkId类型, 从上面的代码可以看出这意味着请求类型会将forMergedMetas标志设置为 false。

获取merge shuffle数据,主要有以下步骤:

  1. 从driver上获取 merge statuses 信息;
  2. merged block 则先向 shuffle service 上请求一次 meta 信息;
  3. 获取到 meta 信息后,利用shuffle service 上的 index 文件信息,读取文件中 block 数据。

在最后,我们再汇总下push-based shuffle的详细过程:

  1. 在Driver端,当dagScheduler提交ShuffleMapStage类型的任务时,会向资源管理器后端询问可用于shuffle merge services的Executor列表。在资源管理器返回用于托管 shuffle merge 服务的可用节点后,dagScheduler 将它们记录在shuffleDependency的mergerLocs属性中。
  2. 在 map端,当ShuffleWriter.write 成功后,会调用 ShuffleWriter.initiateBlockPush ,将已经落盘的 ShuffleBlock push 到远端的 shuffle service 上。将数据封装为PushBlockStream,push 的时候使用的是 streamUpload 的方式,通过 OneForOneBlockPusher,利用 RetryingBlockFetcher 进行发送。
  3. 在shuffle service中,使用TransportRequestHandler.processStreamUpload处理上传的shuffle数据块流。一个 block 的数据会被拆成若干个 bytebuffer 进行处理。在shuffle service中,每个reduceid会维护一个AppShufflePartitionInfo ,在其中包含 3 个 FileChannel,分别用于 data/index/meta 信息的保存。在onData进行数据处理时,对于 streamUpload 过来的 ByteBuffer,只会对 AppShufflePartitionInfo 进行加锁,如果当前 ByteBuffer 的数据不属于currentMergingMapId 的,则加入到一个列表中。最后 onComplete 的时候进行合并合并时,会将 shuffle 字节添加到数据文件后,合并器首先将合并后的偏移量写入索引文件,然后才将映射器信息添加到元文件中。
  4. 当每个 ShuffleMapTask 结束的时候,DAGScheduler都会去判断 ShuffleMapStage 是否 pending partitions 为空,如果为空说明 stage 结束了,此时开始向 shuffle service 上发送 finalize 信息,并将信息返回给 driver 并添加到 merge statuses 信息中。同时DAGScheduler 等待spark.shuffle.push.result.timeout来获取响应。
  5. reduce task 开始之后,从 driver 上获取 merge statuses 信息,并在数据划分的时候,如果是 merged block 则先向 shuffle service 上请求一次 meta 信息,获取到 meta 信息后,利用 shuffle service 上的 index 文件信息,读取文件中 block 数据。

再我们了解完Push-based shuffle代码后,我们来回答下以下几个问题:

  1. push-based shuffle是在shuffle write结束后追加了push与合并操作,那么是否只有在发生FetchFail的情况下(导致stage重试)push-base shuffle的性能更好?
  2. push-based shuffle 能否进行精简下?例如取消掉driver端的行为。

0 人点赞