Pyspark学习笔记专栏系列文章目录
Pyspark学习笔记(一)—序言及目录
Pyspark学习笔记(二)— spark-submit命令
Pyspark学习笔记(三)— SparkContext 与 SparkSession
Pyspark学习笔记(四)弹性分布式数据集 RDD(上)
Pyspark学习笔记(四)弹性分布式数据集 RDD(下)
Pyspark学习笔记(五)RDD操作(一)_RDD转换操作
文章目录
- Pyspark学习笔记专栏系列文章目录
- Pyspark学习笔记(五)RDD操作(一)_RDD转换操作
- 前言
- 主要参考链接:
- 一、PySpark RDD 转换操作简介
-
- 1.窄操作
- 2.宽操作
- 二.常见的转换操作表 & 使用例子
-
- 0.创建一个示例rdd, 后续的例子基本以此例展开
- 1.
map()
- 2.
flatMap()
- 3.
filter()
- 4.
union()
- 5.
distinct(numPartitions=None)
- 6.
groupBy()
- 7.
sortBy(,ascending=True, numPartitions=None)
- 8.
repartition( )
- 9.
coalesce( )
- 10.
cache( )
- 11.`persist( )
前言
提示:本篇博客讲的是RDD的操作中的转换操作,即 RDD Transformations
主要参考链接:
1.PySpark RDD Transformations with examples
2.Apache spark python api
一、PySpark RDD 转换操作简介
PySpark RDD 转换操作(Transformation) 是惰性求值,用于将一个 RDD 转换/更新为另一个。
由于RDD本质上是不可变的,转换操作总是创建一个或多个新的RDD而不更新现有的RDD,因此,一系列RDD转换创建了一个RDD谱系。
1.窄操作
这些计算数据存在于单个分区上,这意味着分区之间不会有任何数据移动。
常见的执行窄操作的一般有:map()
,mapPartition()
,flatMap()
,filter()
,union()
2.宽操作
这些计算数据存在于许多分区上,这意味着分区之间将有数据移动以执行更广泛的转换。由于这些对数据进行混洗,因此它们也称为混洗转换,所以与窄操作相比,是更加昂贵的操作。
常见的执行宽操作的一些方法是:groupBy()
, groupByKey()
, join()
, repartition()
等
二.常见的转换操作表 & 使用例子
0.创建一个示例rdd, 后续的例子基本以此例展开
代码语言:javascript复制data_list = [ ((10,1,2,3), (10,1,2,4), (10,1,2,4), (20,2,2,2), (20,1,2,3)) ]
# 注意该列表中包含有两层tuple嵌套,相当于列表中的元素是一个 (5,4) 二维的tuple
rdd_test = spark.sparkContext.parallelize(data_list)
print("rdd_test:n", rdd_test.collect())
则输出为:
代码语言:javascript复制[ ((10,1,2,3), (10,1,2,4), (10,1,2,4), (20,2,2,2), (20,1,2,3)) ]
1.map(<func>)
是所有转换操作中最基本的。
它应用一个具名函数或者匿名函数,对数据集内的所有元素执行同一操作。
pyspark.RDD.map
代码语言:javascript复制# the example of map
rdd_map_test = rdd_test.map(lambda x: (x[0], x[3]))
print("rdd_map_testn", rdd_map_test.collect())
相当于只从第一层 tuple 中取出了第0和第3个 子tuple, 输出为:
代码语言:javascript复制[((10,1,2,3), (20,2,2,2))]
2.flatMap(<func>)
与map的操作类似,但会进一步拍平数据,表示会去掉一层嵌套.
pyspark.RDD.flatmap
代码语言:javascript复制# the example of flatMap
flat_rdd_test = rdd_test.flatMap(lambda x: x)
print("flat_rdd_testn", flat_rdd_test)
会发现比原始数据少了一层tuple的嵌套,输出为:
代码语言:javascript复制[(10,1,2,3), (10,1,2,4), (10,1,2,4), (20,2,2,2), (20,1,2,3)]
3.filter(<func>)
一般是依据括号中的一个布尔型表达式,来筛选出满足为真的元素
pyspark.RDD.filter
代码语言:javascript复制# the example of filter
key1_rdd = flat_rdd_test.filter(lambda x: x[0]==10)
key2_rdd = flat_rdd_test.filter(lambda x: x[0]==20)
print("filter_1n",key1_rdd.collect())
print("filter_2n",key2_rdd.collect())
输出为:
代码语言:javascript复制[(10,1,2,3), (10,1,2,4), (10,1,2,4)]
[(20,2,2,2), (20,1,2,3)]
4.union(<rdds>)
类似于sql中的union函数,就是将两个RDD执行合并操作;
pyspark.RDD.union
但是pyspark中的union操作似乎不会自动去重,如果需要去重就使用后面讲的distinct
代码语言:javascript复制# the example of union
flat_rdd_test_new = key1_rdd.union(key2_rdd)
print("flat_rdd_test_newn",flat_rdd_test_new.collect())
刚刚被拆开的两部分又合起来了, 输出为
代码语言:javascript复制[(10,1,2,3), (10,1,2,4), (10,1,2,4), (20,2,2,2), (20,1,2,3)]
5.distinct(numPartitions=None)
去除RDD中的重复值;带有参数numPartitions,默认值为None,可以对去重后的数据重新分区;
pyspark.RDD.distinct
代码语言:javascript复制# the example of distinct
distinct_key1_rdd = key1_rdd.distinct()
print("distinctn",distinct.collect())
原来的 Key1_rdd 后两个元素是重复出现的,使用distinct之后就会消掉一个:
代码语言:javascript复制[(10,1,2,3), (10,1,2,4)]
6.groupBy(<func>)
对元素进行分组,可以是具名函数,也可以是匿名,用来确定对所有元素进行分组的键,或者指定用于对元素进行求值以确定其分组方式的表达式.
pyspark.RDD.groupBy
代码语言:javascript复制# the example of groupBy
# 我们可以先定义一个具名函数
def return_group_key(x):
seq = x[1:]
if sum(seq) > 6:
return "big"
else
return "small"
# 下面这两种写法结果都是一样的
groupby_rdd_1 = flat_rdd_test.groupBy(lambda x: return_group_key(x))
groupby_rdd_1 = flat_rdd_test.groupBy(lambda x: "big" if sum(x[1:])>6 else "small")
print("groupby_1n", groupby_rdd_1.collect())
直接输出的话,可能输出的是一个寄存器地址:
代码语言:javascript复制[('small', <pyspark.resultiterable.ResultIterable object at 0x7f004b4ef850>), ('big', <pyspark.resultiterable.ResultIterable object at 0x7f004ac053d0>)]
这时候我们只需要加一个 mapValues
操作即可,即将后面寄存器地址上的值用列表显示出来
print("groupby_1_明文n", groupby_rdd_1.mapValues(list).collect())
明文输出为:
代码语言:javascript复制[('small', [(10,1,2,3), (20,2,2,2), (20,1,2,3)]), ('big', [(10,1,2,4), (10,1,2,4)])]
下面再感受一下,这个groupBy() 中的是确定分组的【键】,这个意思是什么
代码语言:javascript复制groupby_rdd_2 = flat_rdd_test.groupBy(lambda x: x[0]==10)
print("groupby_2_明文n", groupby_rdd_2.mapValues(list).collect())
这时候就是以匿名函数返回的布尔值作为分组的 key【键】了
代码语言:javascript复制[('True', [(10,1,2,3), [(10,1,2,4), (10,1,2,4)), ('False', (20,2,2,2), (20,1,2,3)]])]
代码语言:javascript复制groupby_rdd_3 = flat_rdd_test.groupBy(lambda x: x[0])
print("groupby_3_明文n", groupby_rdd_3.mapValues(list).collect())
这时候就是以匿名函数返回的 x0的具体值 作为分组的 key【键】了
代码语言:javascript复制[(10, [(10,1,2,3), [(10,1,2,4), (10,1,2,4)), (20, (20,2,2,2), (20,1,2,3)]])]
最后再回味一下 这个 最关键的是要产生一个key,作为分组的条件,(要么就重新产生,要么就拿现有的值)
7.sortBy(<keyfunc>,ascending=True, numPartitions=None)
将RDD按照参数选出的指定数据集的键进行排序
pyspark.RDD.sortBy
代码语言:javascript复制# the example of sortBy
sort_by_ascending_rdd = flat_rdd_test.sortBy(keyfunc=lambda x:x[3])
sort_by_descending_rdd = flat_rdd_test.sortBy(keyfunc=lambda x:x[3], ascending=False)
输出为:
代码语言:javascript复制[(20,2,2,2), (10,1,2,3), (20,1,2,3), (10,1,2,4), (10,1,2,4)]
[(10,1,2,4), (10,1,2,4), (10,1,2,3), (20,1,2,3), (20,2,2,2)]
8.repartition( )
重新分区,之前的博客的【并行化】 一节已经描述过
9.coalesce( )
重新分区,之前的博客的【并行化】一节已经描述过:
10.cache( )
缓存,之前博文RDD【持久化】一节已经描述过;
11.persist( )
持久化,之前博文RDD【持久化】一节已经描述过
至此,Pyspark基本的转换操作【Transformation】就介绍完了。