Pyspark学习笔记(五)RDD操作(一)_RDD转换操作

2022-04-14 07:56:37 浏览数 (1)

Pyspark学习笔记专栏系列文章目录

Pyspark学习笔记(一)—序言及目录

Pyspark学习笔记(二)— spark-submit命令

Pyspark学习笔记(三)— SparkContext 与 SparkSession

Pyspark学习笔记(四)弹性分布式数据集 RDD(上)

Pyspark学习笔记(四)弹性分布式数据集 RDD(下)

Pyspark学习笔记(五)RDD操作(一)_RDD转换操作

文章目录

  • Pyspark学习笔记专栏系列文章目录
  • Pyspark学习笔记(五)RDD操作(一)_RDD转换操作
  • 前言
  • 主要参考链接:
  • 一、PySpark RDD 转换操作简介
    • 1.窄操作
    • 2.宽操作
  • 二.常见的转换操作表 & 使用例子
    • 0.创建一个示例rdd, 后续的例子基本以此例展开
    • 1.map()
    • 2.flatMap()
    • 3.filter()
    • 4.union()
    • 5.distinct(numPartitions=None)
    • 6.groupBy()
    • 7.sortBy(,ascending=True, numPartitions=None)
    • 8.repartition( )
    • 9.coalesce( )
    • 10.cache( )
    • 11.`persist( )

前言

提示:本篇博客讲的是RDD的操作中的转换操作,即 RDD Transformations

主要参考链接:

1.PySpark RDD Transformations with examples

2.Apache spark python api

一、PySpark RDD 转换操作简介

    PySpark RDD 转换操作(Transformation) 是惰性求值,用于将一个 RDD 转换/更新为另一个。

由于RDD本质上是不可变的,转换操作总是创建一个或多个新的RDD而不更新现有的RDD,因此,一系列RDD转换创建了一个RDD谱系。

1.窄操作

    这些计算数据存在于单个分区上,这意味着分区之间不会有任何数据移动。

常见的执行窄操作的一般有:map()mapPartition()flatMap()filter()union()

2.宽操作

    这些计算数据存在于许多分区上,这意味着分区之间将有数据移动以执行更广泛的转换。由于这些对数据进行混洗,因此它们也称为混洗转换,所以与窄操作相比,是更加昂贵的操作。

常见的执行宽操作的一些方法是:groupBy(), groupByKey(), join(), repartition()

二.常见的转换操作表 & 使用例子

0.创建一个示例rdd, 后续的例子基本以此例展开

代码语言:javascript复制
data_list = [ ((10,1,2,3), (10,1,2,4), (10,1,2,4), (20,2,2,2), (20,1,2,3)) ]
# 注意该列表中包含有两层tuple嵌套,相当于列表中的元素是一个 (5,4) 二维的tuple

rdd_test = spark.sparkContext.parallelize(data_list)
print("rdd_test:n", rdd_test.collect())

则输出为:

代码语言:javascript复制
[ ((10,1,2,3), (10,1,2,4), (10,1,2,4), (20,2,2,2), (20,1,2,3)) ]

1.map(<func>)

是所有转换操作中最基本的。

它应用一个具名函数或者匿名函数,对数据集内的所有元素执行同一操作

pyspark.RDD.map

代码语言:javascript复制
# the example of map
rdd_map_test = rdd_test.map(lambda x: (x[0], x[3]))
print("rdd_map_testn", rdd_map_test.collect())

相当于只从第一层 tuple 中取出了第0和第3个 子tuple, 输出为:

代码语言:javascript复制
[((10,1,2,3), (20,2,2,2))]

2.flatMap(<func>)

与map的操作类似,但会进一步拍平数据,表示会去掉一层嵌套.

pyspark.RDD.flatmap

代码语言:javascript复制
# the example of flatMap
flat_rdd_test = rdd_test.flatMap(lambda x: x)
print("flat_rdd_testn", flat_rdd_test)

会发现比原始数据少了一层tuple的嵌套,输出为:

代码语言:javascript复制
[(10,1,2,3), (10,1,2,4), (10,1,2,4), (20,2,2,2), (20,1,2,3)]

3.filter(<func>)

一般是依据括号中的一个布尔型表达式,来筛选出满足为真的元素

pyspark.RDD.filter

代码语言:javascript复制
# the example of filter
key1_rdd = flat_rdd_test.filter(lambda x: x[0]==10)
key2_rdd = flat_rdd_test.filter(lambda x: x[0]==20)
print("filter_1n",key1_rdd.collect())
print("filter_2n",key2_rdd.collect())

输出为:

代码语言:javascript复制
[(10,1,2,3), (10,1,2,4), (10,1,2,4)]
[(20,2,2,2), (20,1,2,3)]

4.union(<rdds>)

类似于sql中的union函数,就是将两个RDD执行合并操作;

pyspark.RDD.union

但是pyspark中的union操作似乎不会自动去重,如果需要去重就使用后面讲的distinct

代码语言:javascript复制
# the example of union
flat_rdd_test_new = key1_rdd.union(key2_rdd)
print("flat_rdd_test_newn",flat_rdd_test_new.collect())

刚刚被拆开的两部分又合起来了, 输出为

代码语言:javascript复制
[(10,1,2,3), (10,1,2,4), (10,1,2,4), (20,2,2,2), (20,1,2,3)]

5.distinct(numPartitions=None)

去除RDD中的重复值;带有参数numPartitions,默认值为None,可以对去重后的数据重新分区;

pyspark.RDD.distinct

代码语言:javascript复制
# the example of distinct
distinct_key1_rdd = key1_rdd.distinct()
print("distinctn",distinct.collect())

原来的 Key1_rdd 后两个元素是重复出现的,使用distinct之后就会消掉一个:

代码语言:javascript复制
[(10,1,2,3), (10,1,2,4)]

6.groupBy(<func>)

对元素进行分组,可以是具名函数,也可以是匿名,用来确定对所有元素进行分组的,或者指定用于对元素进行求值以确定其分组方式的表达式.

pyspark.RDD.groupBy

代码语言:javascript复制
# the example of groupBy
# 我们可以先定义一个具名函数
def return_group_key(x):
    seq = x[1:]
    if sum(seq) > 6:
        return "big"
    else
        return "small"

# 下面这两种写法结果都是一样的
groupby_rdd_1 = flat_rdd_test.groupBy(lambda x: return_group_key(x))
groupby_rdd_1 = flat_rdd_test.groupBy(lambda x: "big" if sum(x[1:])>6 else "small")
print("groupby_1n", groupby_rdd_1.collect())

直接输出的话,可能输出的是一个寄存器地址:

代码语言:javascript复制
[('small', <pyspark.resultiterable.ResultIterable object at 0x7f004b4ef850>), ('big', <pyspark.resultiterable.ResultIterable object at 0x7f004ac053d0>)]

这时候我们只需要加一个 mapValues 操作即可,即将后面寄存器地址上的值用列表显示出来

代码语言:javascript复制
print("groupby_1_明文n", groupby_rdd_1.mapValues(list).collect())

明文输出为:

代码语言:javascript复制
[('small', [(10,1,2,3), (20,2,2,2), (20,1,2,3)]), ('big', [(10,1,2,4), (10,1,2,4)])]

下面再感受一下,这个groupBy() 中的是确定分组的【键】,这个意思是什么

代码语言:javascript复制
groupby_rdd_2 = flat_rdd_test.groupBy(lambda x: x[0]==10)
print("groupby_2_明文n", groupby_rdd_2.mapValues(list).collect())

这时候就是以匿名函数返回的布尔值作为分组的 key【键】了

代码语言:javascript复制
[('True', [(10,1,2,3), [(10,1,2,4), (10,1,2,4)), ('False', (20,2,2,2), (20,1,2,3)]])]
代码语言:javascript复制
groupby_rdd_3 = flat_rdd_test.groupBy(lambda x: x[0])
print("groupby_3_明文n", groupby_rdd_3.mapValues(list).collect())

这时候就是以匿名函数返回的 x0的具体值 作为分组的 key【键】了

代码语言:javascript复制
[(10, [(10,1,2,3), [(10,1,2,4), (10,1,2,4)), (20, (20,2,2,2), (20,1,2,3)]])]

最后再回味一下 这个 最关键的是要产生一个key,作为分组的条件,(要么就重新产生,要么就拿现有的值)

7.sortBy(<keyfunc>,ascending=True, numPartitions=None)

将RDD按照参数选出的指定数据集的键进行排序

pyspark.RDD.sortBy

代码语言:javascript复制
# the example of sortBy
sort_by_ascending_rdd = flat_rdd_test.sortBy(keyfunc=lambda x:x[3])
sort_by_descending_rdd = flat_rdd_test.sortBy(keyfunc=lambda x:x[3], ascending=False)

输出为:

代码语言:javascript复制
[(20,2,2,2), (10,1,2,3), (20,1,2,3), (10,1,2,4), (10,1,2,4)]
[(10,1,2,4), (10,1,2,4), (10,1,2,3), (20,1,2,3), (20,2,2,2)]

8.repartition( )

重新分区,之前的博客的【并行化】 一节已经描述过

9.coalesce( )

重新分区,之前的博客的【并行化】一节已经描述过:

10.cache( )

缓存,之前博文RDD【持久化】一节已经描述过;

11.persist( )

持久化,之前博文RDD【持久化】一节已经描述过

至此,Pyspark基本的转换操作【Transformation】就介绍完了。

0 人点赞