Spark源码分析-存储实现

2022-03-30 17:26:15 浏览数 (1)

概述

BlockManager是spark的存储子系统,spark涉及的RDD数据,shuffle数据,BroadCast广播变量等都是依托BlockManager来存取的。spark中的数据都以block的形式存在,block可以在BlockManager之间进行复制和同步。BlockManager之间通过spark的网络基础NettyRPC互通block元数据和数据的有无。

BlockManager也是一个主从架构,Driver端是Master,Executor端是Worker端。Executor端的BlockManager把自己注册到Driver,这样driver就有了所有的Executor端的BlockManager的信息,包括地址和内存等资源状态。Executor端的BlockManager也会把自己管理的所有Block信息上报给Driver端的BlockManager,这样Driver就提供了一个查询所有Block的元数据的服务。

而Driver端的BlockManager可以控制Executor端的BlockManager去做一些存储相关的事情,如RemoveBlock、RemoveRdd、RemoveShuffle、DecommissionBlockManager、RemoveBroadcast、GetMatchingBlockIds、ReplicateBlock等等。上面ReplicateBlock请求的处理逻辑是向Peer BlockManager同步Block数据,换句话说,BlockManager之间可以互相通信,用于Block的复制,或上传下载。

启动流程

BlockManager在SparkEnv实例化的时候,作为SparkEnv的属性被初始化。SparkEnv本身在Driver和Executor端都会存在,因此BlockManager自然也存在Driver和Executor端,不过BlockManager在Driver和Executor端的角色不同,Driver端是用来收集、维护和提供查询全局BlockManager和Block信息的Master端,而Executor端的BlockManager关注Executor维护的BlockData的维护和读写支持,当然也会注册BlockManager自身信息BlockManagerInfo到Driver端的BlockManager。Executor端的BlockManager类似于HDFS系统的DataNode角色(支持block的读写和replication,block的状态上报等等)。

BlockManager启动过程中,通过RpcEnv实例化BlockManager的网络服务,以供其他Blockamanger进行访问。

BlockManager的功能

BlockManager类作为整个spark存储系统的访问入口,提供了几乎所有数据存储和访问的功能,其中最重要的有三种数据:

  1. RDD
  2. Shuffle数据
  3. BroadCast

从交互动作来看,Executor端的BlockManager功能包括:

  • 向Driver注册当前的BlockManger
  • 向Driver上报所管理的数据块信息
  • 从本地获取序列化/非序列化数据块的方法
  • 保存数据块到本地
  • 从Driver获取集群中持有某个数据块的节点信息
  • 从其他节点获取数据块的方法
  • 注册任务,获取/释放数据块上的锁
  • 将所持有的数据块复制到其他节点

BlockManager的底层组件

BlockManager对外提供访问入口,实际的工作由底层各个组件完成,这些组件包括:

  1. 网络通信基础-RpcEnv
  2. Rpc实现-blockManagerMaster:Driver端构造BlockManagerMasterEndpoint和BlockManagerMasterHeartbeatEndpoint,Executor端构造这两个EndPoint对应的Client
  3. 数据序列化/反序列化/压缩/加密工具-SerializerManager
  4. 内存管理(内存分区On-Off-Heap/内存申请/释放)工具-UnifiedMemoryManager
  5. Map端Shuffle输出数据的元数据管理工具-mapOutputTracker:主从架构,Driver端MapOutputTrackerMaster负责维护所有Shuffle的所有Map输出数据的位置(BlockManagerId)信息和大小;Executor端MapOutputTrackerWorker负责向MapOutputTrackerMaster查询特定Shuffle Map输出的元数据。
  6. 负责Map和Reduce端Shuffle数据写/读工具-ShuffleManager:读取过程可能要走网络从其他BlockManagerfetch,同时在读写过程中实现加解密,压缩,sort和Combiner等逻辑
  7. BlockManager之间用来复制,拉取Block数据的传输服务-NettyBlockTransferService
  8. 用来从外部service上传下载RDD/SHuffle Block数据的服务-ExternalBlockStoreClient:NettyBlockTransferService和ExternalBlockStoreClient都是BlockStoreClient的子类,因此功能上类似,只是连接的Server不一样而已。
  9. Block数据实际的存储系统-MemoryStore和DiskStore:MemoryStore依赖MemoryManager进行内存空间的管理,DiskStore依赖DiskBlockManager管理Block数据到磁盘文件路径的映射
  10. Executor端BlockManager数据管RpcEndPoint-BlockManagerStorageEndpoint:RemoveBlock/RemoveRdd/RemoveShuffle/DecommissionBlockManager/RemoveBroadcast/GetBlockStatus等等。BlockManagerStorageEndpoint和第二点BlockManagerMasterEndpoint配合,接收BlockManagerMasterEndpoint发送的上述命令请求,完成Block数据的管理工作。

