一、对RDD操作的本质
RDD的本质是一个函数,而RDD的变换不过是函数的嵌套.RDD有两类:
- 输入的RDD: 典型如KafkaRDD、JDBCRDD
- 转换的RDD: 如MapPartitionsRDD
RDD的处理流程:
以如下代码为例:
代码语言:javascript复制sc.textFile("abc.log").map().saveAsTextFile("")
1. textFile 会构建出一个NewHadoopRDD
2. map函数运行后会构建出一个MapPartitionsRDD
3. saveAsTextFile触发了实际流程代码的执行
所以RDD不过是对一个函数的封装,当一个函数对数据处理完成后,我们就得到一个RDD的数据集(是一个虚拟的,后续会解释)。
NewHadoopRDD是数据来源,每个parition(分布式并行执行)负责获取数据,获得过程是通过iterator.next获得一条一条记录的。假设某个时刻拿到了一条数据A,这个A会立刻被map里的函数处理得到B(完成了转换),然后开始写入到HDFS(一条一条写入)上。其他数据重复如此。
- 理论上某个MapPartitionsRDD里实际在内存里的数据等于其Partition的数目,是个非常小的数值。
- NewHadoopRDD则会略多些,因为属于数据源,读取文件,假设读取文件的buffer是1M,那么最多也就是partitionNum*1M 数据在内存里
- saveAsTextFile也是一样的,往HDFS写文件,需要buffer,最多数据量为 buffer* partitionNum(可以汇聚到Driver端写,也可以各个Executor直接写入到HDFS)
所以整个过程其实是流式(一般是一条一条或者一批一批)的过程,一条数据被各个RDD所包裹的函数处理。
(PS: 如果是mapPartition的话,那就是把整个partition的数据一起加载过来了,所以使用mapPartition函数比起map会容易造成内存溢出)
------------------------------------------------------------
第一章节参考博客:https://blog.csdn.net/zc19921215/article/details/82858585
二、RDD编程常用API
Transformation和Action的区别?
Transformation不会立即执行,只是记录这些操作,操作后生成新的RDD Action会执行前边的Transformation所有操作,不再生成RDD,而是返回具体的结果
RDD中的所有转换都是延迟加载的,也就是说,它们并不会直接计算结果。相反的,它们只是记住这些应用到基础数据集(例如一个文件)上的转换动作。只有当发生一个要求返回结果给Driver的动作时,这些转换才会真正运行。这种设计让Spark更加有效率地运行。
对于Transformation和Action的常用API,可以参考官方文档:http://spark.apache.org/docs/latest/rdd-programming-guide.html#transformations
常用的Transformation与Action API举例。
aggregateByKey | Transformation |
---|---|
reduceByKey | Transformation |
filter | Transformation |
map | Transformation |
flatMap | Transformation |
mapPartitions | Transformation |
mapPartitionsWithIndex | Transformation |
collect | Action |
aggregate | Action |
saveAsTextFile | Action |
foreach | Action |
foreachPartition | Action |
2.1 常用Transformation-API(即转换,延迟加载)
#通过并行化scala集合创建RDD
代码语言:javascript复制scala> val rdd1 = sc.parallelize(Array(1,2,3,4,5,6,7,8))
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24
#查看该RDD的分区数量
代码语言:javascript复制scala> rdd1.partitions.length
#排序操作1
代码语言:javascript复制scala> val rdd1 = sc.parallelize(Array(1,2,3,4,5,6,7,8)).map(_ * 2).sortBy(x => x, true)
rdd1: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[7] at sortBy at <console>:24
#排序操作2
代码语言:javascript复制scala> val rdd2 = sc.parallelize(List(5,6,4,7,3,8,2,9,1,10)).map(_*2).sortBy(x=>x "",true)
rdd2: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[14] at sortBy at <console>:24
发现返回的是RDD[Int],因为sortBy中传递的仅仅是排序规则,排序仅仅改变数据的顺序,而不会改变数据的类型。collect的作用是将一系列的transformation操作提交到集群中执行,结果再返回到Driver所在的Array集合中。
代码语言:javascript复制scala> rdd2.collect
res1: Array[Int] = Array(10, 12, 14, 16, 18, 2, 20, 4, 6, 8)
排序结果为字典顺序。
#区分是对RDD中的List操作还是对Scala中的List操作
代码语言:javascript复制scala> val rdd5 = sc.parallelize(List(List("a b c", "a b b"),List("e f g", "a f g"), List("h i j", "a a b")))
rdd5: org.apache.spark.rdd.RDD[List[String]] = ParallelCollectionRDD[16] at parallelize at <console>:24
scala> val rdd6 = rdd5.flatMap(_.flatMap(_.split(" "))).collect
rdd6: Array[String] = Array(a, b, c, a, b, b, e, f, g, a, f, g, h, i, j, a, a, b)
第一个flatMap调用的是rdd5的方法,取出的是一个个的List(如ListList("a b c", "a b b")和List("e f g", "a f g")等),所以操作的是RDD中的List,第二个flatMap取出的是scala中的List元素"a b c"和"a b b"等元素。所以第一个flatMap会将任务分发到集群中不同的机器执行,而第二个flatMap会在集群中的某一台机器对某一个List进行计算。
#union求并集,注意类型要一致
代码语言:javascript复制scala> val rdd6 = sc.parallelize(List(5,6,4,7))
rdd6: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[18] at parallelize at <console>:24
scala> val rdd7 = sc.parallelize(List(1,2,3,4))
rdd7: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[19] at parallelize at <console>:24
scala> val rdd8 = rdd6.union(rdd7)
rdd8: org.apache.spark.rdd.RDD[Int] = UnionRDD[20] at union at <console>:27
使用DSL风格调用为:
代码语言:javascript复制scala> val rdd8 = rdd6 union rdd7
rdd8: org.apache.spark.rdd.RDD[Int] = UnionRDD[21] at union at <console>:27
scala> rdd8.collect
res2: Array[Int] = Array(5, 6, 4, 7, 1, 2, 3, 4)
#intersection求交集
代码语言:javascript复制scala> val rdd9 = rdd6.intersection(rdd7)
rdd9: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[27] at intersection at <console>:27
scala> rdd9.collect
res3: Array[Int] = Array(4)
#join leftOuterJoin rightOuterJoin(连接)
代码语言:javascript复制scala> val rdd1 = sc.parallelize(List(("tom", 1), ("jerry", 2), ("kitty", 3)))
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[28] at parallelize at <console>:24
scala> val rdd2 = sc.parallelize(List(("jerry", 9), ("tom", 8), ("shuke", 7), ("tom", 2)))
rdd2: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[29] at parallelize at <console>:24
scala> val rdd3 = rdd1.join(rdd2)
rdd3: org.apache.spark.rdd.RDD[(String, (Int, Int))] = MapPartitionsRDD[32] at join at <console>:27
scala> val rdd3 = rdd1.join(rdd2).collect
rdd3: Array[(String, (Int, Int))] = Array((tom,(1,2)), (tom,(1,8)), (jerry,(2,9)))
scala> val rdd3 = rdd1.leftOuterJoin(rdd2).collect
rdd3: Array[(String, (Int, Option[Int]))] = Array((tom,(1,Some(8))), (tom,(1,Some(2))), (kitty,(3,None)), (jerry,(2,Some(9))))
scala> val rdd3 = rdd1.rightOuterJoin(rdd2).collect
rdd3: Array[(String, (Option[Int], Int))] = Array((tom,(Some(1),8)), (tom,(Some(1),2)), (shuke,(None,7)), (jerry,(Some(2),9)))
#cogroup(全连接)
代码语言:javascript复制scala> val rdd1 = sc.parallelize(List(("tom", 1), ("tom", 2), ("jerry", 3), ("kitty", 2)))
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[0] at parallelize at <console>:24
scala> val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 1), ("shuke", 2)))
rdd2: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[1] at parallelize at <console>:24
scala> val rdd3 = rdd1.cogroup(rdd2)
rdd3: org.apache.spark.rdd.RDD[(String, (Iterable[Int], Iterable[Int]))] = MapPartitionsRDD[3] at cogroup at <console>:27
scala> val rdd3 = rdd1.cogroup(rdd2).collect
rdd3: Array[(String, (Iterable[Int], Iterable[Int]))] = Array((tom,(CompactBuffer(1, 2),CompactBuffer(1))), (jerry,(CompactBuffer(3),CompactBuffer(2))), (shuke,(CompactBuffer(),CompactBuffer(2))), (kitty,(CompactBuffer(2),CompactBuffer())))
同一个compactBuffer汇总的值为同一个RDD ,同一个RDD中如果有多个key相同,则把它们的value放入同一个compactBuffer中。
#cartesian笛卡尔积【以对偶元组的方式呈现笛卡尔积的结果】
代码语言:javascript复制scala> val rdd1 = sc.parallelize(List("tom", "jerry"))
rdd1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[6] at parallelize at <console>:24
scala> val rdd2 = sc.parallelize(List("tom", "kitty", "shuke"))
rdd2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[7] at parallelize at <console>:24
scala> val rdd3 = rdd1.cartesian(rdd2)
rdd3: org.apache.spark.rdd.RDD[(String, String)] = CartesianRDD[8] at cartesian at <console>:27
scala> val rdd3 = rdd1.cartesian(rdd2).collect
rdd3: Array[(String, String)] = Array((tom,tom), (tom,kitty), (tom,shuke), (jerry,tom), (jerry,kitty), (jerry,shuke))
#groupByKey(根据key分组)
代码语言:javascript复制scala> val rdd3 = rdd1 union rdd2
rdd3: org.apache.spark.rdd.RDD[(String, Int)] = UnionRDD[51] at union at <console>:27
scala> (rdd1 union rdd2).collect
res7: Array[(String, Int)] = Array((tom,1), (jerry,2), (kitty,3), (jerry,9), (tom,8), (shuke,7), (tom,2))
scala> rdd3.groupByKey.collect
res9: Array[(String, Iterable[Int])] = Array((tom,CompactBuffer(1, 2, 8)), (shuke,CompactBuffer(7)), (kitty,CompactBuffer(3)), (jerry,CompactBuffer(2, 9)))
#使用groupByKey进行WordCount
代码语言:javascript复制scala> sc.textFile("hdfs://hdp-01:9000/wordcount").flatMap(_.split(" ")).map((_,1)).collect
res13: Array[(String, Int)] = Array((hello,1), (hello,1), (hi,1), (java,1), (scala,1), (dianxin,1), (hi,1), (dianxin,1), (hello,1), (spark,1), (spark,1))
scala> sc.textFile("hdfs://hdp-01:9000/wordcount").flatMap(_.split(" ")).map((_,1)).groupByKey.collect
res14: Array[(String, Iterable[Int])] = Array((scala,CompactBuffer(1)), (hello,CompactBuffer(1, 1, 1)), (java,CompactBuffer(1)), (spark,CompactBuffer(1, 1)), (hi,CompactBuffer(1, 1)), (dianxin,CompactBuffer(1, 1)))
scala> sc.textFile("hdfs://hdp-01:9000/wordcount").flatMap(_.split(" ")).map((_,1)).groupByKey.mapValues(_.sum).collect
res15: Array[(String, Int)] = Array((scala,1), (hello,3), (java,1), (spark,2), (hi,2), (dianxin,2))
当数据量较大的时候,groupBy和reduceByKey哪个效率高?
groupBy是分组之后再聚合,分组这个过程会有大量的shuffle,key相同的value需要到同一台机器上计算。由于数据是分散在多态机器上的,需要shuffle到一起机器上,需要通过网络传输,而且发现都是大量的1进行累加,所以groupBy效率很低。
而reduceByKey会在局部先聚合, 聚合再shuffle,这样涉及的网络传输更少,效率更高。
2.2 常用Action-API
#指定分区的Transformation,包含3个分区,意味着以后在触发Action时会生成三个Task,Task将List中的数据进行处理并写入到HDFS文件中,最后将会有3个结果文件。类似于MapReduce中一个Reduce生成一个结果文件。如果不指定分区数量,则根据集群中的总核数(实际上是集群中的总线程数)生成相等数量的结果文件。 一般来说 有多少个输入切片,就会产生多少个分区。
代码语言:javascript复制scala> val rdd1 = sc.parallelize(List(1,2,3,4,5), 3)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24
#collect
代码语言:javascript复制scala> rdd1.collect
res0: Array[Int] = Array(1, 2, 3, 4, 5)
#reduce
代码语言:javascript复制scala> rdd1.reduce(_ _)
res1: Int = 15
#count
代码语言:javascript复制scala> rdd1.count
res2: Long = 5
#top(num: Int) 将数据进行降序排列,求topN。 将返回的多个元素放入Array中返回到Driver端
代码语言:javascript复制scala> rdd1.top(3)
res3: Array[Int] = Array(5, 4, 3)
#take(num: Int) 从头开始取n个元素
代码语言:javascript复制scala> rdd1.take(2)
res4: Array[Int] = Array(1, 2)
#first 取第一个元素返回Int类型
代码语言:javascript复制scala> rdd1.first
res5: Int = 1
#takeOrdered 将数据进行升序排列,取n个数据
代码语言:javascript复制scala> rdd1.takeOrdered(3)
res6: Array[Int] = Array(1, 2, 3)
2.3 高级的RDD-API
#mapPartitionsWithIndex【取分区中的数据,并且可以将分区的编号取出,这样就可以知道数据属于哪个分区对应的Task】
"一次取出一个分区"(分区中并没有存储数据,而是记录要读取哪些数据,真正在Worker的Executor中生成的Task会读取多条数据,并且可以将分区的编号取出,我们可以认为就是分区对应的数据)
代码语言:javascript复制scala> val rdd1 = sc.parallelize(List.apply(1,2,3,4,5,6,7,8,9),2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at parallelize at <console>:24
scala> val mapWithIndexFunction = (index: Int, iterator: Iterator[Int]) => {
| iterator.map(e => s"part: $index, ele: $e")
| }
mapWithIndexFunction: (Int, Iterator[Int]) => Iterator[String] = <function2>
scala> val rdd2 = rdd1.mapPartitionsWithIndex(mapWithIndexFunction)
rdd2: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[1] at mapPartitionsWithIndex at <console>:27
scala> val rdd2 = rdd1.mapPartitionsWithIndex(mapWithIndexFunction).collect
rdd2: Array[String] = Array(part: 0, ele: 1, part: 0, ele: 2, part: 0, ele: 3, part: 0, ele: 4, part: 1, ele: 5, part: 1, ele: 6, part: 1, ele: 7, part: 1, ele: 8, part: 1, ele: 9)
数据0~4都属于0号分区,数据5~9都属于1号分区。总共9个数据,一个分区4个数据一个分区5个数据,目的是为了使两个分区生成的Task在计算的时候尽量均衡。
#aggregate 【先局部聚合再全局聚合】 是Action,只能对一个个元素操作,而不能对(K,V)操作
代码语言:javascript复制scala> val rdd1 = sc.parallelize(Array(1,2,3,4,5,6,7,8,9),2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[5] at parallelize at <console>:24
scala> val sum = rdd1.aggregate(0)(_ _, _ _) #先在分区内局部聚合,再全局聚合
sum: Int = 45
总共有两个分区:分区1为1,2,3,4 分区2为5,6,7,8,9 第一个_ _ 执行的是两个分区内的求和1 2 3 4=10和5 6 7 8 9=25,再全局聚合10 25=35,返回结果。
将每个分区内的最大值进行求和,初始值为0
代码语言:javascript复制scala> val rdd1 = sc.parallelize(Array(1,2,3,4,5,6,7,8,9),2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[6] at parallelize at <console>:24
scala> val maxSum = rdd1.aggregate(0)(math.max(_, _), _ _)
maxSum: Int = 13
总共有两个分区:分区0为1,2,3,4 分区1为5,6,7,8,9 第一个分区最大值为4,第二个分区最大值为9,全局聚合后的结果为13
将每个分区内的最大值进行求和,初始值为5
代码语言:javascript复制scala> val maxSum = rdd1.aggregate(5)(math.max(_, _), _ _)
maxSum: Int = 19
总共有两个分区:分区0为1,2,3,4 分区1为5,6,7,8,9 第一个分区最大值为5(初始值),第二个分区最大值为9,全局聚合后的结果还需与初始值相加,结果为14 5=19
aggregate全局聚合是无序的
代码语言:javascript复制scala> val rdd2 = sc.parallelize(List("a","b","c","d","e","f"),2)
rdd2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[7] at parallelize at <console>:24
scala> val rdd3 = rdd2.aggregate("")(_ _, _ _)
rdd3: String = defabc
scala> val rdd3 = rdd2.aggregate("")(_ _, _ _)
rdd3: String = abcdef
代码语言:javascript复制scala> val rdd3 = sc.parallelize(List("12","23","345","4567"),2)
rdd3: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[8] at parallelize at <console>:24
scala> rdd3.aggregate("")((x,y) => math.max(x.length, y.length).toString, (x,y) => x y)
res6: String = 24
scala> rdd3.aggregate("")((x,y) => math.max(x.length, y.length).toString, (x,y) => x y)
res7: String = 42
代码语言:javascript复制scala> val rdd4 = sc.parallelize(List("12","23","345",""),2)
rdd4: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[0] at parallelize at <console>:24
scala> rdd4.aggregate("")((x, y) => math.min(x.length, y.length).toString, (x, y) => x y)
res0: String = 01
scala> rdd4.aggregate("")((x, y) => math.min(x.length, y.length).toString, (x, y) => x y)
res1: String = 10
"".length的值为0,与"12".length=2 相比,0更小,执行0.toString,返回"0"。注意,此时"0".length的值为1,1再与"23".length即2比较,返回1。同理分区2字符串长度最小值为0,聚合后的结果则为10或01。
#collectAsMap
代码语言:javascript复制scala> val rdd = sc.parallelize(List(("a", 1), ("b", 2)))
rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[4] at parallelize at <console>:24
scala> val rdd1 = rdd.mapValues(_ * 100).collect
rdd1: Array[(String, Int)] = Array((a,100), (b,200))
scala> val rdd1 = rdd.mapValues(_ * 100).collectAsMap
rdd1: scala.collection.Map[String,Int] = Map(b -> 200, a -> 100)
注意:实际生产环境中,不要直接将数据收集到Driver端,再存入数据库或文件系统,这样会存在性能瓶颈。
#countByKey 求某个Key出现的次数,与数值没有关系
代码语言:javascript复制scala> val rdd1 = sc.parallelize(List(("a", 1), ("b", 2), ("b", 2), ("c", 2), ("c", 1)))
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[7] at parallelize at <console>:24
scala> rdd1.countByKey
res4: scala.collection.Map[String,Long] = Map(a -> 1, b -> 2, c -> 2)
#countByValue
代码语言:javascript复制scala> rdd1.countByValue
res5: scala.collection.Map[(String, Int),Long] = Map((b,2) -> 2, (c,2) -> 1, (a,1) -> 1, (c,1) -> 1)
#filterByRange [是一个闭区间,包含了例子所示的"b"与"d"]
代码语言:javascript复制scala> val rdd1 = sc.parallelize(List(("e", 5), ("c", 3), ("d", 4), ("c", 2), ("a", 1),("b", 1)))
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[13] at parallelize at <console>:24
scala> val rdd2 = rdd1.filterByRange("b", "d")
rdd2: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[14] at filterByRange at <console>:25
scala> rdd2.collect
res7: Array[(String, Int)] = Array((c,3), (d,4), (c,2), (b,1))
#flatMapValues
代码语言:javascript复制scala> val a = sc.parallelize(List(("a", "1 2"), ("b", "3 4")))
a: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[16] at parallelize at <console>:24
scala> a.flatMapValues(_.split(" "))
res9: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[17] at flatMapValues at <console>:26
scala> res9.collect
res10: Array[(String, String)] = Array((a,1), (a,2), (b,3), (b,4))
#foldByKey 【属于PairRDDFunctions中的方法,foldByKey和reduceByKey,aggregateByKey,combineByKey一样,它们底层调用的都是combineByKeyWithClassTag方法,都是Transformation】
代码语言:javascript复制scala> val rdd1 = sc.parallelize(List("dog", "wolf", "cat", "bear"), 2)
rdd1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[18] at parallelize at <console>:24
scala> val rdd2 = rdd1.map(x => (x.length, x))
rdd2: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[19] at map at <console>:25
scala> rdd2.collect
res13: Array[(Int, String)] = Array((3,dog), (4,wolf), (3,cat), (4,bear))
将key相同的元组的值都组装在一起
代码语言:javascript复制scala> val rdd3 = rdd2.aggregateByKey("")(_ _, _ _)
rdd3: org.apache.spark.rdd.RDD[(Int, String)] = ShuffledRDD[20] at aggregateByKey at <console>:25
scala> rdd3.collect
res14: Array[(Int, String)] = Array((4,wolfbear), (3,catdog))
scala> val rdd4 = rdd2.reduceByKey(_ _)
rdd4: org.apache.spark.rdd.RDD[(Int, String)] = ShuffledRDD[21] at reduceByKey at <console>:25
scala> rdd4.collect
res15: Array[(Int, String)] = Array((4,wolfbear), (3,catdog))
scala> val rdd5 = rdd3.foldByKey("")(_ _)
rdd5: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[22] at foldByKey at <console>:25
scala> rdd5.collect
res16: Array[(Int, String)] = Array((4,wolfbear), (3,catdog))
#foreach
代码语言:javascript复制scala> val rdd = sc.parallelize(List(1,2,3,4,5,6))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[23] at parallelize at <console>:24
scala> rdd.foreach(x => print(x * 100))
scala>
并没有返回结果,foreach是一个Action操作,实际打印在Executor中打印,控制台即(Driver端)并没有从Worker中的Executor中拉取数据,所以看不到结果,结果可以在spark后台管理界面看到。
#foreachPartition
代码语言:javascript复制scala> rdd.foreachPartition(it => it.foreach(x => println(x * 100)))
scala>
foreach为条为单位打印,而foreachPartition以分区为单位打印。应用场景:比如要将计算好的数据写入到HDFS/Hive/MySQL等中,需要使用foreachPartition效率更高。foreach每写入一条都需要与MySQL建立一个JDBC连接,假设写入1000万条数据,就需要创建1000万个JDBC连接,资源消耗巨大。而foreachPartition每建立一个JDBC连接就可以将整个分区数据写入MySQL中,资源消耗小更高效。
#combineByKey【因为是比较底层的方法,使用时候需要指定类型】
代码语言:javascript复制scala> val rdd = sc.parallelize(List.apply(("hello", 2), ("hi", 3), ("hello", 4), ("hi", 6), ("hello", 8), ("hi", 1)),2)
rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[2] at parallelize at <console>:24
scala> val rdd2 = rdd.combineByKey(x => x, (m: Int, n: Int) => m n, (a: Int, b: Int) => a b)
rdd2: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[3] at combineByKey at <console>:25
scala> rdd2.collect
res1: Array[(String, Int)] = Array((hello,14), (hi,10))
将rdd6中key相同的进行分组并存入ListBuffer中
代码语言:javascript复制scala> val rdd4 = sc.parallelize(List("dog","cat","gnu","salmon","rabbit","turkey","wolf","bear","bee"), 3)
rdd4: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[4] at parallelize at <console>:25
scala> val rdd5 = sc.parallelize(List(1,1,2,2,2,1,2,2,2), 3)
rdd5: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[5] at parallelize at <console>:25
scala> val rdd6 = rdd5.zip(rdd4)
rdd6: org.apache.spark.rdd.RDD[(Int, String)] = ZippedPartitionsRDD2[6] at zip at <console>:28
scala> rdd6.collect
res6: Array[(Int, String)] = Array((1,dog), (1,cat), (2,gnu), (2,salmon), (2,rabbit), (1,turkey), (2,wolf), (2,bear), (2,bee))
scala> import scala.collection.mutable.ListBuffer
import scala.collection.mutable.ListBuffer
scala> rdd6.combineByKey(x => ListBuffer(x), (a: ListBuffer[String], b: String) => a = b, (m: ListBuffer[String], n: ListBuffer[String]) => m = n)
res4: org.apache.spark.rdd.RDD[(Int, scala.collection.mutable.ListBuffer[String])] = ShuffledRDD[7] at combineByKey at <console>:29
scala> res4.collect
res5: Array[(Int, scala.collection.mutable.ListBuffer[String])] = Array((1,ListBuffer(dog, cat, turkey)), (2,ListBuffer(salmon, rabbit, gnu, wolf, bear, bee)))
实质: