提示:写完文章后,目录可以自动生成,如何生成可参考右边的帮助文档
文章目录
- 前言
- 一、PySpark RDD 转换操作
- 1.窄操作
- 2.宽操作
- 3.常见的转换操作表
- 二、pyspark 行动操作
- 三、键值对RDD的操作
前言
提示:本篇博客讲的是RDD的各种操作,包括转换操作、行动操作、键值对操作
一、PySpark RDD 转换操作
PySpark RDD 转换操作(Transformation) 是惰性求值,用于将一个 RDD 转换/更新为另一个。由于RDD本质上是不可变的,转换操作总是创建一个或多个新的RDD而不更新现有的RDD,因此,一系列RDD转换创建了一个RDD谱系(依赖图)。
1.窄操作
这些计算数据存在于单个分区上,这意味着分区之间不会有任何数据移动。
常见的执行窄操作的一般有:map()
,mapPartition()
,flatMap()
,filter()
,union()
2.宽操作
这些计算数据存在于许多分区上,这意味着分区之间将有数据移动以执行更广泛的转换。由于这些对数据进行混洗,因此它们也称为混洗转换,所以与窄操作相比,是更加昂贵的操作。
常见的执行宽操作的一些方法是:groupBy()
, groupByKey()
, join()
, repartition()
等
3.常见的转换操作表
转换操作 | 描述 |
---|---|
map(<func>) | 是所有转换操作中最基本的。它应用一个具名函数或者匿名函数,对数据集内的所有元素执行同一操作。https://sparkbyexamples.com/pyspark/pyspark-map-transformation/ |
flatMap(<func>) | 与map的操作类似,但会进一步拍平数据,表示会去掉一层嵌套.https://sparkbyexamples.com/pyspark/pyspark-flatmap-transformation/ |
mapPartition(<func>) | 类似于map,但在每个分区上执行转换函数,mapPartitions() 的输出返回与输入 RDD 相同的行数,这比map函数提供更好的性能; |
filter(<func>) | 一般是依据括号中的一个布尔型表达式,来筛选出满足为真的元素 |
union( ) | 类似于sql中的union函数,就是将两个RDD执行合并操作;但是pyspark中的union操作似乎不会自动去重,如果需要去重就使用下面的distinct |
distinct( ) | 去除RDD中的重复值;带有参数numPartitions,默认值为None,可以对去重后的数据重新分区 |
groupBy(<func>) | 对元素进行分组。可以是具名函数,也可以是匿名,用来确定对所有元素进行分组的键,或者指定用于对元素进行求值以确定其分组方式的表达式.https://sparkbyexamples.com/pyspark/pyspark-groupby-explained-with-example/ |
sortBy(<keyfunc>,ascending=True) | 将RDD按照参数选出的指定数据集的键进行排序.使用groupBy 和 sortBy的示例:#求余数,并按余数,对原数据进行聚合分组#然后按照升序对各个组内的数据,进行排序 rdd = sc.parallelize([1, 1, 2, 3, 5, 8])result = rdd.groupBy(lambda x: x % 2).collect()sorted([(x, sorted(y)) for (x, y) in result])[(0, [2, 8]), (1, [1, 1, 3, 5])] |
repartition( ) | 重新分区,之前的博客的【并行化】 一节已经描述过 |
coalesce( ) | 重新分区,之前的博客的【并行化】一节已经描述过: |
cache( ) | 缓存,之前博文RDD【持久化】一节已经描述过; |
persist( ) | 持久化,之前博文RDD【持久化】一节已经描述过 |
二、pyspark 行动操作
PySpark RDD行动操作(Actions) 是将值返回给驱动程序的 PySpark 操作.行动操作会触发之前的转换操作进行执行。常见的一些行动操作。
行动操作 | 描述 |
---|---|
count() | 该操作不接受参数,返回一个long类型值,代表rdd的元素个数 |
collect() | 返回一个由RDD中所有元素组成的列表(没有限制输出数量,所以要注意RDD的大小) |
take(n) | 返回RDD的前n个元素(无特定顺序)(仅当预期结果数组较小时才应使用此方法,因为所有数据都已加载到驱动程序的内存中) |
takeOrdered(n, key) | 从一个按照升序排列的RDD,或者按照key中提供的方法升序排列的RDD, 返回前n个元素(仅当预期结果数组较小时才应使用此方法,因为所有数据都已加载到驱动程序的内存中) https://spark.apache.org/docs/2.2.1/api/python/pyspark.html#pyspark.RDD |
takeSample(withReplacement, num, seed=None) | 返回此 RDD 的固定大小的采样子集 |
top(n) | 返回RDD的前n个元素(按照降序输出, 排序方式由元素类型决定) |
first() | 返回RDD的第一个元素,也是不考虑元素顺序 |
reduce(<func>) | 使用指定的满足交换律/结合律的运算符来归约RDD中的所有元素.指定接收两个输入的 匿名函数(lambda x, y: …)#示例,求和操作Numbers=sc.parallelize([1,2,3,4,])Numbers.reduce(lambda x, y: x y)#返回10 |
fold(zeroV, <func>) | 使用给定的func和zeroV把RDD中的每个分区的元素集合,然后把每个分区聚合结果再聚合;和reduce类似,但是不满足交换律需特别注意的是,zeroV要在计算的开头和结尾都加上:Numbers=sc.parallelize([1,2,3,4,])Numbers.fold(10, lambda x, y: x y)#运算过程为 10 1 2 3 4 10 |
foreach(<func>) | 把具名或者匿名函数,应用到RDD的所有元素上.和map类似,但是由于foreach是行动操作,所以可以执行一些输出类的函数,比如print |
countByValue() | 将此 RDD 中每个唯一值的计数作为 (value, count) 对的字典返回.sorted(sc.parallelize([1, 2, 1, 2, 2], 2).countByValue().items())[(1, 2), (2, 3)] |
aggregate(zeroValue, seqOp, combOp) | 使用给定的函数和初始值,对每个分区的聚合进行聚合,然后对聚合的结果进行聚合seqOp 能够返回与当前RDD不同的类型,比如说返回U,RDD本是T,所以会再用一个combine函数,将两种不同的类型U和T聚合起来 >>> seqOp = (lambda x, y: (x[0] y, x[1] 1)) >>> combOp = (lambda x, y: (x[0] y[0], x[1] y[1])) >>> sc.parallelize([1, 2, 3, 4]).aggregate((0, 0), seqOp, combOp)(10, 4) >>> sc.parallelize([]).aggregate((0, 0), seqOp, combOp)(0,0)#这篇博文的示例较为详细https://blog.csdn.net/Li_peipei/article/details/84447234 |
三、键值对RDD的操作
键值对RDD,就是PairRDD,元素的形式是(key,value),键值对RDD是会被经常用到的一类RDD,它的一些操作函数大致可以分为四类: ·字典函数 ·函数式转化操作 ·分组操作、聚合操作、排序操作 ·连接操作
字典函数 | 描述 |
---|---|
keys() | 返回所有键组成的RDD (这是转化操作) |
values() | 返回所有值组成的RDD (这是转化操作) |
keyBy(<func>) | 返回的是一个 PairRDD, 该RDD每个元素的 键,是由生成的;而值是原始RDD每个元素#例子rdd=sc.paralleize([1,2,3])New_rdd=rdd.keyBy(lambda x: x*2 1)# New_rdd 的结果为 [ (3,1), (5,2), (7,3) ] |
函数式转化操作 | 描述 |
---|---|
mapValues() | 和之前介绍的map函数类似,只不过这里是针对 (键,值) 对的值做处理,而键不变 |
flatMapValues() | 和之前介绍的flatmap函数类似,只不过这里是针对 (键,值) 对的值做处理,而键不变 |
分组聚合排序操作 | 描述 |
---|---|
groupByKey() | 按照各个键,对(key,value) pair进行分组, 并把同组的值整合成一个序列这是转化操作 |
reduceByKey(<func>) | 按照各个键,对(key,value) pair进行聚合操作,对同一key对应的value,使用聚合计算这是转化操作, 而reduce是行动操作 |
foldByKey(zerovalue, <func>) | 与之前提及的fold类似,这里也是 根据(key,value) pair不同键进行操作这是转化操作,而fold是行动操作 |
sortByKey(assscending=True) | 把键值对RDD根据键进行排序,默认是升序这是转化操作 |
连接操作 | 描述 |
---|---|
连接操作对应SQL编程中常见的JOIN操作,在SQL中一般使用 on 来确定condition,在这里,因为是针对PairRDD的操作,所以就是根据 键 来确定condition | |
join(<otherRDD>) | 执行的是内连接操作 |
leftOuterJoin(<ohterRDD>) | 返回左RDD中包含的所有元素或记录。如果左RDD中的键在右RDD中存在,那么右RDD中匹配的记录会和左RDD记录一起返回。 |
rightOuterJoin(<otherRDD>) | 返回右RDD中包含的所有元素或记录。如果右RDD中的键在左RDD中存在,那么左RDD中匹配的记录会和右RDD记录一起返回。 |
fullOuterJoin(<otherRDD>) | 无论是否有匹配的键,都会返回两个RDD中的所有元素。左数据或者右数据中没有匹配的元素都用None(空)来表示。 |
cartesian(<otherRDD>) | 笛卡尔积,也被成为交叉链接。会根据两个RDD的记录生成所有可能的组合。 |
集合操作 | 描述 |
---|---|
union | 将一个RDD追加到RDD后面,组合成一个输出RDD.两个RDD不一定要有相同的结构,比如第一个RDD有3个字段,第二个RDD的字段不一定也要等于3. 且该操作不会自动去重。 |
intersection(<otherRDD>) | 返回两个RDD中的共有元素,即两个集合相交的部分.返回的元素或者记录必须在两个集合中是一模一样的,即对于键值对RDD来说,键和值都要一样才行。 |
subtract(<otherRDD>) | 返回第一个RDD中,所有没有出现在第二个RDD中的值(即相当于减掉了第二个RDD) |
subtractByKey(<otherRDD>) | 和subtract类似的操作,只不过 这里是以key为参考 |