Spark Core入门2【RDD的实质与RDD编程API】

2021-05-14 17:33:06 浏览数 (1)

一、对RDD操作的本质

RDD的本质是一个函数,而RDD的变换不过是函数的嵌套.RDD有两类:

  1. 输入的RDD: 典型如KafkaRDD、JDBCRDD
  2. 转换的RDD: 如MapPartitionsRDD

RDD的处理流程:

以如下代码为例:

代码语言:javascript复制
sc.textFile("abc.log").map().saveAsTextFile("")

1. textFile 会构建出一个NewHadoopRDD

2. map函数运行后会构建出一个MapPartitionsRDD

3. saveAsTextFile触发了实际流程代码的执行

所以RDD不过是对一个函数的封装,当一个函数对数据处理完成后,我们就得到一个RDD的数据集(是一个虚拟的,后续会解释)。

NewHadoopRDD是数据来源,每个parition(分布式并行执行)负责获取数据,获得过程是通过iterator.next获得一条一条记录的。假设某个时刻拿到了一条数据A,这个A会立刻被map里的函数处理得到B(完成了转换),然后开始写入到HDFS(一条一条写入)上。其他数据重复如此。

  1. 理论上某个MapPartitionsRDD里实际在内存里的数据等于其Partition的数目,是个非常小的数值。
  2. NewHadoopRDD则会略多些,因为属于数据源,读取文件,假设读取文件的buffer是1M,那么最多也就是partitionNum*1M 数据在内存里
  3. saveAsTextFile也是一样的,往HDFS写文件,需要buffer,最多数据量为 buffer* partitionNum(可以汇聚到Driver端写,也可以各个Executor直接写入到HDFS)

所以整个过程其实是流式(一般是一条一条或者一批一批)的过程,一条数据被各个RDD所包裹的函数处理。

(PS: 如果是mapPartition的话,那就是把整个partition的数据一起加载过来了,所以使用mapPartition函数比起map会容易造成内存溢出)

------------------------------------------------------------

第一章节参考博客:https://blog.csdn.net/zc19921215/article/details/82858585

二、RDD编程常用API

Transformation和Action的区别?

Transformation不会立即执行,只是记录这些操作,操作后生成新的RDD Action会执行前边的Transformation所有操作,不再生成RDD,而是返回具体的结果

RDD中的所有转换都是延迟加载的,也就是说,它们并不会直接计算结果。相反的,它们只是记住这些应用到基础数据集(例如一个文件)上的转换动作。只有当发生一个要求返回结果给Driver的动作时,这些转换才会真正运行。这种设计让Spark更加有效率地运行。

对于Transformation和Action的常用API,可以参考官方文档:http://spark.apache.org/docs/latest/rdd-programming-guide.html#transformations

常用的Transformation与Action API举例。

aggregateByKey

Transformation

reduceByKey

Transformation

filter

Transformation

map

Transformation

flatMap

Transformation

mapPartitions

Transformation

mapPartitionsWithIndex

Transformation

collect

Action

aggregate

Action

saveAsTextFile

Action

foreach

Action

foreachPartition

Action

2.1  常用Transformation-API(即转换,延迟加载)

#通过并行化scala集合创建RDD

代码语言:javascript复制
scala> val rdd1 = sc.parallelize(Array(1,2,3,4,5,6,7,8))
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24

#查看该RDD的分区数量

代码语言:javascript复制
scala> rdd1.partitions.length

#排序操作1

代码语言:javascript复制
scala> val rdd1 = sc.parallelize(Array(1,2,3,4,5,6,7,8)).map(_ * 2).sortBy(x => x, true)
rdd1: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[7] at sortBy at <console>:24

#排序操作2

代码语言:javascript复制
scala> val rdd2 = sc.parallelize(List(5,6,4,7,3,8,2,9,1,10)).map(_*2).sortBy(x=>x "",true)
rdd2: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[14] at sortBy at <console>:24

发现返回的是RDD[Int],因为sortBy中传递的仅仅是排序规则,排序仅仅改变数据的顺序,而不会改变数据的类型。collect的作用是将一系列的transformation操作提交到集群中执行,结果再返回到Driver所在的Array集合中。

代码语言:javascript复制
scala> rdd2.collect
res1: Array[Int] = Array(10, 12, 14, 16, 18, 2, 20, 4, 6, 8)