这些底层组件,大部分在SparkEnv构造BlockManager的时候作为构造函数参数给到BlockManager:

代码语言:txt复制
    val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass)

    val memoryManager: MemoryManager = UnifiedMemoryManager(conf, numUsableCores)

    val blockManagerPort = if (isDriver) {
      conf.get(DRIVER_BLOCK_MANAGER_PORT)
    } else {
      conf.get(BLOCK_MANAGER_PORT)
    }

    val externalShuffleClient = if (conf.get(config.SHUFFLE_SERVICE_ENABLED)) {
      val transConf = SparkTransportConf.fromSparkConf(conf, "shuffle", numUsableCores)
      Some(new ExternalBlockStoreClient(transConf, securityManager,
        securityManager.isAuthenticationEnabled(), conf.get(config.SHUFFLE_REGISTRATION_TIMEOUT)))
    } else {
      None
    }

    // Mapping from block manager id to the block manager's information.
    val blockManagerInfo = new concurrent.TrieMap[BlockManagerId, BlockManagerInfo]()
    val blockManagerMaster = new BlockManagerMaster(
      registerOrLookupEndpoint(
        BlockManagerMaster.DRIVER_ENDPOINT_NAME,
        new BlockManagerMasterEndpoint(
          rpcEnv,
          isLocal,
          conf,
          listenerBus,
          if (conf.get(config.SHUFFLE_SERVICE_FETCH_RDD_ENABLED)) {
            externalShuffleClient
          } else {
            None
          }, blockManagerInfo,
          mapOutputTracker.asInstanceOf[MapOutputTrackerMaster])),
      registerOrLookupEndpoint(
        BlockManagerMaster.DRIVER_HEARTBEAT_ENDPOINT_NAME,
        new BlockManagerMasterHeartbeatEndpoint(rpcEnv, isLocal, blockManagerInfo)),
      conf,
      isDriver)

    val blockTransferService =
      new NettyBlockTransferService(conf, securityManager, bindAddress, advertiseAddress,
        blockManagerPort, numUsableCores, blockManagerMaster.driverEndpoint)

    // NB: blockManager is not valid until initialize() is called later.
    val blockManager = new BlockManager(
      executorId,
      rpcEnv,
      blockManagerMaster,
      serializerManager,
      conf,
      memoryManager,
      mapOutputTracker,
      shuffleManager,
      blockTransferService,
      securityManager,
      externalShuffleClient)

下面挑几个重点组件讲下。

RPCEndPoint相关

BlockManager里涉及到Rpc处理逻辑的主要是上面组件清单里的第二和第十点。

  • Driver端EndPoint:BlockManagerMasterEndpoint
  • Executor端EndPoint:BlockManagerStorageEndpoint

BlockManagerMasterEndpoint接收所有Executor端BlockManager和SparkContext的请求,详见代码:

代码语言:txt复制
### org.apache.spark.storage.BlockManagerMasterEndpoint#receiveAndReply

