Spark笔记7-RDD持久化和分区

2021-03-02 15:29:37 浏览数 (1)

持久化

有时候需要访问同一组值,不做持久化,会重复生成,计算机代价和开销很大。持久化作用:

  • 通过缓存机制避免重复计算的开销
  • 通过使用persist()方法对一个RDD标记为持久化,仅仅是标记
  • 只有等到第一个行动操作才会发生真生的持久化操作,触发真正的计算操作,才会把计算结果进行持久化
  • 持久化后的RDD将会被保留在计算机节点的内存中,被后面的行动操作重复使用。

persist()方法

该方法的作用是将一个RDD标记为持久化,并不是真正的持久化操作,行动操作才是真正的持久化,主要的参数是:

  • memory_only 将反序列化的对象存在JVM中,如果内存不足将会按照先进先出的原则,替换内容。只存入内存中。
  • RDD.cache() 等价于RDD.persist(memory_only),表示缓存在内存中
  • Memory_and_disk 先将结果存入内存中,如果内存不够,再存入磁盘中
unpersist()

手动将持久化的RDD对象从缓存中进行清除。

demo
代码语言:javascript复制
list = ["hadoop", "spark", "hive"]
rdd = sc.parallelize(list)   # 生成RDD
rdd.cache()   # 标记为持久化
print(rdd.count())   # 第一个行动化操作。触发从头到尾的计算,将结果存入缓存中
print(','.join(rdd.collect()))  # 使用上面缓存的结果,不必再次从头到尾的进行计算,使用缓存的RDD

分区

优点
  1. 增加并行度:RDD分区被保存在不同的节点上,在多个节点上同时进行计算
  1. 减小通信开销。分区前后对比
    • 不进行分区userDataevents两个表中的所有数据,都要对中间表joined表进行操作。
    • 分区之后,只需要将events中的所有数据和userData中的部分数据进行操作
分区原则

原则是尽量使得:分区个数 = 集群中CPU核心数目spark的部署模式

  • local模式(本地模式):默认为本地机器的CPU数目
  • Standalone 模式:集群中所有的CPU数目和2之间比较取较大值
  • yarn模式:集群中所有的CPU数目和2之间比较取较大值
  • mesos模式:Apache,默认是8
分区个数

创建RDD时候指定分区个数

代码语言:javascript复制
list = [1,2,3,4]
rdd = sc.parallelize(list,4)  # 设置4个分区

修改分区数目用repartition方法

代码语言:javascript复制
data = sc.parallelize([1,2,3,4], 4) # 指定4个分区
len(data.glom().collect())  # 显示分区数目
rdd = data.repartition(2)  # 	重新设置分区数目为2

自定义分区

spark自带的分区方式

  • 哈希分区 hash partitioner
  • 区域分区 range partitioner
  • 自定义分区
代码语言:javascript复制
# demo.py

from pyspark import SparkConf, SparkContext

def myPartitioner(key):
  print("mypartitioner is running")
  print("the key is %d" %key)
  return key.

def main():
  conf = SparkConf().setMaster("local").setAppName("myapp")
  sc = SparkContext(conf=conf)  # 生成对象,就是指挥官
  data = sc.parallelize(range(10), 5)  # 分成5个分区
  data.map(lambda x: (x,1))  # 生成键值对,下图1
  .partitionBy(10, myPartitioner)   # 函数只接受键值对作为参数,将上面的data变成键值对形式传进来
  .map(lambda x:x[0])   # 	取出键值对的第一个元素,下图2
  .saveAsTextFile("file:///usr/local/spark/mycode/rdd/partitioner")   # 写入目录地址,生成10个文件


if __name__ == "__main__":
  main()

首先进入文件所在的目录,运行方式有两种:

  • python3 demo.py
  • /usr/local/spark/bin/spark-submit demo.py

0 人点赞