2021年大数据Spark(十四):Spark Core的RDD操作

2021-10-09 17:18:35 浏览数 (1)


RDD的操作

有一定开发经验的读者应该都使用过多线程,利用多核 CPU 的并行能力来加快运算速率。在开发并行程序时,可以利用类似 Fork/Join 的框架将一个大的任务切分成细小的任务,每个小任务模块之间是相互独立的,可以并行执行,然后将所有小任务的结果汇总起来,得到最终的结果。

一个非常好的例子便是归并排序。对整个序列进行排序时,可以将序列切分成多个子序列进行排序,然后将排好序的子序列归并起来得到最终的结果。

对 Hadoop 有所了解的读者都知道 map、reduce 操作。对于大量的数据,我们可以通过 map 操作让不同的集群节点并行计算,之后通过 reduce 操作将结果整合起来得到最终输出。

​​​​​​​函数(算子)分类

对于 Spark 处理的大量数据而言,会将数据切分后放入RDD作为Spark 的基本数据结构,开发者可以在 RDD 上进行丰富的操作,之后 Spark 会根据操作调度集群资源进行计算。总结起来,RDD 的操作主要可以分为 Transformation 和 Action 两种

官方文档:http://spark.apache.org/docs/latest/rdd-programming-guide.html#rdd-operations

RDD中操作(函数、算子)分为两类:

 1)、Transformation转换操作:返回一个新的RDD

which create a new dataset from an existing one

所有Transformation函数都是Lazy,不会立即执行,需要Action函数触发

 2)、Action动作操作:返回值不是RDD(无返回值或返回其他的)

which return a value to the driver program after running a computation on the datase

所有Action函数立即执行(Eager),比如count、first、collect、take等

此外注意RDD中函数细节:

 第一点:RDD不实际存储真正要计算的数据,而是记录了数据的位置在哪里,数据的转换关系(调用了什么方法,传入什么函数);

 第二点:RDD中的所有转换都是惰性求值/延迟执行的,也就是说并不会直接计算。只有当发生一个要求返回结果给Driver的Action动作时,这些转换才会真正运行。之所以使用惰性求值/延迟执行,是因为这样可以在Action时对RDD操作形成DAG有向无环图进行Stage的划分和并行优化,这种设计让Spark更加有效率地运行

也就是在运行action之前,前面的计划都列出来了,就可以根据集群的具体情况,优化分区的分布,和网络的传输关系。让性能最优。

如果没有懒操作,那么一步步的执行,就没办法从整体做规划,做优化了。

​​​​​​​Transformation函数

在Spark中Transformation操作表示将一个RDD通过一系列操作变为另一个RDD的过程,这个操作可能是简单的加减操作,也可能是某个函数或某一系列函数。值得注意的是Transformation操作并不会触发真正的计算,只会建立RDD间的关系图。

如下图所示,RDD内部每个方框是一个分区。假设需要采样50%的数据,通过sample函数,从 V1、V2、U1、U2、U3、U4 采样出数据 V1、U1 和 U4,形成新的RDD

常用Transformation转换函数:

转换

含义

map(func)

返回一个新的RDD,该RDD由每一个输入元素经过func函数转换后组成

filter(func)

返回一个新的RDD,该RDD由经过func函数计算后返回值为true的输入元素组成

flatMap(func)

类似于map,但是每一个输入元素可以被映射为0或多个输出元素(所以func应该返回一个序列,而不是单一元素)

mapPartitions(func)

类似于map,但独立地在RDD的每一个分片上运行,因此在类型为T的RDD上运行时,func的函数类型必须是Iterator[T] => Iterator[U]

mapPartitionsWithIndex(func)

类似于mapPartitions,但func带有一个整数参数表示分片的索引值,因此在类型为T的RDD上运行时,func的函数类型必须是 (Int, Interator[T]) => Iterator[U]

sample(withReplacement, fraction, seed)

根据fraction指定的比例对数据进行采样,可以选择是否使用随机数进行替换,seed用于指定随机数生成器种子

union(otherDataset)

对源RDD和参数RDD求并集后返回一个新的RDD

intersection(otherDataset)

对源RDD和参数RDD求交集后返回一个新的RDD

distinct([numTasks]))

对源RDD进行去重后返回一个新的RDD

groupByKey([numTasks])

在一个(K,V)的RDD上调用,返回一个(K, Iterator[V])的RDD

reduceByKey(func, [numTasks])

在一个(K,V)的RDD上调用,返回一个(K,V)的RDD,使用指定的reduce函数,将相同key的值聚合到一起,与groupByKey类似,reduce任务的个数可以通过第二个可选的参数来设置

aggregateByKey(zeroValue)(seqOp, combOp, [numTasks])

sortByKey([ascending], [numTasks])

在一个(K,V)的RDD上调用,K必须实现Ordered接口,返回一个按照key进行排序的(K,V)的RDD

sortBy(func,[ascending], [numTasks])

与sortByKey类似,但是更灵活

join(otherDataset, [numTasks])

在类型为(K,V)和(K,W)的RDD上调用,返回一个相同key对应的所有元素对在一起的(K,(V,W))的RDD

cogroup(otherDataset, [numTasks])

在类型为(K,V)和(K,W)的RDD上调用,返回一个(K,(Iterable<V>,Iterable<W>))类型的RDD

cartesian(otherDataset)

笛卡尔积

pipe(command, [envVars])

对rdd进行管道操作

coalesce(numPartitions)

减少 RDD 的分区数到指定值。在过滤大量数据之后,可以执行此操作

repartition(numPartitions)

重新给 RDD 分区

​​​​​​​Action函数

不同于Transformation操作,Action操作代表一次计算的结束,不再产生新的 RDD,将结果返回到Driver程序或者输出到外部。所以Transformation操作只是建立计算关系,而Action 操作才是实际的执行者。每个Action操作都会调用SparkContext的runJob 方法向集群正式提交请求,所以每个Action操作对应一个Job。

常用Action执行函数:

动作

含义

reduce(func)

通过func函数聚集RDD中的所有元素,这个功能必须是可交换且可并联的

collect()

在驱动程序中,以数组的形式返回数据集的所有元素

count()

返回RDD的元素个数

first()

返回RDD的第一个元素(类似于take(1))

take(n)

返回一个由数据集的前n个元素组成的数组

takeSample(withReplacement,num, [seed])

返回一个数组,该数组由从数据集中随机采样的num个元素组成,可以选择是否用随机数替换不足的部分,seed用于指定随机数生成器种子

takeOrdered(n, [ordering])

返回自然顺序或者自定义顺序的前 n 个元素

saveAsTextFile(path)

将数据集的元素以textfile的形式保存到HDFS文件系统或者其他支持的文件系统,对于每个元素,Spark将会调用toString方法,将它装换为文件中的文本

saveAsSequenceFile(path)

将数据集中的元素以Hadoop sequencefile的格式保存到指定的目录下,可以使HDFS或者其他Hadoop支持的文件系统。

saveAsObjectFile(path)

将数据集的元素,以 Java 序列化的方式保存到指定的目录下

countByKey()

针对(K,V)类型的RDD,返回一个(K,Int)的map,表示每一个key对应的元素个数。

foreach(func)

在数据集的每一个元素上,运行函数func进行更新。

foreachPartition(func)

在数据集的每一个分区上,运行函数func

0 人点赞