override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
    #Executor端BlockManager初始化或重新上线时调用
    case RegisterBlockManager(id, localDirs, maxOnHeapMemSize, maxOffHeapMemSize, endpoint) =>
      context.reply(register(id, localDirs, maxOnHeapMemSize, maxOffHeapMemSize, endpoint))
    #Executor端BlockManager更新了Block数据时上报给Master
    case _updateBlockInfo @
        UpdateBlockInfo(blockManagerId, blockId, storageLevel, deserializedSize, size) =>
      val isSuccess = updateBlockInfo(blockManagerId, blockId, storageLevel, deserializedSize, size)
      context.reply(isSuccess)
      // SPARK-30594: we should not post `SparkListenerBlockUpdated` when updateBlockInfo
      // returns false since the block info would be updated again later.
      if (isSuccess) {
        listenerBus.post(SparkListenerBlockUpdated(BlockUpdatedInfo(_updateBlockInfo)))
      }

    case GetLocations(blockId) =>
      context.reply(getLocations(blockId))

    case GetLocationsAndStatus(blockId, requesterHost) =>
      context.reply(getLocationsAndStatus(blockId, requesterHost))

    case GetLocationsMultipleBlockIds(blockIds) =>
      context.reply(getLocationsMultipleBlockIds(blockIds))

    case GetPeers(blockManagerId) =>
      context.reply(getPeers(blockManagerId))

    case GetExecutorEndpointRef(executorId) =>
      context.reply(getExecutorEndpointRef(executorId))

    case GetMemoryStatus =>
      context.reply(memoryStatus)

    case GetStorageStatus =>
      context.reply(storageStatus)

    case GetBlockStatus(blockId, askStorageEndpoints) =>
      context.reply(blockStatus(blockId, askStorageEndpoints))

    case GetShufflePushMergerLocations(numMergersNeeded, hostsToFilter) =>
      context.reply(getShufflePushMergerLocations(numMergersNeeded, hostsToFilter))

    case RemoveShufflePushMergerLocation(host) =>
      context.reply(removeShufflePushMergerLocation(host))

    case IsExecutorAlive(executorId) =>
      context.reply(blockManagerIdByExecutor.contains(executorId))

    case GetMatchingBlockIds(filter, askStorageEndpoints) =>
      context.reply(getMatchingBlockIds(filter, askStorageEndpoints))
    #SparkContext.unpersistRDD()接口内部调用过来
    case RemoveRdd(rddId) =>
      context.reply(removeRdd(rddId))

    case RemoveShuffle(shuffleId) =>
      context.reply(removeShuffle(shuffleId))

    case RemoveBroadcast(broadcastId, removeFromDriver) =>
      context.reply(removeBroadcast(broadcastId, removeFromDriver))

    case RemoveBlock(blockId) =>
      removeBlockFromWorkers(blockId)
      context.reply(true)

    case RemoveExecutor(execId) =>
      removeExecutor(execId)
      context.reply(true)

    case DecommissionBlockManagers(executorIds) =>
      // Mark corresponding BlockManagers as being decommissioning by adding them to
      // decommissioningBlockManagerSet, so they won't be used to replicate or migrate blocks.
      // Note that BlockManagerStorageEndpoint will be notified about decommissioning when the
      // executor is notified(see BlockManager.decommissionSelf), so we don't need to send the
      // notification here.
      val bms = executorIds.flatMap(blockManagerIdByExecutor.get)
      logInfo(s"Mark BlockManagers (${bms.mkString(", ")}) as being decommissioning.")
      decommissioningBlockManagerSet   = bms
      context.reply(true)

    case GetReplicateInfoForRDDBlocks(blockManagerId) =>
      context.reply(getReplicateInfoForRDDBlocks(blockManagerId))

    case StopBlockManagerMaster =>
      context.reply(true)
      stop()
  }

