键值对RDD数据分区

2022-05-06 15:29:06 浏览数 (1)

前言

Spark目前支持Hash分区、Range分区和用户自定义分区。Hash分区为当前的默认分区。分区器直接决定了RDD中分区的个数、RDD中每条数据经过Shuffle后进入哪个分区和Reduce的个数。

注意:

  1. 只有Key-Value类型的RDD才有分区器,非Key-Value类型的RDD分区的值是None
  2. 每个RDD的分区ID范围:0~numPartitions-1,决定这个值是属于那个分区的。

与分区器相关的算子

这些算子都必须为k-v类型,并且可以指定分区器。

  • partitionBy():按照K重新分区 函数签名:def partitionBy(partitioner: Partitioner): RDD[(K, V)]
  • reduceByKey():按照K聚合V 函数签名:def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)]
  • groupByKey():按照K重新分组 函数签名:def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])]
  • reduceByKey():按照key进行聚合,在shuffle之前有combine(预聚合)操作,返回结果是RDD[K,V]。 函数签名:def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)]
  • aggregateByKey()():按照K处理分区内和分区间逻辑 函数签名:def aggregateByKey[U: ClassTag](zeroValue: U, partitioner: Partitioner)(seqOp: (U, V) => U,combOp: (U, U) => U): RDD[(K, U)]
  • foldByKey()():作用和reduceBykey一样,但是可以指定一个默认值 函数签名:def foldByKey(zeroValue: V,partitioner: Partitioner)(func: (V, V) => V): RDD[(K, V)]
  • combineByKey():转换结构后分区内和分区间操作 函数签名:def combineByKey[C](createCombiner: V => C,mergeValue: (C, V) => C,mergeCombiners: (C, C) => C,partitioner: Partitioner,mapSideCombine: Boolean = true,serializer: Serializer = null): RDD[(K, C)]

RDD中分区器

RDD中的分区器都是Partitioner的之类

代码语言:javascript复制
abstract class Partitioner extends Serializable {
  def numPartitions: Int
  def getPartition(key: Any): Int
}

默认有三个分区器 自定义分区

  1. HashPartitioner
  2. RangePartitioner
  3. PythonPartitioner(由spark内部使用,我们无法使用)
  4. 自定义分区器,继承Partitioner抽象类,自己实现分区。

所以主要了解HashPartitioner分区器,RangePartitioner分区器及自定义分区器。

Hash分区

HashPartitioner分区的原理:对于给定的key,计算其hashCode,并除以分区的个数取余,如果余数小于0,则用余数 分区的个数(否则加0),最后返回的值就是这个key的所属的分区ID。

HashPartitioner分区器源码

代码语言:javascript复制
// partitions 表示分区个数,是由用户指定的。
class HashPartitioner(partitions: Int) extends Partitioner { 
  // 进行断言,分区数不能小于等于0
  require(partitions >= 0, s"Number of partitions ($partitions) cannot be negative.")
  // 绑定分区数
  def numPartitions: Int = partitions
  //对key进行计算,获取分区
  def getPartition(key: Any): Int = key match {
    case null => 0
    case _ => Utils.nonNegativeMod(key.hashCode, numPartitions)
  }
  // 这个不用管
  override def equals(other: Any): Boolean = other match {
    case h: HashPartitioner =>
      h.numPartitions == numPartitions
    case _ =>
      false
  }

  override def hashCode: Int = numPartitions
}

HashPartitioner 计算分区的逻辑

代码语言:javascript复制
  // 对于给定的key,计算其hashCode,并除以分区的个数取余,如果余数小于0,则用余数 分区的个数(否则加0)
  def nonNegativeMod(x: Int, mod: Int): Int = {
    val rawMod = x % mod
    rawMod   (if (rawMod < 0) mod else 0)
  }

案例演示:

代码语言:javascript复制
  @Test
  def hashPartitionerTest(): Unit ={
    val conf=new SparkConf().setMaster("local[4]").setAppName("test")
    val sc=new SparkContext(conf)

    val list=List("张三"->18,"李四"->19,"王五"->20,"赵六"->21,"村长"->22,"福来"->23,"钱多多"->24,"房栋栋"->25)

    val rdd1: RDD[(String, Int)] = sc.parallelize(list, 4)

    val value: RDD[(String, Iterable[Int])] = rdd1.groupByKey()
    value.mapPartitionsWithIndex((index,it)=>{
      println(index,it.toList)
      it
    }).collect

  }

