Pyspark学习笔记(五)RDD操作(二)_RDD行动操作

2022-04-14 07:57:36 浏览数 (1)

Pyspark学习笔记专栏系列文章目录

Pyspark学习笔记(一)—序言及目录

Pyspark学习笔记(二)— spark-submit命令

Pyspark学习笔记(三)— SparkContext 与 SparkSession

Pyspark学习笔记(四)弹性分布式数据集 RDD(上)

Pyspark学习笔记(四)弹性分布式数据集 RDD(下)

Pyspark学习笔记(五)RDD操作(一)_RDD转换操作

Pyspark学习笔记(五)RDD操作(二)_RDD行动操作

文章目录

  • Pyspark学习笔记专栏系列文章目录
  • Pyspark学习笔记(五)RDD操作(二)_RDD行动操作
  • 前言
  • 主要参考链接:
  • 一、PySpark RDD 行动操作简介
  • 二.常见的转换操作表 & 使用例子
    • 0.初始的示例rdd,
    • 1.count()
    • 2.collect()
    • 3.take()
    • 4.takeOrdered(num, key=None)
    • 5.takeSample(withReplacement, num, seed=None)
    • 6.top(num, key=None)
    • 7.first()
    • 8.reduce()
    • 9.foreach()
    • 10.countByValue()
    • 11.fold(zeroValue, func)
    • 12.aggregate(zeroValue, seqOp, combOp)

前言

提示:本篇博客讲的是RDD的操作中的行动操作,即 RDD Action

主要参考链接:

1.PySpark RDD Actions with examples

2.Apache spark python api

一、PySpark RDD 行动操作简介

    PySpark RDD行动操作(Actions) 是将值返回给驱动程序的 PySpark 操作.

行动操作会触发之前的转换操作进行执行。

即只有当程序遇到行动操作的时候,前面的RDD谱系中的一系列的转换操作才会运算,并将由行动操作得到最后的结果。

二.常见的转换操作表 & 使用例子

0.初始的示例rdd,

我们这里仍然以上一篇博文中的rdd_test作为示例,这样能更好的与之前讲的内容联系起来

代码语言:javascript复制
[ ((10,1,2,3), (10,1,2,4), (10,1,2,4), (20,2,2,2), (20,1,2,3)) ]

1.count()

该操作不接受参数,返回一个long类型值,代表rdd的元素个数

pyspark.RDD.count

正好测试一下 rdd_test 经过 map 和 flatMap 之后的不同之处

代码语言:javascript复制
# the example of count
rdd_map_test = rdd_test.map(lambda x: x)
print("count_test1n", rdd_map_test.count())
# out
1
代码语言:javascript复制
# the example of count
rdd_flatmap_test = rdd_test.flatMap(lambda x: x)
print("count_test2n", rdd_flatmap_test.count())
# out
5

分析如下:

map并不去掉嵌套,所以相当于列表中的元素是一个 (5,4) 二维的tuple;

而flatMap会去掉一层嵌套,则相当于5个(4,)一维的tuple

2.collect(<num>)

返回一个由RDD中所有元素组成的列表(没有限制输出数量,所以要注意RDD的大小) ;该行动操作就不用举例了,上一篇博文的转换操作的作用其实都是最后通过collect这个行动操作才显示出来的。

pyspark.RDD.collect

3.take(<num>)

返回RDD的前n个元素(无特定顺序)

(仅当预期结果数组较小时才应使用此方法,因为所有数据都已加载到驱动程序的内存中)

pyspark.RDD.take

代码语言:javascript复制
# the example of take
print("take_testn", rdd_flatmap_test.take(3))
代码语言:javascript复制
[((10,1,2,3), (20,2,2,2), (20,1,2,3))]

4.takeOrdered(num, key=None)

从一个按照升序排列的RDD,或者按照key中提供的方法升序排列的RDD, 返回前n个元素

(仅当预期结果数组较小时才应使用此方法,因为所有数据都已加载到驱动程序的内存中)

pyspark.RDD.takeOrdered

代码语言:javascript复制
# the example of takeOrdered
print("takeOrdered_test_1n",flat_rdd_test.takeOrdered(3))
print("takeOrdered_test_1n",flat_rdd_test.takeOrdered(3, key=lambda x:x[3]))
# out
[(10,1,2,3), (10,1,2,4), (10,1,2,4)] # 默认以子tuple元素的大小排序
[(20,2,2,2), (10,1,2,3), (20,1,2,3)] # 这时候就是以 子tuple元素的第[3]个位置的数字为顺序

5.takeSample(withReplacement, num, seed=None)

返回此 RDD 的固定大小的采样子集

(仅当预期结果数组较小时才应使用此方法,因为所有数据都已加载到驱动程序的内存中)

pyspark.RDD.takeSample

代码语言:javascript复制
print("takeOrdered_test_1n",flat_rdd_test.takeSample(False, 1, 1))
[(10,1,2,4)]

print("takeOrdered_test_1n",flat_rdd_test.takeSample(False, 3, 1))
[(10,1,2,4), (20,1,2,3), (10,1,2,4)]

print("takeOrdered_test_1n",flat_rdd_test.takeSample(False, 10, 1))
[(10,1,2,4), (20,1,2,3), (10,1,2,4), (20,2,2,2), (10,1,2,3)]

6.top(num, key=None)

返回RDD的前n个元素(按照降序输出, 排序方式由元素类型决定)

(仅当预期结果数组较小时才应使用此方法,因为所有数据都已加载到驱动程序的内存中)

pyspark.RDD.top

