持久化
有时候需要访问同一组值,不做持久化,会重复生成,计算机代价和开销很大。持久化作用:
- 通过缓存机制避免重复计算的开销
- 通过使用persist()方法对一个RDD标记为持久化,
仅仅是标记
- 只有等到第一个行动操作才会发生真生的持久化操作,触发真正的计算操作,才会把计算结果进行持久化
- 持久化后的RDD将会被保留在计算机节点的内存中,被后面的行动操作重复使用。
persist()方法
该方法的作用是将一个RDD
标记为持久化,并不是真正的持久化操作,行动操作才是真正的持久化,主要的参数是:
-
memory_only
将反序列化的对象存在JVM
中,如果内存不足将会按照先进先出的原则,替换内容。只存入内存中。 -
RDD.cache()
等价于RDD.persist(memory_only)
,表示缓存在内存中 -
Memory_and_disk
先将结果存入内存中,如果内存不够,再存入磁盘中
unpersist()
手动将持久化的RDD
对象从缓存中进行清除。
demo
代码语言:javascript复制list = ["hadoop", "spark", "hive"]
rdd = sc.parallelize(list) # 生成RDD
rdd.cache() # 标记为持久化
print(rdd.count()) # 第一个行动化操作。触发从头到尾的计算,将结果存入缓存中
print(','.join(rdd.collect())) # 使用上面缓存的结果,不必再次从头到尾的进行计算,使用缓存的RDD
分区
优点
- 增加并行度:
RDD
分区被保存在不同的节点上,在多个节点上同时进行计算
- 减小通信开销。分区前后对比
- 不进行分区
userData
和events
两个表中的所有数据,都要对中间表joined
表进行操作。 - 分区之后,只需要将
events
中的所有数据和userData
中的部分数据进行操作
- 不进行分区
分区原则
原则是尽量使得:分区个数 = 集群中CPU核心数目
。spark
的部署模式
local
模式(本地模式):默认为本地机器的CPU
数目Standalone
模式:集群中所有的CPU
数目和2之间比较取较大值yarn
模式:集群中所有的CPU
数目和2之间比较取较大值mesos
模式:Apache
,默认是8
分区个数
创建RDD
时候指定分区个数
list = [1,2,3,4]
rdd = sc.parallelize(list,4) # 设置4个分区
修改分区数目用repartition
方法
data = sc.parallelize([1,2,3,4], 4) # 指定4个分区
len(data.glom().collect()) # 显示分区数目
rdd = data.repartition(2) # 重新设置分区数目为2
自定义分区
spark
自带的分区方式
- 哈希分区 hash partitioner
- 区域分区 range partitioner
- 自定义分区
# demo.py
from pyspark import SparkConf, SparkContext
def myPartitioner(key):
print("mypartitioner is running")
print("the key is %d" %key)
return key.
def main():
conf = SparkConf().setMaster("local").setAppName("myapp")
sc = SparkContext(conf=conf) # 生成对象,就是指挥官
data = sc.parallelize(range(10), 5) # 分成5个分区
data.map(lambda x: (x,1)) # 生成键值对,下图1
.partitionBy(10, myPartitioner) # 函数只接受键值对作为参数,将上面的data变成键值对形式传进来
.map(lambda x:x[0]) # 取出键值对的第一个元素,下图2
.saveAsTextFile("file:///usr/local/spark/mycode/rdd/partitioner") # 写入目录地址,生成10个文件
if __name__ == "__main__":
main()
首先进入文件所在的目录,运行方式有两种:
python3 demo.py
/usr/local/spark/bin/spark-submit demo.py