继续跟进代码可以发现,很多Block相关业务处理如RemoveRDD最终都分别调用了各个Executor端的BlockManagerStorageEndpoint对应服务。以RemoveRDD的处理为例

代码语言:txt复制
private def removeRdd(rddId: Int): Future[Seq[Int]] = {
    // First remove the metadata for the given RDD, and then asynchronously remove the blocks
    // from the storage endpoints.

    // The message sent to the storage endpoints to remove the RDD
    val removeMsg = RemoveRdd(rddId)

    // Find all blocks for the given RDD, remove the block from both blockLocations and
    // the blockManagerInfo that is tracking the blocks and create the futures which asynchronously
    // remove the blocks from storage endpoints and gives back the number of removed blocks
    val blocks = blockLocations.asScala.keys.flatMap(_.asRDDId).filter(_.rddId == rddId)
    val blocksToDeleteByShuffleService =
      new mutable.HashMap[BlockManagerId, mutable.HashSet[RDDBlockId]]

    blocks.foreach { blockId =>
      val bms: mutable.HashSet[BlockManagerId] = blockLocations.remove(blockId)

      #把RDD的Block location分成两组:内置的Blockmanager或外挂的ShuffleService
      val (bmIdsExtShuffle, bmIdsExecutor) = bms.partition(_.port == externalShuffleServicePort)
      val liveExecutorsForBlock = bmIdsExecutor.map(_.executorId).toSet
      bmIdsExtShuffle.foreach { bmIdForShuffleService =>
        // if the original executor is already released then delete this disk block via
        // the external shuffle service
        if (!liveExecutorsForBlock.contains(bmIdForShuffleService.executorId)) {
          val blockIdsToDel = blocksToDeleteByShuffleService.getOrElseUpdate(bmIdForShuffleService,
            new mutable.HashSet[RDDBlockId]())
          blockIdsToDel  = blockId
          blockStatusByShuffleService.get(bmIdForShuffleService).foreach { blockStatus =>
            blockStatus.remove(blockId)
          }
        }
      }
      #删除内存维护的数据结构
      bmIdsExecutor.foreach { bmId =>
        blockManagerInfo.get(bmId).foreach { bmInfo =>
          bmInfo.removeBlock(blockId)
        }
      }
    }
    #调用Executor端的endpoint删除数据
        val removeRddFromExecutorsFutures = blockManagerInfo.values.map { bmInfo =>
      bmInfo.storageEndpoint.ask[Int](removeMsg).recover {
        // use 0 as default value means no blocks were removed
        handleBlockRemovalFailure("RDD", rddId.toString, bmInfo.blockManagerId, 0)
      }
    }.toSeq
    #调用外部SHuffleService删除Block数据
      val removeRddBlockViaExtShuffleServiceFutures = externalBlockStoreClient.map { shuffleClient =>
      blocksToDeleteByShuffleService.map { case (bmId, blockIds) =>
        Future[Int] {
          val numRemovedBlocks = shuffleClient.removeBlocks(
            bmId.host,
            bmId.port,
            bmId.executorId,
            blockIds.map(_.toString).toArray)
          numRemovedBlocks.get(defaultRpcTimeout.duration.toSeconds, TimeUnit.SECONDS)
        }
      }
    }.getOrElse(Seq.empty)
}

DiskStore和DiskBlockManager

DiskStore:面向Block提供读取和写入的接口。

写:根据blockId参数,用DiskBlockManager确定Block对应的磁盘文件,然后用自定义的写入参数writeFunc来写数据。写完数据,更新blockSizes等内存数据结构

读:采用懒加载的方式完成,先根据BlockId,确定文件路径和blockSize,然后利用file和blocksize构造一个DiskBlockData或EncryptedBlockData对象,并没有做具体的数据读操作,等到后续使用的时候会通过toChunkedByteBuffer等方法完成。

