A1 总述:
去重
A2 思路:
map -> resuceByKey -> map
A3 源码:
3.1 有参:
代码语言:javascript复制 /**
* Return a new RDD containing the distinct elements in this RDD.
*/
def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1)
}
//numPartitions:分区数
3.2 无参:
代码语言:javascript复制/**
* Return a new RDD containing the distinct elements in this RDD.
*/
def distinct(): RDD[T] = withScope {
distinct(partitions.length)
}
//partitions.length:分区数
3.3 解释
我们从源码中可以看到,distinct去重主要实现逻辑是
代码语言:javascript复制 map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1)
这个过程是,先通过map映射每个元素和null,然后通过key(此时是元素)统计{reduceByKey就是对元素为KV对的RDD中Key相同的元素的Value进行binary_function的reduce操作,因此,Key相同的多个元素的值被reduce为一个值,然后与原RDD中的Key组成一个新的KV对。},最后再同过map把去重后的元素挑出来。
A4 测试代码
代码语言:javascript复制import org.apache.spark.{SparkConf, SparkContext}
object TransformationsFun {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setMaster("local").setAppName("transformation_operator")
val sc = new SparkContext(conf)
//这里的3是初设定的partition数
val rdd = sc.parallelize(List(1, 2, 3, 3, 3, 3, 8, 8, 4, 9), 3)
//因为distinct实现用reduceByKey故其可以重设定partition数,这里设定4
rdd.distinct(4).foreach(println)
//这里执行时,每次结果不同,分区在4以内,每个分区处理的元素也不定
sc.stop()
}
}
图解:
解释:这里仅供理解,在实际运行中,分区会随机使用以及每个分区处理的元素也随机,所以每次运行结果会不同。