Spark RDD篇

2019-08-20 16:06:29 浏览数 (1)

RDD是一个抽象,会记录一些信息,他并不是一个真正的集合,但可以像集合一样操作,降低了开发难度。

RDD的算子分为2类,一种是Transformation(lazy不会立即执行,即便有错误也不会发现),一类是Action(触发任务执行)

创建RDD的方式有3种。

1、通过外部的存储系统创建RDD(如hadoop hdfs,HBase,MongoDB)

2、将Driver的Scala集合通过并行化的方式变成RDD(测试时使用,生产环境不适用)

3、调用一个已经存在的RDD的Transformation,会生成一个新的RDD.

1之前已经有过介绍,见提交第一个Spark统计文件单词数程序,配合hadoop hdfs

2

Spark context Web UI available at http://192.168.5.182:4040

Spark context available as 'sc' (master = spark://host2:7077,host1:7077, app id = app-20181112100219-0000).

Spark session available as 'spark'.

Welcome to

代码语言:txt复制
   ____              __
代码语言:txt复制
  / __/__  ___ _____/ /__
代码语言:txt复制
 _ / _ / _ `/ __/  '_/
代码语言:txt复制
/___/ .__/\_,_/_/ /_/\_   version 2.2.0
代码语言:txt复制
   /_/

Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_45)

Type in expressions to have them evaluated.

Type :help for more information.

scala> val arr = Array(1,2,3,4,5,6,7,8,9,10)

arr: ArrayInt = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

scala> arr.map(_ * 10)

res0: ArrayInt = Array(10, 20, 30, 40, 50, 60, 70, 80, 90, 100)

scala> val rdd = sc.parallelize(arr) //将集合转成RDD

rdd: org.apache.spark.rdd.RDDInt = ParallelCollectionRDD0 at parallelize at <console>:26

scala> val rdds = rdd.map(_ * 10) //将每个元素乘以10形成一个新的RDD

rdds: org.apache.spark.rdd.RDDInt = MapPartitionsRDD1 at map at <console>:28

scala> rdds.collect //查看这个新的RDD,由于RDD并不是一个真正的集合,必须要经过一次从各个Worker收集才能查看数据

res3: ArrayInt = Array(10, 20, 30, 40, 50, 60, 70, 80, 90, 100)

scala> val rdd3 = rdd.filter(_ % 2 == 0) //过滤出偶数的集合生成一个新的RDD

rdd3: org.apache.spark.rdd.RDDInt = MapPartitionsRDD2 at filter at <console>:28

scala> rdd3.collect

res4: ArrayInt = Array(2, 4, 6, 8, 10)

这个时候我们来看管理界面

我们点进去这个Spark shell

我们可以看到他进行了2次收集

一路点击进去我们可以看到任务是在哪些机器上执行的详细情况

RDD的算子

scala> val rdd2 = sc.parallelize(List(5,10,6,7,4,3,8,2,9,1)).map(_ * 2).sortBy(x => x,true) //将List集合每个元素乘以2后按照升序排序

rdd2: org.apache.spark.rdd.RDDInt = MapPartitionsRDD9 at sortBy at <console>:24

scala> rdd2.collect

res5: ArrayInt = Array(2, 4, 6, 8, 10, 12, 14, 16, 18, 20)

我们可以看到他进行了排序和收集操作。

scala> val rdd2 = sc.parallelize(List(5,10,6,7,4,3,8,2,9,1)).map(_ * 2).sortBy(x => x "",true) //按照字符串规则来排序,不会改变集合的元素类型,这里依然是Int型集合

rdd2: org.apache.spark.rdd.RDDInt = MapPartitionsRDD16 at sortBy at <console>:24

scala> rdd2.collect

res6: ArrayInt = Array(10, 12, 14, 16, 18, 2, 20, 4, 6, 8)

我们可以看到排序后是先比较第一位,再比较第二位来进行排序,即字符串规则排序的

scala> val arr = Array("a b c","d e f","h i j")

arr: ArrayString = Array(a b c, d e f, h i j)

scala> arr.map(_.split(" "))

res10: Array[ArrayString] = Array(Array(a, b, c), Array(d, e, f), Array(h, i, j))

scala> arr.map(_.split(" ")).flatten //扁平化处理

res11: ArrayString = Array(a, b, c, d, e, f, h, i, j)

以上是集合操作

scala> val rdd4 = sc.parallelize(Array("a b c","d e f","h i j"))

rdd4: org.apache.spark.rdd.RDDString = ParallelCollectionRDD17 at parallelize at <console>:24

scala> rdd4.map(_.split(" ")).collect

res12: Array[ArrayString] = Array(Array(a, b, c), Array(d, e, f), Array(h, i, j))

由于RDD没有flatten方法,只能使用flatMap方法进行扁平化处理

scala> rdd4.flatMap(_.split(" ")).collect

res13: ArrayString = Array(a, b, c, d, e, f, h, i, j)


scala> val rdd5 = sc.parallelize(List(List("a b c","a b b"),List("e f g","a f g"),List("h i j","a a b")))

rdd5: org.apache.spark.rdd.RDD[ListString] = ParallelCollectionRDD21 at parallelize at <console>:24

scala> rdd5.flatMap(_.flatMap(_.split(" "))).collect //这两个flatMap不是一回事,一个是RDD的,他会把任务分发到各个计算服务器上进行计算;一个是List的,他只会在被分发到的计算服务器上进行计算

res14: ArrayString = Array(a, b, c, a, b, b, e, f, g, a, f, g, h, i, j, a, a, b)


scala> val rdd6 = sc.parallelize(List(5,6,4,7))

rdd6: org.apache.spark.rdd.RDDInt = ParallelCollectionRDD23 at parallelize at <console>:24

scala> val rdd7 = sc.parallelize(List(1,2,3,4))

rdd7: org.apache.spark.rdd.RDDInt = ParallelCollectionRDD24 at parallelize at <console>:24

scala> rdd6.union(rdd7).collect

res15: ArrayInt = Array(5, 6, 4, 7, 1, 2, 3, 4) //并集

scala> rdd6.intersection(rdd7).collect

res16: ArrayInt = Array(4) //交集


scala> val rdd8 = sc.parallelize(List(("tom",1),("jerry",2),("kitty",3))) //创建一个对偶元组的List的RDD

rdd8: org.apache.spark.rdd.RDD(String, Int) = ParallelCollectionRDD32 at parallelize at <console>:24

scala> val rdd9 = sc.parallelize(List(("jerry",9),("tom",8),("shuke",7),("tom",2)))

rdd9: org.apache.spark.rdd.RDD(String, Int) = ParallelCollectionRDD33 at parallelize at <console>:24

scala> val rdd10 = rdd8.join(rdd9) //类似于SQL的inner join,只对对偶元组的Key为依据生效

rdd10: org.apache.spark.rdd.RDD(String, (Int, Int)) = MapPartitionsRDD36 at join at <console>:28

scala> rdd10.saveAsTextFile("hdfs://192.168.5.182:8020/testjoin") //将结果保存在hadoop hdfs里面

[Stage 17:> [Stage 19:> [Stage 19:==============================================

root@host2 bin# ./hdfs dfs -ls /testjoin

Found 17 items

-rw-r--r-- 3 root supergroup 0 2018-11-12 14:54 /testjoin/_SUCCESS

-rw-r--r-- 3 root supergroup 0 2018-11-12 14:54 /testjoin/part-00000

-rw-r--r-- 3 root supergroup 0 2018-11-12 14:54 /testjoin/part-00001

-rw-r--r-- 3 root supergroup 24 2018-11-12 14:54 /testjoin/part-00002

-rw-r--r-- 3 root supergroup 0 2018-11-12 14:54 /testjoin/part-00003

-rw-r--r-- 3 root supergroup 0 2018-11-12 14:54 /testjoin/part-00004

-rw-r--r-- 3 root supergroup 0 2018-11-12 14:54 /testjoin/part-00005

-rw-r--r-- 3 root supergroup 0 2018-11-12 14:54 /testjoin/part-00006

-rw-r--r-- 3 root supergroup 0 2018-11-12 14:54 /testjoin/part-00007

-rw-r--r-- 3 root supergroup 0 2018-11-12 14:54 /testjoin/part-00008

-rw-r--r-- 3 root supergroup 0 2018-11-12 14:54 /testjoin/part-00009

-rw-r--r-- 3 root supergroup 0 2018-11-12 14:54 /testjoin/part-00010

-rw-r--r-- 3 root supergroup 0 2018-11-12 14:54 /testjoin/part-00011

-rw-r--r-- 3 root supergroup 0 2018-11-12 14:54 /testjoin/part-00012

-rw-r--r-- 3 root supergroup 0 2018-11-12 14:54 /testjoin/part-00013

-rw-r--r-- 3 root supergroup 14 2018-11-12 14:54 /testjoin/part-00014

-rw-r--r-- 3 root supergroup 0 2018-11-12 14:54 /testjoin/part-00015

root@host2 bin# ./hdfs dfs -cat /testjoin/part-00002

(tom,(1,2))

(tom,(1,8))

root@host2 bin# ./hdfs dfs -cat /testjoin/part-00014

(jerry,(2,9))

根据结果,只有tom和jerry被依据条件保留了下来

scala> val rdd11 = rdd8.leftOuterJoin(rdd9) //left join

rdd11: org.apache.spark.rdd.RDD[(String, (Int, OptionInt))] = MapPartitionsRDD40 at leftOuterJoin at <console>:28

scala> rdd11.saveAsTextFile("hdfs://192.168.5.182:8020/leftjointest")

root@host2 bin# ./hdfs dfs -ls /leftjointest

Found 17 items

-rw-r--r-- 3 root supergroup 0 2018-11-12 15:15 /leftjointest/_SUCCESS

-rw-r--r-- 3 root supergroup 0 2018-11-12 15:15 /leftjointest/part-00000

-rw-r--r-- 3 root supergroup 0 2018-11-12 15:15 /leftjointest/part-00001

-rw-r--r-- 3 root supergroup 36 2018-11-12 15:15 /leftjointest/part-00002

-rw-r--r-- 3 root supergroup 0 2018-11-12 15:15 /leftjointest/part-00003

-rw-r--r-- 3 root supergroup 0 2018-11-12 15:15 /leftjointest/part-00004

-rw-r--r-- 3 root supergroup 0 2018-11-12 15:15 /leftjointest/part-00005

-rw-r--r-- 3 root supergroup 0 2018-11-12 15:15 /leftjointest/part-00006

-rw-r--r-- 3 root supergroup 0 2018-11-12 15:15 /leftjointest/part-00007

-rw-r--r-- 3 root supergroup 0 2018-11-12 15:15 /leftjointest/part-00008

-rw-r--r-- 3 root supergroup 0 2018-11-12 15:15 /leftjointest/part-00009

-rw-r--r-- 3 root supergroup 0 2018-11-12 15:15 /leftjointest/part-00010

-rw-r--r-- 3 root supergroup 17 2018-11-12 15:15 /leftjointest/part-00011

-rw-r--r-- 3 root supergroup 0 2018-11-12 15:15 /leftjointest/part-00012

-rw-r--r-- 3 root supergroup 0 2018-11-12 15:15 /leftjointest/part-00013

-rw-r--r-- 3 root supergroup 20 2018-11-12 15:15 /leftjointest/part-00014

-rw-r--r-- 3 root supergroup 0 2018-11-12 15:15 /leftjointest/part-00015

root@host2 bin# ./hdfs dfs -cat /leftjointest/part-00002

(tom,(1,Some(8)))

(tom,(1,Some(2)))

root@host2 bin# ./hdfs dfs -cat /leftjointest/part-00011

(kitty,(3,None))

root@host2 bin# ./hdfs dfs -cat /leftjointest/part-00014

(jerry,(2,Some(9)))

rdd8的元素都被保留下来,rdd9中有相同的元素会被选出来。

scala> rdd11.collect

res18: Array[(String, (Int, OptionInt))] = Array((tom,(1,Some(8))), (tom,(1,Some(2))), (kitty,(3,None)), (jerry,(2,Some(9))))

在Drive中直接查看结果,跟保存在hadoop hdfs中相同。


scala> val rdd12 = rdd8.union(rdd9)

rdd12: org.apache.spark.rdd.RDD(String, Int) = UnionRDD42 at union at <console>:28

scala> rdd12.collect

res20: Array(String, Int) = Array((tom,1), (jerry,2), (kitty,3), (jerry,9), (tom,8), (shuke,7), (tom,2))

scala> rdd12.groupByKey.collect //分组

res21: Array[(String, IterableInt)] = Array((tom,CompactBuffer(2, 1, 8)), (shuke,CompactBuffer(7)), (kitty,CompactBuffer(3)), (jerry,CompactBuffer(2, 9)))


现在我们不用reduceByKey来计算hadoop hdfs中/usr/file/a.txt中的WordCount,而使用groupByKey

scala> val wordAndOne = sc.textFile("hdfs://192.168.5.182:8020/usr/file/a.txt")

wordAndOne: org.apache.spark.rdd.RDDString = hdfs://192.168.5.182:8020/usr/file/a.txt MapPartitionsRDD45 at textFile at <console>:24

scala> wordAndOne.flatMap(_.split(" ")).map((_, 1)).groupByKey.collect

res23: Array[(String, IterableInt)] = Array((him,CompactBuffer(1)), (park,CompactBuffer(1)), (fool,CompactBuffer(1)), (dinsh,CompactBuffer(1)), (fish,CompactBuffer(1)), (dog,CompactBuffer(1)), (apple,CompactBuffer(1)), (cry,CompactBuffer(1)), (my,CompactBuffer(1)), (ice,CompactBuffer(1)), (cark,CompactBuffer(1)), (balana,CompactBuffer(1)), (fuck,CompactBuffer(1)))

scala> wordAndOne.flatMap(_.split(" ")).map((_, 1)).groupByKey().mapValues(_.sum).collect //mapValues对对偶元组的值进行操作,_.sum对每个值进行求和,这样得出的结果跟之前一样。

res24: Array(String, Int) = Array((him,1), (park,1), (fool,1), (dinsh,1), (fish,1), (dog,1), (apple,1), (cry,1), (my,1), (ice,1), (cark,1), (balana,1), (fuck,1))

虽然结果一样,但是在数据量大的时候,使用reduceByKey,因为reduceByKey会先在各个计算服务器上先计算,而groupByKey会把所有数据放入一台计算服务器中,再进行计算,这样消耗会非常大


scala> val rdd1 = sc.parallelize(List(("tom",1),("tom",2),("jerry",3),("kitty",2)))

rdd1: org.apache.spark.rdd.RDD(String, Int) = ParallelCollectionRDD0 at parallelize at <console>:24

scala> val rdd2 = sc.parallelize(List(("jerry",2),("tom",1),("shuke",2)))

rdd2: org.apache.spark.rdd.RDD(String, Int) = ParallelCollectionRDD1 at parallelize at <console>:24

scala> val rdd3 = rdd1.cogroup(rdd2) //对对偶元组所在的集合的RDD进行操作,以Key为依据进行分组,获得一个新的对偶元组数组,对偶元组中,保留Key,而Value为每一个RDD中的Value集合组成的元组。

rdd3: org.apache.spark.rdd.RDD[(String, (IterableInt, IterableInt))] = MapPartitionsRDD3 at cogroup at <console>:28

scala> rdd3.collect

[Stage 0:> [Stage 0:> res0: Array[(String, (IterableInt, IterableInt))] = Array((tom,(CompactBuffer(1, 2),CompactBuffer(1))), (shuke,(CompactBuffer(),CompactBuffer(2))), (kitty,(CompactBuffer(2),CompactBuffer())), (jerry,(CompactBuffer(3),CompactBuffer(2))))


scala> val rdd1 = sc.parallelize(List("tom","jerry"))

rdd1: org.apache.spark.rdd.RDDString = ParallelCollectionRDD4 at parallelize at <console>:24

scala> val rdd2 = sc.parallelize(List("tom","kitty","shuke"))

rdd2: org.apache.spark.rdd.RDDString = ParallelCollectionRDD5 at parallelize at <console>:24

scala> val rdd3 = rdd1.cartesian(rdd2) //求笛卡尔积

rdd3: org.apache.spark.rdd.RDD(String, String) = CartesianRDD6 at cartesian at <console>:28

scala> rdd3.collect

res1: Array(String, String) = Array((tom,tom), (tom,kitty), (tom,shuke), (jerry,tom), (jerry,kitty), (jerry,shuke))


Action

scala> val rdd1 = sc.parallelize(List(1,2,3,4,5),3) //并行化创建时指定3个分区

rdd1: org.apache.spark.rdd.RDDInt = ParallelCollectionRDD7 at parallelize at <console>:24

scala> rdd1.saveAsTextFile("hdfs://192.168.5.182:8020/testsave")

root@host2 bin# ./hdfs dfs -ls /

Found 4 items

drwxr-xr-x - root supergroup 0 2018-11-12 15:15 /leftjointest

drwxr-xr-x - root supergroup 0 2018-11-12 14:54 /testjoin

drwxr-xr-x - root supergroup 0 2018-11-15 11:07 /testsave

drwxr-xr-x - root supergroup 0 2018-09-14 13:44 /usr

root@host2 bin# ./hdfs dfs -ls /testsave

Found 4 items

-rw-r--r-- 3 root supergroup 0 2018-11-15 11:07 /testsave/_SUCCESS

-rw-r--r-- 3 root supergroup 2 2018-11-15 11:07 /testsave/part-00000

-rw-r--r-- 3 root supergroup 4 2018-11-15 11:07 /testsave/part-00001

-rw-r--r-- 3 root supergroup 4 2018-11-15 11:07 /testsave/part-00002

root@host2 bin# ./hdfs dfs -cat /testsave/part-00000

1

root@host2 bin# ./hdfs dfs -cat /testsave/part-00001

2

3

root@host2 bin# ./hdfs dfs -cat /testsave/part-00002

4

5

在Hadoop hdfs里,我们可以看到,他有3个part保存结果

scala> val rdd1 = sc.parallelize(List(1,2,3,4,5)) //不指定分区

rdd1: org.apache.spark.rdd.RDDInt = ParallelCollectionRDD9 at parallelize at <console>:24

scala> rdd1.saveAsTextFile("hdfs://192.168.5.182:8020/testsave1")

root@host2 bin# ./hdfs dfs -ls /

Found 5 items

drwxr-xr-x - root supergroup 0 2018-11-12 15:15 /leftjointest

drwxr-xr-x - root supergroup 0 2018-11-12 14:54 /testjoin

drwxr-xr-x - root supergroup 0 2018-11-15 11:07 /testsave

drwxr-xr-x - root supergroup 0 2018-11-15 11:09 /testsave1

drwxr-xr-x - root supergroup 0 2018-09-14 13:44 /usr

root@host2 bin# ./hdfs dfs -ls /testsave1

Found 17 items

-rw-r--r-- 3 root supergroup 0 2018-11-15 11:09 /testsave1/_SUCCESS

-rw-r--r-- 3 root supergroup 0 2018-11-15 11:09 /testsave1/part-00000

-rw-r--r-- 3 root supergroup 0 2018-11-15 11:09 /testsave1/part-00001

-rw-r--r-- 3 root supergroup 0 2018-11-15 11:09 /testsave1/part-00002

-rw-r--r-- 3 root supergroup 2 2018-11-15 11:09 /testsave1/part-00003

-rw-r--r-- 3 root supergroup 0 2018-11-15 11:09 /testsave1/part-00004

-rw-r--r-- 3 root supergroup 0 2018-11-15 11:09 /testsave1/part-00005

-rw-r--r-- 3 root supergroup 2 2018-11-15 11:09 /testsave1/part-00006

-rw-r--r-- 3 root supergroup 0 2018-11-15 11:09 /testsave1/part-00007

-rw-r--r-- 3 root supergroup 0 2018-11-15 11:09 /testsave1/part-00008

-rw-r--r-- 3 root supergroup 2 2018-11-15 11:09 /testsave1/part-00009

-rw-r--r-- 3 root supergroup 0 2018-11-15 11:09 /testsave1/part-00010

-rw-r--r-- 3 root supergroup 0 2018-11-15 11:09 /testsave1/part-00011

-rw-r--r-- 3 root supergroup 2 2018-11-15 11:09 /testsave1/part-00012

-rw-r--r-- 3 root supergroup 0 2018-11-15 11:09 /testsave1/part-00013

-rw-r--r-- 3 root supergroup 0 2018-11-15 11:09 /testsave1/part-00014

-rw-r--r-- 3 root supergroup 2 2018-11-15 11:09 /testsave1/part-00015

root@host2 bin# ./hdfs dfs -cat /testsave1/part-00003

1

root@host2 bin# ./hdfs dfs -cat /testsave1/part-00006

2

root@host2 bin# ./hdfs dfs -cat /testsave1/part-00009

3

root@host2 bin# ./hdfs dfs -cat /testsave1/part-00012

4

root@host2 bin# ./hdfs dfs -cat /testsave1/part-00015

5

不指定分区,我们可以看到有16个分区,这跟我们启动Spark-Shell时使用的核数有关系

root@host2 bin# ./spark-shell --master spark://host2:7077,host1:7077 --executor-memory 1g --total-executor-cores 16

这里我使用的16核,1G内存来启动本次计算,值得注意的是这里并不是分区越大越好,分区较大,也只有16个线程同时工作,其他线程等待,而切换线程会浪费时间。


scala> val rdd = sc.textFile("hdfs://192.168.5.182:8020/usr/file/wcount")

rdd: org.apache.spark.rdd.RDDString = hdfs://192.168.5.182:8020/usr/file/wcount MapPartitionsRDD12 at textFile at <console>:24

root@host2 bin# ./hdfs dfs -ls /usr/file/wcount

Found 3 items

-rw-r--r-- 3 root supergroup 0 2018-11-03 16:20 /usr/file/wcount/_SUCCESS

-rw-r--r-- 3 root supergroup 78 2018-11-03 16:20 /usr/file/wcount/part-00000

-rw-r--r-- 3 root supergroup 37 2018-11-03 16:20 /usr/file/wcount/part-00001

scala> rdd.partitions.length //查看RDD的分区数

res4: Int = 3

这里我们可以看到hadoop hdfs里/usr/file/wcount下面有3个文件,RDD的分区数则为3,如果我们上传一个新的文件进入该文件夹

root@host2 bin# ./hdfs dfs -put /home/soft/schema.xml /usr/file/wcount

root@host2 bin# ./hdfs dfs -ls /usr/file/wcount

Found 4 items

-rw-r--r-- 3 root supergroup 0 2018-11-03 16:20 /usr/file/wcount/_SUCCESS

-rw-r--r-- 3 root supergroup 78 2018-11-03 16:20 /usr/file/wcount/part-00000

-rw-r--r-- 3 root supergroup 37 2018-11-03 16:20 /usr/file/wcount/part-00001

-rw-r--r-- 3 root supergroup 3320 2018-11-15 14:34 /usr/file/wcount/schema.xml

scala> val rdd = sc.textFile("hdfs://192.168.5.182:8020/usr/file/wcount")

rdd: org.apache.spark.rdd.RDDString = hdfs://192.168.5.182:8020/usr/file/wcount MapPartitionsRDD14 at textFile at <console>:24

scala> rdd.partitions.length

res5: Int = 4

则该RDD的分区数变成了4.


scala> val rdd1 = sc.parallelize(List(1,2,3,4,5),2)

rdd1: org.apache.spark.rdd.RDDInt = ParallelCollectionRDD15 at parallelize at <console>:24

scala> rdd1.reduce(_ _)

res6: Int = 15

我们这里可以看到reduce没有返回一个RDD,而是直接返回了一个值,说明reduce()是一个Action算子

scala> rdd1.count

res7: Long = 5

集合包含的元素数量,也是一个Action算子

scala> rdd1.top(2)

res8: ArrayInt = Array(5, 4)

将元素进行排序,按照降序取最大的n个

scala> rdd1.take(2)

res9: ArrayInt = Array(1, 2)

取前n个元素,不排序

scala> rdd1.first

res10: Int = 1

取第一个元素

scala> rdd1.takeOrdered(3)

res11: ArrayInt = Array(1, 2, 3)

排序,按照升序,取前n个元素


scala> val rdd = sc.parallelize(List(1,2,3,4,5,6,7,8,9),2)

rdd: org.apache.spark.rdd.RDDInt = ParallelCollectionRDD0 at parallelize at <console>:24

scala> val func = (index: Int,it: IteratorInt) => {

代码语言:txt复制
  |       it.map(e => s"part: $index, ele: $e")
代码语言:txt复制
  |     }

func: (Int, IteratorInt) => IteratorString = <function2>

定义一个专门获取集合数据e所在分区index的函数

scala> val rdd2 = rdd.mapPartitionsWithIndex(func) //一次性获取一个分区的集合数据,并且知道这个集合的数据在哪个分区

rdd2: org.apache.spark.rdd.RDDString = MapPartitionsRDD1 at mapPartitionsWithIndex at <console>:28

scala> rdd2.collect

res0: ArrayString = Array(part: 0, ele: 1, part: 0, ele: 2, part: 0, ele: 3, part: 0, ele: 4, part: 1, ele: 5, part: 1, ele: 6, part: 1, ele: 7, part: 1, ele: 8, part: 1, ele: 9)

1,2,3,4在0分区;5,6,7,8,9在1分区。

scala> rdd.aggregate(0)(_ _,_ _) //第一个_ _表示在每个分区内各自相加(这里是2个分区),第二个_ _表示再总求和(先分散,再聚合)

res6: Int = 45

scala> rdd.aggregate(0)(math.max(_,_),_ _) //math.max(_,_)表示取各个分区的最大值,_ _表示各个最大值相加

res7: Int = 13

scala> val rdd = sc.parallelize(List(1,2,3,4,5,6,7,8,9),3) //3个分区

rdd: org.apache.spark.rdd.RDDInt = ParallelCollectionRDD6 at parallelize at <console>:24

scala> rdd.aggregate(0)(_ _,_ _) //3个分区分别相加,再汇总

res8: Int = 45

scala> rdd.aggregate(0)(math.max(_,_),_ _) //3个分区的最大值相加,这里为3 6 9

res9: Int = 18

scala> rdd.aggregate(5)(math.max(_,_),_ _)

res10: Int = 25

这里5作为一个值被加到各个分区做比较,第一个分区1,2,3都比5小,所以第一个分区最大值为5,第二个分区最大值为6,第三个分区最大值为9,5 6 9=20,同时5又作为一个单独分区被统加,所以这里是5 6 9 5=25


scala> val rdd = sc.parallelize(List("a","b","c","d","e","f"),2)

rdd: org.apache.spark.rdd.RDDString = ParallelCollectionRDD2 at parallelize at <console>:24

scala> def func2(index: Int, iter: IteratorString): IteratorString = {

代码语言:txt复制
  |    iter.map(x => "[partID:"   index   ",val:"   x   "]")
代码语言:txt复制
  | }

func2: (index: Int, iter: IteratorString)IteratorString

定义一个专门获取集合数据x所在分区index的函数

scala> val rdd1 = rdd.mapPartitionsWithIndex(func2) //一次性获取一个分区的集合数据,并且知道这个集合的数据在哪个分区

rdd1: org.apache.spark.rdd.RDDString = MapPartitionsRDD4 at mapPartitionsWithIndex at <console>:28

scala> rdd1.collect

res3: ArrayString = Array(partID:0,val:a, partID:0,val:b, partID:0,val:c, partID:1,val:d, partID:1,val:e, partID:1,val:f)

a,b,c在0分区;d,e,f在1分区

scala> rdd.aggregate("")(_ _,_ _)

res18: String = defabc

scala> rdd.aggregate("")(_ _,_ _)

res19: String = abcdef

这里出现了两个不同的结果,其原因就在于rdd有两个分区,而每个分区在worker里面的executor是并行计算的,他们返回到rdd的结果速度不一定,谁先返回,谁在前面。

scala> rdd.aggregate("|")(_ _,_ _)

res20: String = ||abc|def

scala> rdd.aggregate("|")(_ _,_ _)

res21: String = ||def|abc

这里也是出现了两个结果,原因同上,|被分配到每一个分区作为第一个字符被连接,同时|作为一个单独的分区被连接字符串。


scala> val rdd = sc.parallelize(List("12","23","345","4567"),2)

rdd: org.apache.spark.rdd.RDDString = ParallelCollectionRDD8 at parallelize at <console>:24

scala> rdd.aggregate("")((x,y) => math.max(x.length,y.length).toString,(x,y) => x y)

res24: String = 24

scala> rdd.aggregate("")((x,y) => math.max(x.length,y.length).toString,(x,y) => x y)

res25: String = 42

(x,y) => math.max(x.length,y.length).toString每一个分区取最大的字符串长度转成字符串,(x,y) => x y所有分区结果字符串的拼接。第一个分区"12","23"的最大字符串长度为2,第二个分区"345","4567"的最大字符串长度为4.所以有两个结果,谁先返回谁在前面,返回的结果为"24"或者"42".


scala> val rdd = sc.parallelize(List("12","23","345",""),2)

rdd: org.apache.spark.rdd.RDDString = ParallelCollectionRDD9 at parallelize at <console>:24

scala> rdd.aggregate("")((x,y) => math.min(x.length,y.length).toString,(x,y) => x y)

res28: String = 01

scala> rdd.aggregate("")((x,y) => math.min(x.length,y.length).toString,(x,y) => x y)

res29: String = 10

这个比较难以理解,第一个分区""跟"12"比较得到长度为"0"的字符串,然后"0"的字符串跟"23"比较,得到长度为"1"的字符串;第二个分区,""跟"345"比较得到"0"的字符串,"0"的字符串跟""比较得到"0"的字符串,所以返回的是"01"或者是"10",我们可以用下面这个rdd来验证。

scala> val rdd = sc.parallelize(List("12","23","345","67"),2)

rdd: org.apache.spark.rdd.RDDString = ParallelCollectionRDD10 at parallelize at <console>:24

scala> rdd.aggregate("")((x,y) => math.min(x.length,y.length).toString,(x,y) => x y)

res30: String = 11

这里唯一的不同就在于"0"的字符串跟"67"比较得到"1"的字符串


scala> val pairRDD = sc.parallelize(List(("cat",2),("cat",5),("mouse",4),("cat",12),("dog",12),("mouse",2)),2)

pairRDD: org.apache.spark.rdd.RDD(String, Int) = ParallelCollectionRDD11 at parallelize at <console>:24

scala> pairRDD.aggregateByKey(0)(_ _,_ _).collect //各个分区相加,再聚合相加

res33: Array(String, Int) = Array((dog,12), (cat,19), (mouse,6))

scala> pairRDD.aggregateByKey(100)(_ _,_ _).collect

res34: Array(String, Int) = Array((dog,112), (cat,219), (mouse,206))

初始值100,会在每个分区的都加一次,dog在第一个分区中没有,第二个分区中加得112;cat在第一个分区和第二个分区都有,所以100会加两次,得到219,mouse同理。

当然我们只是为了获取对偶元组key的value值的和,可以使用reduceByKey,这里不需要分区,结果跟初始值为0的aggregateByKey相同

scala> pairRDD.reduceByKey(_ _).collect

res31: Array(String, Int) = Array((dog,12), (cat,19), (mouse,6))

scala> pairRDD.aggregateByKey(100)(_ _,_ _).saveAsTextFile("hdfs://192.168.5.182:8020/aggbk")

root@host2 bin# ./hdfs dfs -ls /aggbk

Found 3 items

-rw-r--r-- 3 root supergroup 0 2018-11-16 17:22 /aggbk/_SUCCESS

-rw-r--r-- 3 root supergroup 20 2018-11-16 17:22 /aggbk/part-00000

-rw-r--r-- 3 root supergroup 12 2018-11-16 17:22 /aggbk/part-00001

root@host2 bin# ./hdfs dfs -cat /aggbk/part-00000

(dog,112)

(cat,219)

root@host2 bin# ./hdfs dfs -cat /aggbk/part-00001

(mouse,206)

root@host2 bin#

将初始值100的结果保存进hadoop hdfs中,因为我们创建RDD的时候是2个分区,所以这里只有2个part文件,查看结果跟之前collect相同。


scala> val rdd = sc.parallelize(List(("a",1),("b",2)))

rdd: org.apache.spark.rdd.RDD(String, Int) = ParallelCollectionRDD18 at parallelize at <console>:24

scala> rdd.collectAsMap //将结果收集转换成Map

res36: scala.collection.MapString,Int = Map(b -> 2, a -> 1)

scala> rdd.mapValues(_ * 100).collectAsMap //将value乘以100,收集成Map

res37: scala.collection.MapString,Int = Map(b -> 200, a -> 100)


RDD的执行过程,先把List(1,2,3,4,5)分3个区,生成task,推送到3个Worker的Executor中,在Executor中经过计算,得到结果,再收集回Driver中,以数组的形式返回,返回的结果,有快有慢,但是他依然会按照分区编号来进行组装成一个Array,所以他的顺序并不会变化。

scala> val rdd = sc.parallelize(List(1,2,3,4,5),3)

rdd: org.apache.spark.rdd.RDDInt = ParallelCollectionRDD2 at parallelize at <console>:24

scala> rdd.map(_ * 10).collect

res19: ArrayInt = Array(10, 20, 30, 40, 50)

scala> rdd.map(_ * 10).collect

res20: ArrayInt = Array(10, 20, 30, 40, 50)

scala> rdd.map(_ * 10).collect

res21: ArrayInt = Array(10, 20, 30, 40, 50)

这里无论执行多少次,顺序都不会变。

如果要将结果保存到数据库中,当数据量过大时,应该通过Executor直接写入数据库,而不是通过Driver收集再存入数据库。


scala> val rdd = sc.parallelize(List(("a",1),("b",2),("b",2),("c",2),("c",1)))

rdd: org.apache.spark.rdd.RDD(String, Int) = ParallelCollectionRDD6 at parallelize at <console>:24

scala> rdd.countByKey() //跟对偶元组的Value无关,只看Key的出现次数

res22: scala.collection.MapString,Long = Map(a -> 1, b -> 2, c -> 2)


scala> val rdd = sc.parallelize(List(("a",1),("b",2),("b",2),("c",2),("c",1),("d",4),("d",2),("e",1)))

rdd: org.apache.spark.rdd.RDD(String, Int) = ParallelCollectionRDD10 at parallelize at <console>:24

scala> val rdd1 = rdd.filterByRange("b","d") //以对偶数组的Key为过滤条件,只取"b"到"d"的范围的元组

rdd1: org.apache.spark.rdd.RDD(String, Int) = MapPartitionsRDD11 at filterByRange at <console>:26

scala> rdd1.collect

res24: Array(String, Int) = Array((b,2), (b,2), (c,2), (c,1), (d,4), (d,2))


scala> val a = sc.parallelize(List(("a","1 2"),("b","3 4")))

a: org.apache.spark.rdd.RDD(String, String) = ParallelCollectionRDD12 at parallelize at <console>:24

scala> a.flatMapValues(_.split(" ")).collect //对对偶元组的Value进行扁平化处理

res25: Array(String, String) = Array((a,1), (a,2), (b,3), (b,4))


scala> val rdd = sc.parallelize(List("dog","wolf","cat","bear"),2)

rdd: org.apache.spark.rdd.RDDString = ParallelCollectionRDD14 at parallelize at <console>:24

scala> val rdd1 = rdd.map(x => (x.length,x)) //将rdd的元素转成带对偶元组的集合,形成一个新的RDD的rdd1

rdd1: org.apache.spark.rdd.RDD(Int, String) = MapPartitionsRDD15 at map at <console>:26

scala> rdd1.collect

res26: Array(Int, String) = Array((3,dog), (4,wolf), (3,cat), (4,bear))

现在我们要将rdd1以相同的Key,将Value拼接起来,有以下三种方法

scala> rdd1.aggregateByKey("")(_ _,_ _).collect

res27: Array(Int, String) = Array((4,bearwolf), (3,dogcat))

scala> rdd1.aggregateByKey("")(_ _,_ _).collect

res28: Array(Int, String) = Array((4,wolfbear), (3,catdog))

scala> rdd1.reduceByKey(_ _).collect

res40: Array(Int, String) = Array((4,bearwolf), (3,dogcat))

scala> rdd1.foldByKey("")(_ _).collect

res41: Array(Int, String) = Array((4,bearwolf), (3,dogcat))

其实这3种方法都可以实现分散聚合,是因为他们都调用了同一个底层方法combineByKeyWithClassTag


scala> val rdd = sc.parallelize(List(1,2,3,4,5,6,7,8,9),2)

rdd: org.apache.spark.rdd.RDDInt = ParallelCollectionRDD31 at parallelize at <console>:24

scala> rdd.foreach(e => println(e * 100))

这个foreach我们看到没有任何返回,其原因就在于这是在executor上执行的,并没有返回Driver.我们来看Spark的控制台

这里有一个Job Id为42的foreach,一直点进去可以看到

我们点击Tasks(2)的stdout可以看到当index为0时

当index为1时,可以看到

说明他们只是在executor中执行了rdd.foreach(e => println(e * 100))这条语句。

scala> rdd.foreachPartition(it => it.foreach(x => println(x * 10000))) //一次性拿出一个分区的数据放入迭代器,由迭代器来打印

我们可以看到这里也没有返回值,在Spark控制台中,可以看到

说明他也是在Executor中执行了该语句,并没有返回到Driver.

当我们要将Executor中的数据写入到数据库时,使用foreachPartition一次性拿出一个分区的数据,与数据库建立一次连接,就可以全部写进去,而使用foreach则需要每拿出一条数据就要与数据库建立一次连接,这样非常低效,而且消耗非常巨大。


scala> val pairRDD = sc.parallelize(List(("hello",2),("jerry",3),("hello",4),("jerry",1)),2)

pairRDD: org.apache.spark.rdd.RDD(String, Int) = ParallelCollectionRDD1 at parallelize at <console>:24

scala> val rdd = pairRDD.combineByKey(x => x,(m: Int,n: Int) => m n,(a: Int,b: Int) => a b)

rdd: org.apache.spark.rdd.RDD(String, Int) = ShuffledRDD2 at combineByKey at <console>:26

combineByKey是一个底层的算子,必须要声明参数的类型,不能使用类似_ _的写法;ShuffledRDD是把有相同的Key的对偶元组放到同一个Executor中,再进行运算。

scala> rdd.collect

res1: Array(String, Int) = Array((hello,6), (jerry,4))

我们来看一个把各种动物按照单双来进行分组的例子

scala> val rdd = sc.parallelize(List("dog","cat","gnu","salmon","rabbit","turkey","wolf","bear","bee"),3)

rdd: org.apache.spark.rdd.RDDString = ParallelCollectionRDD0 at parallelize at <console>:24

scala> val rdd1 = sc.parallelize(List(1,1,2,2,2,1,2,2,2),3)

rdd1: org.apache.spark.rdd.RDDInt = ParallelCollectionRDD1 at parallelize at <console>:24

scala> val rdd2 = rdd1.zip(rdd) //将两个RDD的集合合并成一个对偶元组的集合

rdd2: org.apache.spark.rdd.RDD(Int, String) = ZippedPartitionsRDD22 at zip at <console>:28

scala> rdd2.collect

res0: Array(Int, String) = Array((1,dog), (1,cat), (2,gnu), (2,salmon), (2,rabbit), (1,turkey), (2,wolf), (2,bear), (2,bee))

scala> import scala.collection.mutable.ListBuffer

import scala.collection.mutable.ListBuffer

scala> val rdd3 = rdd2.combineByKey(x => ListBuffer(x),(m: ListBufferString,n: String) => m = n,(a: ListBufferString,b: ListBufferString) => a = b)

rdd3: org.apache.spark.rdd.RDD[(Int, scala.collection.mutable.ListBufferString)] = ShuffledRDD4 at combineByKey at <console>:31

第一个函数x => ListBuffer(x)是将分好组的各种Key(这里Key为数字)的第一个Value(Value为动物)放进一个单独的ListBuffer中,比如第一个分区中只有ListBuffer(dog)和ListBuffer(gnu),没有cat,因为cat不是1的第一个Value,其他分区以此类推;第二个函数(m: ListBufferString,n: String) => m = n将没有放进ListBuffer中的其他Value放进有相同Key的ListBuffer中,比如第一个分区中有ListBuffer(dog,cat),ListBuffer(gnu),此时只是在各个分区分别操作;第三个函数(a: ListBufferString,b: ListBufferString) => a = b进行所有分区整体聚合,将所有相同Key的ListBuffer合并,此时是一个Shuffled操作,会将有相同Key的ListBuffer放入到同一个机器中,计算完再合并。

scala> rdd3.collect

res2: Array[(Int, scala.collection.mutable.ListBufferString)] = Array((1,ListBuffer(dog, cat, turkey)), (2,ListBuffer(salmon, rabbit, gnu, wolf, bear, bee)))

整体概念图如下

将结果保存到hadoop hdfs中

scala> rdd3.saveAsTextFile("hdfs://192.168.5.182:8020/combine")

root@host2 bin# ./hdfs dfs -ls /combine

Found 4 items

-rw-r--r-- 3 root supergroup 0 2018-11-23 17:14 /combine/_SUCCESS

-rw-r--r-- 3 root supergroup 0 2018-11-23 17:14 /combine/part-00000

-rw-r--r-- 3 root supergroup 33 2018-11-23 17:14 /combine/part-00001

-rw-r--r-- 3 root supergroup 53 2018-11-23 17:14 /combine/part-00002

root@host2 bin# ./hdfs dfs -cat /combine/part-00001

(1,ListBuffer(turkey, dog, cat))

root@host2 bin# ./hdfs dfs -cat /combine/part-00002

(2,ListBuffer(gnu, wolf, bear, bee, salmon, rabbit))

虽然有3个分区,但是Shuffled以后,只有2个Key(1和2),所以只有两个文件有数据,但是有3个part文件。

我们可以重新定义rdd3的分区数

scala> import org.apache.spark.HashPartitioner

import org.apache.spark.HashPartitioner

scala> val rdd3 = rdd2.combineByKey(x => ListBuffer(x),(m: ListBufferString,n: String) => m = n,(a: ListBufferString,b: ListBufferString) => a = b,new HashPartitioner(2),true,null)

rdd3: org.apache.spark.rdd.RDD[(Int, scala.collection.mutable.ListBufferString)] = ShuffledRDD6 at combineByKey at <console>:32

重新保存到hadoop hdfs中

scala> rdd3.saveAsTextFile("hdfs://192.168.5.182:8020/combine1")

root@host2 bin# ./hdfs dfs -ls /combine1

Found 3 items

-rw-r--r-- 3 root supergroup 0 2018-11-23 17:27 /combine1/_SUCCESS

-rw-r--r-- 3 root supergroup 53 2018-11-23 17:27 /combine1/part-00000

-rw-r--r-- 3 root supergroup 33 2018-11-23 17:27 /combine1/part-00001

此时可以看到新保存的结果只有2个part文件,并且都有数据。

0 人点赞