代码语言:txt复制
# org.apache.spark.storage.DiskStore

  def getBytes(blockId: BlockId): BlockData = {
    getBytes(diskManager.getFile(blockId.name), getSize(blockId))
  }

  def getBytes(f: File, blockSize: Long): BlockData = securityManager.getIOEncryptionKey() match {
    case Some(key) =>
      // Encrypted blocks cannot be memory mapped; return a special object that does decryption
      // and provides InputStream / FileRegion implementations for reading the data.
      new EncryptedBlockData(f, blockSize, conf, key)

    case _ =>
      new DiskBlockData(minMemoryMapBytes, maxMemoryMapBytes, f, blockSize)
  }

DiskBlockManager不操作block数据,只维护某个Executor的localDirs,以及负责BlockId到文件路径的映射关系。

MemoryStore和MemoryManager

MemoryStore面向Block提供读取和写入接口,功能层面上同DiskStore。

MemoryManager负责堆内存进行管理,包括内存区域划分,不同区域的内存申请和释放。MemoryManager借助MemoryPool完成对单个内存区域的管理。其中内存区域分为:

  • onHeapStorageMemoryPool:(spark.testing.memory - 300M) * spark.memory.storageFraction
  • onHeapExecutionMemoryPool: (spark.testing.memory - 300M) * (1 - spark.memory.storageFraction )
  • offHeapStorageMemoryPool: spark.memory.offHeap.size * spark.memory.storageFraction
  • offHeapExecutionMemoryPool: spark.memory.offHeap.size * ( 1 - spark.memory.storageFraction)

MemoryManager就是利用上述四个区域的MemoryPool,实现在各个区域里进行内存空间申请和释放的逻辑。

onHeapStorageMemoryPool和onHeapExecutionMemoryPool之间在内存申请不够时,可以相互borrow空间。

offHeapStorageMemoryPool和offHeapExecutionMemoryPool同理。

Storage空间可以无条件向Execution空间借用free的memory,但是Executio空间只能向storage空间借用StorageRegionSize以外的空间。

Storage在借完空间后,如果还不够,则会启用block淘汰逻辑,淘汰掉非此次申请空间的block所在的RDD的其他block。淘汰只是说从内存丢掉,可能是直接在BlockManager里抹掉被淘汰的block,也可能只是将该block放到diskStore去,取决于该Block的StorageLevel设置。

下面以OnHeapStorageMemory空间的内存申请为例介绍其实现

思路:

  • 根据MemoryMode确定相关Storage和Execution区域是OnHeap还是OffHeap。
代码语言:txt复制
#org.apache.spark.memory.UnifiedMemoryManager#acquireStorageMemory