排序结果为字典顺序。

#区分是对RDD中的List操作还是对Scala中的List操作

代码语言:javascript复制
scala> val rdd5 = sc.parallelize(List(List("a b c", "a b b"),List("e f g", "a f g"), List("h i j", "a a b")))
rdd5: org.apache.spark.rdd.RDD[List[String]] = ParallelCollectionRDD[16] at parallelize at <console>:24

scala> val rdd6 = rdd5.flatMap(_.flatMap(_.split(" "))).collect
rdd6: Array[String] = Array(a, b, c, a, b, b, e, f, g, a, f, g, h, i, j, a, a, b)

第一个flatMap调用的是rdd5的方法,取出的是一个个的List(如ListList("a b c", "a b b")和List("e f g", "a f g")等),所以操作的是RDD中的List,第二个flatMap取出的是scala中的List元素"a b c"和"a b b"等元素。所以第一个flatMap会将任务分发到集群中不同的机器执行,而第二个flatMap会在集群中的某一台机器对某一个List进行计算。

#union求并集,注意类型要一致

代码语言:javascript复制
scala> val rdd6 = sc.parallelize(List(5,6,4,7))
rdd6: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[18] at parallelize at <console>:24

scala> val rdd7 = sc.parallelize(List(1,2,3,4))
rdd7: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[19] at parallelize at <console>:24

scala> val rdd8 = rdd6.union(rdd7)
rdd8: org.apache.spark.rdd.RDD[Int] = UnionRDD[20] at union at <console>:27

使用DSL风格调用为:

代码语言:javascript复制
scala> val rdd8 = rdd6 union rdd7
rdd8: org.apache.spark.rdd.RDD[Int] = UnionRDD[21] at union at <console>:27

scala> rdd8.collect
res2: Array[Int] = Array(5, 6, 4, 7, 1, 2, 3, 4)

#intersection求交集

代码语言:javascript复制
scala> val rdd9 = rdd6.intersection(rdd7)
rdd9: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[27] at intersection at <console>:27

scala> rdd9.collect
res3: Array[Int] = Array(4)

#join leftOuterJoin rightOuterJoin(连接)

代码语言:javascript复制
scala> val rdd1 = sc.parallelize(List(("tom", 1), ("jerry", 2), ("kitty", 3)))
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[28] at parallelize at <console>:24

scala> val rdd2 = sc.parallelize(List(("jerry", 9), ("tom", 8), ("shuke", 7), ("tom", 2)))
rdd2: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[29] at parallelize at <console>:24

scala> val rdd3 = rdd1.join(rdd2)
rdd3: org.apache.spark.rdd.RDD[(String, (Int, Int))] = MapPartitionsRDD[32] at join at <console>:27

scala> val rdd3 = rdd1.join(rdd2).collect
rdd3: Array[(String, (Int, Int))] = Array((tom,(1,2)), (tom,(1,8)), (jerry,(2,9)))

scala> val rdd3 = rdd1.leftOuterJoin(rdd2).collect
rdd3: Array[(String, (Int, Option[Int]))] = Array((tom,(1,Some(8))), (tom,(1,Some(2))), (kitty,(3,None)), (jerry,(2,Some(9))))

scala> val rdd3 = rdd1.rightOuterJoin(rdd2).collect
rdd3: Array[(String, (Option[Int], Int))] = Array((tom,(Some(1),8)), (tom,(Some(1),2)), (shuke,(None,7)), (jerry,(Some(2),9)))

#cogroup(全连接)

代码语言:javascript复制
scala> val rdd1 = sc.parallelize(List(("tom", 1), ("tom", 2), ("jerry", 3), ("kitty", 2)))
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[0] at parallelize at <console>:24

scala> val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 1), ("shuke", 2)))
rdd2: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[1] at parallelize at <console>:24

scala> val rdd3 = rdd1.cogroup(rdd2)
rdd3: org.apache.spark.rdd.RDD[(String, (Iterable[Int], Iterable[Int]))] = MapPartitionsRDD[3] at cogroup at <console>:27

