【Spark重点难点】你的数据存在哪了?

2021-12-07 14:42:21 浏览数 (1)

  • 《我们在学习Spark的时候,到底在学习什么?》
  • 《我在B站读大学,大数据专业》

前言

在之前的课中我们讲了Spark的RDD以及整个Spark系统中的一些关键角色:《【Spark重点难点】你从未深入理解的RDD和关键角色》

以及Spark中非常重要的一个概念Shuffle:《【Spark重点难点】你以为的Shuffle和真正的Shuffle》

无论是在提交任务还是执行任务的过程中,Spark存储体系永远是绕不过去的坎。

Spark为了避免类似Hadoop读写磁盘的IO操作成为性能瓶颈,优先将配置信息、计算结果等数据存入内存,当内存存储不下的时候,可选择性的将计算结果输出到磁盘,为了保证性能,默认都是存储到内存的,这样极大的提高了Spark的计算效率。

我们先用一张图来概括一下Spark的存储体系:

整体体系中重要的角色包括:

  • BlockManager是整体存储体系中核心模块
  • DiskBlockManager磁盘管理器
  • MemoryStore内存存储
  • DiskStore磁盘存储

接下来我们依次看看这些角色都是用来做什么的。

BlockManager

BlockManager运行在每个节点上(包括Driver和Executor)。

他提供对本地或远端节点上的内存、磁盘及堆外内存中Block的管理。存储体系从狭义上来说指的就是BlockManager,从广义上来说,则包括整个Spark集群中的各个BlockManagerBlockInfoManagerDiskBlockManagerDiskStoreMemoryManagerMemoryStore、对集群中的所有BlockManager进行管理的BlockManagerMaster及各个节点上对外提供Block上传与下载服务的BlockTransferService

BlockManager的结构是Maser-Slave架构,Master就是Driver上的BlockManagerMaster,Slave就是每个Executor上的BlockManagerBlockManagerMaster负责接受Executor上的BlockManager的注册以及管理BlockManager的元数据信息。

工作原理

在DAGShceduler中有一个BlockManagerMaster对象,该对象的工作就是负责管理全局所有BlockManager的元数据,当集群中有BlockManager注册完成的时候,其会向BlockManagerMaster发送自己元数据信息;BlockManagerMaster会为BlockManager创建一个属于这个BlockManagerBlockManagerInfo,用于存放BlockManager的信息。

在创建SparkContext的时候,会调用SparkEnv.blockManager.initialize方法实例化BlockManager对象,在创建Executor对象的时候也会创建BlockManager

当我们的Spark程序启动的时候,首先会创建SparkContext对象,在创建SparkContext对象的时候就会调用_env.blockManager.initialize(_applicationId)创建BlockManager对象,这个BlockManager就是Driver上的BlockManager,它负责管理集群中Executor上的BlockManager

创建BlockManager的关键方法如下,完整的源代码你可以在BlockManager这个类中看到。

代码语言:javascript复制
def initialize(appId: String): Unit = {
    //初始化BlockTransferService,其实是它的子类NettyBlockTransferService是下了init方法,
    //该方法的作用就是初始化传输服务,通过传输服务可以从不同的节点上拉取Block数据
    blockTransferService.init(this)
    shuffleClient.init(appId)

    //设置block的复制分片策略,由spark.storage.replication.policy指定
    blockReplicationPolicy = {
      val priorityClass = conf.get(
        "spark.storage.replication.policy", classOf[RandomBlockReplicationPolicy].getName)
      val clazz = Utils.classForName(priorityClass)
      val ret = clazz.newInstance.asInstanceOf[BlockReplicationPolicy]
      logInfo(s"Using $priorityClass for block replication policy")
      ret
    }

    //根据给定参数为对对应的Executor封装一个BlockManagerId对象(块存储的唯一标识)
    //executorID:executor的Id,blockTransferService.hostName:传输Block数据的服务的主机名
    //blockTransferService.port:传输Block数据的服务的主机名
    val id = BlockManagerId(executorId, blockTransferService.hostName, blockTransferService.port, None)

    //调用BlockManagerMaster的registerBlockManager方法向Driver上的BlockManagerMaster注册
    val idFromMaster = master.registerBlockManager(
      id,
      maxMemory,
      slaveEndpoint)
    //更新BlockManagerId
    blockManagerId = if (idFromMaster != null) idFromMaster else id

    //判断是否开了外部shuffle服务
    shuffleServerId = if (externalShuffleServiceEnabled) {
      logInfo(s"external shuffle service port = $externalShuffleServicePort")
      BlockManagerId(executorId, blockTransferService.hostName, externalShuffleServicePort)
    } else {
      blockManagerId
    }

    // 如果开启了外部shuffle服务,并且该节点是Driver的话就调用registerWithExternalShuffleServer方法
    //将BlockManager注册在本地
    if (externalShuffleServiceEnabled && !blockManagerId.isDriver) {
      registerWithExternalShuffleServer()
    }

    logInfo(s"Initialized BlockManager: $blockManagerId")
  }

那么BlockManager又是如何存储数据的呢?Spark存储系统提供了两种存储抽象:MemoryStoreDiskStoreBlockManager正是利用它们来分别管理数据在内存和磁盘中的存取。

MemoryStore

MemoryStore负责将Block存储到内存。Spark通过将广播数据、RDD、Shuffle数据存储到内存,减少了对磁盘I/O的依赖,提高了程序的读写效率。

