Pyspark学习笔记专栏系列文章目录
Pyspark学习笔记(一)—序言及目录
Pyspark学习笔记(二)— spark-submit命令
Pyspark学习笔记(三)— SparkContext 与 SparkSession
Pyspark学习笔记(四)弹性分布式数据集 RDD(上)
Pyspark学习笔记(四)弹性分布式数据集 RDD(下)
Pyspark学习笔记(五)RDD操作(一)_RDD转换操作
Pyspark学习笔记(五)RDD操作(二)_RDD行动操作
文章目录
- Pyspark学习笔记专栏系列文章目录
- Pyspark学习笔记(五)RDD操作(二)_RDD行动操作
- 前言
- 主要参考链接:
- 一、PySpark RDD 行动操作简介
- 二.常见的转换操作表 & 使用例子
-
- 0.初始的示例rdd,
- 1.
count()
- 2.
collect()
- 3.
take()
- 4.
takeOrdered(num, key=None)
- 5.
takeSample(withReplacement, num, seed=None)
- 6.
top(num, key=None)
- 7.
first()
- 8.
reduce()
- 9.
foreach()
- 10.
countByValue()
- 11.
fold(zeroValue, func)
- 12.
aggregate(zeroValue, seqOp, combOp)
前言
提示:本篇博客讲的是RDD的操作中的行动操作,即 RDD Action
主要参考链接:
1.PySpark RDD Actions with examples
2.Apache spark python api
一、PySpark RDD 行动操作简介
PySpark RDD行动操作(Actions) 是将值返回给驱动程序的 PySpark 操作.
行动操作会触发之前的转换操作进行执行。
即只有当程序遇到行动操作的时候,前面的RDD谱系中的一系列的转换操作才会运算,并将由行动操作得到最后的结果。
二.常见的转换操作表 & 使用例子
0.初始的示例rdd,
我们这里仍然以上一篇博文中的rdd_test作为示例,这样能更好的与之前讲的内容联系起来
代码语言:javascript复制[ ((10,1,2,3), (10,1,2,4), (10,1,2,4), (20,2,2,2), (20,1,2,3)) ]
1.count()
该操作不接受参数,返回一个long类型值,代表rdd的元素个数
pyspark.RDD.count
正好测试一下 rdd_test 经过 map 和 flatMap 之后的不同之处
代码语言:javascript复制# the example of count
rdd_map_test = rdd_test.map(lambda x: x)
print("count_test1n", rdd_map_test.count())
# out
1
代码语言:javascript复制# the example of count
rdd_flatmap_test = rdd_test.flatMap(lambda x: x)
print("count_test2n", rdd_flatmap_test.count())
# out
5
分析如下:
map并不去掉嵌套,所以相当于列表中的元素是一个 (5,4) 二维的tuple;
而flatMap会去掉一层嵌套,则相当于5个(4,)一维的tuple
2.collect(<num>)
返回一个由RDD中所有元素组成的列表(没有限制输出数量,所以要注意RDD的大小) ;该行动操作就不用举例了,上一篇博文的转换操作的作用其实都是最后通过collect
这个行动操作才显示出来的。
pyspark.RDD.collect
3.take(<num>)
返回RDD的前n个元素(无特定顺序)
(仅当预期结果数组较小时才应使用此方法,因为所有数据都已加载到驱动程序的内存中)
pyspark.RDD.take
代码语言:javascript复制# the example of take
print("take_testn", rdd_flatmap_test.take(3))
代码语言:javascript复制[((10,1,2,3), (20,2,2,2), (20,1,2,3))]
4.takeOrdered(num, key=None)
从一个按照升序排列的RDD,或者按照key中提供的方法升序排列的RDD, 返回前n个元素
(仅当预期结果数组较小时才应使用此方法,因为所有数据都已加载到驱动程序的内存中)
pyspark.RDD.takeOrdered
代码语言:javascript复制# the example of takeOrdered
print("takeOrdered_test_1n",flat_rdd_test.takeOrdered(3))
print("takeOrdered_test_1n",flat_rdd_test.takeOrdered(3, key=lambda x:x[3]))
# out
[(10,1,2,3), (10,1,2,4), (10,1,2,4)] # 默认以子tuple元素的大小排序
[(20,2,2,2), (10,1,2,3), (20,1,2,3)] # 这时候就是以 子tuple元素的第[3]个位置的数字为顺序
5.takeSample(withReplacement, num, seed=None)
返回此 RDD 的固定大小的采样子集
(仅当预期结果数组较小时才应使用此方法,因为所有数据都已加载到驱动程序的内存中)
pyspark.RDD.takeSample
代码语言:javascript复制print("takeOrdered_test_1n",flat_rdd_test.takeSample(False, 1, 1))
[(10,1,2,4)]
print("takeOrdered_test_1n",flat_rdd_test.takeSample(False, 3, 1))
[(10,1,2,4), (20,1,2,3), (10,1,2,4)]
print("takeOrdered_test_1n",flat_rdd_test.takeSample(False, 10, 1))
[(10,1,2,4), (20,1,2,3), (10,1,2,4), (20,2,2,2), (10,1,2,3)]
6.top(num, key=None)
返回RDD的前n个元素(按照降序输出, 排序方式由元素类型决定)
(仅当预期结果数组较小时才应使用此方法,因为所有数据都已加载到驱动程序的内存中)
pyspark.RDD.top
代码语言:javascript复制print("top_testn",flat_rdd_test.top(3))
[(20,2,2,2), (20,1,2,3), (10,1,2,4)]
7.first()
返回RDD的第一个元素,也是不考虑元素顺序
pyspark.RDD.first
代码语言:javascript复制print("first_testn",flat_rdd_test.first(3))
[(10,1,2,3)]
8.reduce(<func>)
使用指定的满足交换律/结合律的运算符来归约RDD中的所有元素;
处一般可以指定接收两个输入的 匿名函数<lambda x, y: …>;
pyspark.RDD.reduce
代码语言:javascript复制print("reduce_testn",flat_rdd_test.reduce(lambda x, y: x y))
[(10,1,2,3,10,1,2,4,10,1,2,4,20,2,2,2,20,1,2,3)]
解释一下过程:
代码语言:javascript复制step_0: [(10,1,2,3), (10,1,2,4), (10,1,2,4), (20,2,2,2), (20,1,2,3)]
step_1: (10,1,2,3) => x; (10,1,2,4) => y
x y => (10,1,2,3) (10,1,2,4) => (10,1,2,3,10,1,2,4)
step_2: (10,1,2,3,10,1,2,4) => x; (10,1,2,4) => y
x y => (10,1,2,3,10,1,2,4) (10,1,2,4) => (10,1,2,3,10,1,2,4,10,1,2,4)
step_3: (10,1,2,3,10,1,2,4,10,1,2,4) => x; (20,2,2,2) => y
x y => (10,1,2,3,10,1,2,4,10,1,2,4) (20,2,2,2) => (10,1,2,3,10,1,2,4,10,1,2,4,20,2,2,2)
step_4: (10,1,2,3,10,1,2,4,10,1,2,4,20,2,2,2) => x; (20,1,2,3) => y
x y => (10,1,2,3,10,1,2,4,10,1,2,4,20,2,2,2) (20,1,2,3) =>
(10,1,2,3,10,1,2,4,10,1,2,4,20,2,2,2,20,1,2,3)
9.foreach(<func>)
把具名或者匿名函数 ,应用到RDD的所有元素上.
和map类似,但是由于foreach是行动操作,所以可以执行一些输出类的函数,比如print操作
pyspark.RDD.foreach
10.countByValue()
将此 RDD 中每个唯一值的计数作为 (unique_value, count) 对的字典返回.
pyspark.RDD.countByValue
代码语言:javascript复制print("top_testn",flat_rdd_test.countByValue().items() )
[((10,1,2,3),1), ((20,1,2,3),1), ((20,2,2,2),1), ((10,1,2,4),2)]
11.fold(zeroValue, func)
使用给定的func和 初始值zeroV把RDD中的每个分区的元素聚合,然后把每个分区聚合结果再聚合;
聚合的过程其实和reduce类似,但是不满足交换律
这里有个细节要注意,fold是对每个分区(each partition)都会应用 zeroValue 进行聚合,而不是只使用一次
代码语言:javascript复制'''
① 在每个节点应用fold:初始值zeroValue 分区内RDD元素
② 获得各个partition的聚合值之后,对这些值再进行一次聚合,同样也应用zeroValue;
③ 则结果应为:zeroValue * (partition_num 1) RDD元素聚合值
'''
示例如下:
代码语言:javascript复制rdd_2 = spark.sparkContext.parallelize(['A_a#', 'B_b#', 'C_c#', 'D_d#'], 1)
rdd_3 = spark.sparkContext.parallelize(['A_a#', 'B_b#', 'C_c#', 'D_d#'], 4)
print('fold_test_2', rdd_2.fold('zeroV$_', lambda x,y: x y))
print('fold_test_3', rdd_3.fold('zeroV$_', lambda x,y: x y))
rdd2的分区是1,则初始值只会出现2次:
代码语言:javascript复制'ZeroV$_ZeroV$_A_a#B_b#C_c#D_d#'
rdd3的分区是4,则初始值会出现5次:
代码语言:javascript复制'ZeroV$_ZeroV$_A_a#zeroV$_B_b#zeroV$_C_c#zeroV$_D_d#'
再对flat_rdd进行一次实验:
代码语言:javascript复制print("fold_test", flat_rdd_test.repartition(1).fold(('Hello','World'), lambda x,y: x y))
('Hello','World','Hello','World',10,1,2,3,10,1,2,4,10,1,2,4,20,2,2,2,20,1,2,3)
12.aggregate(zeroValue, seqOp, combOp)
使用给定的函数和初始值,对每个分区的聚合进行聚合
(这里同样是对每个分区,初始值的使用规则和fold是一样的,对每个分区都采用)
seqOp方法是先对每个分区操作,然后combOp对每个分区的聚合结果进行最终聚合
代码语言:javascript复制rdd_agg_test = spark.sparkContext.parallelize([1,2,3,10,20,30,7,8,9],3)
seqOp = (lambda x, y: (x[0] y, x[1] 1))
combOp = (lambda x, y: (x[0] y[0], x[1] y[1]))
result_rdd = rdd_agg_test.aggregate((100,1000),seqOp,combOp)
(490, 4009)
代码语言:javascript复制seqOp :
partition_1: 100 1 2 3, 1000 1 1 1 => 106, 1003
partition_2: 100 10 20 30, 1000 1 1 1 => 160, 1003
partition_3: 100 7 8 9, 1000 1 1 1 => 124, 1003
combOp :
100 106 160 124, 1000 1003 1003 1003 => (490, 4009)
至此,行动操作的常用方法都基本介绍了