前言
Spark目前支持Hash分区、Range分区和用户自定义分区。Hash分区为当前的默认分区。分区器直接决定了RDD中分区的个数、RDD中每条数据经过Shuffle后进入哪个分区和Reduce的个数。
注意:
- 只有
Key-Value
类型的RDD才有分区器,非Key-Value类型的RDD分区的值是None
- 每个RDD的分区ID范围:
0~numPartitions-1
,决定这个值是属于那个分区的。
与分区器相关的算子
这些算子都必须为
k-v
类型,并且可以指定分区器。
- partitionBy():按照K重新分区
函数签名:def
partitionBy
(partitioner: Partitioner): RDD[(K, V)] - reduceByKey():按照K聚合V
函数签名:def
reduceByKey
(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] - groupByKey():按照K重新分组
函数签名:def
groupByKey
(partitioner: Partitioner): RDD[(K, Iterable[V])] - reduceByKey():按照key进行聚合,在shuffle之前有combine(预聚合)操作,返回结果是RDD[K,V]。
函数签名:def
reduceByKey
(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] - aggregateByKey()():按照K处理分区内和分区间逻辑
函数签名:def
aggregateByKey
[U: ClassTag](zeroValue: U, partitioner: Partitioner)(seqOp: (U, V) => U,combOp: (U, U) => U): RDD[(K, U)] - foldByKey()():作用和reduceBykey一样,但是可以指定一个默认值
函数签名:def
foldByKey
(zeroValue: V,partitioner: Partitioner)(func: (V, V) => V): RDD[(K, V)] - combineByKey():转换结构后分区内和分区间操作
函数签名:def
combineByKey
[C](createCombiner: V => C,mergeValue: (C, V) => C,mergeCombiners: (C, C) => C,partitioner: Partitioner,mapSideCombine: Boolean = true,serializer: Serializer = null): RDD[(K, C)]
RDD中分区器
RDD中的分区器都是Partitioner
的之类
abstract class Partitioner extends Serializable {
def numPartitions: Int
def getPartition(key: Any): Int
}
默认有三个分区器 自定义分区
- HashPartitioner
- RangePartitioner
- PythonPartitioner(由spark内部使用,我们无法使用)
- 自定义分区器,继承
Partitioner
抽象类,自己实现分区。
所以主要了解HashPartitioner
分区器,RangePartitioner
分区器及自定义分区器。
Hash分区
HashPartitioner分区的原理:对于给定的key,计算其hashCode,并除以分区的个数取余,如果余数小于0,则用余数 分区的个数(否则加0),最后返回的值就是这个key的所属的分区ID。
HashPartitioner分区器源码
代码语言:javascript复制// partitions 表示分区个数,是由用户指定的。
class HashPartitioner(partitions: Int) extends Partitioner {
// 进行断言,分区数不能小于等于0
require(partitions >= 0, s"Number of partitions ($partitions) cannot be negative.")
// 绑定分区数
def numPartitions: Int = partitions
//对key进行计算,获取分区
def getPartition(key: Any): Int = key match {
case null => 0
case _ => Utils.nonNegativeMod(key.hashCode, numPartitions)
}
// 这个不用管
override def equals(other: Any): Boolean = other match {
case h: HashPartitioner =>
h.numPartitions == numPartitions
case _ =>
false
}
override def hashCode: Int = numPartitions
}
HashPartitioner
计算分区的逻辑
// 对于给定的key,计算其hashCode,并除以分区的个数取余,如果余数小于0,则用余数 分区的个数(否则加0)
def nonNegativeMod(x: Int, mod: Int): Int = {
val rawMod = x % mod
rawMod (if (rawMod < 0) mod else 0)
}
案例演示:
代码语言:javascript复制 @Test
def hashPartitionerTest(): Unit ={
val conf=new SparkConf().setMaster("local[4]").setAppName("test")
val sc=new SparkContext(conf)
val list=List("张三"->18,"李四"->19,"王五"->20,"赵六"->21,"村长"->22,"福来"->23,"钱多多"->24,"房栋栋"->25)
val rdd1: RDD[(String, Int)] = sc.parallelize(list, 4)
val value: RDD[(String, Iterable[Int])] = rdd1.groupByKey()
value.mapPartitionsWithIndex((index,it)=>{
println(index,it.toList)
it
}).collect
}
RDD默认分区就是HashPartitioner
,分区就是调用者(RDD)的分区数
(3,List((房栋栋,CompactBuffer(25))))
(1,List((张三,CompactBuffer(18)), (李四,CompactBuffer(19)), (钱多多,CompactBuffer(24)), (王五,CompactBuffer(20))))
(2,List((村长,CompactBuffer(22)), (福来,CompactBuffer(23))))
(0,List((赵六,CompactBuffer(21))))
也可以明确指定分区器(new HashPartitioner(partitions)
) partitions=分区个数
@Test
def hashPartitionerTest(): Unit ={
val conf=new SparkConf().setMaster("local[4]").setAppName("test")
val sc=new SparkContext(conf)
val list=List("张三"->18,"李四"->19,"王五"->20,"赵六"->21,"村长"->22,"福来"->23,"钱多多"->24,"房栋栋"->25)
val rdd1: RDD[(String, Int)] = sc.parallelize(list, 4)
val value: RDD[(String, Iterable[Int])] = rdd1.groupByKey(new HashPartitioner(4))
value.mapPartitionsWithIndex((index,it)=>{
println(index,it.toList)
it
}).collect
}
指定4个分区
代码语言:javascript复制(0,List((赵六,CompactBuffer(21))))
(1,List((张三,CompactBuffer(18)), (李四,CompactBuffer(19)), (钱多多,CompactBuffer(24)), (王五,CompactBuffer(20))))
(2,List((村长,CompactBuffer(22)), (福来,CompactBuffer(23))))
(3,List((房栋栋,CompactBuffer(25))))
Ranger分区
RangePartitionz作用:将一定范围内的数映射到某一个分区内,尽量保证每个分区中的数据量均匀,而且分区与分区之间是有序的,一个分区中的元素肯定都是比另一个分区内的元素小或大,但是分区内的元素是不能保证顺序的。简单的说就是将一定范围内的映射到某一个分区内。
实现过程为:
- 先从整个RDD中采用水塘抽样算法,抽取出样本数据,将样本数据排序,计算出每个分区的最大key值,形成一个Array[Key]类型的数组变量rangeBounds;
- 判断key在rangeBounds中所处的范围,给出该key值在下一个RDD中的分区id下标;
该分区器要求RDD中的KEY类型必须是可以排序的
RangePartitioner 参数列表
代码语言:javascript复制class RangePartitioner[K : Ordering : ClassTag, V](
partitions: Int, # 分区个数
rdd: RDD[_ <: Product2[K, V]], # 指定对按个RDD 进行抽样
private var ascending: Boolean = true, 指定排序规则(默认为升序,分区间是有序的,分区内不一定有序)
val samplePointsPerPartitionHint: Int = 20)
extends Partitioner {...}
rangeBounds 中决定了抽样范围
代码语言:javascript复制private var rangeBounds: Array[K] = {
if (partitions <= 1) {
Array.empty
} else {
// This is the sample size we need to have roughly balanced output partitions, capped at 1M.
// Cast to double to avoid overflowing ints or longs
val sampleSize = math.min(samplePointsPerPartitionHint.toDouble * partitions, 1e6)
// Assume the input partitions are roughly balanced and over-sample a little bit.
val sampleSizePerPartition = math.ceil(3.0 * sampleSize / rdd.partitions.length).toInt
val (numItems, sketched) = RangePartitioner.sketch(rdd.map(_._1), sampleSizePerPartition)
if (numItems == 0L) {
Array.empty
} else {
// If a partition contains much more than the average number of items, we re-sample from it
// to ensure that enough items are collected from that partition.
val fraction = math.min(sampleSize / math.max(numItems, 1L), 1.0)
val candidates = ArrayBuffer.empty[(K, Float)]
val imbalancedPartitions = mutable.Set.empty[Int]
sketched.foreach { case (idx, n, sample) =>
if (fraction * n > sampleSizePerPartition) {
imbalancedPartitions = idx
} else {
// The weight is 1 over the sampling probability.
val weight = (n.toDouble / sample.length).toFloat
for (key <- sample) {
candidates = ((key, weight))
}
}
}
// 如果分区不均衡,重新采样
if (imbalancedPartitions.nonEmpty) {
// Re-sample imbalanced partitions with the desired sampling probability.
val imbalanced = new PartitionPruningRDD(rdd.map(_._1), imbalancedPartitions.contains)
// 计算种子
val seed = byteswap32(-rdd.id - 1)
// 调用 sample 进行采样。
val reSampled = imbalanced.sample(withReplacement = false, fraction, seed).collect()
val weight = (1.0 / fraction).toFloat
candidates = reSampled.map(x => (x, weight))
}
RangePartitioner.determineBounds(candidates, math.min(partitions, candidates.size))
}
}
}
具体的采样实现
代码语言:javascript复制 def sketch[K : ClassTag](
rdd: RDD[K],
sampleSizePerPartition: Int): (Long, Array[(Int, Long, Array[K])]) = {
val shift = rdd.id
// val classTagK = classTag[K] // to avoid serializing the entire partitioner object
val sketched = rdd.mapPartitionsWithIndex { (idx, iter) =>
// 计算种子
val seed = byteswap32(idx ^ (shift << 16))
val (sample, n) = SamplingUtils.reservoirSampleAndCount(
iter, sampleSizePerPartition, seed)
Iterator((idx, n, sample))
}.collect() // 会再次运算job任务
val numItems = sketched.map(_._2).sum
(numItems, sketched)
}
getPartition
会使用到rangeBounds
,计算key获取对应分区。
def getPartition(key: Any): Int = {
val k = key.asInstanceOf[K]
var partition = 0
if (rangeBounds.length <= 128) {
// If we have less than 128 partitions naive search
while (partition < rangeBounds.length && ordering.gt(k, rangeBounds(partition))) {
partition = 1
}
} else {
// Determine which binary search method to use only once.
partition = binarySearch(rangeBounds, k)
// binarySearch either returns the match location or -[insertion point]-1
if (partition < 0) {
partition = -partition-1
}
if (partition > rangeBounds.length) {
partition = rangeBounds.length
}
}
if (ascending) {
partition
} else {
rangeBounds.length - partition
}
}
对这一块源码,还不是很清楚,暂时记录一下。
案例演示:
代码语言:javascript复制 @Test
def rangePartitionerTest(): Unit ={
val conf=new SparkConf().setMaster("local[4]").setAppName("test")
val sc=new SparkContext(conf)
// 创建本地集合RDD
val list=List("1"->18,"2"->19,"3"->20,"4"->21,"5"->22,"6"->23,"7"->24,"8"->25)
val rdd1: RDD[(String, Int)] = sc.parallelize(list, 4)
//对数据进行分区,并使用RangePartitioner分区器,
val value: RDD[(String, Iterable[Int])] = rdd1.groupByKey(new RangePartitioner(4, rdd1))
value.mapPartitionsWithIndex((index,it)=>{
println(index,it.toList)
it
}).collect
value.foreach(e=>{
println(e._1,e._2.toList)
})
}
各个分区数据
代码语言:javascript复制(0,List((2,CompactBuffer(19)), (1,CompactBuffer(18))))
(2,List((5,CompactBuffer(22)), (6,CompactBuffer(23))))
(3,List((8,CompactBuffer(25)), (7,CompactBuffer(24))))
(1,List((4,CompactBuffer(21)), (3,CompactBuffer(20))))
Hash 与 Range的区别
- hash 是通过对key取hashcode%分区数(如果小于0就加上分区数,否则 0)的方式指定分区;Range是通过对RDD进行抽样,指定一个区间。然后计算key,确认key具体在那个区间中。
- hash 只是单纯的对key进行运算,不会重新运算job任务,range需要对分区进行抽样,需要运行一个job任务。
- RDD默认为HashPartitioner 分区器,即使不指定分区器默认的就是。Ragen需要明确指定。
自定义分区
代码语言:javascript复制上面说过,我们能使用spark 分区器的就有两种,HashPartitioner和RangePartitioner;很多时候根据业务的需求,需要自定义分区。如下数据: 需求要求 a,b,c华为一个分区,d,e,f换分为一个分区,剩下的分为一个分区。
val list=List("a"->100,"b"->100,"c"->100,"d"->100,"e"->100,"f"->100,"g"->100,"h"->100,"i"->100,"j"->100)
依样画葫芦娃
我们也许不知道怎么自定义一个分区,那么可以看看spark 自带的是怎么写的;如HashPartitioner
class HashPartitioner(partitions: Int) extends Partitioner {
require(partitions >= 0, s"Number of partitions ($partitions) cannot be negative.")
def numPartitions: Int = partitions
def getPartition(key: Any): Int = key match {
case null => 0
case _ => Utils.nonNegativeMod(key.hashCode, numPartitions)
}
override def equals(other: Any): Boolean = other match {
case h: HashPartitioner =>
h.numPartitions == numPartitions
case _ =>
false
}
override def hashCode: Int = numPartitions
}
查看 HashPartitioner
的父类(Partitioner
)
abstract class Partitioner extends Serializable {
def numPartitions: Int
def getPartition(key: Any): Int
}
总结:
- 继承
Partitioner
类,它是一个抽象类。 - 实现父类的
numPartitions
函数 - 实现父类的
getPartition
函数
自定义分区器
代码语言:javascript复制/**
* 自定义分区器
* partitions 默认为3
* @param partitions
*/
class CustomPartitioner(partitions: Int) extends Partitioner{
override def numPartitions: Int = partitions
override def getPartition(key: Any): Int = key match {
case "a"|"b"|"c" =>1
case "d"|"e"|"f" =>2
case _=>0
}
}
使用自定义分区器
代码语言:javascript复制 @Test
def partitionByTest(): Unit ={
val list=List("a"->100,"b"->100,"c"->100,"d"->100,"e"->100,"f"->100,"g"->100,"h"->100,"i"->100,"j"->100)
val rdd=sc.parallelize(list,4)
val rdd3: RDD[(String, Int)] = rdd.partitionBy(new CustomPartitioner(3))
rdd3.mapPartitionsWithIndex((index,it)=>{
println(s"$index=${it.toList}")
it
}).collect
}
结果
代码语言:javascript复制0=List((g,100), (h,100), (i,100), (j,100))
1=List((a,100), (b,100), (c,100))
2=List((d,100), (e,100), (f,100))
注意:若出现这种序列化
问题
org.apache.spark.SparkException: Task not serializable
解决方式:
- 将
CustomPartitioner
重新定义class文件创建 - 不要再
class
或object
中创建(如下)
class Test{
class CustomPartitioner(partitions: Int) extends Partitioner{
...
}
}
抽离出class
,在外面定义
class Test{
...
}
class CustomPartitioner(partitions: Int) extends Partitioner{
...
}
- 若在一个class文件中创建,请使外部实现
Serializable
接口
class Test extends Serializable {
class CustomPartitioner(partitions: Int) extends Partitioner{
...
}
}
- 实现
Serializable
接口之后,出现部分属性无法序列化,可以使用@transient
注解忽略。
class Test extends Serializable {
@transient
val name="a"
class CustomPartitioner(partitions: Int) extends Partitioner{
...
}
}
该问题的原因:
Driver最终会将Task交给Executor进行执行,其中就需要进行将对象进行序列化,由于CustomPartitioner类在另一个class内部中,序列化CustomPartitioner就需要将外部类先进性序列化。而外部类并没有进行序列化,所以就报了这样的错。
数据倾斜
无论是HashPartitioner
还是RangePartitioner
都可能会有数据倾斜的问题产生,但是需要注意的是,出现数据倾斜是数据的原因,而不是分区器的原因,是需要单独处理的。