文章目录
- 前言
- 一、PySpark RDD 持久化
- ①` cache()`
- ②` persist() `
- ③ `unpersist() `
- 二、持久性存储级别
- `MEMORY_ONLY `
- `MEMORY_AND_DISK`
- `DISK_ONLY`
- `MEMORY_ONLY_2`
- `MEMORY_AND_DISK_2`
- `DISK_ONLY_2`
- 三、共享变量
- 1.广播变量(只读共享变量)
- i 广播变量 ( broadcast variable)
- ii 创建广播变量
- 2.累加器变量(可更新的共享变量)
前言
本篇主要讲述了如何在执行pyspark任务时候缓存或者共享变量,以达到节约资源、计算量、时间等目的
一、PySpark RDD 持久化
参考文献:https://sparkbyexamples.com/pyspark-rdd#rdd-persistence
我们在上一篇博客提到,RDD 的转化操作是惰性的,要等到后面执行行动操作的时候,才会真正执行计算;
那么如果我们的流程图中有多个分支,比如某一个转换操作 X 的中间结果,被后续的多个并列的流程图(a,b,c)运用,那么就会出现这么一个情况:
在执行后续的(a,b,c)不同流程的时候,遇到行动操作时,会重新从头计算整个图,即该转换操作X,会被重复调度执行:(X->a), (X->b), (X->c); 如此一来就会浪费时间和计算资源,则RDD的持久化就显得十分有用了。
PySpark 通过使用 cache()
和persist()
提供了一种优化机制,来存储 RDD 的中间计算,以便它们可以在后续操作中重用。当持久化或缓存一个 RDD 时,每个工作节点将它的分区数据存储在内存或磁盘中,并在该 RDD 的其他操作中重用它们。Spark 在节点上的持久数据是容错的,这意味着如果任何分区丢失,它将使用创建它的原始转换自动重新计算
①cache()
默认将 RDD 计算保存到存储级别MEMORY_ONLY
,这意味着它将数据作为未序列化对象存储在 JVM 堆中
(对于Spark DataFrame 或 Dataset 缓存将其保存到存储级别 ` MEMORY_AND_DISK’)
cachedRdd = rdd.cache()
②persist()
有两种函数签名 第一个签名不接受任何参数,默认情况下将其保存到MEMORY_AND_DISK存储级别, 例:
代码语言:javascript复制dfPersist = df.persist()
第二个签名StorageLevel作为参数将其存储到不同的存储级别; 例:
代码语言:javascript复制dfPersist = df.persist(StorageLevel.MEMORY_ONLY)
该参数可选的有:MEMORY_AND_DISK,MEMORY_ONLY_SER,MEMORY_AND_DISK_SER,DISK_ONLY,MEMORY_ONLY_2,MEMORY_AND_DISK_2
③ unpersist()
PySpark 会自动监视每个persist()
和cache()
调用,并检查每个节点上的使用情况,并在未使用或使用最近最少使用 (LRU) 算法时删除持久数据。
也使用unpersist() 方法手动删除。unpersist()
将 RDD 标记为非持久的,并从内存和磁盘中删除它的所有块:
rddPersist2 = rddPersist.unpersist()
关于 cache() 和 persist()的一些细微区别:链接
二、持久性存储级别
参考文献: ①https://spark.apache.org/docs/latest/rdd-programming-guide.html#rdd-persistence ② https://sparkbyexamples.com/spark/spark-persistence-storage-levels/
代码如下(示例):
代码语言:javascript复制import org.apache.spark.storage.StorageLevel
rdd2 = rdd.persist(StorageLevel.MEMORY_ONLY_SER)
df2 = df.persist(StorageLevel.MEMORY_ONLY_SER)
MEMORY_ONLY
这是 RDD cache() 方法的默认行为, 并将 RDD 或 DataFrame 作为反序列化对象存储到 JVM 内存中。当没有足够的可用内存时,它不会保存某些分区的 DataFrame,这些将在需要时重新计算。这需要更多的存储空间,但运行速度更快,因为从内存中读取需要很少的 CPU 周期。
MEMORY_AND_DISK
在此存储级别,RDD 将作为反序列化对象存储在 JVM 内存中。当所需的存储空间大于可用内存时,它会将一些多余的分区存储到磁盘中,并在需要时从磁盘读取数据。由于涉及 I/O,因此速度较慢。
DISK_ONLY
在此存储级别,RDD 仅存储在磁盘上,并且由于涉及 I/O,CPU 计算时间较长。
MEMORY_ONLY_2
与MEMORY_ONLY 存储级别相同, 但将每个分区复制到两个集群节点。
MEMORY_AND_DISK_2
与MEMORY_AND_DISK 存储级别相同, 但将每个分区复制到两个集群节点。
DISK_ONLY_2
与DISK_ONLY 存储级别相同, 但将每个分区复制到两个集群节点。
下面是存储级别的表格表示,通过空间、CPU 和性能的影响选择最适合的一个。 ------------------------------------------------------------------------------------------------------------------------------------ 存储级别 | 占用空间 | CPU 耗时 | 在内存中 | 在硬盘上 | 序列化 |重新计算一些分区 ------------------------------------------------------------------------------------------------------------------------------------ MEMORY_ONLY High Low Y N N Y MEMORY_AND_DISK High Medium some some some N DISK_ONLY Low High N Y Y N ------------------------------------------------------------------------------------------------------------------------------------ 或者参考官方文档的指导: https://spark.apache.org/docs/latest/rdd-programming-guide.html#which-storage-level-to-choose
三、共享变量
当 PySpark 使用map()
或reduce()
操作执行转换时,它使用任务附带的变量在远程节点上执行转换,并且这些变量不会发送回 PySpark 驱动程序,因此无法在任务之间重用和共享变量。PySpark 共享变量使用以下两种技术解决了这个问题。
·广播变量(只读共享变量)
·累加器变量(可更新的共享变量)
1.广播变量(只读共享变量)
i 广播变量 ( broadcast variable)
广播变量是只读共享变量,它们被缓存并在集群中的所有节点上可用,以便任务访问或使用。PySpark 不是将这些数据与每个任务一起发送,而是使用高效的广播算法将广播变量分发给机器,以降低通信成本。 PySpark RDD Broadcast 的最佳用例之一是与查找数据一起使用。
ii 创建广播变量
使用SparkContext 类的方法broadcast(v)
创建的。
代码如下(示例):
broadcastVar = sc.broadcast([0, 1, 2, 3])
broadcastVar.value
注意,广播变量 不会在调用 sc.broadcast(variable)
时 就发送给执行器,而是在首次使用它时发送给执行器
参考文献:https://sparkbyexamples.com/pyspark/pyspark-broadcast-variables/
2.累加器变量(可更新的共享变量)
累加器是另一种类型的共享变量,仅通过关联和交换操作“添加” ,用于执行计数器(类似于 Map-reduce 计数器)或求和操作。 这里不做详细介绍了,可参考: https://spark.apache.org/docs/latest/rdd-programming-guide.html#accumulators