Pyspark学习笔记(四)弹性分布式数据集 RDD(下)

2021-08-18 15:56:09 浏览数 (1)

文章目录

  • 前言
  • 一、PySpark RDD 持久化
    • ①` cache()`
    • ②` persist() `
    • ③ `unpersist() `
  • 二、持久性存储级别
    • `MEMORY_ONLY `
    • `MEMORY_AND_DISK`
    • `DISK_ONLY`
    • `MEMORY_ONLY_2`
    • `MEMORY_AND_DISK_2`
    • `DISK_ONLY_2`
  • 三、共享变量
    • 1.广播变量(只读共享变量)
      • i 广播变量 ( broadcast variable)
      • ii 创建广播变量
    • 2.累加器变量(可更新的共享变量)

前言

本篇主要讲述了如何在执行pyspark任务时候缓存或者共享变量,以达到节约资源、计算量、时间等目的

一、PySpark RDD 持久化

参考文献:https://sparkbyexamples.com/pyspark-rdd#rdd-persistence

    我们在上一篇博客提到,RDD 的转化操作是惰性的,要等到后面执行行动操作的时候,才会真正执行计算;     那么如果我们的流程图中有多个分支,比如某一个转换操作 X 的中间结果,被后续的多个并列的流程图(a,b,c)运用,那么就会出现这么一个情况:     在执行后续的(a,b,c)不同流程的时候,遇到行动操作时,会重新从头计算整个图,即该转换操作X,会被重复调度执行:(X->a), (X->b), (X->c); 如此一来就会浪费时间和计算资源,则RDD的持久化就显得十分有用了。     PySpark 通过使用 cache()persist() 提供了一种优化机制,来存储 RDD 的中间计算,以便它们可以在后续操作中重用。当持久化或缓存一个 RDD 时,每个工作节点将它的分区数据存储在内存或磁盘中,并在该 RDD 的其他操作中重用它们。Spark 在节点上的持久数据是容错的,这意味着如果任何分区丢失,它将使用创建它的原始转换自动重新计算

cache()

    默认将 RDD 计算保存到存储级别MEMORY_ONLY ,这意味着它将数据作为未序列化对象存储在 JVM 堆中 (对于Spark DataFrame 或 Dataset 缓存将其保存到存储级别 ` MEMORY_AND_DISK’)

代码语言:javascript复制
cachedRdd = rdd.cache()

persist()

有两种函数签名 第一个签名不接受任何参数,默认情况下将其保存到MEMORY_AND_DISK存储级别, 例:

代码语言:javascript复制
dfPersist = df.persist()

第二个签名StorageLevel作为参数将其存储到不同的存储级别; 例:

代码语言:javascript复制
dfPersist = df.persist(StorageLevel.MEMORY_ONLY)

该参数可选的有:MEMORY_AND_DISK,MEMORY_ONLY_SER,MEMORY_AND_DISK_SER,DISK_ONLY,MEMORY_ONLY_2,MEMORY_AND_DISK_2

unpersist()

PySpark 会自动监视每个persist()cache()调用,并检查每个节点上的使用情况,并在未使用或使用最近最少使用 (LRU) 算法时删除持久数据。 也使用unpersist() 方法手动删除。unpersist() 将 RDD 标记为非持久的,并从内存和磁盘中删除它的所有块:

代码语言:javascript复制
rddPersist2 = rddPersist.unpersist()

关于 cache() 和 persist()的一些细微区别:链接

二、持久性存储级别

参考文献: ①https://spark.apache.org/docs/latest/rdd-programming-guide.html#rdd-persistence ② https://sparkbyexamples.com/spark/spark-persistence-storage-levels/

代码如下(示例):

代码语言:javascript复制
import org.apache.spark.storage.StorageLevel
rdd2 = rdd.persist(StorageLevel.MEMORY_ONLY_SER) 
df2 = df.persist(StorageLevel.MEMORY_ONLY_SER)

MEMORY_ONLY

这是 RDD cache() 方法的默认行为, 并将 RDD 或 DataFrame 作为反序列化对象存储到 JVM 内存中。当没有足够的可用内存时,它不会保存某些分区的 DataFrame,这些将在需要时重新计算。这需要更多的存储空间,但运行速度更快,因为从内存中读取需要很少的 CPU 周期。

MEMORY_AND_DISK

在此存储级别,RDD 将作为反序列化对象存储在 JVM 内存中。当所需的存储空间大于可用内存时,它会将一些多余的分区存储到磁盘中,并在需要时从磁盘读取数据。由于涉及 I/O,因此速度较慢。

DISK_ONLY

在此存储级别,RDD 仅存储在磁盘上,并且由于涉及 I/O,CPU 计算时间较长。

MEMORY_ONLY_2

与MEMORY_ONLY 存储级别相同, 但将每个分区复制到两个集群节点。

MEMORY_AND_DISK_2

与MEMORY_AND_DISK 存储级别相同, 但将每个分区复制到两个集群节点。

DISK_ONLY_2

与DISK_ONLY 存储级别相同, 但将每个分区复制到两个集群节点。

下面是存储级别的表格表示,通过空间、CPU 和性能的影响选择最适合的一个。 ------------------------------------------------------------------------------------------------------------------------------------ 存储级别           | 占用空间  | CPU 耗时 | 在内存中  | 在硬盘上 | 序列化  |重新计算一些分区 ------------------------------------------------------------------------------------------------------------------------------------ MEMORY_ONLY        High       Low       Y          N         N         Y MEMORY_AND_DISK   High      Medium    some     some       some      N DISK_ONLY            Low       High       N          Y         Y         N ------------------------------------------------------------------------------------------------------------------------------------ 或者参考官方文档的指导: https://spark.apache.org/docs/latest/rdd-programming-guide.html#which-storage-level-to-choose

三、共享变量

    当 PySpark 使用map()reduce()操作执行转换时,它使用任务附带的变量在远程节点上执行转换,并且这些变量不会发送回 PySpark 驱动程序,因此无法在任务之间重用和共享变量。PySpark 共享变量使用以下两种技术解决了这个问题。 ·广播变量(只读共享变量) ·累加器变量(可更新的共享变量)

1.广播变量(只读共享变量)

i 广播变量 ( broadcast variable)

广播变量是只读共享变量,它们被缓存并在集群中的所有节点上可用,以便任务访问或使用。PySpark 不是将这些数据与每个任务一起发送,而是使用高效的广播算法将广播变量分发给机器,以降低通信成本。 PySpark RDD Broadcast 的最佳用例之一是与查找数据一起使用。

ii 创建广播变量

使用SparkContext 类的方法broadcast(v)创建的。 代码如下(示例):

代码语言:javascript复制
broadcastVar = sc.broadcast([0, 1, 2, 3])
broadcastVar.value

注意,广播变量 不会在调用 sc.broadcast(variable) 时 就发送给执行器,而是在首次使用它时发送给执行器

参考文献:https://sparkbyexamples.com/pyspark/pyspark-broadcast-variables/

2.累加器变量(可更新的共享变量)

累加器是另一种类型的共享变量,仅通过关联和交换操作“添加” ,用于执行计数器(类似于 Map-reduce 计数器)或求和操作。 这里不做详细介绍了,可参考: https://spark.apache.org/docs/latest/rdd-programming-guide.html#accumulators

0 人点赞