scala> val rdd3 = rdd1.cogroup(rdd2).collect
rdd3: Array[(String, (Iterable[Int], Iterable[Int]))] = Array((tom,(CompactBuffer(1, 2),CompactBuffer(1))), (jerry,(CompactBuffer(3),CompactBuffer(2))), (shuke,(CompactBuffer(),CompactBuffer(2))), (kitty,(CompactBuffer(2),CompactBuffer())))

同一个compactBuffer汇总的值为同一个RDD ,同一个RDD中如果有多个key相同,则把它们的value放入同一个compactBuffer中。

#cartesian笛卡尔积【以对偶元组的方式呈现笛卡尔积的结果】

代码语言:javascript复制
scala> val rdd1 = sc.parallelize(List("tom", "jerry"))
rdd1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[6] at parallelize at <console>:24

scala> val rdd2 = sc.parallelize(List("tom", "kitty", "shuke"))
rdd2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[7] at parallelize at <console>:24

scala> val rdd3 = rdd1.cartesian(rdd2)
rdd3: org.apache.spark.rdd.RDD[(String, String)] = CartesianRDD[8] at cartesian at <console>:27

scala> val rdd3 = rdd1.cartesian(rdd2).collect
rdd3: Array[(String, String)] = Array((tom,tom), (tom,kitty), (tom,shuke), (jerry,tom), (jerry,kitty), (jerry,shuke))

#groupByKey(根据key分组)

代码语言:javascript复制
scala> val rdd3 = rdd1 union rdd2
rdd3: org.apache.spark.rdd.RDD[(String, Int)] = UnionRDD[51] at union at <console>:27

scala> (rdd1 union rdd2).collect
res7: Array[(String, Int)] = Array((tom,1), (jerry,2), (kitty,3), (jerry,9), (tom,8), (shuke,7), (tom,2))

scala> rdd3.groupByKey.collect
res9: Array[(String, Iterable[Int])] = Array((tom,CompactBuffer(1, 2, 8)), (shuke,CompactBuffer(7)), (kitty,CompactBuffer(3)), (jerry,CompactBuffer(2, 9)))

#使用groupByKey进行WordCount

代码语言:javascript复制
scala> sc.textFile("hdfs://hdp-01:9000/wordcount").flatMap(_.split(" ")).map((_,1)).collect
res13: Array[(String, Int)] = Array((hello,1), (hello,1), (hi,1), (java,1), (scala,1), (dianxin,1), (hi,1), (dianxin,1), (hello,1), (spark,1), (spark,1))

scala> sc.textFile("hdfs://hdp-01:9000/wordcount").flatMap(_.split(" ")).map((_,1)).groupByKey.collect
res14: Array[(String, Iterable[Int])] = Array((scala,CompactBuffer(1)), (hello,CompactBuffer(1, 1, 1)), (java,CompactBuffer(1)), (spark,CompactBuffer(1, 1)), (hi,CompactBuffer(1, 1)), (dianxin,CompactBuffer(1, 1)))

scala> sc.textFile("hdfs://hdp-01:9000/wordcount").flatMap(_.split(" ")).map((_,1)).groupByKey.mapValues(_.sum).collect
res15: Array[(String, Int)] = Array((scala,1), (hello,3), (java,1), (spark,2), (hi,2), (dianxin,2))

当数据量较大的时候,groupBy和reduceByKey哪个效率高?

groupBy是分组之后再聚合,分组这个过程会有大量的shuffle,key相同的value需要到同一台机器上计算。由于数据是分散在多态机器上的,需要shuffle到一起机器上,需要通过网络传输,而且发现都是大量的1进行累加,所以groupBy效率很低。

而reduceByKey会在局部先聚合, 聚合再shuffle,这样涉及的网络传输更少,效率更高。

2.2  常用Action-API

#指定分区的Transformation,包含3个分区,意味着以后在触发Action时会生成三个Task,Task将List中的数据进行处理并写入到HDFS文件中,最后将会有3个结果文件。类似于MapReduce中一个Reduce生成一个结果文件。如果不指定分区数量,则根据集群中的总核数(实际上是集群中的总线程数)生成相等数量的结果文件。 一般来说  有多少个输入切片,就会产生多少个分区。

代码语言:javascript复制
scala> val rdd1 = sc.parallelize(List(1,2,3,4,5), 3)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24

#collect

