Pyspark学习笔记(五)RDD操作(四)_RDD连接/集合操作
文章目录
- Pyspark学习笔记(五)RDD操作(四)_RDD连接/集合操作
- 1.join-连接
- 1.1. innerjoin-内连接
- 1.2. leftOuterJoin-左连接
- 1.3. rightOuterJoin-右连接
- 1.4. fullOuterJoin-全连接
- 1.5 cogroup
- 1.6 cartesian
- 2.Union-集合操作
- 2.1 union
- 2.2 intersection
- 2.3 subtract
- 2.4 subtractByKey
# 前言 本篇博客讲的是RDD的连接/集合操作
1.join-连接
对应于SQL中常见的JOIN操作 菜鸟教程网关于SQL连接总结性资料 Pyspark中的连接函数要求定义键,因为连接的过程是基于共同的字段(键)来组合两个RDD中的记录,因此需要操作键值对RDD
代码语言:javascript复制rdd_1 = sc.parallelize([('USA', (1,2,3)), ('CHINA', (4,5,6)), ('RUSSIA', (7,8,9))])
rdd_2 = sc.parallelize([('UK', (3,3,3)), ('FRANCE', (7,7,7)), ('USA', (9,9,9))])
1.1. innerjoin-内连接
join(other, numPartitions)
官方文档:pyspark.RDD.join
内连接通常就被简称为连接,或者说平时说的连接其实指的是内连接。
rdd_join_test = rdd_1.join(rdd_2)
print(rdd_join_test.collect())
#output
[('USA', ((1,2,3), (9,9,9)))]
1.2. leftOuterJoin-左连接
leftOuterJoin(other, numPartitions)
官方文档:pyspark.RDD.leftOuterJoin
以“左侧”的RDD的key为基准,join上“右侧”的RDD的value, 如果在右侧RDD中找不到对应的key, 则返回 none;
rdd_leftOuterJoin_test = rdd_1.leftOuterJoin(rdd_2)
print(rdd_leftOuterJoin_test.collect())
#output
[('USA', ((1,2,3), (9,9,9))), ('CHINA', ((4,5,6), None)), ('RUSSIA', ((7,8,9), None))]
1.3. rightOuterJoin-右连接
rightOuterJoin(other, numPartitions)
官方文档:pyspark.RDD.rightOuterJoin
以“右侧”的RDD的key为基准,join上“左侧”的RDD的value, 如果在左侧RDD中找不到对应的key, 则返回 none;
rdd_rightOuterJoin_test = rdd_1.rightOuterJoin(rdd_2)
print(rdd_rightOuterJoin_test.collect())
#output
[('USA', ((1,2,3), (9,9,9))), ('UK', (None, (3,3,3))), ('FRANCE', (None, (7,7,7))]
1.4. fullOuterJoin-全连接
fullOuterJoin(other, numPartitions)
官方文档:pyspark.RDD.fullOuterJoin
两个RDD中各自包含的key为基准,能找到共同的Key,则返回两个RDD的值,找不到就各自返回各自的值,并以none****填充缺失的值
rdd_fullOuterJoin_test = rdd_1.fullOuterJoin(rdd_2)
print(rdd_fullOuterJoin_test.collect())
#output
[('USA', ((1,2,3), (9,9,9))), ('CHINA', ((4,5,6), None)), ('RUSSIA', ((7,8,9), None)), ('UK', (None, (3,3,3))), ('FRANCE', (None, (7,7,7))]
1.5 cogroup
cogroup(other, numPartitions)
官方文档:pyspark.RDD.cogroup
实现过程和全连接其实差不多,就是数据的表现形式有点区别
生成的并不是一个新的键值对RDD,而是一个可迭代的对象
rdd_cogroup_test = rdd_1.cogroup(rdd_2)
print(rdd_cogroup_test.collect())
#会发现直接输出的话,输出的是可迭代对象的地址
[('USA', (<pyspark.resultiterable.ResultIterable at xxxxxxxxx>, <pyspark.resultiterable.ResultIterable at xxxxxxxxx>)), ..., ]
#因为该函数输出的格式就是: RDD[Tuple[K, Tuple[ResultIterable[V], ResultIterable[U]]]]
想要看明文的结果的话,可以如下
代码语言:javascript复制print((k, tuple(map(list, v))) for k, v in list(rdd_cogroup_test.collect()))
[('USA', ((1,2,3), (9,9,9))), ('CHINA', ([(4,5,6)], [])), ('RUSSIA', ([(7,8,9)], [])), ('UK', ([], [(3,3,3)])), ('FRANCE', ([], [(7,7,7)])]
1.6 cartesian
cartesian(other)
官方文档:pyspark.RDD.cartesian
这个就是笛卡尔积,也被称为交叉连接,它会根据两个RDD的所有条目来进行所有可能的组合。
要注意这个操作可能会产生大量的数据,一般还是不要轻易使用。
2.Union-集合操作
2.1 union
union(other)
官方文档:pyspark.RDD.union
转化操作union()把一个RDD追加到另一个RDD后面,两个RDD的结构并不一定要相同(即不一定列数要相同),并且union并不会过滤重复的条目。
2.2 intersection
intersection(other)
官方文档:pyspark.RDD.intersection
返回两个RDD中共有的元素,要注意,和 join 其实并不一样,join操作只是要求 key一样,而intersection 并不要求有key,是要求两边的条目必须是一模一样,即每个字段(列)上的数据都要求能保持一致,即【完全一样】的两行条目,才能返回。
2.3 subtract
subtract(other, numPartitions)
官方文档:pyspark.RDD.subtract
这个名字就说明是在做“减法”,即第一个RDD中的元素 减去 第二个RDD中的元素,返回第一个RDD中有,但第二个RDD中没有的元素。
2.4 subtractByKey
subtractByKey(other, numPartitions)
官方文档:pyspark.RDD.subtractByKey
该操作和上面的subtract类似,只不过这里是以Key作为参照了。