写在前面
本文基于Spark 3.2.0 Scala的RDD API,内容来源主要由官方文档整理,文中所整理算子为常用收录,并不完全。在Spark RDD官方文档中按照转换算子(Transformation )和行动算子(Action)进行分类,在RDD.scala文档中按照RDD的内部构造进行分类。RDD算子分类方式并不是绝对的,有些算子可能具有多种分类的特征,本文综合两种分类方式便于阅读理解。文中所描述的基本概念来自于官方文档的谷歌翻译和ChatGPT3.5优化,少量来自本人直接翻译。
另外,在《Hadoop权威指南》中,译者将action
译为动作
,以下内容对动作
,行动
不做区分。
一、转换算子 Transformation [1]-[2]
1.1 Value类型[3]
(1) map(func)
返回通过函数传递的每个元素而一一映射形成的新分布式数据集。
(2) filter(func)
返回一个新的数据集,该数据集是通过选择 func 返回 true 的源元素而形成的。
(3) flatMap(func)
与map类似,但每个输入项可以映射到0个或多个输出项(因此func会返回一个flatten后的map而不是单个项)。
(4) mapPartitions(func)
通过对 RDD 的每个分区应用一个函数来返回一个新的 RDD。
(5) mapPartitionsWithIndex(func)
与mapPartitions类似,但为func提供了一个表示分区索引的整数值。
(6) sample(withReplacement, fraction, seed)
其有3个参数,使用给定的随机数生成器种子,在有或没有替换的情况下对数据的一小部分进行采样。
(7) groupBy
返回按一定规则分组后的 RDD。 每个组由一个键和映射到该键的一系列元素组成。 不能保证每个组中元素的顺序,甚至在每次计算结果 RDD 时都可能不同。
(8) glom
返回通过将每个分区内的所有元素合并到数组中而创建的 RDD。
(9) distinct([numPartitions]))
返回一个新的 RDD,其中包含该 RDD 中的去重元素。
(10) coalesce(numPartitions)
返回一个减少分区的新 RDD。
(11) repartition(numPartitions)
返回一个准确数量分区的新 RDD,可以是增加或者减少分区。
(12) sortBy
返回按给定键函数排序的 RDD
1.2 双Value类型
(1) intersection(otherDataset)
返回一个新的 RDD,其中包含源数据集中元素与参数的交集。
(2) union(otherDataset)
返回这个 RDD 和另一个 RDD 的联合。 任何相同的元素都会出现多次(使用 .distinct() 来消除它们)。
1.3 K-V类型[4]
(1) partitionBy(partitioner: Partitioner)
返回使用指定分区器分区的 RDD 的副本。
(2) reduceByKey(partitioner: Partitioner, func: (V, V) => V)
使用关联和交换归约函数合并每个键的值。 还会在将结果发送到 reducer 之前在每个映射器上执行本地合并渔,类似于 MapReduce 中的“combiner”。
(3) groupByKey(partitioner: Partitioner)
将 RDD 中每个键的值组合成一个单独的序列,并可以通过传递一个 Partitioner 控制生成的键值对 RDD 的分区方式。每个分组内元素的顺序不能保证,并且每次对生成的 RDD 进行评估时可能会有所不同。
(4) aggregateByKey(zeroValue)(seqOp, combOp, [numPartitions])
当对一个包含(K, V)对的数据集调用时,返回一个包含(K, U)对的数据集,其中每个键的值使用给定的组合函数和中性的"零"值进行聚合。允许聚合值的类型与输入值的类型不同,同时避免不必要的内存分配。与groupByKey类似,可以通过可选的第二个参数来配置reduce任务的数量。
(5) foldByKey
使用一个关联函数和一个中性的 “零值”,将每个键的值合并在一起。中性的 “零值” 可以被添加到结果中任意次数,且不改变结果(例如,列表连接中的 Nil,加法中的 0,或乘法中的 1)。
(6) combineByKey
combineByKeyWithClassTag的简化版本,使用现有的分区器/并行级别对结果RDD进行哈希分区。此方法用于向后兼容,不会向shuffle提供组合器的classtag信息。
(7) join(otherDataset, [numPartitions])
返回一个包含this
和other
中具有匹配键的所有元素对的RDD。每个元素对将作为(k, (v1, v2))元组返回,其中(k, v1)在this
中,(k, v2)在other
中。使用给定的分区器对输出RDD进行分区。
(8) sortByKey([ascending], [numPartitions])
当对一个包含(K, V)对的数据集调用时,其中K实现了Ordered接口,返回一个按键按升序或降序排序的(K, V)对的数据集。可以通过布尔型参数ascending来指定排序顺序,如果设置为true,则按升序排序,如果设置为false,则按降序排序。还可以通过可选参数numPartitions指定输出RDD的分区数。
(9) mapValues
对键值对RDD中的每个值应用映射函数,而不改变键;同时保留原始RDD的分区方式。
(10) cogroup(otherDataset, [numPartitions])
当在类型为(K, V)和(K, W)的数据集上调用时,返回一个包含(K, (Iterable, Iterable))元组的数据集。该操作也被称为groupWith。
二、行动算子Actions
(1) reduce(func)
使用函数func(接受两个参数并返回一个参数)对数据集的元素进行聚合。该函数应该是可交换和可结合的,以便可以并行正确计算。
(2) collect()
将数据集的所有元素作为数组返回到驱动程序。通常在筛选或其他返回数据子集的操作后使用,当数据集足够小适合在驱动程序上进行处理时。
(3) count()
返回数据集中元素的数量。
(4) first()
first()
函数用于返回数据集的第一个元素,类似于take(1)
操作。它返回数据集中的第一个元素作为单个元素的结果。如果数据集为空,则会抛出异常。first()
常用于需要获取数据集中的第一个元素的情况,而不需要获取整个数据集的内容。
(5) take(n)
返回数据集中的前 n 个元素,以dataset的形式返回。
(6) takeOrdered(n, [ordering])
使用指定的排序方式,返回 RDD 中的前 n 个元素。排序方式可以是元素的自然顺序或自定义的比较器。
(7) saveAsTextFile(path)
将数据集的元素作为文本文件(或一组文本文件)写入到指定目录中,可以是本地文件系统、HDFS或其他支持Hadoop文件系统的文件系统。Spark将对每个元素调用toString方法,将其转换为文件中的一行文本。
(8) countByKey()
仅适用于类型为(K,V)的RDD。返回一个包含每个键的计数的(K,Int)对的哈希映射。
(9) foreach(func)
对数据集中的每个元素运行函数func
。通常用于具有副作用的操作,比如更新累加器或与外部存储系统进行交互。
注意:在foreach()
之外修改除累加器之外的变量可能导致未定义的行为。详情请参阅了解闭包。
三、转换算子和行动算子的区别[5]
Spark为RDD提供了两大类操作:转换(transformation)和动作(action),可以通过以下几个方面来区分它们:
- 操作结果类型: 转换算子返回一个新的RDD、DataFrame或DataSet等数据集,而动作触发一个非RDD的结果,如单个值、集合,要么返回给用户要么写入外部存储。
- 惰性执行: 动作的效果立竿见影,转换算子是惰性执行的,即在调用转换算子时并不立即执行计算,而是记录下转换操作的逻辑。只有在遇到行动算子时,Spark才会触发对转换操作的实际计算。
- 作用范围: 转换算子通常对整个数据集进行操作,而行动算子是对数据集进行汇总或返回最终结果的操作。
- 计算开销: 转换算子通常是一种转换逻辑的描述,不会立即触发实际计算,因此计算开销相对较低。而行动算子需要触发实际计算并生成结果,因此可能需要较大的计算开销。
要想判断一个操作是转换还是动作,我们可以观察其返回类型:如果返回的类型是RDD,那么它是一个转换,否则就是一个动作。转换算子是惰性执行的,而行动算子是立即执行的。通过理解这些区别,可以更好地使用和组合转换算子和行动算子来构建Spark应用程序。
四、惰性(Lazy Evaluation)和立即(Eager Evaluation)如何体现
在Spark中,惰性(Lazy Evaluation)和立即(Eager Evaluation)是指计算操作的时机和方式。
惰性计算意味着在Spark中,转换算子并不会立即执行实际的计算操作。当应用程序调用转换算子时,Spark只会记录下转换操作的逻辑,而不会立即执行计算。这样做的好处是可以进行优化和延迟计算。因此,对于转换算子,不会立即生成结果,而是构建一个转换操作的执行计划(Execution Plan)。
相反,立即计算意味着在Spark中,行动算子会立即触发实际的计算操作并生成结果。当应用程序调用行动算子时,Spark会按照转换操作的执行计划执行计算,并将结果返回给应用程序。
通过惰性计算,Spark可以对转换操作进行优化、重排和延迟执行。例如,Spark可以根据数据依赖性进行操作合并、过滤无用操作、推测执行等。这样可以提高执行效率和节省计算资源。而立即计算则确保了在需要结果时可以立即获取。
总结起来,惰性计算是指在调用转换算子时,Spark仅记录下转换操作的逻辑而不执行实际计算,而立即计算是指在调用行动算子时,Spark立即触发实际计算并生成结果。惰性计算使Spark可以优化和延迟执行计算,而立即计算确保了在需要时可以立即获取结果。
4.1 示例
代码语言:javascript复制import org.apache.spark.sql.SparkSession
object LazyEvaluationExample {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.appName("LazyEvaluationExample")
.master("local")
.getOrCreate()
// 惰性计算
val numbers = spark.range(1, 1000000000) // 生成一个大范围的数字序列,但不会立即执行计算
val filteredNumbers = numbers.filter(_ % 2 == 0) // 应用过滤操作,仍然不会执行计算
// 立即计算
val count = filteredNumbers.count() // 调用行动算子count(),立即触发实际的计算操作并生成结果
println(s"Filtered count: $count")
spark.stop()
}
}
在上面的示例中,numbers
是一个范围RDD,表示从1到10亿的数字序列。在创建numbers
时,并没有立即执行计算操作,而是记录下了生成数字序列的逻辑。然后,通过filter
转换算子筛选出偶数,仍然没有执行实际的计算。
最后,通过调用count
行动算子,触发了实际的计算操作,并将结果打印出来。这里的计算操作包括生成数字序列和筛选出偶数,以及计算偶数的个数。
通过这个示例,可以看到惰性计算的特点是在转换操作时不立即执行计算,而是在行动算子触发时才执行实际的计算操作。
在执行 val filteredNumbers = numbers.filter(_ % 2 == 0)
这行代码时,filteredNumbers
只是一个表示转换操作的逻辑执行计划,并没有实际的计算发生。因此,直接打印 filteredNumbers
并不会输出筛选后的结果。
如果想要查看筛选后的结果,需要触发实际的计算操作,可以使用行动算子来实现,比如调用 filteredNumbers.collect()
或者 filteredNumbers.show()
。这样会触发计算操作,并将结果打印出来。
示例代码中的 filteredNumbers.count()
就是一个行动算子,它会计算 filteredNumbers
中元素的数量,并返回结果。如果你想直接打印筛选后的结果,可以使用类似的行动算子来实现。
五、foreach和foreachPartition的区别?
参考文献
[1] RDD.scala官方实例:https://github.com/apache/spark/blob/v3.2.0/core/src/main/scala/org/apache/spark/rdd/RDD.scala
[2] Spark 3.2.0官方文档:https://spark.apache.org/docs/3.2.0/rdd-programming-guide.html
[3] https://spark.apache.org/docs/3.2.0/api/scala/org/apache/spark/rdd/RDD.html
[4] https://github.com/apache/spark/blob/v3.2.0/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala#L525
[5] Tom White. Hadoop权威指南. 第4版. 清华大学出版社, 2017.pages 557.