override def acquireStorageMemory(
      blockId: BlockId,
      numBytes: Long,
      memoryMode: MemoryMode): Boolean = synchronized {
    assertInvariants()
    assert(numBytes >= 0)
    #确定内存相关参与区域
    val (executionPool, storagePool, maxMemory) = memoryMode match {
      case MemoryMode.ON_HEAP => (
        onHeapExecutionMemoryPool,
        onHeapStorageMemoryPool,
        maxOnHeapStorageMemory)
      case MemoryMode.OFF_HEAP => (
        offHeapExecutionMemoryPool,
        offHeapStorageMemoryPool,
        maxOffHeapStorageMemory)
    }
    #maxMemory指除了Execution已用空间外的所有OnHeap/OffHeap空间。
    #换句话说,除了Execution已用的空间不能动,其他所有空间都是可以考虑淘汰替换的。
    if (numBytes > maxMemory) {
      // Fail fast if the block simply won't fit
      logInfo(s"Will not store $blockId as the required space ($numBytes bytes) exceeds our "  
        s"memory limit ($maxMemory bytes)")
      return false
    }
    #向execution区域borrow
    if (numBytes > storagePool.memoryFree) {
      // There is not enough free memory in the storage pool, so try to borrow free memory from
      // the execution pool.
      val memoryBorrowedFromExecution = Math.min(executionPool.memoryFree,
        numBytes - storagePool.memoryFree)
      executionPool.decrementPoolSize(memoryBorrowedFromExecution)
      storagePool.incrementPoolSize(memoryBorrowedFromExecution)
    }
    #在storage区域内部进行分配,里面可能涉及block 淘汰
    storagePool.acquireMemory(blockId, numBytes)
  }
  
   def acquireMemory(
      blockId: BlockId,
      numBytesToAcquire: Long,
      numBytesToFree: Long): Boolean = lock.synchronized {
    assert(numBytesToAcquire >= 0)
    assert(numBytesToFree >= 0)
    assert(memoryUsed <= poolSize)
    if (numBytesToFree > 0) {
    #选块淘汰
      memoryStore.evictBlocksToFreeSpace(Some(blockId), numBytesToFree, memoryMode)
    }
    // NOTE: If the memory store evicts blocks, then those evictions will synchronously call
    // back into this StorageMemoryPool in order to free memory. Therefore, these variables
    // should have been updated.
    val enoughMemory = numBytesToAcquire <= memoryFree
    if (enoughMemory) {
      _memoryUsed  = numBytesToAcquire
    }
    enoughMemory
  }
  • 如果申请量超过storage的free值,则尝试从Execution区域borrow其free空间内的内存
  • 然后开始在Storage区域内部进行申请,如果经过borrow操作还是不够,则开启block淘汰过程,直到得到足够空间

下面看MemoryStore。

数据结构:

  • entries: new LinkedHashMapBlockId, MemoryEntry_,真正的block数据存放的地方,一个Block对应一个MemoryEntry。
  • onHeapUnrollMemoryMap/offHeapUnrollMemoryMap:维护TaskAttemptId到unroll内存大小的映射,unroll是指在对RDD record进行迭代写入时动态分配的临时存储空间,这个临时存储空间同样来自storage区域,而且迭代完毕后会晋升为普通的Block空间。

MemoryStore的几个重要方法,也是对外提供的API:

代码语言:txt复制
def getBytes(blockId: BlockId): Option[ChunkedByteBuffer]

def getValues(blockId: BlockId): Option[Iterator[_]]

private def putIterator[T](
      blockId: BlockId,
      values: Iterator[T],
      classTag: ClassTag[T],
      memoryMode: MemoryMode,
      valuesHolder: ValuesHolder[T]): Either[Long, Long]

#逻辑简单,直接利用参数_bytes构造一个MemoryEntry,然后放到entries数据结构      
def putBytes[T: ClassTag](
      blockId: BlockId,
      size: Long,
      memoryMode: MemoryMode,
      _bytes: () => ChunkedByteBuffer): Boolean

下面简单分析putIterator,思路:

  • 申请初始大小的storage空间,并记录在unrollMap里。这里会调用上面分析过的memoryManager.acquireUnrollMemory()方法
  • 然后开始对参数values迭代器迭代,把每个value放入valuesHolder(一个临时容器),而且每放入spark.storage.unrollMemoryCheckPeriod个value,就要抽样估算当前valuesHolder的大小是否超过当前申请的内存大小,若超过则继续按比例申请一块内存。
  • 所有value都丢到valuesHolder后,利用valuesHolder构造一个enryBuilder,并精确计算所有value的大小;如果当前申请的内存不够的话,还要进行最后一次申请。
  • 申请到足够空间,用EntryBuilder build一个entry,放到entries数据结构。

具体代码:

代码语言:txt复制
# org.apache.spark.storage.memory.MemoryStore#putIterator

