Spark 3.2为spark shuffle带来了重大的改变,其中新增了push-based shuffle机制。但其实在push-based shuffle 之前,业界也有人提出了remote shuffle service的实践,不过由于它们是依赖于外部组件实现的所以一直不被社区所接收。
在上一讲我们先来了解push-based shuffle机制的实现原理,这里我们来通过源码分析下其实现的过程。
首先,Push-based shuffle机制是不依赖于外部组件的方案,但使用升级版的ESS进行shuffle data的合并,所以PBS(Push-based shuffle)只支持Yarn方式的实现。
其次,引入PBS新特性的主要原因是为了解决大shuffle的场景存在的问题:
- 第一个挑战是可靠性问题。由于计算节点数据量大和 shuffle 工作负载的规模,可能会导致 shuffle fetch 失败,从而导致昂贵的 stage 重试。
- 第二个挑战是效率问题。由于 reducer 的 shuffle fetch 请求是随机到达的,因此 shuffle 服务也会随机访问 shuffle 文件中的数据。如果单个 shuffle 块大小较小,则 shuffle 服务产生的小随机读取会严重影响磁盘吞吐量,从而延长 shuffle fetch 等待时间。
- 第三个挑战是扩展问题。由于 external shuffle service 是我们基础架构中的共享服务,因此一些对 shuffle services 错误调优的作业也会影响其他作业。当一个作业错误地配置导致产生许多小的 shuffle blocks 将会给 shuffle 服务带来压力时,它不仅会给自身带来性能下降,还会使共享相同 shuffle 服务的所有相邻作业的性能下降。这可能会导致原本正常运行的作业出现不可预测的运行时延迟,尤其是在集群高峰时段。
此外,PBS不仅适用于大shuffle的场景,对于大量小shuffle文件,这种严重影响磁盘IO性能的情况下, 也有很好的性能提升。push-based shuffle并不是来替换sort-based shuffle, 它是通过补充的方式来优化shuffle。
接下来我们将从以下shuffle service 准备、Map端push shuffle数据、shuffle service merge数据、更新MergeStatues和reducer拉取merge shuffle 数据五部分进行分析代码的实现。
shuffle service 准备
push-based shuffle依赖于driver节点的行为,并将其作为中心的协调节点,为其协调资源、记录mergeLocs信息和记录mergeStatues等。
push-based shuffle虽然有很多的性能的提升,但是社区在其使用上还是比较保守,默认pbs是关闭的。如果要开启它还需要满足比较严格的条件,下面我们首先了解下开启PBS需要满足什么。
我们从DAGScheduler类中pushBasedShuffleEnabled可以看出,开启pbs需要满足以下条件限制:
代码语言:javascript复制// 标志开启push-based shuffle, push based shuffle 只能在以下的情况下开启
private val pushBasedShuffleEnabled = Utils.isPushBasedShuffleEnabled(sc.getConf, isDriver = true)
def isPushBasedShuffleEnabled(conf: SparkConf,
isDriver: Boolean,
checkSerializer: Boolean = true): Boolean = {
// [1] spark.shuffle.push.enabled 设置为true
val pushBasedShuffleEnabled = conf.get(PUSH_BASED_SHUFFLE_ENABLED)
if (pushBasedShuffleEnabled) {
val canDoPushBasedShuffle = {
val isTesting = conf.get(IS_TESTING).getOrElse(false)
// [2] spark.shuffle.service.enabled 必须设置为true, shuffle merge 就是在ess上进行合并的
// [3] 目前resource manager资源管理的方式,只支持yarn模式
val isShuffleServiceAndYarn = conf.get(SHUFFLE_SERVICE_ENABLED) &&
conf.get(SparkLauncher.SPARK_MASTER, null) == "yarn"
// [4] 序列化程序支持对象重定位relocation
lazy val serializerIsSupported = {
if (checkSerializer) {
Option(SparkEnv.get)
.map(_.serializer)
.filter(_ != null)
.getOrElse(instantiateSerializerFromConf[Serializer](SERIALIZER, conf, isDriver))
.supportsRelocationOfSerializedObjects
} else {
// if no need to check Serializer, always set serializerIsSupported as true
true
}
}
// [5] spark.io.encryption.enabled 需要关闭
// TODO: [SPARK-36744] needs to support IO encryption for push-based shuffle
val ioEncryptionDisabled = !conf.get(IO_ENCRYPTION_ENABLED)
(isShuffleServiceAndYarn || isTesting) && ioEncryptionDisabled && serializerIsSupported
}
if (!canDoPushBasedShuffle) {
logWarning("Push-based shuffle can only be enabled when the application is submitted "
"to run in YARN mode, with external shuffle service enabled, IO encryption disabled, "
"and relocation of serialized objects supported.")
}
canDoPushBasedShuffle
} else {
false
}
}
从上述代码可以看出,开启push-based shuffle 需要满足以下条件:
- [1] spark.shuffle.push.enabled 设置为true
- [2] spark.shuffle.service.enabled 必须设置为true, shuffle merge 就是在ess上进行合并的
- [3] 目前resource manager资源管理的方式,只支持yarn模式
- [4] 序列化程序支持对象重定位relocation
- [5] spark.io.encryption.enabled 需要关闭
如果以上条件满足并开启了PBS,那么在Driver节点会发生哪些行为呢?这些行为的作用是什么?
我们回到DAGScheduler中,在DAGScheduler 进行submitTasks时会为pbs做以下准备:
代码语言:javascript复制// DAGScheduler
private def submitMissingTasks(stage: Stage, jobId: Int): Unit = {
stage match {
// 在提交shuffleMapTask节点会
case s: ShuffleMapStage =>
outputCommitCoordinator.stageStart(stage = s.id, maxPartitionId = s.numPartitions- 1)
// Only generate merger location for a given shuffle dependency once.
// [1] 如果shuffle merge 开启,同时shuffle merge没有完成,准备shuffleservice为ShuffleMapStage
if (s.shuffleDep.shuffleMergeEnabled) {
if (!s.shuffleDep.shuffleMergeFinalized) {
prepareShuffleServicesForShuffleMapStage(s)
} else {
s.shuffleDep.setShuffleMergeEnabled(false)
logInfo("Push-based shuffle disabled for $stage (${stage.name}) since it"
" is already shuffle merge finalized")
}
}
// [2] 在prepareShuffleServicesForShuffleMapStage,通过schedulerBackend获取ShufflePushMergerLocations
// prepareShuffleServicesForShuffleMapStage
val mergerLocs = sc.schedulerBackend.getShufflePushMergerLocations(
stage.shuffleDep.partitioner.numPartitions, stage.resourceProfileId)
// [3] 向主节点块管理器(BlockManagerMasterEndpoint 类)发送GetShufflePushMergerLocations消息
def getShufflePushMergerLocations(
numMergersNeeded: Int,
hostsToFilter: Set[String]): Seq[BlockManagerId] = {
driverEndpoint.askSync[Seq[BlockManagerId]](
GetShufflePushMergerLocations(numMergersNeeded, hostsToFilter))
}
从上面可以看出,当DAGScheduler 进行submitTasks时,如果stage是ShuffleMapStage,同时shuffle merge没有完成,那么shuffleMerge会向resource manager资源管理器后端询问可用于shuffle merge services的Executor列表。
在返回用于托管 shuffle merge 服务的可用节点后,DAGScheduler 将它们记录在ShuffleDependency的mergerLocs 属性中。下面是其详细的过程:
- [1] 如果shuffle merge 开启,同时shuffle merge没有完成,准备shuffleService为ShuffleMapStage。
- [2] 在prepareShuffleServicesForShuffleMapStage,通过schedulerBackend获取ShufflePushMergerLocations,返回mergerLocs。
- [3] 向主节点块管理器(BlockManagerMasterEndpoint 类)发送GetShufflePushMergerLocations消息
现在我们进入BlockManagerMasterEndpoint类的getShufflePushMergerLocations方法中,进一步看看shuffleMerge是如何获取足够的可用于合并的Executor列表的。
代码语言:javascript复制// BlockManagerMasterEndpoint
// 获取足够的executor进行合并
private def getShufflePushMergerLocations(
numMergersNeeded: Int,
hostsToFilter: Set[String]): Seq[BlockManagerId] = {
// [1] 通过blockManagerIdByExecutor过滤非driver的Executor如果满足numMergersNeeded则直接返回
val blockManagerHosts = blockManagerIdByExecutor
.filterNot(_._2.isDriver).values.map(_.host).toSet
val filteredBlockManagerHosts = blockManagerHosts.filterNot(hostsToFilter.contains(_))
val filteredMergersWithExecutors = filteredBlockManagerHosts.map(
BlockManagerId(BlockManagerId.SHUFFLE_MERGER_IDENTIFIER, _,externalShuffleServicePort))
// Enough mergers are available as part of active executors list
if (filteredMergersWithExecutors.size >= numMergersNeeded) {
filteredMergersWithExecutors.toSeq
} else {
// [2] 否则需要激活过去使用的Executor(最多 500 个)用于进行合并
// Delta mergers added from inactive mergers list to the active mergers list
val filteredMergersWithExecutorsHosts = filteredMergersWithExecutors.map(_.host)
val filteredMergersWithoutExecutors =shuffleMergerLocations.values
.filterNot(x => hostsToFilter.contains(x.host))
.filterNot(x => filteredMergersWithExecutorsHosts.contains(x.host))
val randomFilteredMergersLocations =
if (filteredMergersWithoutExecutors.size >
numMergersNeeded - filteredMergersWithExecutors.size) {
Utils.randomize(filteredMergersWithoutExecutors)
.take(numMergersNeeded - filteredMergersWithExecutors.size)
} else {
filteredMergersWithoutExecutors
}
filteredMergersWithExecutors.toSeq randomFilteredMergersLocations
}
}
[1] 通过blockManagerIdByExecutor过滤非driver的Executor如果满足numMergersNeeded则直接返回
[2] 否则需要激活过去使用的Executor(最多 500 个)用于进行合并。
从中可以看出,如果executor数不满足numMergersNeeded,会从过去使用executor中选择进行激活,直到获取到足够的可用于合并的Executor列表。
这时shuffle merge service已经准备好了,同时其被记录在shuffleDependency的mergerLocs 属性中。
总而言之,这个阶段在Driver的DAGScheduler中主要做了两件事:
- 获取足够的可用于shuffle merge services的Executor列表。
- 将它们记录在ShuffleDependency的mergerLocs属性中。
那么shuffle data 是如何被push到shuffle service中的呢?
Map端push shuffle数据
乍一看,shuffle Writer中的代码并没有变化,没有增加一种新的shuffle Writer。但PBS的实现主要是shuffle data生成后推送出去进行合并。
还记的在介绍getWriter时(参考bypass的文章),在ShuffleWriteProcessor.write 中,在 ShuffleWriter.write 成功后, 曾有段shuffleMerge处理的代码。
下面我们来详细介绍下push-based shuffle 是怎样处理getWriter返回的结果数据的。
代码语言:javascript复制def write(
...
val manager = SparkEnv.get.shuffleManager
writer = manager.getWriter[Any, Any](
dep.shuffleHandle,
mapId,
context,
createMetricsReporter(context))
writer.write(
rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])
val mapStatus = writer.stop(success = true)
if (mapStatus.isDefined) {
// 创建了一个ShuffleBlockPusher实例并调用了它的initialBlockPush方法。
// 在该方法内部,推送器通过获取连续的 shuffle 数据块来创建 shuffle 合并请求
if (dep.shuffleMergeEnabled && dep.getMergerLocs.nonEmpty && !dep.shuffleMergeFinalized) {
manager.shuffleBlockResolver match {
case resolver: IndexShuffleBlockResolver =>
val dataFile = resolver.getDataFile(dep.shuffleId, mapId)
new ShuffleBlockPusher(SparkEnv.get.conf)
.initiateBlockPush(dataFile, writer.getPartitionLengths(), dep, partition.index)
case _ =>
}
}
}
mapStatus.get
} catch {
...
}
}
从上面的代码可见,在执行完map端的writer后,会判断shuffleMergeEnabled是否开启, 要求dependency中MergerLocs不为空,其次就是shuffleMerge还未执行完成。如果满足这些条件,则会创建ShuffleBlockPusher类,并调用其initiateBlockPush方法。
看来具体的push实现位于initiateBlockPush方法中。
代码语言:javascript复制private[shuffle] def initiateBlockPush(
dataFile: File,
partitionLengths: Array[Long],
dep: ShuffleDependency[_, _, _],
mapIndex: Int): Unit = {
val numPartitions = dep.partitioner.numPartitions
val transportConf = SparkTransportConf.fromSparkConf(conf, "shuffle")
// [1] 将map task的shuffle数据转换为PushRequest请求
val requests = prepareBlockPushRequests(numPartitions, mapIndex, dep.shuffleId,
dep.shuffleMergeId, dataFile, partitionLengths, dep.getMergerLocs, transportConf)
// [2] 将PushRequest请求列表变为随机请求,这样不同的mapper同时推送块不会推送相同范围的 shuffle 分区
// Randomize the orders of the PushRequest, so different mappers pushing blocks at the same
// time won't be pushing the same ranges of shuffle partitions.
pushRequests = Utils.randomize(requests)
// [3] 尽力而为的push shuffle数据到ess
submitTask(() => {
tryPushUpToMax()
})
}
从上面的代码可以看出,在initiateBlockPush主要做了以下三步工作:
- [1] 将map task的shuffle数据转换为PushRequest请求
- [2] 将PushRequest请求列表变为随机请求,这样不同的mapper同时推送块不会推送相同范围的 shuffle 分区
- [3] 尽力而为的push shuffle数据到shuffle merge service
顾名思义,prepareBlockPushRequests方法的作用是将map端生成的shuffle data封装转换为PushRequest请求。不过除此以外还会将shuffle中连续的块分到同一个请求中,可以允许更有效的数据读取。如下图所示:
代码语言:javascript复制// prepareBlockPushRequests 方法
for (reduceId <-0 until numPartitions) {
// 分区块进行合并并非直接按分区发送,而是通过以下公式
val mergeId = math.min(math.floor(reduceId *1.0 / numPartitions * numMergers),
numMergers -1 ).asInstanceOf[Int ]
在prepareBlockPushRequests,分区块进行合并按照chunk进行发送,通过上面的公式进行划分合并块的,同时会跳过空的分区块和超过maxBlockSizeToPush,从而避免数据倾斜。这里已经在上一讲讲过了,就不再过多赘述了,具体可以看上一讲push-based shuffle初探。
在封装好PushRequest请求后,最后通过调用tryPushUpToMax方法将数据块推送出去。
tryPushUpToMax方法调用的是我们在shuffle read中使用的pushUpToMax方法。这个方法在shuffle reader文章中也已经介绍过了,这里只简单总结下。这里的工作就是将shuffle data push到对应的shuffle merge service。在发送时将数据封装为PushBlockStream,push 的时候使用的是 streamUpload 的方式,通过 OneForOneBlockPusher ,利用 RetryingBlockFetcher 进行发送。
在Map端push data的阶段,主要做了三件事:
- 将相同分区shuffle data block合并拆分到chunk中,并将其封装为PushRequest;
- 随机打乱PushRequest,避免顺序的构造push chunk,导致热点和严重的争用冲突;
- 通过pushUpToMax方法,将数据封装为PushBlockStream,通过 OneForOneBlockPusher,利用 RetryingBlockFetcher 进行发送。
Shuffle Service merge数据
shuffle service 上使用 TransportRequestHandler.processStreamUpload处理上传的shuffle数据块流。一个 block 的数据会被拆成若干个 bytebuffer 进行处理,最后 onComplete 的时候进行合并。
代码语言:javascript复制if (msgObj instanceof PushBlockStream) {
PushBlockStream message = (PushBlockStream) msgObj;
checkAuth(client, message.appId);
return mergeManager.receiveBlockDataAsStream(message);
....
}
如上代码所示:在合并过程中,会调用ExternalBlockHandler.receiveStream方法中操作请求。它将推送消息传递给RemoteBlockPushResolver的receiveBlockDataAsStream方法。
那么shuffle data具体是如何被合并的,这里涉及到一个重要的数据结构AppShufflePartitionInfo。
在ShuffleService上会保存appId到AppShuffleInfo的map映射,每个AppShuffleInfo内会保存shuffleId到AppShuffleMergePartitionsInfo 的map映射,在appShuffleMergePartitionsInfo内会保存reduceId到AppShufflePartitionInfo 的映射,最终在AppShufflePartitionInfo 内部会保存三个File;
代码语言:javascript复制public static class AppShufflePartitionInfo {
private final String appId;
private final int shuffleId;
private final int shuffleMergeId;
private final int reduceId;
private final File dataFile;
// The merged shuffle data file channel
public final FileChannel dataChannel;
// The index file for a particular merged shuffle contains the chunk offsets.
private final MergeShuffleFile indexFile;
// The meta file for a particular merged shuffle contains all the map indices that belong to
// every chunk. The entry per chunk is a serialized bitmap.
private final MergeShuffleFile metaFile;
...
}
如上代码所示,AppShufflePartitionInfo 中包含 3 个 FileChannel,分别用于 data/index/meta 信息的保存。
当shuffle service接收到 block 块时,在尝试添加到对应的 shuffle 合并文件之前,它首先要检索相应的 Shuffle 分区元数据。保存的元数据可以帮助shuffle service正确处理一些潜在的异常场景。
在onData进行数据处理时,对于 streamUpload 过来的 ByteBuffer,只会对 AppShufflePartitionInfo 进行加锁,如果当前 ByteBuffer 的数据不属于currentMergingMapId 的,则加入到一个列表中。在写当前正在处理的 ByteBuffer 前,会将前面列表中的数据都写入到数据文件中。
最终的合并时在onComplete进行的,下面我们详细看下合并要满足的条件:
代码语言:javascript复制@Override
public void onComplete(String streamId) throws IOException {
synchronized (partitionInfo) {
logger.trace("{} onComplete invoked", partitionInfo);
AppShuffleMergePartitionsInfo info = appShuffleInfo.shuffles.get(partitionInfo.shuffleId);
// [1] 如果shuffle map任务终止(或者说reducers已经启动),表明太迟了,则不会发生合并
if (isTooLate(info, partitionInfo.reduceId)) {
...
}
// [2] stage的状态不确定,(stage在重试中), 也不会进行合并
if (isStale(info, partitionInfo.shuffleMergeId)) {
...
}
// [3] 校验给定 reducer 的一个映射流只能与现有文件合并
// Check if we can commit this block
if (allowedToWrite()) {
// [4] 如果是推测任务执行中发送重复的reducer数据,则直接返回
if (isDuplicateBlock()) {
deferredBufs = null;
return;
}
if (partitionInfo.getCurrentMapIndex() < 0) {
...
}
// [5] 执行buffer合并
long updatedPos = partitionInfo.getDataFilePos() length;
boolean indexUpdated = false;
if (updatedPos - partitionInfo.getLastChunkOffset() >= mergeManager.minChunkSize) {
try {
partitionInfo.updateChunkInfo(updatedPos, mapIndex);
indexUpdated = true;
} catch (IOException ioe) {
incrementIOExceptionsAndAbortIfNecessary();
// If the above doesn't throw a RuntimeException, then we do not propagate the
// IOException to the client. This may increase the chunk size however the increase is
// still limited because of the limit on the number of IOExceptions for a
// particular shuffle partition.
}
}
partitionInfo.setDataFilePos(updatedPos);
partitionInfo.setCurrentMapIndex(-1);
// update merged results
partitionInfo.blockMerged(mapIndex);
if (indexUpdated) {
partitionInfo.resetChunkTracker();
}
} else {
deferredBufs = null;
throw new BlockPushNonFatalFailure(
new BlockPushReturnCode(ReturnCode.BLOCK_APPEND_COLLISION_DETECTED.id(), streamId)
.toByteBuffer(), BlockPushNonFatalFailure.getErrorMsg(
streamId, ReturnCode.BLOCK_APPEND_COLLISION_DETECTED));
}
}
isWriting = false;
}
可见在合并前也需要满足PushBlockStreamCallback定义的条件:
- [1] 如果shuffle map任务终止(或者说reducers已经启动),表明太迟了,则不会发生合并
- [2] stage的状态不确定,(stage在重试中), 也不会进行合并
- [3] 校验给定 reducer 的一个映射流只能与现有文件合并
- [4] 如果是推测任务执行中发送重复的reducer数据,则直接返回
最后再进行合并时,会将 shuffle 字节添加到数据文件后,合并器首先将合并后的偏移量写入索引文件,然后才将映射器信息添加到元文件中。
这里的逻辑有点复杂,为了避免错误,只总结下要点:
- shuffle service 上使用 TransportRequestHandler.processStreamUpload处理上传的shuffle数据块流。一个 block 的数据会被拆成若干个 bytebuffer 进行处理。
- 在onData进行数据处理时,对于 streamUpload 过来的 ByteBuffer,只会对 AppShufflePartitionInfo 进行加锁,如果当前 ByteBuffer 的数据不属于currentMergingMapId 的,则加入到一个列表中。在写当前正在处理的 ByteBuffer 前,会将前面列表中的数据都写入到数据文件中。
- 在onComplete进行合并时,会先判断是否满足合并的条件。合并时,会将 shuffle 字节append到数据data文件后,合并器首先将合并后的偏移量写入索引index文件,然后才将映射器信息添加到元meta文件中。
获取更新MergeStatues
当每个 ShuffleMapTask 结束的时候,DAGScheduler都会去判断 ShuffleMapStage 是否 pending partitions 为空,如果为空说明 stage 结束了,此时开始向 shuffle service 上发送 finalize 信息,并将信息返回给 driver 并添加到 merge statuses 信息中。
代码语言:javascript复制private[scheduler] def handleTaskCompletion(event: CompletionEvent): Unit = {
...
if (runningStages.contains(shuffleStage) && shuffleStage.pendingPartitions.isEmpty) {
if (!shuffleStage.shuffleDep.shuffleMergeFinalized &&
shuffleStage.shuffleDep.getMergerLocs.nonEmpty) {
scheduleShuffleMergeFinalize(shuffleStage)
} else {
processShuffleMapStageCompletion(shuffleStage)
}
}
...
上面是DAGScheduler.handleTaskCompletion中的代码,可以从中看出在TaskCompletion时,当DAGScheduler收到有关执行的最后一个map task的通知时, 它会向所有 shuffle 服务发送FinalizeShuffleMerge消息。服务拦截消息并从MergerShuffleFileManager完成合并过程。任何正在进行的合并都会被中断并取消,以避免合并文件中有部分数据。
同时DAGScheduler 等待spark.shuffle.push.result.timeout来获取响应。如果 shuffle 服务在此延迟内响应,DAGScheduler会拦截包含以下属性的响应:
代码语言:javascript复制public class MergeStatuses extends BlockTransferMessage {
/** Shuffle ID **/
public final int shuffleId;
/**
* shuffleMergeId is used to uniquely identify merging process of shuffle by
* an indeterminate stage attempt.
*/
public final int shuffleMergeId;
/**
* Array of bitmaps tracking the set of mapper partition blocks merged for each
* reducer partition
*/
public final RoaringBitmap[] bitmaps;
/** Array of reducer IDs **/
public final int[] reduceIds;
/**
* Array of merged shuffle partition block size. Each represents the total size of all
* merged shuffle partition blocks for one reducer partition.
* **/
public final long[] sizes;
获取MergeStatues的过程和MapStatus的过程类似,其是通过getPushBasedShuffleMapSizesByExecutorId进行获取的,具体可以参考shuffle reader 的文章。
在更新Merge Status阶段,主要做了下面的工作:
- 向shuffle service上发送 FinalizeShuffleMerge 信息。注意:任何正在进行的合并都会被中断并取消,以避免合并文件中有部分数据。
- 等待spark.shuffle.push.result.timeout来获取响应,拦截获取MergeStatuses。
reducer拉取merge shuffle数据
reduce task 开始之后,从 driver 上获取 merge statuses 信息,并在数据划分的时候,如果是 merged block 则先向 shuffle service 上请求一次 meta 信息,获取到 meta 信息后,利用 shuffle service 上的 index 文件信息,读取文件中 block 数据。
通过get reader获取shuffle数据,这块已经在shuffle reader中讲过了,这里就不大段的贴代码了,只讲解涉及到pbs的地方,具体的可以再复习shuffle reader的文章。
代码语言:javascript复制// 在划分数据源的请求:本地、主机本地和远程块, 同时划分出pbs的blocks
val remoteRequests = partitionBlocksByFetchMode(
blocksByAddress, localBlocks, hostLocalBlocksByExecutor, pushMergedLocalBlocks)
我们具体的收集pbs的远程blocks地址的实现:
代码语言:javascript复制blockId match {
// 获取数据请求
case ShuffleBlockChunkId(_, _, _, _) =>
if (curRequestSize >= targetRemoteRequestSize ||
curBlocks.size >= maxBlocksInFlightPerAddress) {
curBlocks = createFetchRequests(curBlocks.toSeq, address, isLast = false,
collectedRemoteRequests, enableBatchFetch = false)
curRequestSize = curBlocks.map(_.size).sum
}
// 从forMergedMetas可以看出这里会为获取元数据构建单独的请求
case ShuffleMergedBlockId(_, _, _) =>
if (curBlocks.size >= maxBlocksInFlightPerAddress) {
curBlocks = createFetchRequests(curBlocks.toSeq, address, isLast = false,
collectedRemoteRequests, enableBatchFetch = false, forMergedMetas = true)
}
case _ =>
// For batch fetch, the actual block in flight should count for merged block.
val mayExceedsMaxBlocks = !doBatchFetch && curBlocks.size >= maxBlocksInFlightPerAddress
if (curRequestSize >= targetRemoteRequestSize || mayExceedsMaxBlocks) {
curBlocks = createFetchRequests(curBlocks.toSeq, address, isLast = false,
collectedRemoteRequests, doBatchFetch)
curRequestSize = curBlocks.map(_.size).sum
}
}
从这里可以看出如果是 merged block 则先向 shuffle service 上请求一次 meta 信息。
接下来,我们再来看下发送请求的方法:
代码语言:javascript复制def send(remoteAddress: BlockManagerId, request: FetchRequest): Unit = {
if (request.forMergedMetas) {
pushBasedFetchHelper.sendFetchMergedStatusRequest(request)
} else {
sendRequest(request)
}
numBlocksInFlightPerAddress(remoteAddress) =
numBlocksInFlightPerAddress.getOrElse(remoteAddress, 0) request.blocks.size
}
iterator.addToResultsQueue(PushMergedRemoteMetaFetchResult(shuffleId, shuffleMergeId,
reduceId, sizeMap((shuffleId, reduceId)), meta.readChunkBitmaps(), address))
在发送请求时,会区分请求元数据的请求和数据的获取请求。元数据请求返回PushMergedRemoteMetaFetchResult 。
最后在ShuffleBlockFetcherIterator.next读取数据时,将获取元数据进行模式匹配:
代码语言:javascript复制 case PushMergedRemoteMetaFetchResult(
shuffleId, shuffleMergeId, reduceId, blockSize, bitmaps, address) =>
// ...
val blocksToFetch = pushBasedFetchHelper.createChunkBlockInfosFromMetaResponse(
shuffleId, shuffleMergeId, reduceId, blockSize, bitmaps)
val additionalRemoteReqs = new ArrayBuffer[FetchRequest]
collectFetchRequests(address, blocksToFetch.toSeq, additionalRemoteReqs)
fetchRequests = additionalRemoteReqs
// Set result to null to force another iteration.
result = null
从这里可以看出, 在获取取到 PushMergedRemoteMetaFetchResult 信息后,利用 shuffle service 上的 index 文件信息,再次发起读取文件中 block 数据的请求。这次的请求blockId是ShuffleBlockChunkId类型, 从上面的代码可以看出这意味着请求类型会将forMergedMetas标志设置为 false。
获取merge shuffle数据,主要有以下步骤:
- 从driver上获取 merge statuses 信息;
- merged block 则先向 shuffle service 上请求一次 meta 信息;
- 获取到 meta 信息后,利用shuffle service 上的 index 文件信息,读取文件中 block 数据。
在最后,我们再汇总下push-based shuffle的详细过程:
- 在Driver端,当dagScheduler提交ShuffleMapStage类型的任务时,会向资源管理器后端询问可用于shuffle merge services的Executor列表。在资源管理器返回用于托管 shuffle merge 服务的可用节点后,dagScheduler 将它们记录在shuffleDependency的mergerLocs属性中。
- 在 map端,当ShuffleWriter.write 成功后,会调用 ShuffleWriter.initiateBlockPush ,将已经落盘的 ShuffleBlock push 到远端的 shuffle service 上。将数据封装为PushBlockStream,push 的时候使用的是 streamUpload 的方式,通过 OneForOneBlockPusher,利用 RetryingBlockFetcher 进行发送。
- 在shuffle service中,使用TransportRequestHandler.processStreamUpload处理上传的shuffle数据块流。一个 block 的数据会被拆成若干个 bytebuffer 进行处理。在shuffle service中,每个reduceid会维护一个AppShufflePartitionInfo ,在其中包含 3 个 FileChannel,分别用于 data/index/meta 信息的保存。在onData进行数据处理时,对于 streamUpload 过来的 ByteBuffer,只会对 AppShufflePartitionInfo 进行加锁,如果当前 ByteBuffer 的数据不属于currentMergingMapId 的,则加入到一个列表中。最后 onComplete 的时候进行合并合并时,会将 shuffle 字节添加到数据文件后,合并器首先将合并后的偏移量写入索引文件,然后才将映射器信息添加到元文件中。
- 当每个 ShuffleMapTask 结束的时候,DAGScheduler都会去判断 ShuffleMapStage 是否 pending partitions 为空,如果为空说明 stage 结束了,此时开始向 shuffle service 上发送 finalize 信息,并将信息返回给 driver 并添加到 merge statuses 信息中。同时DAGScheduler 等待spark.shuffle.push.result.timeout来获取响应。
- reduce task 开始之后,从 driver 上获取 merge statuses 信息,并在数据划分的时候,如果是 merged block 则先向 shuffle service 上请求一次 meta 信息,获取到 meta 信息后,利用 shuffle service 上的 index 文件信息,读取文件中 block 数据。
再我们了解完Push-based shuffle代码后,我们来回答下以下几个问题:
- push-based shuffle是在shuffle write结束后追加了push与合并操作,那么是否只有在发生FetchFail的情况下(导致stage重试)push-base shuffle的性能更好?
- push-based shuffle 能否进行精简下?例如取消掉driver端的行为。