代码语言:javascript复制
scala> rdd1.collect
res0: Array[Int] = Array(1, 2, 3, 4, 5)   

#reduce

代码语言:javascript复制
scala> rdd1.reduce(_   _)
res1: Int = 15

#count

代码语言:javascript复制
scala> rdd1.count
res2: Long = 5

#top(num: Int)  将数据进行降序排列,求topN。 将返回的多个元素放入Array中返回到Driver端

代码语言:javascript复制
scala> rdd1.top(3)
res3: Array[Int] = Array(5, 4, 3)

#take(num: Int)  从头开始取n个元素

代码语言:javascript复制
scala> rdd1.take(2)
res4: Array[Int] = Array(1, 2)

#first 取第一个元素返回Int类型

代码语言:javascript复制
scala> rdd1.first
res5: Int = 1

#takeOrdered   将数据进行升序排列,取n个数据

代码语言:javascript复制
scala> rdd1.takeOrdered(3)
res6: Array[Int] = Array(1, 2, 3)

2.3  高级的RDD-API

#mapPartitionsWithIndex【取分区中的数据,并且可以将分区的编号取出,这样就可以知道数据属于哪个分区对应的Task】

"一次取出一个分区"(分区中并没有存储数据,而是记录要读取哪些数据,真正在Worker的Executor中生成的Task会读取多条数据,并且可以将分区的编号取出,我们可以认为就是分区对应的数据)