RDD默认分区就是HashPartitioner,分区就是调用者(RDD)的分区数

代码语言:javascript复制
(3,List((房栋栋,CompactBuffer(25))))
(1,List((张三,CompactBuffer(18)), (李四,CompactBuffer(19)), (钱多多,CompactBuffer(24)), (王五,CompactBuffer(20))))
(2,List((村长,CompactBuffer(22)), (福来,CompactBuffer(23))))
(0,List((赵六,CompactBuffer(21))))

也可以明确指定分区器(new HashPartitioner(partitions)) partitions=分区个数

代码语言:javascript复制
  @Test
  def hashPartitionerTest(): Unit ={
    val conf=new SparkConf().setMaster("local[4]").setAppName("test")
    val sc=new SparkContext(conf)

    val list=List("张三"->18,"李四"->19,"王五"->20,"赵六"->21,"村长"->22,"福来"->23,"钱多多"->24,"房栋栋"->25)

    val rdd1: RDD[(String, Int)] = sc.parallelize(list, 4)

    val value: RDD[(String, Iterable[Int])] = rdd1.groupByKey(new HashPartitioner(4))
    value.mapPartitionsWithIndex((index,it)=>{
      println(index,it.toList)
      it
    }).collect

  }

指定4个分区

代码语言:javascript复制
(0,List((赵六,CompactBuffer(21))))
(1,List((张三,CompactBuffer(18)), (李四,CompactBuffer(19)), (钱多多,CompactBuffer(24)), (王五,CompactBuffer(20))))
(2,List((村长,CompactBuffer(22)), (福来,CompactBuffer(23))))
(3,List((房栋栋,CompactBuffer(25))))

Ranger分区

RangePartitionz作用:将一定范围内的数映射到某一个分区内,尽量保证每个分区中的数据量均匀,而且分区与分区之间是有序的,一个分区中的元素肯定都是比另一个分区内的元素小或大,但是分区内的元素是不能保证顺序的。简单的说就是将一定范围内的映射到某一个分区内。

实现过程为:

  1. 先从整个RDD中采用水塘抽样算法,抽取出样本数据,将样本数据排序,计算出每个分区的最大key值,形成一个Array[Key]类型的数组变量rangeBounds;
  2. 判断key在rangeBounds中所处的范围,给出该key值在下一个RDD中的分区id下标;该分区器要求RDD中的KEY类型必须是可以排序的

 抽样范围计算 抽样范围计算

RangePartitioner 参数列表

代码语言:javascript复制
class RangePartitioner[K : Ordering : ClassTag, V](
    partitions: Int, # 分区个数
    rdd: RDD[_ <: Product2[K, V]], # 指定对按个RDD 进行抽样
    private var ascending: Boolean = true, 指定排序规则(默认为升序,分区间是有序的,分区内不一定有序)
    val samplePointsPerPartitionHint: Int = 20)
  extends Partitioner {...}

rangeBounds 中决定了抽样范围

代码语言:javascript复制
private var rangeBounds: Array[K] = {
    if (partitions <= 1) {
      Array.empty
    } else {
      // This is the sample size we need to have roughly balanced output partitions, capped at 1M.
      // Cast to double to avoid overflowing ints or longs
      val sampleSize = math.min(samplePointsPerPartitionHint.toDouble * partitions, 1e6)
      // Assume the input partitions are roughly balanced and over-sample a little bit.
      val sampleSizePerPartition = math.ceil(3.0 * sampleSize / rdd.partitions.length).toInt
      val (numItems, sketched) = RangePartitioner.sketch(rdd.map(_._1), sampleSizePerPartition)
      if (numItems == 0L) {
        Array.empty
      } else {
        // If a partition contains much more than the average number of items, we re-sample from it
        // to ensure that enough items are collected from that partition.
        val fraction = math.min(sampleSize / math.max(numItems, 1L), 1.0)
        val candidates = ArrayBuffer.empty[(K, Float)]
        val imbalancedPartitions = mutable.Set.empty[Int]
        sketched.foreach { case (idx, n, sample) =>
          if (fraction * n > sampleSizePerPartition) {
            imbalancedPartitions  = idx
          } else {
            // The weight is 1 over the sampling probability.
            val weight = (n.toDouble / sample.length).toFloat
            for (key <- sample) {
              candidates  = ((key, weight))
            }
          }
        }
        // 如果分区不均衡,重新采样
        if (imbalancedPartitions.nonEmpty) {
          // Re-sample imbalanced partitions with the desired sampling probability.
          val imbalanced = new PartitionPruningRDD(rdd.map(_._1), imbalancedPartitions.contains)
         // 计算种子
          val seed = byteswap32(-rdd.id - 1)
          // 调用 sample 进行采样。
          val reSampled = imbalanced.sample(withReplacement = false, fraction, seed).collect()
          val weight = (1.0 / fraction).toFloat
          candidates   = reSampled.map(x => (x, weight))
        }
        RangePartitioner.determineBounds(candidates, math.min(partitions, candidates.size))
      }
    }
  }