private def putIterator[T](
      blockId: BlockId,
      values: Iterator[T],
      classTag: ClassTag[T],
      memoryMode: MemoryMode,
      valuesHolder: ValuesHolder[T]): Either[Long, Long] = {
    require(!contains(blockId), s"Block $blockId is already present in the MemoryStore")

    // Number of elements unrolled so far
    var elementsUnrolled = 0
    // Whether there is still enough memory for us to continue unrolling this block
    var keepUnrolling = true
    // Initial per-task memory to request for unrolling blocks (bytes).
    val initialMemoryThreshold = unrollMemoryThreshold
    // How often to check whether we need to request more memory
    val memoryCheckPeriod = conf.get(UNROLL_MEMORY_CHECK_PERIOD)
    // Memory currently reserved by this task for this particular unrolling operation
    var memoryThreshold = initialMemoryThreshold
    // Memory to request as a multiple of current vector size
    val memoryGrowthFactor = conf.get(UNROLL_MEMORY_GROWTH_FACTOR)
    // Keep track of unroll memory used by this particular block / putIterator() operation
    var unrollMemoryUsedByThisBlock = 0L

    // Request enough memory to begin unrolling
    keepUnrolling =
      reserveUnrollMemoryForThisTask(blockId, initialMemoryThreshold, memoryMode)

    if (!keepUnrolling) {
      logWarning(s"Failed to reserve initial memory threshold of "  
        s"${Utils.bytesToString(initialMemoryThreshold)} for computing block $blockId in memory.")
    } else {
      unrollMemoryUsedByThisBlock  = initialMemoryThreshold
    }

    // Unroll this block safely, checking whether we have exceeded our threshold periodically
    while (values.hasNext && keepUnrolling) {
      valuesHolder.storeValue(values.next())
      if (elementsUnrolled % memoryCheckPeriod == 0) {
        val currentSize = valuesHolder.estimatedSize()
        // If our vector's size has exceeded the threshold, request more memory
        if (currentSize >= memoryThreshold) {
          val amountToRequest = (currentSize * memoryGrowthFactor - memoryThreshold).toLong
          keepUnrolling =
            reserveUnrollMemoryForThisTask(blockId, amountToRequest, memoryMode)
          if (keepUnrolling) {
            unrollMemoryUsedByThisBlock  = amountToRequest
          }
          // New threshold is currentSize * memoryGrowthFactor
          memoryThreshold  = amountToRequest
        }
      }
      elementsUnrolled  = 1
    }

    // Make sure that we have enough memory to store the block. By this point, it is possible that
    // the block's actual memory usage has exceeded the unroll memory by a small amount, so we
    // perform one final call to attempt to allocate additional memory if necessary.
    if (keepUnrolling) {
      val entryBuilder = valuesHolder.getBuilder()
      val size = entryBuilder.preciseSize
      if (size > unrollMemoryUsedByThisBlock) {
        val amountToRequest = size - unrollMemoryUsedByThisBlock
        keepUnrolling = reserveUnrollMemoryForThisTask(blockId, amountToRequest, memoryMode)
        if (keepUnrolling) {
          unrollMemoryUsedByThisBlock  = amountToRequest
        }
      }

      if (keepUnrolling) {
        val entry = entryBuilder.build()
        // Synchronize so that transfer is atomic
        memoryManager.synchronized {
          releaseUnrollMemoryForThisTask(memoryMode, unrollMemoryUsedByThisBlock)
          val success = memoryManager.acquireStorageMemory(blockId, entry.size, memoryMode)
          assert(success, "transferring unroll memory to storage memory failed")
        }

        entries.synchronized {
          entries.put(blockId, entry)
        }

        logInfo("Block %s stored as values in memory (estimated size %s, free %s)".format(blockId,
          Utils.bytesToString(entry.size), Utils.bytesToString(maxMemory - blocksMemoryUsed)))
        Right(entry.size)
      } else {
        // We ran out of space while unrolling the values for this block
        logUnrollFailureMessage(blockId, entryBuilder.preciseSize)
        Left(unrollMemoryUsedByThisBlock)
      }
    } else {
      // We ran out of space while unrolling the values for this block
      logUnrollFailureMessage(blockId, valuesHolder.estimatedSize())
      Left(unrollMemoryUsedByThisBlock)
    }
  }