代码语言:javascript复制
scala> val rdd1 = sc.parallelize(List.apply(1,2,3,4,5,6,7,8,9),2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at parallelize at <console>:24

scala> val mapWithIndexFunction = (index: Int, iterator: Iterator[Int]) => { 
     | iterator.map(e => s"part: $index, ele: $e")
     | }
mapWithIndexFunction: (Int, Iterator[Int]) => Iterator[String] = <function2>

scala> val rdd2 = rdd1.mapPartitionsWithIndex(mapWithIndexFunction)
rdd2: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[1] at mapPartitionsWithIndex at <console>:27

scala> val rdd2 = rdd1.mapPartitionsWithIndex(mapWithIndexFunction).collect
rdd2: Array[String] = Array(part: 0, ele: 1, part: 0, ele: 2, part: 0, ele: 3, part: 0, ele: 4, part: 1, ele: 5, part: 1, ele: 6, part: 1, ele: 7, part: 1, ele: 8, part: 1, ele: 9)

数据0~4都属于0号分区,数据5~9都属于1号分区。总共9个数据,一个分区4个数据一个分区5个数据,目的是为了使两个分区生成的Task在计算的时候尽量均衡。

#aggregate 【先局部聚合再全局聚合】 是Action,只能对一个个元素操作,而不能对(K,V)操作

代码语言:javascript复制
scala> val rdd1 = sc.parallelize(Array(1,2,3,4,5,6,7,8,9),2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[5] at parallelize at <console>:24

scala> val sum = rdd1.aggregate(0)(_   _, _   _) #先在分区内局部聚合,再全局聚合
sum: Int = 45   

总共有两个分区:分区1为1,2,3,4  分区2为5,6,7,8,9  第一个_ _ 执行的是两个分区内的求和1 2 3 4=10和5 6 7 8 9=25,再全局聚合10 25=35,返回结果。

将每个分区内的最大值进行求和,初始值为0

代码语言:javascript复制
scala> val rdd1 = sc.parallelize(Array(1,2,3,4,5,6,7,8,9),2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[6] at parallelize at <console>:24

scala> val maxSum = rdd1.aggregate(0)(math.max(_, _), _   _)
maxSum: Int = 13

总共有两个分区:分区0为1,2,3,4  分区1为5,6,7,8,9   第一个分区最大值为4,第二个分区最大值为9,全局聚合后的结果为13

将每个分区内的最大值进行求和,初始值为5

代码语言:javascript复制
scala> val maxSum = rdd1.aggregate(5)(math.max(_, _), _   _)
maxSum: Int = 19

总共有两个分区:分区0为1,2,3,4  分区1为5,6,7,8,9   第一个分区最大值为5(初始值),第二个分区最大值为9,全局聚合后的结果还需与初始值相加,结果为14 5=19

aggregate全局聚合是无序的

代码语言:javascript复制
scala> val rdd2 = sc.parallelize(List("a","b","c","d","e","f"),2)
rdd2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[7] at parallelize at <console>:24

scala> val rdd3 = rdd2.aggregate("")(_   _, _   _)
rdd3: String = defabc

scala> val rdd3 = rdd2.aggregate("")(_   _, _   _)
rdd3: String = abcdef
代码语言:javascript复制
scala> val rdd3 = sc.parallelize(List("12","23","345","4567"),2)
rdd3: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[8] at parallelize at <console>:24

scala> rdd3.aggregate("")((x,y) => math.max(x.length, y.length).toString, (x,y) => x   y)
res6: String = 24

scala> rdd3.aggregate("")((x,y) => math.max(x.length, y.length).toString, (x,y) => x   y)
res7: String = 42
代码语言:javascript复制
scala> val rdd4 = sc.parallelize(List("12","23","345",""),2)
rdd4: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[0] at parallelize at <console>:24

scala> rdd4.aggregate("")((x, y) => math.min(x.length, y.length).toString, (x, y) => x   y)
res0: String = 01                                                               

scala> rdd4.aggregate("")((x, y) => math.min(x.length, y.length).toString, (x, y) => x   y)
res1: String = 10

"".length的值为0,与"12".length=2 相比,0更小,执行0.toString,返回"0"。注意,此时"0".length的值为1,1再与"23".length即2比较,返回1。同理分区2字符串长度最小值为0,聚合后的结果则为10或01。

#collectAsMap

代码语言:javascript复制
scala> val rdd = sc.parallelize(List(("a", 1), ("b", 2)))
rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[4] at parallelize at <console>:24

scala> val rdd1 = rdd.mapValues(_ * 100).collect
rdd1: Array[(String, Int)] = Array((a,100), (b,200))


scala> val rdd1 = rdd.mapValues(_ * 100).collectAsMap
rdd1: scala.collection.Map[String,Int] = Map(b -> 200, a -> 100)

注意:实际生产环境中,不要直接将数据收集到Driver端,再存入数据库或文件系统,这样会存在性能瓶颈。

#countByKey 求某个Key出现的次数,与数值没有关系

代码语言:javascript复制
scala> val rdd1 = sc.parallelize(List(("a", 1), ("b", 2), ("b", 2), ("c", 2), ("c", 1)))
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[7] at parallelize at <console>:24

scala> rdd1.countByKey
res4: scala.collection.Map[String,Long] = Map(a -> 1, b -> 2, c -> 2)

#countByValue

代码语言:javascript复制
scala> rdd1.countByValue
res5: scala.collection.Map[(String, Int),Long] = Map((b,2) -> 2, (c,2) -> 1, (a,1) -> 1, (c,1) -> 1)

#filterByRange   [是一个闭区间,包含了例子所示的"b"与"d"]

代码语言:javascript复制
scala> val rdd1 = sc.parallelize(List(("e", 5), ("c", 3), ("d", 4), ("c", 2), ("a", 1),("b", 1)))
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[13] at parallelize at <console>:24

scala> val rdd2 = rdd1.filterByRange("b", "d")
rdd2: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[14] at filterByRange at <console>:25

scala> rdd2.collect
res7: Array[(String, Int)] = Array((c,3), (d,4), (c,2), (b,1))

#flatMapValues 

代码语言:javascript复制
scala> val a = sc.parallelize(List(("a", "1 2"), ("b", "3 4")))
a: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[16] at parallelize at <console>:24

scala> a.flatMapValues(_.split(" "))
res9: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[17] at flatMapValues at <console>:26

scala> res9.collect
res10: Array[(String, String)] = Array((a,1), (a,2), (b,3), (b,4))

#foldByKey 【属于PairRDDFunctions中的方法,foldByKey和reduceByKey,aggregateByKey,combineByKey一样,它们底层调用的都是combineByKeyWithClassTag方法,都是Transformation】

代码语言:javascript复制
scala> val rdd1 = sc.parallelize(List("dog", "wolf", "cat", "bear"), 2)
rdd1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[18] at parallelize at <console>:24

scala> val rdd2 = rdd1.map(x => (x.length, x))
rdd2: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[19] at map at <console>:25

scala> rdd2.collect
res13: Array[(Int, String)] = Array((3,dog), (4,wolf), (3,cat), (4,bear))

将key相同的元组的值都组装在一起

代码语言:javascript复制
scala> val rdd3 = rdd2.aggregateByKey("")(_   _, _   _)
rdd3: org.apache.spark.rdd.RDD[(Int, String)] = ShuffledRDD[20] at aggregateByKey at <console>:25

scala> rdd3.collect
res14: Array[(Int, String)] = Array((4,wolfbear), (3,catdog))

scala> val rdd4 = rdd2.reduceByKey(_   _)
rdd4: org.apache.spark.rdd.RDD[(Int, String)] = ShuffledRDD[21] at reduceByKey at <console>:25

scala> rdd4.collect
res15: Array[(Int, String)] = Array((4,wolfbear), (3,catdog))

scala> val rdd5 = rdd3.foldByKey("")(_   _)
rdd5: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[22] at foldByKey at <console>:25

scala> rdd5.collect
res16: Array[(Int, String)] = Array((4,wolfbear), (3,catdog))

#foreach

代码语言:javascript复制
scala> val rdd = sc.parallelize(List(1,2,3,4,5,6))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[23] at parallelize at <console>:24

scala> rdd.foreach(x => print(x * 100))

scala> 

并没有返回结果,foreach是一个Action操作,实际打印在Executor中打印,控制台即(Driver端)并没有从Worker中的Executor中拉取数据,所以看不到结果,结果可以在spark后台管理界面看到。

#foreachPartition

代码语言:javascript复制
scala> rdd.foreachPartition(it => it.foreach(x => println(x * 100)))

scala> 

foreach为条为单位打印,而foreachPartition以分区为单位打印。应用场景:比如要将计算好的数据写入到HDFS/Hive/MySQL等中,需要使用foreachPartition效率更高。foreach每写入一条都需要与MySQL建立一个JDBC连接,假设写入1000万条数据,就需要创建1000万个JDBC连接,资源消耗巨大。而foreachPartition每建立一个JDBC连接就可以将整个分区数据写入MySQL中,资源消耗小更高效。

#combineByKey【因为是比较底层的方法,使用时候需要指定类型】

代码语言:javascript复制
scala> val rdd = sc.parallelize(List.apply(("hello", 2), ("hi", 3), ("hello", 4), ("hi", 6), ("hello", 8), ("hi", 1)),2)
rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[2] at parallelize at <console>:24

scala> val rdd2 = rdd.combineByKey(x => x, (m: Int, n: Int) => m   n, (a: Int, b: Int) => a   b)
rdd2: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[3] at combineByKey at <console>:25

scala> rdd2.collect
res1: Array[(String, Int)] = Array((hello,14), (hi,10))

将rdd6中key相同的进行分组并存入ListBuffer中

代码语言:javascript复制
scala> val rdd4 = sc.parallelize(List("dog","cat","gnu","salmon","rabbit","turkey","wolf","bear","bee"), 3)
rdd4: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[4] at parallelize at <console>:25

scala> val rdd5 = sc.parallelize(List(1,1,2,2,2,1,2,2,2), 3)
rdd5: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[5] at parallelize at <console>:25

scala> val rdd6 = rdd5.zip(rdd4)
rdd6: org.apache.spark.rdd.RDD[(Int, String)] = ZippedPartitionsRDD2[6] at zip at <console>:28

scala> rdd6.collect
res6: Array[(Int, String)] = Array((1,dog), (1,cat), (2,gnu), (2,salmon), (2,rabbit), (1,turkey), (2,wolf), (2,bear), (2,bee))

scala> import scala.collection.mutable.ListBuffer
import scala.collection.mutable.ListBuffer

scala> rdd6.combineByKey(x => ListBuffer(x), (a: ListBuffer[String], b: String) => a  = b, (m: ListBuffer[String], n: ListBuffer[String]) => m   = n)
res4: org.apache.spark.rdd.RDD[(Int, scala.collection.mutable.ListBuffer[String])] = ShuffledRDD[7] at combineByKey at <console>:29

scala> res4.collect
res5: Array[(Int, scala.collection.mutable.ListBuffer[String])] = Array((1,ListBuffer(dog, cat, turkey)), (2,ListBuffer(salmon, rabbit, gnu, wolf, bear, bee)))

实质:

0 人点赞