代码语言:javascript复制
print("top_testn",flat_rdd_test.top(3))
[(20,2,2,2), (20,1,2,3), (10,1,2,4)]

7.first()

返回RDD的第一个元素,也是不考虑元素顺序

pyspark.RDD.first

代码语言:javascript复制
print("first_testn",flat_rdd_test.first(3))
[(10,1,2,3)]

8.reduce(<func>)

使用指定的满足交换律/结合律的运算符来归约RDD中的所有元素;

处一般可以指定接收两个输入的 匿名函数<lambda x, y: …>;

pyspark.RDD.reduce

代码语言:javascript复制
print("reduce_testn",flat_rdd_test.reduce(lambda x, y: x y))
[(10,1,2,3,10,1,2,4,10,1,2,4,20,2,2,2,20,1,2,3)]

解释一下过程:

代码语言:javascript复制
step_0: [(10,1,2,3), (10,1,2,4), (10,1,2,4), (20,2,2,2), (20,1,2,3)]
step_1: (10,1,2,3) => x;   (10,1,2,4) => y
x   y => (10,1,2,3)   (10,1,2,4) => (10,1,2,3,10,1,2,4)

step_2: (10,1,2,3,10,1,2,4) => x;   (10,1,2,4) => y
x   y => (10,1,2,3,10,1,2,4)   (10,1,2,4) => (10,1,2,3,10,1,2,4,10,1,2,4)

step_3: (10,1,2,3,10,1,2,4,10,1,2,4) => x;  (20,2,2,2) => y
x   y => (10,1,2,3,10,1,2,4,10,1,2,4)   (20,2,2,2) => (10,1,2,3,10,1,2,4,10,1,2,4,20,2,2,2)

step_4: (10,1,2,3,10,1,2,4,10,1,2,4,20,2,2,2) => x;  (20,1,2,3) => y
x   y => (10,1,2,3,10,1,2,4,10,1,2,4,20,2,2,2)   (20,1,2,3) =>
(10,1,2,3,10,1,2,4,10,1,2,4,20,2,2,2,20,1,2,3)

9.foreach(<func>)

把具名或者匿名函数 ,应用到RDD的所有元素上.

和map类似,但是由于foreach是行动操作,所以可以执行一些输出类的函数,比如print操作

pyspark.RDD.foreach

10.countByValue()

将此 RDD 中每个唯一值的计数作为 (unique_value, count) 对的字典返回.

pyspark.RDD.countByValue

代码语言:javascript复制
print("top_testn",flat_rdd_test.countByValue().items() )
[((10,1,2,3),1), ((20,1,2,3),1), ((20,2,2,2),1), ((10,1,2,4),2)]

11.fold(zeroValue, func)

使用给定的func和 初始值zeroV把RDD中的每个分区的元素聚合,然后把每个分区聚合结果再聚合;

聚合的过程其实和reduce类似,但是不满足交换律

这里有个细节要注意,fold是对每个分区(each partition)都会应用 zeroValue 进行聚合,而不是只使用一次

代码语言:javascript复制
'''
① 在每个节点应用fold:初始值zeroValue   分区内RDD元素
② 获得各个partition的聚合值之后,对这些值再进行一次聚合,同样也应用zeroValue;
③ 则结果应为:zeroValue * (partition_num   1)   RDD元素聚合值
'''

示例如下:

代码语言:javascript复制
rdd_2 = spark.sparkContext.parallelize(['A_a#', 'B_b#', 'C_c#', 'D_d#'], 1)
rdd_3 = spark.sparkContext.parallelize(['A_a#', 'B_b#', 'C_c#', 'D_d#'], 4)
print('fold_test_2', rdd_2.fold('zeroV$_', lambda x,y: x y))
print('fold_test_3', rdd_3.fold('zeroV$_', lambda x,y: x y))

rdd2的分区是1,则初始值只会出现2次:

代码语言:javascript复制
'ZeroV$_ZeroV$_A_a#B_b#C_c#D_d#'

rdd3的分区是4,则初始值会出现5次:

代码语言:javascript复制
'ZeroV$_ZeroV$_A_a#zeroV$_B_b#zeroV$_C_c#zeroV$_D_d#'

再对flat_rdd进行一次实验:

代码语言:javascript复制
print("fold_test", flat_rdd_test.repartition(1).fold(('Hello','World'), lambda x,y: x y))

('Hello','World','Hello','World',10,1,2,3,10,1,2,4,10,1,2,4,20,2,2,2,20,1,2,3)

12.aggregate(zeroValue, seqOp, combOp)

使用给定的函数和初始值,对每个分区的聚合进行聚合

(这里同样是对每个分区,初始值的使用规则和fold是一样的,对每个分区都采用)

seqOp方法是先对每个分区操作,然后combOp对每个分区的聚合结果进行最终聚合

代码语言:javascript复制
rdd_agg_test = spark.sparkContext.parallelize([1,2,3,10,20,30,7,8,9],3)
seqOp = (lambda x, y: (x[0]   y, x[1]   1))
combOp = (lambda x, y: (x[0]   y[0], x[1]   y[1]))

result_rdd = rdd_agg_test.aggregate((100,1000),seqOp,combOp)

(490, 4009)
代码语言:javascript复制
seqOp :
partition_1: 100   1   2   3, 1000   1   1   1  => 106, 1003
partition_2: 100   10   20   30, 1000   1   1   1  => 160, 1003
partition_3: 100   7   8   9, 1000   1   1  1 => 124, 1003
combOp :
100 106 160 124, 1000 1003 1003 1003 => (490, 4009)

至此,行动操作的常用方法都基本介绍了

0 人点赞