Pyspark学习笔记专栏系列文章目录
Pyspark学习笔记(一)—序言及目录
Pyspark学习笔记(二)— spark-submit命令
Pyspark学习笔记(三)— SparkContext 与 SparkSession
Pyspark学习笔记(四)弹性分布式数据集 RDD(上)
Pyspark学习笔记(四)弹性分布式数据集 RDD(下)
Pyspark学习笔记(五)RDD操作(一)_RDD转换操作
Pyspark学习笔记(五)RDD操作(二)_RDD行动操作
Pyspark学习笔记(五)RDD操作(三)_键值对RDD转换操作
文章目录
- Pyspark学习笔记专栏系列文章目录
- Pyspark学习笔记(五)RDD操作(三)_键值对RDD转换操作
- 主要参考链接:
- 一、PySpark RDD 行动操作简介
- 二.常见的转换操作表 & 使用例子
-
- 0.初始的示例rdd,
- 1.
keys()
- 2.
values()
- 3.
keyBy()
- 4.
mapValues()
- 5.
flatMapValues()
- 6.`sortByKey(ascending=True, numPartitions=None, keyfunc=
前言 提示:本篇博客讲的是键值对RDD的转换操作,即 PariRDD Transformations
主要参考链接:
1.Apache spark python api
2.Spark Pair-RDD Actions with examples
一、PySpark RDD 行动操作简介
键值对RDD,也就是PariRDD, 它的记录由键和值组成。
键(Key):可以是整型(INT)或者字符串(STRING)对象,也可以是元组这种复杂的对象。
值(Value):可以是标量,也可以是列表(List),元组(Tuple),字典(Dictionary)或者集合(Set)这些数据结构
首先要明确的是键值对RDD也是RDD,所以之前讲过的RDD的转换和行动操作,肯定也适用于键值对RDD;
但是键值对RDD由于其组织形式的特殊性,也有其自己专属的一些转换操作。
下面将介绍一些常用的键值对转换操作(注意是转换操作,所以是会返回新的RDD)
二.常见的转换操作表 & 使用例子
0.初始的示例rdd,
我们这里以第七次全国人口普查人口性别构成中的部分数据作为示例
代码语言:javascript复制[ ('Beijing', [51.14, 48.86, 104.65]), ('Shanghai',[51.77, 48.23, 107.33]), ('Guangdong', [53.07, 46.93, 113.08]), ('Jiangsu', [50.78, 49.22, 103.15])]
该RDD就是键值对RDD,每个元素是一个键值对,键(key)为省份名,值(Value)为一个list
1.keys()
该函数返回键值对RDD中,所有键(key)组成的RDD
pyspark.RDD.keys
代码语言:javascript复制# the example of keys
print("rdd_test_keysn", rdd_test.keys().collect())
# out
['Beijing', 'Shanghai', 'Guangdong', 'Jiangsu']
2.values()
该函数返回键值对RDD中,所有值(values)组成的RDD
pyspark.RDD.values
代码语言:javascript复制# the example of values
print("rdd_test_valuesn", rdd_test.values().collect())
# out
[ [51.14, 48.86, 104.65], [51.77, 48.23, 107.33], [53.07, 46.93, 113.08], [50.78, 49.22, 103.15] ]
3.keyBy(<func>)
该操作返回一个新的键值对RDD,
该RDD的键(key)是使用函数提取出的结果作为新的键,
该RDD的值(value)是原始pair-RDD的值作为值。
pyspark.RDD.keyBy
代码语言:javascript复制# the example of keyBy
print("rdd_test_keyByn", rdd_test.keyBy(lambda x: x[1][2]).collect())
[ (104.65, ('Beijing', [51.14, 48.86, 104.65])), (107.33, ('Shanghai',[51.77, 48.23, 107.33])), (113.08, ('Guangdong', [53.07, 46.93, 113.08])), (103.15, ('Jiangsu', [50.78, 49.22, 103.15])) ]
4.mapValues(<func>)
对原始键值对RDD的每个元素中的值(value),应用函数,作为新键值对RDD的值,而键(key)着保持原始的不变
pyspark.RDD.mapValues
代码语言:javascript复制# the example of mapValues
print("rdd_test_mapValuesn",rdd_test_mapvalues.mapValues(lambda x: int(sum(x))))
# out
[('Beijing', 204), ('Shanghai', 207), ('Guangdong', 213), ('Jiangsu', 203)]
5.flatMapValues(<func>)
对原始键值对RDD的每个元素中的值(value),应用函数,作为新键值对RDD的值,并且将数据“拍平”,而键(key)着保持原始的不变
所谓“拍平”和之前介绍的普通RDD的mapValues(<func>)
是一样的,就是去掉一层嵌套。
pyspark.RDD.flatMapValues
这里将mapValues(<func>)
和flatMapValues(<func>)
一起作用在一个数据上,以显示二者的区别。
mapvalue_rdd = rdd_test.mapValues(lambda x: [string(int(q)/100.0) '%' for q in x])
flatmapvalue_rdd = rdd_test.mapValues(lambda x: [string(int(q)/100.0) '%' for q in x])
print("rdd_test_mapvalues_2n",mapvalue_rdd.collect())
print("rdd_test_flatmapvaluen",flatmapvalue_rdd.collect())
#out
rdd_test_mapvalues_2
[('Beijing', ['0.51%', '0.48%', '1.04%']), ('Shanghai',['0.51%', '0.48%', '1.07%']), ('Guangdong', ['0.53%', '0.46%', '1.13%']), ('Jiangsu', ['0.5%', '0.49%', '1.03%'])]
rdd_test_flatmapvalue
[('Beijing', '0.51%'), ('Beijing', '0.48%'), ('Beijing', '1.04%'), ('Shanghai', '0.51%'), ('Shanghai', '0.48%'), ('Shanghai', '1.07%'), ('Guangdong', '0.53%'), ('Guangdong', '0.46%'), ('Guangdong', '1.13%'), ('Jiangsu', '0.5%'), ('Jiangsu', '0.49%'), ('Jiangsu','1.03%')]
6.sortByKey(ascending=True, numPartitions=None, keyfunc=<function RDD.<lambda>>)
返回一个新键值对RDD,该RDD根据键(key)将原始Pari-RDD进行排序,默认是升序,可以指定新RDD的分区数,以及使用匿名函数指定排序规则
(可能导致重新分区或数据混洗)
pyspark.RDD.sortByKey
代码语言:javascript复制print("sortByKey_testn",RDD.sortByKey(True, 1, keyfunc = lambda k : k.lower()).collect() )
#out
[ ('Beijing', [51.14, 48.86, 104.65]), ('Guangdong', [53.07, 46.93, 113.08]), ('Jiangsu', [50.78, 49.22, 103.15]), ('Shanghai',[51.77, 48.23, 107.33])]
7.groupByKey(numPartitions=None, partitionFunc=<hash_func>)
该操作将键值对RDD按照各个键(key)对值(value)进行分组,把同组的值整合成一个序列。
参数numPartitions指定创建多少个分区,分区使用partitionFunc提供的哈希函数创建;
通常情况下我们一般令numPartitions=None,也就是不填任何参数,会直接使用系统默认的分区数:spark.default.parallelism
pyspark.RDD.groupByKey
注意,如果你点击上面的链接查看官方文档,会发现它也提醒:
If you are grouping in order to perform an aggregation (such as a sum or average) over each key,
using reduceByKey or aggregateByKey will provide much better performance.
就是说如果对数据分组并不只是为了分组,还顺带要做聚合操作(比如sum或者average),那么更推荐使用reduceByKey或者aggregateByKey,
会有更好的性能表现。
代码语言:javascript复制print("rdd_test_groupByKeyn",flatmapvalue_rdd.groupByKey().collect())
#会发现返回的是一个resultiterable对象,这个现象在我们之前讨论普通RDD的`groupBy(<func>)`的时候也出现过
#再使用一个mapValues操作即可显示出具体的数据
print("rdd_test_groupByKey_2n",flatmapvalue_rdd.groupByKey().mapValues(list).collect())
[('Beijing', ['0.51%', '0.48%', '1.04%']), ('Shanghai',['0.51%', '0.48%', '1.07%']), ('Guangdong', ['0.53%', '0.46%', '1.13%']), ('Jiangsu', ['0.5%', '0.49%', '1.03%'])]
8.reduceByKey(<func>, numPartitions=None, partitionFunc=<hash_func>)
与普通RDD的reduce
操作类似,但是普通RDD的reduce
是行动操作,键值对RDD的reduceByKey
是转换操作!
使用指定的满足交换律/结合律的函数来合并键对应的值(value),而对键(key)不执行操作,numPartitions=None和partitionFunc的用法和groupByKey()
时一致;
numPartitions的值是要执行归约任务数量,同时还会影响其他行动操作所产生文件的数量;
而处一般可以指定接收两个输入的 匿名函数<lambda x, y: …>。
pyspark.RDD.reduceByKey
使用一个新的原始数据rdd_test_2来做示范
代码语言:javascript复制rdd_test_2 = spark.sparkContext.parallelize([ ('A', [1, 2, 3]), ('B',[4, 5, 6]), ('A', [10, 20, 30]), ('B',[40, 50, 60]) ], 1)
代码语言:javascript复制#注意,因为 reduceByKey 是转换操作,所以 想要看结果需要使用行动操作 collect 进行输出
#而普通的 reduce 自己就是行动操作
print("rdd_test_reduceByKeyn",rdd_test_2.reduceByKey(lambda x, y: x y).collect())
#out
[ ('A', [1, 2, 3, 10, 20, 30]), ('B',[4, 5, 6, 40, 50, 60]) ]
可以看出,reduceByKey 其实就相当于 针对每个 key 来进行 reduce 操作
9.foldByKey(zeroValue, <func>, numPartitions=None, partitionFunc=<hash_func>)
该操作与之前讲过的普通RDD的fold
操作类似,但是普通RDD的fold
是行动操作,而foldByKey
是转换操作!
pyspark.RDD.foldByKey
代码语言:javascript复制print("rdd_test_foldByKeyn",rdd_test_2.foldByKey([100,], lambda x, y: x y).collect())
[ ('A', [100, 1, 2, 3, 10, 20, 30]), ('B',[100, 4, 5, 6, 40, 50, 60]) ]
rdd_test_3 = spark.sparkContext.parallelize([ ('A', [1, 2, 3]), ('B',[4, 5, 6]), ('A', [10, 20, 30]), ('B',[40, 50, 60]) ], 2)
print("rdd_test_foldByKeyn",rdd_test_3.foldByKey([100,], lambda x, y: x y).collect())
[ ('A', [100, 10, 20, 30, 100, 1, 2, 3]), ('B',[100, 40, 50, 60, 100, 4, 5, 6]) ]
此处也是用了不同分区的同样的数据来做测试,在我们讲普通RDD的 fold 操作时说过,zeroValue出现的数目应该是 (partition_num 1) ,参考Pyspark学习笔记(五)RDD操作(二)_RDD行动操作 中的11.fold
但是对于 foldByKey 而言,观察发现其 zeroValue出现的数目 就是 partition_num,
相当于只是在每个partition上多一个zeroValue,最后做不同partition聚合的时候没有用到zeroValue,这一点一定要注意区分
10.aggregateByKey(zeroValue, seqFunc, combFunc, numPartitions=None, partitionFunc=<hash_func>)
该操作与之前讲过的普通RDD的aggregate
操作类似,但是普通RDD的aggregate
是行动操作,而aggregateByKey
是转换操作!
pyspark.RDD.aggregateByKey
该操作也与之前讲的普通RDD的 aggregate 操作类似,只不过是针对每个不同的Key做aggregate;再此就不再举例了。
至此,Pair RDD 转换操作的常用方法都基本介绍了