RDD是一个抽象,会记录一些信息,他并不是一个真正的集合,但可以像集合一样操作,降低了开发难度。
RDD的算子分为2类,一种是Transformation(lazy不会立即执行,即便有错误也不会发现),一类是Action(触发任务执行)
创建RDD的方式有3种。
1、通过外部的存储系统创建RDD(如hadoop hdfs,HBase,MongoDB)
2、将Driver的Scala集合通过并行化的方式变成RDD(测试时使用,生产环境不适用)
3、调用一个已经存在的RDD的Transformation,会生成一个新的RDD.
1之前已经有过介绍,见提交第一个Spark统计文件单词数程序,配合hadoop hdfs
2
Spark context Web UI available at http://192.168.5.182:4040
Spark context available as 'sc' (master = spark://host2:7077,host1:7077, app id = app-20181112100219-0000).
Spark session available as 'spark'.
Welcome to
代码语言:txt复制 ____ __
代码语言:txt复制 / __/__ ___ _____/ /__
代码语言:txt复制 _ / _ / _ `/ __/ '_/
代码语言:txt复制/___/ .__/\_,_/_/ /_/\_ version 2.2.0
代码语言:txt复制 /_/
Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_45)
Type in expressions to have them evaluated.
Type :help for more information.
scala> val arr = Array(1,2,3,4,5,6,7,8,9,10)
arr: ArrayInt = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
scala> arr.map(_ * 10)
res0: ArrayInt = Array(10, 20, 30, 40, 50, 60, 70, 80, 90, 100)
scala> val rdd = sc.parallelize(arr) //将集合转成RDD
rdd: org.apache.spark.rdd.RDDInt = ParallelCollectionRDD0 at parallelize at <console>:26
scala> val rdds = rdd.map(_ * 10) //将每个元素乘以10形成一个新的RDD
rdds: org.apache.spark.rdd.RDDInt = MapPartitionsRDD1 at map at <console>:28
scala> rdds.collect //查看这个新的RDD,由于RDD并不是一个真正的集合,必须要经过一次从各个Worker收集才能查看数据
res3: ArrayInt = Array(10, 20, 30, 40, 50, 60, 70, 80, 90, 100)
scala> val rdd3 = rdd.filter(_ % 2 == 0) //过滤出偶数的集合生成一个新的RDD
rdd3: org.apache.spark.rdd.RDDInt = MapPartitionsRDD2 at filter at <console>:28
scala> rdd3.collect
res4: ArrayInt = Array(2, 4, 6, 8, 10)
这个时候我们来看管理界面
我们点进去这个Spark shell
我们可以看到他进行了2次收集
一路点击进去我们可以看到任务是在哪些机器上执行的详细情况
RDD的算子
scala> val rdd2 = sc.parallelize(List(5,10,6,7,4,3,8,2,9,1)).map(_ * 2).sortBy(x => x,true) //将List集合每个元素乘以2后按照升序排序
rdd2: org.apache.spark.rdd.RDDInt = MapPartitionsRDD9 at sortBy at <console>:24
scala> rdd2.collect
res5: ArrayInt = Array(2, 4, 6, 8, 10, 12, 14, 16, 18, 20)
我们可以看到他进行了排序和收集操作。
scala> val rdd2 = sc.parallelize(List(5,10,6,7,4,3,8,2,9,1)).map(_ * 2).sortBy(x => x "",true) //按照字符串规则来排序,不会改变集合的元素类型,这里依然是Int型集合
rdd2: org.apache.spark.rdd.RDDInt = MapPartitionsRDD16 at sortBy at <console>:24
scala> rdd2.collect
res6: ArrayInt = Array(10, 12, 14, 16, 18, 2, 20, 4, 6, 8)
我们可以看到排序后是先比较第一位,再比较第二位来进行排序,即字符串规则排序的
scala> val arr = Array("a b c","d e f","h i j")
arr: ArrayString = Array(a b c, d e f, h i j)
scala> arr.map(_.split(" "))
res10: Array[ArrayString] = Array(Array(a, b, c), Array(d, e, f), Array(h, i, j))
scala> arr.map(_.split(" ")).flatten //扁平化处理
res11: ArrayString = Array(a, b, c, d, e, f, h, i, j)
以上是集合操作
scala> val rdd4 = sc.parallelize(Array("a b c","d e f","h i j"))
rdd4: org.apache.spark.rdd.RDDString = ParallelCollectionRDD17 at parallelize at <console>:24
scala> rdd4.map(_.split(" ")).collect
res12: Array[ArrayString] = Array(Array(a, b, c), Array(d, e, f), Array(h, i, j))
由于RDD没有flatten方法,只能使用flatMap方法进行扁平化处理
scala> rdd4.flatMap(_.split(" ")).collect
res13: ArrayString = Array(a, b, c, d, e, f, h, i, j)
scala> val rdd5 = sc.parallelize(List(List("a b c","a b b"),List("e f g","a f g"),List("h i j","a a b")))
rdd5: org.apache.spark.rdd.RDD[ListString] = ParallelCollectionRDD21 at parallelize at <console>:24
scala> rdd5.flatMap(_.flatMap(_.split(" "))).collect //这两个flatMap不是一回事,一个是RDD的,他会把任务分发到各个计算服务器上进行计算;一个是List的,他只会在被分发到的计算服务器上进行计算
res14: ArrayString = Array(a, b, c, a, b, b, e, f, g, a, f, g, h, i, j, a, a, b)
scala> val rdd6 = sc.parallelize(List(5,6,4,7))
rdd6: org.apache.spark.rdd.RDDInt = ParallelCollectionRDD23 at parallelize at <console>:24
scala> val rdd7 = sc.parallelize(List(1,2,3,4))
rdd7: org.apache.spark.rdd.RDDInt = ParallelCollectionRDD24 at parallelize at <console>:24
scala> rdd6.union(rdd7).collect
res15: ArrayInt = Array(5, 6, 4, 7, 1, 2, 3, 4) //并集
scala> rdd6.intersection(rdd7).collect
res16: ArrayInt = Array(4) //交集
scala> val rdd8 = sc.parallelize(List(("tom",1),("jerry",2),("kitty",3))) //创建一个对偶元组的List的RDD
rdd8: org.apache.spark.rdd.RDD(String, Int) = ParallelCollectionRDD32 at parallelize at <console>:24
scala> val rdd9 = sc.parallelize(List(("jerry",9),("tom",8),("shuke",7),("tom",2)))
rdd9: org.apache.spark.rdd.RDD(String, Int) = ParallelCollectionRDD33 at parallelize at <console>:24
scala> val rdd10 = rdd8.join(rdd9) //类似于SQL的inner join,只对对偶元组的Key为依据生效
rdd10: org.apache.spark.rdd.RDD(String, (Int, Int)) = MapPartitionsRDD36 at join at <console>:28
scala> rdd10.saveAsTextFile("hdfs://192.168.5.182:8020/testjoin") //将结果保存在hadoop hdfs里面
[Stage 17:> [Stage 19:> [Stage 19:==============================================
root@host2 bin# ./hdfs dfs -ls /testjoin
Found 17 items
-rw-r--r-- 3 root supergroup 0 2018-11-12 14:54 /testjoin/_SUCCESS
-rw-r--r-- 3 root supergroup 0 2018-11-12 14:54 /testjoin/part-00000
-rw-r--r-- 3 root supergroup 0 2018-11-12 14:54 /testjoin/part-00001
-rw-r--r-- 3 root supergroup 24 2018-11-12 14:54 /testjoin/part-00002
-rw-r--r-- 3 root supergroup 0 2018-11-12 14:54 /testjoin/part-00003
-rw-r--r-- 3 root supergroup 0 2018-11-12 14:54 /testjoin/part-00004
-rw-r--r-- 3 root supergroup 0 2018-11-12 14:54 /testjoin/part-00005
-rw-r--r-- 3 root supergroup 0 2018-11-12 14:54 /testjoin/part-00006
-rw-r--r-- 3 root supergroup 0 2018-11-12 14:54 /testjoin/part-00007
-rw-r--r-- 3 root supergroup 0 2018-11-12 14:54 /testjoin/part-00008
-rw-r--r-- 3 root supergroup 0 2018-11-12 14:54 /testjoin/part-00009
-rw-r--r-- 3 root supergroup 0 2018-11-12 14:54 /testjoin/part-00010
-rw-r--r-- 3 root supergroup 0 2018-11-12 14:54 /testjoin/part-00011
-rw-r--r-- 3 root supergroup 0 2018-11-12 14:54 /testjoin/part-00012
-rw-r--r-- 3 root supergroup 0 2018-11-12 14:54 /testjoin/part-00013
-rw-r--r-- 3 root supergroup 14 2018-11-12 14:54 /testjoin/part-00014
-rw-r--r-- 3 root supergroup 0 2018-11-12 14:54 /testjoin/part-00015
root@host2 bin# ./hdfs dfs -cat /testjoin/part-00002
(tom,(1,2))
(tom,(1,8))
root@host2 bin# ./hdfs dfs -cat /testjoin/part-00014
(jerry,(2,9))
根据结果,只有tom和jerry被依据条件保留了下来
scala> val rdd11 = rdd8.leftOuterJoin(rdd9) //left join
rdd11: org.apache.spark.rdd.RDD[(String, (Int, OptionInt))] = MapPartitionsRDD40 at leftOuterJoin at <console>:28
scala> rdd11.saveAsTextFile("hdfs://192.168.5.182:8020/leftjointest")
root@host2 bin# ./hdfs dfs -ls /leftjointest
Found 17 items
-rw-r--r-- 3 root supergroup 0 2018-11-12 15:15 /leftjointest/_SUCCESS
-rw-r--r-- 3 root supergroup 0 2018-11-12 15:15 /leftjointest/part-00000
-rw-r--r-- 3 root supergroup 0 2018-11-12 15:15 /leftjointest/part-00001
-rw-r--r-- 3 root supergroup 36 2018-11-12 15:15 /leftjointest/part-00002
-rw-r--r-- 3 root supergroup 0 2018-11-12 15:15 /leftjointest/part-00003
-rw-r--r-- 3 root supergroup 0 2018-11-12 15:15 /leftjointest/part-00004
-rw-r--r-- 3 root supergroup 0 2018-11-12 15:15 /leftjointest/part-00005
-rw-r--r-- 3 root supergroup 0 2018-11-12 15:15 /leftjointest/part-00006
-rw-r--r-- 3 root supergroup 0 2018-11-12 15:15 /leftjointest/part-00007
-rw-r--r-- 3 root supergroup 0 2018-11-12 15:15 /leftjointest/part-00008
-rw-r--r-- 3 root supergroup 0 2018-11-12 15:15 /leftjointest/part-00009
-rw-r--r-- 3 root supergroup 0 2018-11-12 15:15 /leftjointest/part-00010
-rw-r--r-- 3 root supergroup 17 2018-11-12 15:15 /leftjointest/part-00011
-rw-r--r-- 3 root supergroup 0 2018-11-12 15:15 /leftjointest/part-00012
-rw-r--r-- 3 root supergroup 0 2018-11-12 15:15 /leftjointest/part-00013
-rw-r--r-- 3 root supergroup 20 2018-11-12 15:15 /leftjointest/part-00014
-rw-r--r-- 3 root supergroup 0 2018-11-12 15:15 /leftjointest/part-00015
root@host2 bin# ./hdfs dfs -cat /leftjointest/part-00002
(tom,(1,Some(8)))
(tom,(1,Some(2)))
root@host2 bin# ./hdfs dfs -cat /leftjointest/part-00011
(kitty,(3,None))
root@host2 bin# ./hdfs dfs -cat /leftjointest/part-00014
(jerry,(2,Some(9)))
rdd8的元素都被保留下来,rdd9中有相同的元素会被选出来。
scala> rdd11.collect
res18: Array[(String, (Int, OptionInt))] = Array((tom,(1,Some(8))), (tom,(1,Some(2))), (kitty,(3,None)), (jerry,(2,Some(9))))
在Drive中直接查看结果,跟保存在hadoop hdfs中相同。
scala> val rdd12 = rdd8.union(rdd9)
rdd12: org.apache.spark.rdd.RDD(String, Int) = UnionRDD42 at union at <console>:28
scala> rdd12.collect
res20: Array(String, Int) = Array((tom,1), (jerry,2), (kitty,3), (jerry,9), (tom,8), (shuke,7), (tom,2))
scala> rdd12.groupByKey.collect //分组
res21: Array[(String, IterableInt)] = Array((tom,CompactBuffer(2, 1, 8)), (shuke,CompactBuffer(7)), (kitty,CompactBuffer(3)), (jerry,CompactBuffer(2, 9)))
现在我们不用reduceByKey来计算hadoop hdfs中/usr/file/a.txt中的WordCount,而使用groupByKey
scala> val wordAndOne = sc.textFile("hdfs://192.168.5.182:8020/usr/file/a.txt")
wordAndOne: org.apache.spark.rdd.RDDString = hdfs://192.168.5.182:8020/usr/file/a.txt MapPartitionsRDD45 at textFile at <console>:24
scala> wordAndOne.flatMap(_.split(" ")).map((_, 1)).groupByKey.collect
res23: Array[(String, IterableInt)] = Array((him,CompactBuffer(1)), (park,CompactBuffer(1)), (fool,CompactBuffer(1)), (dinsh,CompactBuffer(1)), (fish,CompactBuffer(1)), (dog,CompactBuffer(1)), (apple,CompactBuffer(1)), (cry,CompactBuffer(1)), (my,CompactBuffer(1)), (ice,CompactBuffer(1)), (cark,CompactBuffer(1)), (balana,CompactBuffer(1)), (fuck,CompactBuffer(1)))
scala> wordAndOne.flatMap(_.split(" ")).map((_, 1)).groupByKey().mapValues(_.sum).collect //mapValues对对偶元组的值进行操作,_.sum对每个值进行求和,这样得出的结果跟之前一样。
res24: Array(String, Int) = Array((him,1), (park,1), (fool,1), (dinsh,1), (fish,1), (dog,1), (apple,1), (cry,1), (my,1), (ice,1), (cark,1), (balana,1), (fuck,1))
虽然结果一样,但是在数据量大的时候,使用reduceByKey,因为reduceByKey会先在各个计算服务器上先计算,而groupByKey会把所有数据放入一台计算服务器中,再进行计算,这样消耗会非常大
scala> val rdd1 = sc.parallelize(List(("tom",1),("tom",2),("jerry",3),("kitty",2)))
rdd1: org.apache.spark.rdd.RDD(String, Int) = ParallelCollectionRDD0 at parallelize at <console>:24
scala> val rdd2 = sc.parallelize(List(("jerry",2),("tom",1),("shuke",2)))
rdd2: org.apache.spark.rdd.RDD(String, Int) = ParallelCollectionRDD1 at parallelize at <console>:24
scala> val rdd3 = rdd1.cogroup(rdd2) //对对偶元组所在的集合的RDD进行操作,以Key为依据进行分组,获得一个新的对偶元组数组,对偶元组中,保留Key,而Value为每一个RDD中的Value集合组成的元组。
rdd3: org.apache.spark.rdd.RDD[(String, (IterableInt, IterableInt))] = MapPartitionsRDD3 at cogroup at <console>:28
scala> rdd3.collect
[Stage 0:> [Stage 0:> res0: Array[(String, (IterableInt, IterableInt))] = Array((tom,(CompactBuffer(1, 2),CompactBuffer(1))), (shuke,(CompactBuffer(),CompactBuffer(2))), (kitty,(CompactBuffer(2),CompactBuffer())), (jerry,(CompactBuffer(3),CompactBuffer(2))))
scala> val rdd1 = sc.parallelize(List("tom","jerry"))
rdd1: org.apache.spark.rdd.RDDString = ParallelCollectionRDD4 at parallelize at <console>:24
scala> val rdd2 = sc.parallelize(List("tom","kitty","shuke"))
rdd2: org.apache.spark.rdd.RDDString = ParallelCollectionRDD5 at parallelize at <console>:24
scala> val rdd3 = rdd1.cartesian(rdd2) //求笛卡尔积
rdd3: org.apache.spark.rdd.RDD(String, String) = CartesianRDD6 at cartesian at <console>:28
scala> rdd3.collect
res1: Array(String, String) = Array((tom,tom), (tom,kitty), (tom,shuke), (jerry,tom), (jerry,kitty), (jerry,shuke))
Action
scala> val rdd1 = sc.parallelize(List(1,2,3,4,5),3) //并行化创建时指定3个分区
rdd1: org.apache.spark.rdd.RDDInt = ParallelCollectionRDD7 at parallelize at <console>:24
scala> rdd1.saveAsTextFile("hdfs://192.168.5.182:8020/testsave")
root@host2 bin# ./hdfs dfs -ls /
Found 4 items
drwxr-xr-x - root supergroup 0 2018-11-12 15:15 /leftjointest
drwxr-xr-x - root supergroup 0 2018-11-12 14:54 /testjoin
drwxr-xr-x - root supergroup 0 2018-11-15 11:07 /testsave
drwxr-xr-x - root supergroup 0 2018-09-14 13:44 /usr
root@host2 bin# ./hdfs dfs -ls /testsave
Found 4 items
-rw-r--r-- 3 root supergroup 0 2018-11-15 11:07 /testsave/_SUCCESS
-rw-r--r-- 3 root supergroup 2 2018-11-15 11:07 /testsave/part-00000
-rw-r--r-- 3 root supergroup 4 2018-11-15 11:07 /testsave/part-00001
-rw-r--r-- 3 root supergroup 4 2018-11-15 11:07 /testsave/part-00002
root@host2 bin# ./hdfs dfs -cat /testsave/part-00000
1
root@host2 bin# ./hdfs dfs -cat /testsave/part-00001
2
3
root@host2 bin# ./hdfs dfs -cat /testsave/part-00002
4
5
在Hadoop hdfs里,我们可以看到,他有3个part保存结果
scala> val rdd1 = sc.parallelize(List(1,2,3,4,5)) //不指定分区
rdd1: org.apache.spark.rdd.RDDInt = ParallelCollectionRDD9 at parallelize at <console>:24
scala> rdd1.saveAsTextFile("hdfs://192.168.5.182:8020/testsave1")
root@host2 bin# ./hdfs dfs -ls /
Found 5 items
drwxr-xr-x - root supergroup 0 2018-11-12 15:15 /leftjointest
drwxr-xr-x - root supergroup 0 2018-11-12 14:54 /testjoin
drwxr-xr-x - root supergroup 0 2018-11-15 11:07 /testsave
drwxr-xr-x - root supergroup 0 2018-11-15 11:09 /testsave1
drwxr-xr-x - root supergroup 0 2018-09-14 13:44 /usr
root@host2 bin# ./hdfs dfs -ls /testsave1
Found 17 items
-rw-r--r-- 3 root supergroup 0 2018-11-15 11:09 /testsave1/_SUCCESS
-rw-r--r-- 3 root supergroup 0 2018-11-15 11:09 /testsave1/part-00000
-rw-r--r-- 3 root supergroup 0 2018-11-15 11:09 /testsave1/part-00001
-rw-r--r-- 3 root supergroup 0 2018-11-15 11:09 /testsave1/part-00002
-rw-r--r-- 3 root supergroup 2 2018-11-15 11:09 /testsave1/part-00003
-rw-r--r-- 3 root supergroup 0 2018-11-15 11:09 /testsave1/part-00004
-rw-r--r-- 3 root supergroup 0 2018-11-15 11:09 /testsave1/part-00005
-rw-r--r-- 3 root supergroup 2 2018-11-15 11:09 /testsave1/part-00006
-rw-r--r-- 3 root supergroup 0 2018-11-15 11:09 /testsave1/part-00007
-rw-r--r-- 3 root supergroup 0 2018-11-15 11:09 /testsave1/part-00008
-rw-r--r-- 3 root supergroup 2 2018-11-15 11:09 /testsave1/part-00009
-rw-r--r-- 3 root supergroup 0 2018-11-15 11:09 /testsave1/part-00010
-rw-r--r-- 3 root supergroup 0 2018-11-15 11:09 /testsave1/part-00011
-rw-r--r-- 3 root supergroup 2 2018-11-15 11:09 /testsave1/part-00012
-rw-r--r-- 3 root supergroup 0 2018-11-15 11:09 /testsave1/part-00013
-rw-r--r-- 3 root supergroup 0 2018-11-15 11:09 /testsave1/part-00014
-rw-r--r-- 3 root supergroup 2 2018-11-15 11:09 /testsave1/part-00015
root@host2 bin# ./hdfs dfs -cat /testsave1/part-00003
1
root@host2 bin# ./hdfs dfs -cat /testsave1/part-00006
2
root@host2 bin# ./hdfs dfs -cat /testsave1/part-00009
3
root@host2 bin# ./hdfs dfs -cat /testsave1/part-00012
4
root@host2 bin# ./hdfs dfs -cat /testsave1/part-00015
5
不指定分区,我们可以看到有16个分区,这跟我们启动Spark-Shell时使用的核数有关系
root@host2 bin# ./spark-shell --master spark://host2:7077,host1:7077 --executor-memory 1g --total-executor-cores 16
这里我使用的16核,1G内存来启动本次计算,值得注意的是这里并不是分区越大越好,分区较大,也只有16个线程同时工作,其他线程等待,而切换线程会浪费时间。
scala> val rdd = sc.textFile("hdfs://192.168.5.182:8020/usr/file/wcount")
rdd: org.apache.spark.rdd.RDDString = hdfs://192.168.5.182:8020/usr/file/wcount MapPartitionsRDD12 at textFile at <console>:24
root@host2 bin# ./hdfs dfs -ls /usr/file/wcount
Found 3 items
-rw-r--r-- 3 root supergroup 0 2018-11-03 16:20 /usr/file/wcount/_SUCCESS
-rw-r--r-- 3 root supergroup 78 2018-11-03 16:20 /usr/file/wcount/part-00000
-rw-r--r-- 3 root supergroup 37 2018-11-03 16:20 /usr/file/wcount/part-00001
scala> rdd.partitions.length //查看RDD的分区数
res4: Int = 3
这里我们可以看到hadoop hdfs里/usr/file/wcount下面有3个文件,RDD的分区数则为3,如果我们上传一个新的文件进入该文件夹
root@host2 bin# ./hdfs dfs -put /home/soft/schema.xml /usr/file/wcount
root@host2 bin# ./hdfs dfs -ls /usr/file/wcount
Found 4 items
-rw-r--r-- 3 root supergroup 0 2018-11-03 16:20 /usr/file/wcount/_SUCCESS
-rw-r--r-- 3 root supergroup 78 2018-11-03 16:20 /usr/file/wcount/part-00000
-rw-r--r-- 3 root supergroup 37 2018-11-03 16:20 /usr/file/wcount/part-00001
-rw-r--r-- 3 root supergroup 3320 2018-11-15 14:34 /usr/file/wcount/schema.xml
scala> val rdd = sc.textFile("hdfs://192.168.5.182:8020/usr/file/wcount")
rdd: org.apache.spark.rdd.RDDString = hdfs://192.168.5.182:8020/usr/file/wcount MapPartitionsRDD14 at textFile at <console>:24
scala> rdd.partitions.length
res5: Int = 4
则该RDD的分区数变成了4.
scala> val rdd1 = sc.parallelize(List(1,2,3,4,5),2)
rdd1: org.apache.spark.rdd.RDDInt = ParallelCollectionRDD15 at parallelize at <console>:24
scala> rdd1.reduce(_ _)
res6: Int = 15
我们这里可以看到reduce没有返回一个RDD,而是直接返回了一个值,说明reduce()是一个Action算子
scala> rdd1.count
res7: Long = 5
集合包含的元素数量,也是一个Action算子
scala> rdd1.top(2)
res8: ArrayInt = Array(5, 4)
将元素进行排序,按照降序取最大的n个
scala> rdd1.take(2)
res9: ArrayInt = Array(1, 2)
取前n个元素,不排序
scala> rdd1.first
res10: Int = 1
取第一个元素
scala> rdd1.takeOrdered(3)
res11: ArrayInt = Array(1, 2, 3)
排序,按照升序,取前n个元素
scala> val rdd = sc.parallelize(List(1,2,3,4,5,6,7,8,9),2)
rdd: org.apache.spark.rdd.RDDInt = ParallelCollectionRDD0 at parallelize at <console>:24
scala> val func = (index: Int,it: IteratorInt) => {
代码语言:txt复制 | it.map(e => s"part: $index, ele: $e")
代码语言:txt复制 | }
func: (Int, IteratorInt) => IteratorString = <function2>
定义一个专门获取集合数据e所在分区index的函数
scala> val rdd2 = rdd.mapPartitionsWithIndex(func) //一次性获取一个分区的集合数据,并且知道这个集合的数据在哪个分区
rdd2: org.apache.spark.rdd.RDDString = MapPartitionsRDD1 at mapPartitionsWithIndex at <console>:28
scala> rdd2.collect
res0: ArrayString = Array(part: 0, ele: 1, part: 0, ele: 2, part: 0, ele: 3, part: 0, ele: 4, part: 1, ele: 5, part: 1, ele: 6, part: 1, ele: 7, part: 1, ele: 8, part: 1, ele: 9)
1,2,3,4在0分区;5,6,7,8,9在1分区。
scala> rdd.aggregate(0)(_ _,_ _) //第一个_ _表示在每个分区内各自相加(这里是2个分区),第二个_ _表示再总求和(先分散,再聚合)
res6: Int = 45
scala> rdd.aggregate(0)(math.max(_,_),_ _) //math.max(_,_)表示取各个分区的最大值,_ _表示各个最大值相加
res7: Int = 13
scala> val rdd = sc.parallelize(List(1,2,3,4,5,6,7,8,9),3) //3个分区
rdd: org.apache.spark.rdd.RDDInt = ParallelCollectionRDD6 at parallelize at <console>:24
scala> rdd.aggregate(0)(_ _,_ _) //3个分区分别相加,再汇总
res8: Int = 45
scala> rdd.aggregate(0)(math.max(_,_),_ _) //3个分区的最大值相加,这里为3 6 9
res9: Int = 18
scala> rdd.aggregate(5)(math.max(_,_),_ _)
res10: Int = 25
这里5作为一个值被加到各个分区做比较,第一个分区1,2,3都比5小,所以第一个分区最大值为5,第二个分区最大值为6,第三个分区最大值为9,5 6 9=20,同时5又作为一个单独分区被统加,所以这里是5 6 9 5=25
scala> val rdd = sc.parallelize(List("a","b","c","d","e","f"),2)
rdd: org.apache.spark.rdd.RDDString = ParallelCollectionRDD2 at parallelize at <console>:24
scala> def func2(index: Int, iter: IteratorString): IteratorString = {
代码语言:txt复制 | iter.map(x => "[partID:" index ",val:" x "]")
代码语言:txt复制 | }
func2: (index: Int, iter: IteratorString)IteratorString
定义一个专门获取集合数据x所在分区index的函数
scala> val rdd1 = rdd.mapPartitionsWithIndex(func2) //一次性获取一个分区的集合数据,并且知道这个集合的数据在哪个分区
rdd1: org.apache.spark.rdd.RDDString = MapPartitionsRDD4 at mapPartitionsWithIndex at <console>:28
scala> rdd1.collect
res3: ArrayString = Array(partID:0,val:a, partID:0,val:b, partID:0,val:c, partID:1,val:d, partID:1,val:e, partID:1,val:f)
a,b,c在0分区;d,e,f在1分区
scala> rdd.aggregate("")(_ _,_ _)
res18: String = defabc
scala> rdd.aggregate("")(_ _,_ _)
res19: String = abcdef
这里出现了两个不同的结果,其原因就在于rdd有两个分区,而每个分区在worker里面的executor是并行计算的,他们返回到rdd的结果速度不一定,谁先返回,谁在前面。
scala> rdd.aggregate("|")(_ _,_ _)
res20: String = ||abc|def
scala> rdd.aggregate("|")(_ _,_ _)
res21: String = ||def|abc
这里也是出现了两个结果,原因同上,|被分配到每一个分区作为第一个字符被连接,同时|作为一个单独的分区被连接字符串。
scala> val rdd = sc.parallelize(List("12","23","345","4567"),2)
rdd: org.apache.spark.rdd.RDDString = ParallelCollectionRDD8 at parallelize at <console>:24
scala> rdd.aggregate("")((x,y) => math.max(x.length,y.length).toString,(x,y) => x y)
res24: String = 24
scala> rdd.aggregate("")((x,y) => math.max(x.length,y.length).toString,(x,y) => x y)
res25: String = 42
(x,y) => math.max(x.length,y.length).toString每一个分区取最大的字符串长度转成字符串,(x,y) => x y所有分区结果字符串的拼接。第一个分区"12","23"的最大字符串长度为2,第二个分区"345","4567"的最大字符串长度为4.所以有两个结果,谁先返回谁在前面,返回的结果为"24"或者"42".
scala> val rdd = sc.parallelize(List("12","23","345",""),2)
rdd: org.apache.spark.rdd.RDDString = ParallelCollectionRDD9 at parallelize at <console>:24
scala> rdd.aggregate("")((x,y) => math.min(x.length,y.length).toString,(x,y) => x y)
res28: String = 01
scala> rdd.aggregate("")((x,y) => math.min(x.length,y.length).toString,(x,y) => x y)
res29: String = 10
这个比较难以理解,第一个分区""跟"12"比较得到长度为"0"的字符串,然后"0"的字符串跟"23"比较,得到长度为"1"的字符串;第二个分区,""跟"345"比较得到"0"的字符串,"0"的字符串跟""比较得到"0"的字符串,所以返回的是"01"或者是"10",我们可以用下面这个rdd来验证。
scala> val rdd = sc.parallelize(List("12","23","345","67"),2)
rdd: org.apache.spark.rdd.RDDString = ParallelCollectionRDD10 at parallelize at <console>:24
scala> rdd.aggregate("")((x,y) => math.min(x.length,y.length).toString,(x,y) => x y)
res30: String = 11
这里唯一的不同就在于"0"的字符串跟"67"比较得到"1"的字符串
scala> val pairRDD = sc.parallelize(List(("cat",2),("cat",5),("mouse",4),("cat",12),("dog",12),("mouse",2)),2)
pairRDD: org.apache.spark.rdd.RDD(String, Int) = ParallelCollectionRDD11 at parallelize at <console>:24
scala> pairRDD.aggregateByKey(0)(_ _,_ _).collect //各个分区相加,再聚合相加
res33: Array(String, Int) = Array((dog,12), (cat,19), (mouse,6))
scala> pairRDD.aggregateByKey(100)(_ _,_ _).collect
res34: Array(String, Int) = Array((dog,112), (cat,219), (mouse,206))
初始值100,会在每个分区的都加一次,dog在第一个分区中没有,第二个分区中加得112;cat在第一个分区和第二个分区都有,所以100会加两次,得到219,mouse同理。
当然我们只是为了获取对偶元组key的value值的和,可以使用reduceByKey,这里不需要分区,结果跟初始值为0的aggregateByKey相同
scala> pairRDD.reduceByKey(_ _).collect
res31: Array(String, Int) = Array((dog,12), (cat,19), (mouse,6))
scala> pairRDD.aggregateByKey(100)(_ _,_ _).saveAsTextFile("hdfs://192.168.5.182:8020/aggbk")
root@host2 bin# ./hdfs dfs -ls /aggbk
Found 3 items
-rw-r--r-- 3 root supergroup 0 2018-11-16 17:22 /aggbk/_SUCCESS
-rw-r--r-- 3 root supergroup 20 2018-11-16 17:22 /aggbk/part-00000
-rw-r--r-- 3 root supergroup 12 2018-11-16 17:22 /aggbk/part-00001
root@host2 bin# ./hdfs dfs -cat /aggbk/part-00000
(dog,112)
(cat,219)
root@host2 bin# ./hdfs dfs -cat /aggbk/part-00001
(mouse,206)
root@host2 bin#
将初始值100的结果保存进hadoop hdfs中,因为我们创建RDD的时候是2个分区,所以这里只有2个part文件,查看结果跟之前collect相同。
scala> val rdd = sc.parallelize(List(("a",1),("b",2)))
rdd: org.apache.spark.rdd.RDD(String, Int) = ParallelCollectionRDD18 at parallelize at <console>:24
scala> rdd.collectAsMap //将结果收集转换成Map
res36: scala.collection.MapString,Int = Map(b -> 2, a -> 1)
scala> rdd.mapValues(_ * 100).collectAsMap //将value乘以100,收集成Map
res37: scala.collection.MapString,Int = Map(b -> 200, a -> 100)
RDD的执行过程,先把List(1,2,3,4,5)分3个区,生成task,推送到3个Worker的Executor中,在Executor中经过计算,得到结果,再收集回Driver中,以数组的形式返回,返回的结果,有快有慢,但是他依然会按照分区编号来进行组装成一个Array,所以他的顺序并不会变化。
scala> val rdd = sc.parallelize(List(1,2,3,4,5),3)
rdd: org.apache.spark.rdd.RDDInt = ParallelCollectionRDD2 at parallelize at <console>:24
scala> rdd.map(_ * 10).collect
res19: ArrayInt = Array(10, 20, 30, 40, 50)
scala> rdd.map(_ * 10).collect
res20: ArrayInt = Array(10, 20, 30, 40, 50)
scala> rdd.map(_ * 10).collect
res21: ArrayInt = Array(10, 20, 30, 40, 50)
这里无论执行多少次,顺序都不会变。
如果要将结果保存到数据库中,当数据量过大时,应该通过Executor直接写入数据库,而不是通过Driver收集再存入数据库。
scala> val rdd = sc.parallelize(List(("a",1),("b",2),("b",2),("c",2),("c",1)))
rdd: org.apache.spark.rdd.RDD(String, Int) = ParallelCollectionRDD6 at parallelize at <console>:24
scala> rdd.countByKey() //跟对偶元组的Value无关,只看Key的出现次数
res22: scala.collection.MapString,Long = Map(a -> 1, b -> 2, c -> 2)
scala> val rdd = sc.parallelize(List(("a",1),("b",2),("b",2),("c",2),("c",1),("d",4),("d",2),("e",1)))
rdd: org.apache.spark.rdd.RDD(String, Int) = ParallelCollectionRDD10 at parallelize at <console>:24
scala> val rdd1 = rdd.filterByRange("b","d") //以对偶数组的Key为过滤条件,只取"b"到"d"的范围的元组
rdd1: org.apache.spark.rdd.RDD(String, Int) = MapPartitionsRDD11 at filterByRange at <console>:26
scala> rdd1.collect
res24: Array(String, Int) = Array((b,2), (b,2), (c,2), (c,1), (d,4), (d,2))
scala> val a = sc.parallelize(List(("a","1 2"),("b","3 4")))
a: org.apache.spark.rdd.RDD(String, String) = ParallelCollectionRDD12 at parallelize at <console>:24
scala> a.flatMapValues(_.split(" ")).collect //对对偶元组的Value进行扁平化处理
res25: Array(String, String) = Array((a,1), (a,2), (b,3), (b,4))
scala> val rdd = sc.parallelize(List("dog","wolf","cat","bear"),2)
rdd: org.apache.spark.rdd.RDDString = ParallelCollectionRDD14 at parallelize at <console>:24
scala> val rdd1 = rdd.map(x => (x.length,x)) //将rdd的元素转成带对偶元组的集合,形成一个新的RDD的rdd1
rdd1: org.apache.spark.rdd.RDD(Int, String) = MapPartitionsRDD15 at map at <console>:26
scala> rdd1.collect
res26: Array(Int, String) = Array((3,dog), (4,wolf), (3,cat), (4,bear))
现在我们要将rdd1以相同的Key,将Value拼接起来,有以下三种方法
scala> rdd1.aggregateByKey("")(_ _,_ _).collect
res27: Array(Int, String) = Array((4,bearwolf), (3,dogcat))
scala> rdd1.aggregateByKey("")(_ _,_ _).collect
res28: Array(Int, String) = Array((4,wolfbear), (3,catdog))
scala> rdd1.reduceByKey(_ _).collect
res40: Array(Int, String) = Array((4,bearwolf), (3,dogcat))
scala> rdd1.foldByKey("")(_ _).collect
res41: Array(Int, String) = Array((4,bearwolf), (3,dogcat))
其实这3种方法都可以实现分散聚合,是因为他们都调用了同一个底层方法combineByKeyWithClassTag
scala> val rdd = sc.parallelize(List(1,2,3,4,5,6,7,8,9),2)
rdd: org.apache.spark.rdd.RDDInt = ParallelCollectionRDD31 at parallelize at <console>:24
scala> rdd.foreach(e => println(e * 100))
这个foreach我们看到没有任何返回,其原因就在于这是在executor上执行的,并没有返回Driver.我们来看Spark的控制台
这里有一个Job Id为42的foreach,一直点进去可以看到
我们点击Tasks(2)的stdout可以看到当index为0时
当index为1时,可以看到
说明他们只是在executor中执行了rdd.foreach(e => println(e * 100))这条语句。
scala> rdd.foreachPartition(it => it.foreach(x => println(x * 10000))) //一次性拿出一个分区的数据放入迭代器,由迭代器来打印
我们可以看到这里也没有返回值,在Spark控制台中,可以看到
说明他也是在Executor中执行了该语句,并没有返回到Driver.
当我们要将Executor中的数据写入到数据库时,使用foreachPartition一次性拿出一个分区的数据,与数据库建立一次连接,就可以全部写进去,而使用foreach则需要每拿出一条数据就要与数据库建立一次连接,这样非常低效,而且消耗非常巨大。
scala> val pairRDD = sc.parallelize(List(("hello",2),("jerry",3),("hello",4),("jerry",1)),2)
pairRDD: org.apache.spark.rdd.RDD(String, Int) = ParallelCollectionRDD1 at parallelize at <console>:24
scala> val rdd = pairRDD.combineByKey(x => x,(m: Int,n: Int) => m n,(a: Int,b: Int) => a b)
rdd: org.apache.spark.rdd.RDD(String, Int) = ShuffledRDD2 at combineByKey at <console>:26
combineByKey是一个底层的算子,必须要声明参数的类型,不能使用类似_ _的写法;ShuffledRDD是把有相同的Key的对偶元组放到同一个Executor中,再进行运算。
scala> rdd.collect
res1: Array(String, Int) = Array((hello,6), (jerry,4))
我们来看一个把各种动物按照单双来进行分组的例子
scala> val rdd = sc.parallelize(List("dog","cat","gnu","salmon","rabbit","turkey","wolf","bear","bee"),3)
rdd: org.apache.spark.rdd.RDDString = ParallelCollectionRDD0 at parallelize at <console>:24
scala> val rdd1 = sc.parallelize(List(1,1,2,2,2,1,2,2,2),3)
rdd1: org.apache.spark.rdd.RDDInt = ParallelCollectionRDD1 at parallelize at <console>:24
scala> val rdd2 = rdd1.zip(rdd) //将两个RDD的集合合并成一个对偶元组的集合
rdd2: org.apache.spark.rdd.RDD(Int, String) = ZippedPartitionsRDD22 at zip at <console>:28
scala> rdd2.collect
res0: Array(Int, String) = Array((1,dog), (1,cat), (2,gnu), (2,salmon), (2,rabbit), (1,turkey), (2,wolf), (2,bear), (2,bee))
scala> import scala.collection.mutable.ListBuffer
import scala.collection.mutable.ListBuffer
scala> val rdd3 = rdd2.combineByKey(x => ListBuffer(x),(m: ListBufferString,n: String) => m = n,(a: ListBufferString,b: ListBufferString) => a = b)
rdd3: org.apache.spark.rdd.RDD[(Int, scala.collection.mutable.ListBufferString)] = ShuffledRDD4 at combineByKey at <console>:31
第一个函数x => ListBuffer(x)是将分好组的各种Key(这里Key为数字)的第一个Value(Value为动物)放进一个单独的ListBuffer中,比如第一个分区中只有ListBuffer(dog)和ListBuffer(gnu),没有cat,因为cat不是1的第一个Value,其他分区以此类推;第二个函数(m: ListBufferString,n: String) => m = n将没有放进ListBuffer中的其他Value放进有相同Key的ListBuffer中,比如第一个分区中有ListBuffer(dog,cat),ListBuffer(gnu),此时只是在各个分区分别操作;第三个函数(a: ListBufferString,b: ListBufferString) => a = b进行所有分区整体聚合,将所有相同Key的ListBuffer合并,此时是一个Shuffled操作,会将有相同Key的ListBuffer放入到同一个机器中,计算完再合并。
scala> rdd3.collect
res2: Array[(Int, scala.collection.mutable.ListBufferString)] = Array((1,ListBuffer(dog, cat, turkey)), (2,ListBuffer(salmon, rabbit, gnu, wolf, bear, bee)))
整体概念图如下
将结果保存到hadoop hdfs中
scala> rdd3.saveAsTextFile("hdfs://192.168.5.182:8020/combine")
root@host2 bin# ./hdfs dfs -ls /combine
Found 4 items
-rw-r--r-- 3 root supergroup 0 2018-11-23 17:14 /combine/_SUCCESS
-rw-r--r-- 3 root supergroup 0 2018-11-23 17:14 /combine/part-00000
-rw-r--r-- 3 root supergroup 33 2018-11-23 17:14 /combine/part-00001
-rw-r--r-- 3 root supergroup 53 2018-11-23 17:14 /combine/part-00002
root@host2 bin# ./hdfs dfs -cat /combine/part-00001
(1,ListBuffer(turkey, dog, cat))
root@host2 bin# ./hdfs dfs -cat /combine/part-00002
(2,ListBuffer(gnu, wolf, bear, bee, salmon, rabbit))
虽然有3个分区,但是Shuffled以后,只有2个Key(1和2),所以只有两个文件有数据,但是有3个part文件。
我们可以重新定义rdd3的分区数
scala> import org.apache.spark.HashPartitioner
import org.apache.spark.HashPartitioner
scala> val rdd3 = rdd2.combineByKey(x => ListBuffer(x),(m: ListBufferString,n: String) => m = n,(a: ListBufferString,b: ListBufferString) => a = b,new HashPartitioner(2),true,null)
rdd3: org.apache.spark.rdd.RDD[(Int, scala.collection.mutable.ListBufferString)] = ShuffledRDD6 at combineByKey at <console>:32
重新保存到hadoop hdfs中
scala> rdd3.saveAsTextFile("hdfs://192.168.5.182:8020/combine1")
root@host2 bin# ./hdfs dfs -ls /combine1
Found 3 items
-rw-r--r-- 3 root supergroup 0 2018-11-23 17:27 /combine1/_SUCCESS
-rw-r--r-- 3 root supergroup 53 2018-11-23 17:27 /combine1/part-00000
-rw-r--r-- 3 root supergroup 33 2018-11-23 17:27 /combine1/part-00001
此时可以看到新保存的结果只有2个part文件,并且都有数据。