分类:
- partitionBy() 按照K重新分区
- 自定义分区
- reduceByKey()按照K聚合V
- groupByKey()按照K重新分组
- reduceByKey和groupByKey区别
- aggregateByKey()按照K处理分区内和分区间逻辑
- foldByKey()分区内和分区间相同的aggregateByKey()
- combineByKey()转换结构后分区内和分区间操作
SparkContext
代码语言:javascript复制SparkContext 定义成在全局范围,配置如下;
val conf=new SparkConf().setMaster("local[2]").setAppName("test")
val sc=new SparkContext(conf)
partitionBy()
将RDD[K,V]中的K按照指定Partitioner重新进行分区; 如果原有的RDD和新的RDD是一致的话就不进行分区,否则会产生Shuffle过程。
定义一个集合,存放map元素(a-j);默认分区4个
代码语言:javascript复制 val list=List("a"->100,"b"->100,"c"->100,"d"->100,"e"->100,"f"->100,"g"->100,"h"->100,"i"->100,"j"->100)
val rdd=sc.parallelize(list,4)
查看默认分区情况
代码语言:javascript复制 rdd.mapPartitionsWithIndex((index,it)=>{
println(s"$index=${it.toList}")
it
}).collect
结果: 几乎比较均匀的分配到各个分区中
代码语言:javascript复制0=List((a,100), (b,100))
1=List((c,100), (d,100), (e,100))
2=List((f,100), (g,100))
3=List((h,100), (i,100), (j,100))
使用 partitionBy
按照key进行分区
partitionBy
源码,需要让指定一个分区器(Partitioner
)
def partitionBy(partitioner: Partitioner): RDD[(K, V)] = self.withScope {
if (keyClass.isArray && partitioner.isInstanceOf[HashPartitioner]) {
throw new SparkException("HashPartitioner cannot partition array keys.")
}
if (self.partitioner == Some(partitioner)) {
self
} else {
new ShuffledRDD[K, V, V](self, partitioner)
}
}
查看spark有那些分区器
- HashPartitioner:默认的分区器,通过对key进行hash运算,取余分区数的方式计算分区
- RangePartitioner:
- PythonPartitioner:spark内部使用的,外部无法使用
- 自定义分区:开发者能使用的只有
HashPartitioner
和RangePartitioner
两种,若都无法满足我们的需求,就只能自定义分区器了。
使用HashPartitioner
作为 partitionBy
的分区器
// HashPartitioner 需要指定一个分区数
val rdd2: RDD[(String, Int)] = rdd.partitionBy(new HashPartitioner(2))
// 查看分区情况
rdd2.mapPartitionsWithIndex((index,it)=>{
println(s"$index=${it.toList}")
it
}).collect
结果
代码语言:javascript复制1=List((a,100), (c,100), (e,100), (g,100), (i,100))
0=List((b,100), (d,100), (f,100), (h,100), (j,100))
注意:
- 包位置需要指定为
org.apache.spark.HashPartitioner
- 需要指定一个分区数new HashPartitioner(
分区数
) 完整代码
@Test
def partitionByTest(): Unit ={
val list=List("a"->100,"b"->100,"c"->100,"d"->100,"e"->100,"f"->100,"g"->100,"h"->100,"i"->100,"j"->100)
val rdd=sc.parallelize(list,4)
rdd.mapPartitionsWithIndex((index,it)=>{
println(s"$index=${it.toList}")
it
}).collect
val rdd2: RDD[(String, Int)] = rdd.partitionBy(new HashPartitioner(2))
rdd2.mapPartitionsWithIndex((index,it)=>{
println(s"$index=${it.toList}")
it
}).collect
}
源码分析HashPartitioner
是如何进行分区的
class HashPartitioner(partitions: Int) extends Partitioner {
require(partitions >= 0, s"Number of partitions ($partitions) cannot be negative.")
// 将分区数传给numPartitions
def numPartitions: Int = partitions
// 这里才是重点;
def getPartition(key: Any): Int = key match {
// 通过模式匹配,判断key是否为null,若为null指定到0分区
case null => 0
// 获取 key的hashCode ; numPartitions 传入进来的分区数,也就是2
case _ => Utils.nonNegativeMod(key.hashCode, numPartitions)
}
// 这个不太重要
override def equals(other: Any): Boolean = other match {
case h: HashPartitioner =>
h.numPartitions == numPartitions
case _ =>
false
}
override def hashCode: Int = numPartitions
}
nonNegativeMod
: 真正计算出分区的地方
// x:上面传入的 hashcode
// mod :上面传入的分区数
def nonNegativeMod(x: Int, mod: Int): Int = {
// hashcode取余分区数 ,取余出来的可能会是一个负数
// 例如:scala> "hadoop".hashCode %2
// res10: Int = -1
val rawMod = x % mod
// 分区数肯定不能为负数,于是他做了这样的判断,if (rawMod < 0)
// 比如 rawMod =-1 ;就使用 mod rawMod = 1 ;否则 mod 0;
// 这样就很好的解决了取余后为分区可能负数的情况了。
rawMod (if (rawMod < 0) mod else 0)
}
自定义分区
代码语言:javascript复制上面说过,我们能使用spark 分区器的就有两种,HashPartitioner和RangePartitioner;很多时候根据业务的需求,需要自定义分区。如下数据: 需求要求 a,b,c华为一个分区,d,e,f换分为一个分区,剩下的分为一个分区。
val list=List("a"->100,"b"->100,"c"->100,"d"->100,"e"->100,"f"->100,"g"->100,"h"->100,"i"->100,"j"->100)
依样画葫芦娃
我们也许不知道怎么自定义一个分区,那么可以看看spark 自带的是怎么写的;如HashPartitioner
class HashPartitioner(partitions: Int) extends Partitioner {
require(partitions >= 0, s"Number of partitions ($partitions) cannot be negative.")
def numPartitions: Int = partitions
def getPartition(key: Any): Int = key match {
case null => 0
case _ => Utils.nonNegativeMod(key.hashCode, numPartitions)
}
override def equals(other: Any): Boolean = other match {
case h: HashPartitioner =>
h.numPartitions == numPartitions
case _ =>
false
}
override def hashCode: Int = numPartitions
}
查看 HashPartitioner
的父类(Partitioner
)
abstract class Partitioner extends Serializable {
def numPartitions: Int
def getPartition(key: Any): Int
}
总结:
- 继承
Partitioner
类,它是一个抽象类。 - 实现父类的
numPartitions
函数 - 实现父类的
getPartition
函数
自定义分区器
代码语言:javascript复制/**
* 自定义分区器
* partitions 默认为3
* @param partitions
*/
class CustomPartitioner(partitions: Int) extends Partitioner{
override def numPartitions: Int = partitions
override def getPartition(key: Any): Int = key match {
case "a"|"b"|"c" =>1
case "d"|"e"|"f" =>2
case _=>0
}
}
使用自定义分区器
代码语言:javascript复制 @Test
def partitionByTest(): Unit ={
val list=List("a"->100,"b"->100,"c"->100,"d"->100,"e"->100,"f"->100,"g"->100,"h"->100,"i"->100,"j"->100)
val rdd=sc.parallelize(list,4)
val rdd3: RDD[(String, Int)] = rdd.partitionBy(new CustomPartitioner(3))
rdd3.mapPartitionsWithIndex((index,it)=>{
println(s"$index=${it.toList}")
it
}).collect
}
结果
代码语言:javascript复制0=List((g,100), (h,100), (i,100), (j,100))
1=List((a,100), (b,100), (c,100))
2=List((d,100), (e,100), (f,100))
注意:若出现这种序列化
问题
org.apache.spark.SparkException: Task not serializable
解决方式:
- 将
CustomPartitioner
重新定义class文件创建 - 不要再
class
或object
中创建(如下)
class Test{
class CustomPartitioner(partitions: Int) extends Partitioner{
...
}
}
抽离出class
,在外面定义
class Test{
...
}
class CustomPartitioner(partitions: Int) extends Partitioner{
...
}
- 若在一个class文件中创建,请使外部实现
Serializable
接口
class Test extends Serializable {
class CustomPartitioner(partitions: Int) extends Partitioner{
...
}
}
- 实现
Serializable
接口之后,出现部分属性无法序列化,可以使用@transient
注解忽略。
class Test extends Serializable {
@transient
val name="a"
class CustomPartitioner(partitions: Int) extends Partitioner{
...
}
}
该问题的原因:
Driver最终会将Task交给Executor进行执行,其中就需要进行将对象进行序列化,由于CustomPartitioner类在另一个class内部中,序列化CustomPartitioner就需要将外部类先进性序列化。而外部类并没有进行序列化,所以就报了这样的错。
reduceByKey()
功能说明:该操作可以将RDD[K,V]中的元素按照相同的K对V进行聚合。其存在多种重载形式,还可以设置新RDD的分区数。
- 需求:考试成绩下来了,统计语文,数学,英语各成绩的总成绩
val rdd1 = sc.parallelize(List("语文"->10,"语文"->20,"数学"->15,"语文"->30,"数学"->33,"英语"->12,"语文"->40,"数学"->21,"英语"->50,"英语"->100),2)
代码语言:javascript复制val result= rdd1.reduceByKey((x, y) => {
x y
}).collect
println(result.toList)
结果
代码语言:javascript复制List((数学,69), (英语,162), (语文,100))
- 原理分析 查看分区情况
rdd1.mapPartitionsWithIndex((index,it)=>{
println(s"$index=${it.toList}")
it
}).collect
结果
代码语言:javascript复制0=List((语文,10), (语文,20), (数学,15), (语文,30), (数学,33))
1=List((英语,12), (语文,40), (数学,21), (英语,50), (英语,100))
原理图:
上传上去不太清楚的酱紫,我也
说明:
- 根据分区大小(这里设置分区数为2),设置将数据写入分布到各个分区中,
0=List((语文,10), (语文,20), (数学,15), (语文,30), (数学,33))
1=List((英语,12), (语文,40), (数学,21), (英语,50), (英语,100))
- 程序运行时会将数据写入缓冲区中(MapReduce流程差不多),缓冲区处于内存中,无法无限存入数据,所以会溢写入磁盘中。
- 在不影响程序最终结果的情况下使用
combiner
可以更好的提高效率,在reduceByKey
中无论如何都会进行一次combiner
(用于提高效率)。
- 对数据按照key进行分组,并再次调用 reduce程序代码(
如下
),对单个组的数据进行聚合运算
val result2=rdd1.reduceByKeyLocally((x, y) => {x y})
- 计算结果完成后再将数据溢写入磁盘。
- rdd2 类似于
reduce
,他会对分区类的数据再进进行聚合统计
- 最终得到想要的数据结果
List((数学,69), (英语,162), (语文,100))
- 完整代码
@Test
def reduceByKeyTest(): Unit ={
val rdd1 = sc.parallelize(List("语文"->10,"语文"->20,"数学"->15,"语文"->30,"数学"->33,"英语"->12,"语文"->40,"数学"->21,"英语"->50,"英语"->100),2)
val result= rdd1.reduceByKey((x, y) => {
x y
}).collect
println(result.toList)
rdd1.mapPartitionsWithIndex((index,it)=>{
println(s"$index=${it.toList}")
it
}).collect
val result2=rdd1.reduceByKeyLocally((x, y) => {
x y
})
println(result2.toList)
}
- 总结:
reduceByKey(func: (RDD Value值类型,RDD value值) => RDD value值): 根据key分组之后,所有该key的value值进行聚合 reduceByKey里面的函数是针对每个组的所有value值操作 reduceByKey 会经过一次shuffle
groupByKey()
groupByKey对每个key进行操作,但只生成一个seq,并不进行聚合。 该操作可以指定分区器或者分区数(默认使用HashPartitioner)
groupByKey 有三个重载方法
- groupByKey(partitioner: Partitioner) 指定一个分区器
- groupByKey():
底层实现就是调用了
groupByKey(partitioner: Partitioner)
默认的分区器为HashPartitioner
分区器的分区数默认为最开始配置的大小(2) - groupByKey(numPartitions: Int)
底层实现也是调用
groupByKey(partitioner: Partitioner)
; 并直接定分区器为HashPartitioner
创建HashPartitioner
分区器同时,也为其指定了分区数大小numPartitions
。
案例:使用默认的无参的groupByKey()
@Test
def groupByKeyTest(): Unit ={
val rdd1 = sc.parallelize(List("语文"->10,"语文"->20,"数学"->15,"语文"->30,"数学"->33,"英语"->12,"语文"->40,"数学"->21,"英语"->50,"英语"->100),2)
val value: RDD[(String, Iterable[Int])] = rdd1.groupByKey()
println(value.collect.toList)
}
结果:
代码语言:javascript复制List((数学,CompactBuffer(15, 33, 21)), (英语,CompactBuffer(12, 50, 100)), (语文,CompactBuffer(10, 20, 30, 40)))
reduceByKey和groupByKey区别
- reduceByKey存在
combiner
行为,性能更高 - groupByKey不存在
conbiner
行为,性能比较低
工作中推荐使用reduceByKey这种高性能shuffle算子
aggregateByKey()
foldByKey()
在scala中也有fold()函数,与reduce()唯一的区别就是,reduce会把第一个列表中第一个元作为参数的默认值,而fold(),可以指定一个默认值,其他操作和fold与reduce没有什么不同。在spark中foldByKey()和reduceBykey()亦是如此。
foldByKey 参数说明
代码语言:javascript复制def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)] = self.withScope {
foldByKey(zeroValue, defaultPartitioner(self))(func)
}
这种((zeroValue: V)(func: (V, V) => V)
)语法称为:函数柯里化
。
(zeroValue: V)
:需要指定一个默认值;
(func: (V, V) => V)
:具体的操作逻辑
案例: 统计各科总成绩,校长心情比较好,决定在总成绩的分数上再加一百分
代码语言:javascript复制 @Test
def foldByKeyTest(): Unit ={
val rdd = sc.parallelize(List("语文"->10,"语文"->20,"数学"->15,"语文"->30,"数学"->33,"英语"->12,"语文"->40,"数学"->21,"英语"->50,"英语"->100),2)
// 使用 foldByKey 数据汇总
val value: RDD[(String, Int)] = rdd.foldByKey(100)((v1, v2) => {
v1 v2
})
println(value.collect.toList)
}
结果:
代码语言:javascript复制List((数学,269), (英语,262), (语文,300))
aggregateByKey()
combineByKey()
combineByKey 参数说明:
代码语言:javascript复制def combineByKey[C](
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C): RDD[(K, C)] = self.withScope {
combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners)(null)
}
- createCombiner(转换数据的结构)
combineByKey() 会遍历分区中的所有元素,因此每个元素的键要么还没有遇到过,要么就和之前的某个元素的键相同。如果这是一个新的元素,combineByKey()会使用一个叫作createCombiner()的函数来创建那个键对应的累加器的初始值。
简单说明:在combiner阶段对每个组的第一个vlaue值进行转换
- mergeValue(分区内)
如果这是一个在处理当前分区之前已经遇到的键,它会使用mergeValue()方法将该键的累加器对应的当前值与这个新的值进行合并。
简单说明:combiner的聚合逻辑
- mergeCombiners(分区间)
由于每个分区都是独立处理的,因此对于同一个键可以有多个累加器。如果有两个或者更多的分区都有对应同一个键的累加器,就需要使用用户提供的mergeCombiners()方法将各个分区的结果进行合并。
简单说明:reduce的聚合逻辑
案例: 求每门学科的平均成绩 输入如下:
代码语言:javascript复制val rdd = sc.parallelize(List("语文"->10,"语文"->20,"数学"->15,"语文"->30,"数学"->33,"英语"->12,"语文"->40,"数学"->21,"英语"->50,"英语"->100),2)
完整代码如下:
代码语言:javascript复制@Test
def combineByKeyTest(): Unit ={
val rdd = sc.parallelize(List("语文"->10,"语文"->20,"数学"->15,"语文"->30,"数学"->33,"英语"->12,"语文"->40,"数学"->21,"英语"->50,"英语"->100),2)
// 求每门学科的平均成绩
val value: RDD[(String, (Int, Int))] = rdd.combineByKey(v => (v, 1), (c: (Int, Int), v: Int) => (c._1 v, c._2 1), (c1: (Int, Int), c2: (Int, Int)) => (c1._1 c2._1, c1._2 c2._2))
//获得各科成绩的总分数和个数
val result=value.collect.toList
println(result)
result.foreach(m=>{
m match {
case (curr,(totalScore,size)) => println(s"课程:$curr,总分:$totalScore,个数:$size,平均数:${totalScore.toDouble/size}")
}
})
结果:
代码语言:javascript复制List((数学,(69,3)), (英语,(162,3)), (语文,(100,4)))
课程:数学,总分:69,个数:3,平均数:23.0
课程:英语,总分:162,个数:3,平均数:54.0
课程:语文,总分:100,个数:4,平均数:25.0
代码分析(我觉得挺复杂的,有点太抽象了);
首先我们应该理解的时候,combineByKey各个阶段的操作都是针对于 value
而言,毕竟时Bykey
。combineByKey
中的C,V其实表示的含义就是传入的value
,返回的也是一个value
val value: RDD[(String, (Int, Int))] = rdd.combineByKey(
v => (v, 1),
(c: (Int, Int), v: Int) => (c._1 v, c._2 1), (c1: (Int, Int),
c2: (Int, Int)) => (c1._1 c2._1, c1._2 c2._2)
)
第一个参数:createCombiner
createCombiner: V => C :他希望我们传入进来C
的是一个value,就是一个个分数(10,20,15等),返回的V
将作为下一个函数参数的C
(mergeValue
)
v => (v, 1)
此时结合我们的业务,统计平均数,我们首先得知道语文有多少个,数学有多少个,英语多少个。如何通过combineByKey
来实现呢?结合createCombiner
的特性在combiner阶段对每个组的第一个vlaue值进行转换
,我们就可以将计算器(用1
标识)存放到value
中
结果应该是这样的。
"语文"->(10,1),"语文"->(20,1),"数学"->(15,1),"语文"->(30,1),"数学"->(33,1),
"英语"->(12,1),"语文"->(40,1),"数学"->(21,1),"英语"->(50,1),"英语"->(100,1)
由于是在combiner
阶段,在combiner阶段对每个组的第一个vlaue值进行转换
语文:
"语文"->(10,1)
数学:
代码语言:javascript复制"数学"->(15,1)
英语:
代码语言:javascript复制"英语"->(12,1)
然后就是第二个参数mergeValue
,他的解释是combiner的聚合逻辑
;待会解释是什么意思
先看看mergeValue
需要指定哪些参数 : (C, V) => C,
C:就是上一个函数参数(createCombiner
)返回的结果(如:(10,1),(20,1),(30,1))
V:表示带聚合的元素
返回的C
将会作为下一个函数参数的C
(mergeCombiners
的参数C
)。应该能明白了吧。
了解了mergeValue
各个参数的意思及返回参数的意思之后,再次回到业务中。
(c: (Int, Int), v: Int) => (c._1 v, c._2 1), (c1: (Int, Int):
拿语文进行举例
"语文"->(10,1)
第一次:c._1 表示成绩也是 元素(10,1),v: 表示 下一个带聚合的元素(20,1 1), 结果就是(30,2) 第二次:c._1 表示上一个结果(30,2),v: 表示 下一个带聚合的元素(30,2 1),结果就是(60,3) 第二次:c._1 表示上一个结果(60,3),v: 表示 下一个带聚合的元素(40,3 1),结果就是(100,4)
最终结果
代码语言:javascript复制语文 -> (100,4)
数学 -> (69,3)
英语 -> (162,3)
mergeValue之后,最终溢写到磁盘
。
mergeCombiners
就比较简单了,就是一个reduce操作。
注意:我上面的方式是建立在一个分区情况下,多个分区也是一样的流程。
mergeCombiners
中就是将多个 分区进行最后的聚合处理。
原理图:
我也是在学习阶段,理解可能没那么透彻,文章若有什么不对,希望可以指出来。
除了使用combineByKey
可以使用reduceByKey
的方式实现类似的功能,对比combineByKey
还更简单一点。
@Test
def combineByKeyTest(): Unit ={
val rdd = sc.parallelize(List("语文"->10,"语文"->20,"数学"->15,"语文"->30,"数学"->33,"英语"->12,"语文"->40,"数学"->21,"英语"->50,"英语"->100),2)
// 通过 map方式实现一样的功能
val rdd2=rdd.map(x=>{
(x._1,(x._2,1))
}).reduceByKey((v1,v2)=>{
(v1._1 v2._1,v1._2 v2._2)
})
println(rdd2.collect.toList)
}
结果:
代码语言:javascript复制List((数学,(69,3)), (英语,(162,3)), (语文,(100,4)))
其实reduceByKey
底层就是使用的是combineByKey
combineByKey
底层实现
def combineByKeyWithClassTag[C](
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C)(implicit ct: ClassTag[C]): RDD[(K, C)] = self.withScope {
// 注意这个函数
combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners, defaultPartitioner(self))
}
reduceByKey
底层实现
def reduceByKey(func: (V, V) => V): RDD[(K, V)] = self.withScope {
reduceByKey(defaultPartitioner(self), func)
}
在往reduceByKey(defaultPartitioner(self), func)
中点击
def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = self.withScope {
// 注意这个函数
// (v: V) => v ;自己转自己,啥都没干。
combineByKeyWithClassTag[V]((v: V) => v, func, func, partitioner)
}
看到了吗? 都是使用的是combineByKeyWithClassTag
来实现。