具体的采样实现

代码语言:javascript复制
  def sketch[K : ClassTag](
      rdd: RDD[K],
      sampleSizePerPartition: Int): (Long, Array[(Int, Long, Array[K])]) = {
    val shift = rdd.id
    // val classTagK = classTag[K] // to avoid serializing the entire partitioner object
    val sketched = rdd.mapPartitionsWithIndex { (idx, iter) =>
      // 计算种子
      val seed = byteswap32(idx ^ (shift << 16))
      val (sample, n) = SamplingUtils.reservoirSampleAndCount(
        iter, sampleSizePerPartition, seed)
      Iterator((idx, n, sample))
    }.collect() // 会再次运算job任务
    val numItems = sketched.map(_._2).sum
    (numItems, sketched)
  }

getPartition 会使用到rangeBounds,计算key获取对应分区。

代码语言:javascript复制
 def getPartition(key: Any): Int = {
    val k = key.asInstanceOf[K]
    var partition = 0
    if (rangeBounds.length <= 128) {
      // If we have less than 128 partitions naive search
      while (partition < rangeBounds.length && ordering.gt(k, rangeBounds(partition))) {
        partition  = 1
      }
    } else {
      // Determine which binary search method to use only once.
      partition = binarySearch(rangeBounds, k)
      // binarySearch either returns the match location or -[insertion point]-1
      if (partition < 0) {
        partition = -partition-1
      }
      if (partition > rangeBounds.length) {
        partition = rangeBounds.length
      }
    }
    if (ascending) {
      partition
    } else {
      rangeBounds.length - partition
    }
  }

对这一块源码,还不是很清楚,暂时记录一下。

案例演示:

代码语言:javascript复制
  @Test
  def rangePartitionerTest(): Unit ={
    val conf=new SparkConf().setMaster("local[4]").setAppName("test")
    val sc=new SparkContext(conf)

    // 创建本地集合RDD
    val list=List("1"->18,"2"->19,"3"->20,"4"->21,"5"->22,"6"->23,"7"->24,"8"->25)
    val rdd1: RDD[(String, Int)] = sc.parallelize(list, 4)

    //对数据进行分区,并使用RangePartitioner分区器,
    val value: RDD[(String, Iterable[Int])] = rdd1.groupByKey(new RangePartitioner(4, rdd1))

    value.mapPartitionsWithIndex((index,it)=>{
      println(index,it.toList)
      it
    }).collect

    value.foreach(e=>{
      println(e._1,e._2.toList)
    })

  }

各个分区数据

代码语言:javascript复制
(0,List((2,CompactBuffer(19)), (1,CompactBuffer(18))))
(2,List((5,CompactBuffer(22)), (6,CompactBuffer(23))))
(3,List((8,CompactBuffer(25)), (7,CompactBuffer(24))))
(1,List((4,CompactBuffer(21)), (3,CompactBuffer(20))))

Hash 与 Range的区别

  1. hash 是通过对key取hashcode%分区数(如果小于0就加上分区数,否则 0)的方式指定分区;Range是通过对RDD进行抽样,指定一个区间。然后计算key,确认key具体在那个区间中。
  2. hash 只是单纯的对key进行运算,不会重新运算job任务,range需要对分区进行抽样,需要运行一个job任务。
  3. RDD默认为HashPartitioner 分区器,即使不指定分区器默认的就是。Ragen需要明确指定。

