环境/背景
Spark 2.3.0
Scala 2.11
Java 1.8
在进行RDD操作的时候,我们需要在接下来多个行动中重用同一个RDD,这个时候我们就可以将RDD缓存起来,可以很大程度的节省计算和程序运行时间。
接下来可以通过查看Spark的源码对比RDD.cache()与RDD.persist()的差别。
cache 与 persist 对比
首先从JavaRDD类中点进去看JavaRDD.cache()方法与JavaRDD.persist()方法:
JavaRDD.scala
代码语言:javascript复制 /**
* Persist this RDD with the default storage level (`MEMORY_ONLY`).
*/
def cache(): JavaRDD[T] = wrapRDD(rdd.cache())
/**
* Set this RDD's storage level to persist its values across operations after the first time
* it is computed. This can only be used to assign a new storage level if the RDD does not
* have a storage level set yet..
*/
def persist(newLevel: StorageLevel): JavaRDD[T] = wrapRDD(rdd.persist(newLevel))
在JavaRDD中说得挺明白:
- cache()只是缓存到默认的缓存级别:只使用内存
- persist()可以自定义缓存级别
我们再点进去看看RDD.scala的描述:
RDD.scala
代码语言:javascript复制 /**
* Set this RDD's storage level to persist its values across operations after the first time
* it is computed. This can only be used to assign a new storage level if the RDD does not
* have a storage level set yet. Local checkpointing is an exception.
*/
def persist(newLevel: StorageLevel): this.type = {
if (isLocallyCheckpointed) {
// This means the user previously called localCheckpoint(), which should have already
// marked this RDD for persisting. Here we should override the old storage level with
// one that is explicitly requested by the user (after adapting it to use disk).
persist(LocalRDDCheckpointData.transformStorageLevel(newLevel), allowOverride = true)
} else {
persist(newLevel, allowOverride = false)
}
}
/**
* Persist this RDD with the default storage level (`MEMORY_ONLY`).
*/
def persist(): this.type = persist(StorageLevel.MEMORY_ONLY)
/**
* Persist this RDD with the default storage level (`MEMORY_ONLY`).
*/
def cache(): this.type = persist()
如果我们直接调用cache()或者没有入参的persist(),效果是一样的,都是使用默认的storage level 如果需要自定义缓存级别,可以通过传入StorageLevel类里面的设置好的对象使用。
缓存级别
我们再点进去StorageLevel.scala
里面看看是怎么定义的:
StorageLevel.scala
其中StorageLevel的构造函数如下:
代码语言:javascript复制/**
* :: DeveloperApi ::
* Flags for controlling the storage of an RDD. Each StorageLevel records whether to use memory,
* or ExternalBlockStore, whether to drop the RDD to disk if it falls out of memory or
* ExternalBlockStore, whether to keep the data in memory in a serialized format, and whether
* to replicate the RDD partitions on multiple nodes.
*
* The [[org.apache.spark.storage.StorageLevel]] singleton object contains some static constants
* for commonly useful storage levels. To create your own storage level object, use the
* factory method of the singleton object (`StorageLevel(...)`).
*/
@DeveloperApi
class StorageLevel private(
private var _useDisk: Boolean,
private var _useMemory: Boolean,
private var _useOffHeap: Boolean,
private var _deserialized: Boolean,
private var _replication: Int = 1)
extends Externalizable {
可以看到StorageLevel类的主构造器包含了5个参数:
- useDisk:使用硬盘(外存)
- useMemory:使用内存
- useOffHeap:使用堆外内存,这是Java虚拟机里面的概念,堆外内存意味着把内存对象分配在Java虚拟机的堆以外的内存,这些内存直接受操作系统管理(而不是虚拟机)。这样做的结果就是能保持一个较小的堆,以减少垃圾收集对应用的影响。
- deserialized:反序列化,其逆过程序列化(Serialization)是java提供的一种机制,将对象表示成一连串的字节;而反序列化就表示将字节恢复为对象的过程。序列化是对象永久化的一种机制,可以将对象及其属性保存起来,并能在反序列化后直接恢复这个对象
- replication:备份数(在多个节点上备份)
举个栗子:
代码语言:javascript复制val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
就表示使用这种缓存级别的RDD将存储在硬盘以及内存中,使用序列化(在硬盘中),并且在多个节点上备份2份(正常的RDD只有一份)
另外还注意到有一种特殊的缓存级别
代码语言:javascript复制val OFF_HEAP = new StorageLevel(false, false, true, false)
使用了堆外内存,StorageLevel 类的源码中有一段代码可以看出这个的特殊性,它不能和其它几个参数共存。
代码语言:javascript复制if (useOffHeap) {
require(!useDisk, "Off-heap storage level does not support using disk")
require(!useMemory, "Off-heap storage level does not support using heap memory")
require(!deserialized, "Off-heap storage level does not support deserialized storage")
require(replication == 1, "Off-heap storage level does not support multiple replication")
}