ExternalBlockStoreClient和NettyBlockTransferService

二者都实现了BlockStoreClient,都支持向远端push或upload block数据,从远端fetchBlock。

ExternalBlockStoreClient,主要用于Shuffle数据传输。class注释:

代码语言:txt复制
  Client for reading both RDD blocks and shuffle blocks which points to an external
  (outside of executor) server. This is instead of reading blocks directly from other executors
  (via BlockTransferService), which has the downside of losing the data if we lose the executors.

MapOutputTracker

MapOutputTracker是主从架构,分为MapOutputTrackerMaster和MapOutputTrackerWorker。

MapOutputTrackerMaster运行在Driver端,DAGSchduler向它注册shuffle的Map output status在数据。ShuffleMapStage用它跟踪Map输出状态,进而决定哪些作业要重跑。

代码语言:txt复制
/**
 * Driver-side class that keeps track of the location of the map output of a stage.
 *
 * The DAGScheduler uses this class to (de)register map output statuses and to look up statistics
 * for performing locality-aware reduce task scheduling.
 *
 * ShuffleMapStage uses this class for tracking available / missing outputs in order to determine
 * which tasks need to be run.
 */

除了本地方法调用,master还对worker提供了查询功能,其处理逻辑在独立的messgeloop线程里完成:

代码语言:txt复制
private class MessageLoop extends Runnable {
    override def run(): Unit = {
      try {
        while (true) {
          try {
            val data = mapOutputRequests.take()
             if (data == PoisonPill) {
              // Put PoisonPill back so that other MessageLoops can see it.
              mapOutputRequests.offer(PoisonPill)
              return
            }
            val context = data.context
            val shuffleId = data.shuffleId
            val hostPort = context.senderAddress.hostPort
            logDebug("Handling request to send map output locations for shuffle "   shuffleId  
              " to "   hostPort)
            val shuffleStatus = shuffleStatuses.get(shuffleId).head
            context.reply(
              shuffleStatus.serializedMapStatus(broadcastManager, isLocal, minSizeForBroadcast,
                conf))
          } catch {
            case NonFatal(e) => logError(e.getMessage, e)
          }
        }
      } catch {
        case ie: InterruptedException => // exit
      }
    }
  }

MapOutputTrackerWorker运行在Executor端,是Master的client,单一作用就是用来查询Master端维护的Shuffle Map输出数据的状态。

代码语言:txt复制
/**
 * Executor-side client for fetching map output info from the driver's MapOutputTrackerMaster.
 * Note that this is not used in local-mode; instead, local-mode Executors access the
 * MapOutputTrackerMaster directly (which is possible because the master and worker share a common
 * superclass).
 */

MapOutputTrackerWorker的主要方法:

代码语言:txt复制
#org.apache.spark.MapOutputTrackerWorker#getMapSizesByExecutorId
#向master查询给定shuffle的某几个map给指定范围的reduce的block信息
override def getMapSizesByExecutorId(
      shuffleId: Int,
      startMapIndex: Int,
      endMapIndex: Int,
      startPartition: Int,
      endPartition: Int): Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])] = {
    logDebug(s"Fetching outputs for shuffle $shuffleId")
    val statuses = getStatuses(shuffleId, conf)
    try {
      val actualEndMapIndex = if (endMapIndex == Int.MaxValue) statuses.length else endMapIndex
      logDebug(s"Convert map statuses for shuffle $shuffleId, "  
        s"mappers $startMapIndex-$actualEndMapIndex, partitions $startPartition-$endPartition")
      MapOutputTracker.convertMapStatuses(
        shuffleId, startPartition, endPartition, statuses, startMapIndex, actualEndMapIndex)
    } catch {
      case e: MetadataFetchFailedException =>
        // We experienced a fetch failure so our mapStatuses cache is outdated; clear it:
        mapStatuses.clear()
        throw e
    }
  }

0 人点赞