自定义分区

上面说过,我们能使用spark 分区器的就有两种,HashPartitioner和RangePartitioner;很多时候根据业务的需求,需要自定义分区。如下数据: 需求要求 a,b,c华为一个分区,d,e,f换分为一个分区,剩下的分为一个分区。

代码语言:javascript复制
 val list=List("a"->100,"b"->100,"c"->100,"d"->100,"e"->100,"f"->100,"g"->100,"h"->100,"i"->100,"j"->100)

依样画葫芦 我们也许不知道怎么自定义一个分区,那么可以看看spark 自带的是怎么写的;如HashPartitioner

代码语言:javascript复制
class HashPartitioner(partitions: Int) extends Partitioner {
  require(partitions >= 0, s"Number of partitions ($partitions) cannot be negative.")

  def numPartitions: Int = partitions

  def getPartition(key: Any): Int = key match {
    case null => 0
    case _ => Utils.nonNegativeMod(key.hashCode, numPartitions)
  }

  override def equals(other: Any): Boolean = other match {
    case h: HashPartitioner =>
      h.numPartitions == numPartitions
    case _ =>
      false
  }

  override def hashCode: Int = numPartitions
}

查看 HashPartitioner的父类(Partitioner)

代码语言:javascript复制
abstract class Partitioner extends Serializable {
  def numPartitions: Int
  def getPartition(key: Any): Int
}

总结:

  • 继承 Partitioner类,它是一个抽象类。
  • 实现父类的numPartitions函数
  • 实现父类的getPartition 函数

自定义分区器

代码语言:javascript复制
/**
 * 自定义分区器
 * partitions 默认为3
 * @param partitions
 */
class CustomPartitioner(partitions: Int) extends Partitioner{
  override def numPartitions: Int = partitions

  override def getPartition(key: Any): Int = key match {
    case "a"|"b"|"c" =>1
    case "d"|"e"|"f" =>2
    case _=>0
  }
}

使用自定义分区器

代码语言:javascript复制
 @Test
  def partitionByTest(): Unit ={

    val list=List("a"->100,"b"->100,"c"->100,"d"->100,"e"->100,"f"->100,"g"->100,"h"->100,"i"->100,"j"->100)
    val rdd=sc.parallelize(list,4)

    val rdd3: RDD[(String, Int)] = rdd.partitionBy(new CustomPartitioner(3))

    rdd3.mapPartitionsWithIndex((index,it)=>{
      println(s"$index=${it.toList}")
      it
    }).collect

  }

结果

代码语言:javascript复制
0=List((g,100), (h,100), (i,100), (j,100))
1=List((a,100), (b,100), (c,100))
2=List((d,100), (e,100), (f,100))

注意:若出现这种序列化问题

代码语言:javascript复制
org.apache.spark.SparkException: Task not serializable

解决方式:

  1. CustomPartitioner 重新定义class文件创建
  2. 不要再 classobject 中创建(如下)
代码语言:javascript复制
class Test{
 class CustomPartitioner(partitions: Int) extends Partitioner{
...
 }
}

抽离出class,在外面定义

代码语言:javascript复制
class Test{
...
}
class CustomPartitioner(partitions: Int) extends Partitioner{
...
}
  1. 若在一个class文件中创建,请使外部实现Serializable接口
代码语言:javascript复制
class Test extends Serializable {
 class CustomPartitioner(partitions: Int) extends Partitioner{
...
 }
}
  1. 实现Serializable接口之后,出现部分属性无法序列化,可以使用 @transient 注解忽略。
代码语言:javascript复制
class Test extends Serializable {
 @transient
 val name="a"
 class CustomPartitioner(partitions: Int) extends Partitioner{
 ...
 }
}

该问题的原因:

Driver最终会将Task交给Executor进行执行,其中就需要进行将对象进行序列化,由于CustomPartitioner类在另一个class内部中,序列化CustomPartitioner就需要将外部类先进性序列化。而外部类并没有进行序列化,所以就报了这样的错。


数据倾斜

无论是HashPartitioner还是RangePartitioner都可能会有数据倾斜的问题产生,但是需要注意的是,出现数据倾斜是数据的原因,而不是分区器的原因,是需要单独处理的。

0 人点赞