MemoryStore类实现了一个简单的基于块数据的内存数据库,用来管理需要写入到内存中的块数据。可以按序列化或非序列化的形式存放块数据,存放这两种块数据的数据结构是不同的,但都必须实现MemoryEntry这个接口。也就是说:MemoryStore管理的是以MemoryEntry为父接口的内存对象。

MemoryEntryMemoryStore中的管理的成员结构。它是一个接口,有两种实现:一种是DeserializedMemoryEntry用来保存非序列化块数据;一种是SerializedMemoryEntry用来保存序列化块数据

MemoryStore如何管理这些MemoryEntry对象呢?在当前版本,MemoryStore通过一个LinkedHashMap结构来管理内存对象。也就是说,MemoryStore是一个MemoryEntry类型的LinkedHashMap。Spark选择LinkedHashMap作为内存管理的数据结构与内存块的淘汰机制有很大的关系。

MemoryStore的数据结构

MemoryStore通过以MemoryEntry对象为元素的LinkedHashMap来管理块数据。LinkedHashMap是一个有序的HashMap,这样可以按插入顺序来对元素进行管理,此时各个节点构成了一个双向链表。

MemoryStore使用LinkedHashMap按访问元素的先后顺序把访问过的元素放到双向链表的末尾。这其实就形成了一个LRU队列(Least Recently Used队列)。这正是官方文档中提到的:缓存数据是不可靠的,当内存不够时,会按LRU算法来淘汰内存块。

需要注意的是,LinkedHashMap是非并发结构,所以在进行其元素的读写操作时,必须加锁。

MemoryEntry的数据结构

MemoryEntry的成员变量有三个:块数据的大小,内存模式(堆内还是堆外),块数据的类标识。MemoryEntry的代码实现如下:

代码语言:javascript复制
// 代码位置:org.apache.spark.storage.memory

private sealed trait MemoryEntry[T] {
  // 块数据大小
  def size: Long
  // 内存模式:ON_HEAP(堆内),OFF_HEAP(堆外)
  def memoryMode: MemoryMode
  // 数据的类标识
  def classTag: ClassTag[T]
}

每个MemoryEntry对象的大小由size来确定。并且可以被保存在ON_HEAP(堆内)或者OFF_HEAP(堆外)

淘汰内存数据

当执行任务或缓存数据空闲内存不足时,可能会释放一部分存储内存,如果对应的RDD的存储级别设置了useDisk,则会把内存中的数据持久化到磁盘上。可以参考:MemoryStore#evictBlocksToFreeSpace:

代码语言:javascript复制
private[spark] def evictBlocksToFreeSpace(
                    blockId: Option[BlockId],
                    space: Long,
                    memoryMode: MemoryMode
): Long = {...}

其中的blockId是数据块的id,每个id都对应一个内存块。需要淘汰内存块时,只需要从LinkedHashMap的头部选择一个进行删除即可。这就是上面我们提到的LRU内存数据淘汰机制。

DiskStore

DiskStore是BlockStore的另一个实现类,负责管理磁盘数据。简单的说,DiskStore就是通过DiskBlockManager来实现Block和相应磁盘文件的映射关系,从而将Block存储到磁盘的文件中。

下面是整体DiskStore的类实现:

代码语言:javascript复制
private[spark] class DiskStore(
    conf: SparkConf,
    diskManager: DiskBlockManager,
    securityManager: SecurityManager) extends Logging { // SecurityManager用于提供对数据加密的支持

  // 读取磁盘中的Block时,是直接读取还是使用FileChannel的内存镜像映射方法读取的阈值。由spark.storage.memoryMapThreshold配置,默认为2M
  private val minMemoryMapBytes = conf.get(config.STORAGE_MEMORY_MAP_THRESHOLD)
  // 使用内存映射读取文件的最大阈值,由配置项spark.storage.memoryMapLimitForTests指定。它是个测试参数,默认值为不限制。
  private val maxMemoryMapBytes = conf.get(config.MEMORY_MAP_LIMIT_FOR_TESTS)
  // 维护块ID与其对应大小之间的映射关系的ConcurrentHashMap。
  private val blockSizes = new ConcurrentHashMap[BlockId, Long]()
  ...
}

我们可以看到DiskStore的属性有以下几项:

  • conf:即SparkConf
  • diskManager:即磁盘Block管理器DiskBlockManager
  • minMemoryMapBytes:读到磁盘中的Block时,是直接读取还是使用FileChannel的内存镜像映射方法读取的阈值

此外,DiskStore提供了下面的方法进行操作:

  • getSize:获取给定的BlockId所对应Block的大小。
  • contains:判断本地磁盘存储路径下是否包含给定BlockId所对应的Block文件。
  • remove:删除给定BlockId所对应的Block文件。
  • putBytes:用于将BlockId所对应的Block写入磁盘,Block的内容已经封装为ChunkedByteBuffer。
  • getBytes:读取给定BlockId所对应的Block,并封装为ChunkedByteBuffer返回。

借用吴磊老师的一句话:DiskStore中数据的存取本质上就是字节序列与磁盘文件之间的转换,它通过putBytes方法把字节序列存入磁盘文件,再通过getBytes方法将文件内容转换为数据块。

关于BlockStore的实现还有一种叫做TachyonStore,是基于Tachyon内存分布式文件系统级别的持久化,我们在这里就不做介绍了。感兴趣的读者可以网上搜索一些资料来看。

0 人点赞