Transformation转换算子之Key-Value类型

2022-04-27 14:57:22 浏览数 (1)

分类:

  1. partitionBy() 按照K重新分区
  2. 自定义分区
  3. reduceByKey()按照K聚合V
  4. groupByKey()按照K重新分组
  5. reduceByKey和groupByKey区别
  6. aggregateByKey()按照K处理分区内和分区间逻辑
  7. foldByKey()分区内和分区间相同的aggregateByKey()
  8. combineByKey()转换结构后分区内和分区间操作

SparkContext

SparkContext 定义成在全局范围,配置如下;

代码语言:javascript复制
  val conf=new SparkConf().setMaster("local[2]").setAppName("test")
  val sc=new SparkContext(conf)

partitionBy()

将RDD[K,V]中的K按照指定Partitioner重新进行分区; 如果原有的RDD和新的RDD是一致的话就不进行分区,否则会产生Shuffle过程。

定义一个集合,存放map元素(a-j);默认分区4个

代码语言:javascript复制
    val list=List("a"->100,"b"->100,"c"->100,"d"->100,"e"->100,"f"->100,"g"->100,"h"->100,"i"->100,"j"->100)
    val rdd=sc.parallelize(list,4)

查看默认分区情况

代码语言:javascript复制
    rdd.mapPartitionsWithIndex((index,it)=>{
      println(s"$index=${it.toList}")
      it
    }).collect

结果: 几乎比较均匀的分配到各个分区中

代码语言:javascript复制
0=List((a,100), (b,100)) 
1=List((c,100), (d,100), (e,100))
2=List((f,100), (g,100))
3=List((h,100), (i,100), (j,100))

使用 partitionBy 按照key进行分区

partitionBy 源码,需要让指定一个分区器(Partitioner)

代码语言:javascript复制
 def partitionBy(partitioner: Partitioner): RDD[(K, V)] = self.withScope {
    if (keyClass.isArray && partitioner.isInstanceOf[HashPartitioner]) {
      throw new SparkException("HashPartitioner cannot partition array keys.")
    }
    if (self.partitioner == Some(partitioner)) {
      self
    } else {
      new ShuffledRDD[K, V, V](self, partitioner)
    }
  }

查看spark有那些分区器

  • HashPartitioner:默认的分区器,通过对key进行hash运算,取余分区数的方式计算分区
  • RangePartitioner:
  • PythonPartitioner:spark内部使用的,外部无法使用
  • 自定义分区:开发者能使用的只有HashPartitionerRangePartitioner两种,若都无法满足我们的需求,就只能自定义分区器了。

使用HashPartitioner 作为 partitionBy的分区器

代码语言:javascript复制
//  HashPartitioner 需要指定一个分区数
val rdd2: RDD[(String, Int)] = rdd.partitionBy(new HashPartitioner(2))
//  查看分区情况
rdd2.mapPartitionsWithIndex((index,it)=>{
  println(s"$index=${it.toList}")
  it
}).collect

结果

代码语言:javascript复制
1=List((a,100), (c,100), (e,100), (g,100), (i,100))
0=List((b,100), (d,100), (f,100), (h,100), (j,100))

注意:

  1. 包位置需要指定为org.apache.spark.HashPartitioner
  2. 需要指定一个分区数new HashPartitioner(分区数) 完整代码
代码语言:javascript复制
  @Test
  def partitionByTest(): Unit ={

    val list=List("a"->100,"b"->100,"c"->100,"d"->100,"e"->100,"f"->100,"g"->100,"h"->100,"i"->100,"j"->100)
    val rdd=sc.parallelize(list,4)


    rdd.mapPartitionsWithIndex((index,it)=>{
      println(s"$index=${it.toList}")
      it
    }).collect

    val rdd2: RDD[(String, Int)] = rdd.partitionBy(new HashPartitioner(2))

    rdd2.mapPartitionsWithIndex((index,it)=>{
      println(s"$index=${it.toList}")
      it
    }).collect

  }

源码分析HashPartitioner是如何进行分区的

代码语言:javascript复制
class HashPartitioner(partitions: Int) extends Partitioner {
  require(partitions >= 0, s"Number of partitions ($partitions) cannot be negative.")
  // 将分区数传给numPartitions
  def numPartitions: Int = partitions
  // 这里才是重点;
  def getPartition(key: Any): Int = key match {
    // 通过模式匹配,判断key是否为null,若为null指定到0分区
    case null => 0
    // 获取 key的hashCode ; numPartitions 传入进来的分区数,也就是2
    case _ => Utils.nonNegativeMod(key.hashCode, numPartitions)
  }
  // 这个不太重要
  override def equals(other: Any): Boolean = other match {
    case h: HashPartitioner =>
      h.numPartitions == numPartitions
    case _ =>
      false
  }

  override def hashCode: Int = numPartitions
}

nonNegativeMod: 真正计算出分区的地方

代码语言:javascript复制
// x:上面传入的 hashcode 
// mod :上面传入的分区数
 def nonNegativeMod(x: Int, mod: Int): Int = {
    // hashcode取余分区数  ,取余出来的可能会是一个负数
    // 例如:scala> "hadoop".hashCode %2
    // res10: Int = -1
    val rawMod = x % mod
    // 分区数肯定不能为负数,于是他做了这样的判断,if (rawMod < 0)
    // 比如 rawMod =-1 ;就使用 mod   rawMod = 1 ;否则 mod  0;
    // 这样就很好的解决了取余后为分区可能负数的情况了。
    rawMod   (if (rawMod < 0) mod else 0)
  }

自定义分区

上面说过,我们能使用spark 分区器的就有两种,HashPartitioner和RangePartitioner;很多时候根据业务的需求,需要自定义分区。如下数据: 需求要求 a,b,c华为一个分区,d,e,f换分为一个分区,剩下的分为一个分区。

代码语言:javascript复制
 val list=List("a"->100,"b"->100,"c"->100,"d"->100,"e"->100,"f"->100,"g"->100,"h"->100,"i"->100,"j"->100)

依样画葫芦 我们也许不知道怎么自定义一个分区,那么可以看看spark 自带的是怎么写的;如HashPartitioner

代码语言:javascript复制
class HashPartitioner(partitions: Int) extends Partitioner {
  require(partitions >= 0, s"Number of partitions ($partitions) cannot be negative.")

  def numPartitions: Int = partitions

  def getPartition(key: Any): Int = key match {
    case null => 0
    case _ => Utils.nonNegativeMod(key.hashCode, numPartitions)
  }

  override def equals(other: Any): Boolean = other match {
    case h: HashPartitioner =>
      h.numPartitions == numPartitions
    case _ =>
      false
  }

  override def hashCode: Int = numPartitions
}

查看 HashPartitioner的父类(Partitioner)

代码语言:javascript复制
abstract class Partitioner extends Serializable {
  def numPartitions: Int
  def getPartition(key: Any): Int
}

总结:

  • 继承 Partitioner类,它是一个抽象类。
  • 实现父类的numPartitions函数
  • 实现父类的getPartition 函数

自定义分区器

代码语言:javascript复制
/**
 * 自定义分区器
 * partitions 默认为3
 * @param partitions
 */
class CustomPartitioner(partitions: Int) extends Partitioner{
  override def numPartitions: Int = partitions

  override def getPartition(key: Any): Int = key match {
    case "a"|"b"|"c" =>1
    case "d"|"e"|"f" =>2
    case _=>0
  }
}

使用自定义分区器

代码语言:javascript复制
 @Test
  def partitionByTest(): Unit ={

    val list=List("a"->100,"b"->100,"c"->100,"d"->100,"e"->100,"f"->100,"g"->100,"h"->100,"i"->100,"j"->100)
    val rdd=sc.parallelize(list,4)

    val rdd3: RDD[(String, Int)] = rdd.partitionBy(new CustomPartitioner(3))

    rdd3.mapPartitionsWithIndex((index,it)=>{
      println(s"$index=${it.toList}")
      it
    }).collect

  }

结果

代码语言:javascript复制
0=List((g,100), (h,100), (i,100), (j,100))
1=List((a,100), (b,100), (c,100))
2=List((d,100), (e,100), (f,100))

注意:若出现这种序列化问题

代码语言:javascript复制
org.apache.spark.SparkException: Task not serializable

解决方式:

  1. CustomPartitioner 重新定义class文件创建
  2. 不要再 classobject 中创建(如下)
代码语言:javascript复制
class Test{
 class CustomPartitioner(partitions: Int) extends Partitioner{
...
 }
}

抽离出class,在外面定义

代码语言:javascript复制
class Test{
...
}
class CustomPartitioner(partitions: Int) extends Partitioner{
...
}
  1. 若在一个class文件中创建,请使外部实现Serializable接口
代码语言:javascript复制
class Test extends Serializable {
 class CustomPartitioner(partitions: Int) extends Partitioner{
...
 }
}
  1. 实现Serializable接口之后,出现部分属性无法序列化,可以使用 @transient 注解忽略。
代码语言:javascript复制
class Test extends Serializable {
 @transient
 val name="a"
 class CustomPartitioner(partitions: Int) extends Partitioner{
 ...
 }
}

该问题的原因:

Driver最终会将Task交给Executor进行执行,其中就需要进行将对象进行序列化,由于CustomPartitioner类在另一个class内部中,序列化CustomPartitioner就需要将外部类先进性序列化。而外部类并没有进行序列化,所以就报了这样的错。


reduceByKey()

功能说明:该操作可以将RDD[K,V]中的元素按照相同的K对V进行聚合。其存在多种重载形式,还可以设置新RDD的分区数。

  • 需求:考试成绩下来了,统计语文,数学,英语各成绩的总成绩
代码语言:javascript复制
    val rdd1 = sc.parallelize(List("语文"->10,"语文"->20,"数学"->15,"语文"->30,"数学"->33,"英语"->12,"语文"->40,"数学"->21,"英语"->50,"英语"->100),2)
代码语言:javascript复制
val result= rdd1.reduceByKey((x, y) => {
  x y
}).collect

println(result.toList)

结果

代码语言:javascript复制
List((数学,69), (英语,162), (语文,100))
  • 原理分析 查看分区情况
代码语言:javascript复制
   rdd1.mapPartitionsWithIndex((index,it)=>{
      println(s"$index=${it.toList}")
      it
    }).collect

结果

代码语言:javascript复制
0=List((语文,10), (语文,20), (数学,15), (语文,30), (数学,33))
1=List((英语,12), (语文,40), (数学,21), (英语,50), (英语,100))

原理图:

 reduceByKey执行流程图 reduceByKey执行流程图

上传上去不太清楚的酱紫,我也

说明:

  1. 根据分区大小(这里设置分区数为2),设置将数据写入分布到各个分区中,
代码语言:javascript复制
0=List((语文,10), (语文,20), (数学,15), (语文,30), (数学,33))
1=List((英语,12), (语文,40), (数学,21), (英语,50), (英语,100))
  1. 程序运行时会将数据写入缓冲区中(MapReduce流程差不多),缓冲区处于内存中,无法无限存入数据,所以会溢写入磁盘中。
  2. 在不影响程序最终结果的情况下使用combiner可以更好的提高效率,在reduceByKey中无论如何都会进行一次combiner(用于提高效率)。

  combiner combiner

  1. 对数据按照key进行分组,并再次调用 reduce程序代码(如下),对单个组的数据进行聚合运算
代码语言:javascript复制
 val result2=rdd1.reduceByKeyLocally((x, y) => {x   y})
  1. 计算结果完成后再将数据溢写入磁盘。
  2. rdd2 类似于reduce,他会对分区类的数据再进进行聚合统计

 reduce-rdd2 reduce-rdd2
  1. 最终得到想要的数据结果
代码语言:javascript复制
List((数学,69), (英语,162), (语文,100))
  • 完整代码
代码语言:javascript复制
  @Test
  def reduceByKeyTest(): Unit ={
    val rdd1 = sc.parallelize(List("语文"->10,"语文"->20,"数学"->15,"语文"->30,"数学"->33,"英语"->12,"语文"->40,"数学"->21,"英语"->50,"英语"->100),2)

    val result= rdd1.reduceByKey((x, y) => {
      x y
    }).collect

    println(result.toList)

   rdd1.mapPartitionsWithIndex((index,it)=>{
      println(s"$index=${it.toList}")
      it
    }).collect

    val result2=rdd1.reduceByKeyLocally((x, y) => {
      x   y
    })

    println(result2.toList)
  }
  • 总结:

reduceByKey(func: (RDD Value值类型,RDD value值) => RDD value值): 根据key分组之后,所有该key的value值进行聚合 reduceByKey里面的函数是针对每个组的所有value值操作 reduceByKey 会经过一次shuffle

groupByKey()

groupByKey对每个key进行操作,但只生成一个seq,并不进行聚合。 该操作可以指定分区器或者分区数(默认使用HashPartitioner)

groupByKey 有三个重载方法

  • groupByKey(partitioner: Partitioner) 指定一个分区器
  • groupByKey(): 底层实现就是调用了groupByKey(partitioner: Partitioner) 默认的分区器为HashPartitioner 分区器的分区数默认为最开始配置的大小(2)
  • groupByKey(numPartitions: Int) 底层实现也是调用groupByKey(partitioner: Partitioner); 并直接定分区器为HashPartitioner 创建HashPartitioner分区器同时,也为其指定了分区数大小numPartitions

案例:使用默认的无参的groupByKey()

代码语言:javascript复制
 @Test
  def groupByKeyTest(): Unit ={
    val rdd1 = sc.parallelize(List("语文"->10,"语文"->20,"数学"->15,"语文"->30,"数学"->33,"英语"->12,"语文"->40,"数学"->21,"英语"->50,"英语"->100),2)
    val value: RDD[(String, Iterable[Int])] = rdd1.groupByKey()
    println(value.collect.toList)
  }

结果:

代码语言:javascript复制
List((数学,CompactBuffer(15, 33, 21)), (英语,CompactBuffer(12, 50, 100)), (语文,CompactBuffer(10, 20, 30, 40)))

reduceByKey和groupByKey区别

  1. reduceByKey存在combiner行为,性能更高
  2. groupByKey不存在conbiner行为,性能比较低

工作中推荐使用reduceByKey这种高性能shuffle算子

aggregateByKey()

foldByKey()

在scala中也有fold()函数,与reduce()唯一的区别就是,reduce会把第一个列表中第一个元作为参数的默认值,而fold(),可以指定一个默认值,其他操作和fold与reduce没有什么不同。在spark中foldByKey()和reduceBykey()亦是如此。

foldByKey 参数说明

代码语言:javascript复制
def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)] = self.withScope {
    foldByKey(zeroValue, defaultPartitioner(self))(func)
}

这种((zeroValue: V)(func: (V, V) => V))语法称为:函数柯里化(zeroValue: V):需要指定一个默认值; (func: (V, V) => V):具体的操作逻辑

案例: 统计各科总成绩,校长心情比较好,决定在总成绩的分数上再加一百分

代码语言:javascript复制
  @Test
  def foldByKeyTest(): Unit ={
    val rdd = sc.parallelize(List("语文"->10,"语文"->20,"数学"->15,"语文"->30,"数学"->33,"英语"->12,"语文"->40,"数学"->21,"英语"->50,"英语"->100),2)

   // 使用 foldByKey 数据汇总
    val value: RDD[(String, Int)] = rdd.foldByKey(100)((v1, v2) => {
      v1   v2
    })

    println(value.collect.toList)

  }

结果:

代码语言:javascript复制
List((数学,269), (英语,262), (语文,300))

aggregateByKey()

combineByKey()

combineByKey 参数说明:

代码语言:javascript复制
def combineByKey[C](
      createCombiner: V => C,
      mergeValue: (C, V) => C,
      mergeCombiners: (C, C) => C): RDD[(K, C)] = self.withScope {
    combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners)(null)
  }
  • createCombiner(转换数据的结构) combineByKey() 会遍历分区中的所有元素,因此每个元素的键要么还没有遇到过,要么就和之前的某个元素的键相同。如果这是一个新的元素,combineByKey()会使用一个叫作createCombiner()的函数来创建那个键对应的累加器的初始值。 简单说明:在combiner阶段对每个组的第一个vlaue值进行转换
  • mergeValue(分区内) 如果这是一个在处理当前分区之前已经遇到的键,它会使用mergeValue()方法将该键的累加器对应的当前值与这个新的值进行合并。 简单说明:combiner的聚合逻辑
  • mergeCombiners(分区间) 由于每个分区都是独立处理的,因此对于同一个键可以有多个累加器。如果有两个或者更多的分区都有对应同一个键的累加器,就需要使用用户提供的mergeCombiners()方法将各个分区的结果进行合并。 简单说明:reduce的聚合逻辑

案例: 求每门学科的平均成绩 输入如下:

代码语言:javascript复制
val rdd = sc.parallelize(List("语文"->10,"语文"->20,"数学"->15,"语文"->30,"数学"->33,"英语"->12,"语文"->40,"数学"->21,"英语"->50,"英语"->100),2)

完整代码如下:

代码语言:javascript复制
@Test
  def combineByKeyTest(): Unit ={
    val rdd = sc.parallelize(List("语文"->10,"语文"->20,"数学"->15,"语文"->30,"数学"->33,"英语"->12,"语文"->40,"数学"->21,"英语"->50,"英语"->100),2)
    // 求每门学科的平均成绩
    val value: RDD[(String, (Int, Int))] = rdd.combineByKey(v => (v, 1), (c: (Int, Int), v: Int) => (c._1 v, c._2 1), (c1: (Int, Int), c2: (Int, Int)) => (c1._1 c2._1, c1._2 c2._2))

    //获得各科成绩的总分数和个数
    val result=value.collect.toList
    println(result)

    result.foreach(m=>{
      m match {
        case (curr,(totalScore,size)) => println(s"课程:$curr,总分:$totalScore,个数:$size,平均数:${totalScore.toDouble/size}")
      }
    })

结果:

代码语言:javascript复制
List((数学,(69,3)), (英语,(162,3)), (语文,(100,4)))
课程:数学,总分:69,个数:3,平均数:23.0
课程:英语,总分:162,个数:3,平均数:54.0
课程:语文,总分:100,个数:4,平均数:25.0

代码分析(我觉得挺复杂的,有点太抽象了); 首先我们应该理解的时候,combineByKey各个阶段的操作都是针对于 value 而言,毕竟时BykeycombineByKey 中的C,V其实表示的含义就是传入的value,返回的也是一个value

代码语言:javascript复制
val value: RDD[(String, (Int, Int))] = rdd.combineByKey(
 v => (v, 1),
 (c: (Int, Int), v: Int) => (c._1 v, c._2 1), (c1: (Int, Int), 
 c2: (Int, Int)) => (c1._1 c2._1, c1._2 c2._2)
)

第一个参数:createCombiner createCombiner: V => C :他希望我们传入进来C的是一个value,就是一个个分数(10,20,15等),返回的V将作为下一个函数参数的C(mergeValue) v => (v, 1) 此时结合我们的业务,统计平均数,我们首先得知道语文有多少个,数学有多少个,英语多少个。如何通过combineByKey来实现呢?结合createCombiner的特性在combiner阶段对每个组的第一个vlaue值进行转换,我们就可以将计算器(用1标识)存放到value中 结果应该是这样的。

代码语言:javascript复制
"语文"->(10,1),"语文"->(20,1),"数学"->(15,1),"语文"->(30,1),"数学"->(33,1),
"英语"->(12,1),"语文"->(40,1),"数学"->(21,1),"英语"->(50,1),"英语"->(100,1)

由于是在combiner阶段,在combiner阶段对每个组的第一个vlaue值进行转换 语文:

代码语言:javascript复制
"语文"->(10,1)

数学:

代码语言:javascript复制
"数学"->(15,1)

英语:

代码语言:javascript复制
"英语"->(12,1)

然后就是第二个参数mergeValue,他的解释是combiner的聚合逻辑;待会解释是什么意思 先看看mergeValue需要指定哪些参数 : (C, V) => C, C:就是上一个函数参数(createCombiner)返回的结果(如:(10,1),(20,1),(30,1)) V:表示带聚合的元素 返回的C将会作为下一个函数参数的C(mergeCombiners的参数C)。应该能明白了吧。

了解了mergeValue各个参数的意思及返回参数的意思之后,再次回到业务中。 (c: (Int, Int), v: Int) => (c._1 v, c._2 1), (c1: (Int, Int): 拿语文进行举例

代码语言:javascript复制
"语文"->(10,1)

第一次:c._1 表示成绩也是 元素(10,1),v: 表示 下一个带聚合的元素(20,1 1), 结果就是(30,2) 第二次:c._1 表示上一个结果(30,2),v: 表示 下一个带聚合的元素(30,2 1),结果就是(60,3) 第二次:c._1 表示上一个结果(60,3),v: 表示 下一个带聚合的元素(40,3 1),结果就是(100,4)

最终结果

代码语言:javascript复制
语文 -> (100,4)
数学 -> (69,3)
英语 -> (162,3)

mergeValue之后,最终溢写到磁盘

mergeCombiners 就比较简单了,就是一个reduce操作。 注意:我上面的方式是建立在一个分区情况下,多个分区也是一样的流程。 mergeCombiners 中就是将多个 分区进行最后的聚合处理。

原理图:

 combineByKey原理图 combineByKey原理图

我也是在学习阶段,理解可能没那么透彻,文章若有什么不对,希望可以指出来。

除了使用combineByKey可以使用reduceByKey的方式实现类似的功能,对比combineByKey还更简单一点。

代码语言:javascript复制
  @Test
  def combineByKeyTest(): Unit ={
    val rdd = sc.parallelize(List("语文"->10,"语文"->20,"数学"->15,"语文"->30,"数学"->33,"英语"->12,"语文"->40,"数学"->21,"英语"->50,"英语"->100),2)

    // 通过 map方式实现一样的功能
    val rdd2=rdd.map(x=>{
      (x._1,(x._2,1))
    }).reduceByKey((v1,v2)=>{
      (v1._1 v2._1,v1._2 v2._2)
    })
    println(rdd2.collect.toList)
 }

结果:

代码语言:javascript复制
List((数学,(69,3)), (英语,(162,3)), (语文,(100,4)))

其实reduceByKey底层就是使用的是combineByKey combineByKey 底层实现

代码语言:javascript复制
def combineByKeyWithClassTag[C](
      createCombiner: V => C,
      mergeValue: (C, V) => C,
      mergeCombiners: (C, C) => C)(implicit ct: ClassTag[C]): RDD[(K, C)] = self.withScope {
    // 注意这个函数
    combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners, defaultPartitioner(self))
  }

reduceByKey 底层实现

代码语言:javascript复制
 def reduceByKey(func: (V, V) => V): RDD[(K, V)] = self.withScope {
    reduceByKey(defaultPartitioner(self), func)
  }

在往reduceByKey(defaultPartitioner(self), func)中点击

代码语言:javascript复制
def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = self.withScope {
    // 注意这个函数
   // (v: V) => v ;自己转自己,啥都没干。
    combineByKeyWithClassTag[V]((v: V) => v, func, func, partitioner)
}

看到了吗? 都是使用的是combineByKeyWithClassTag来实现。

0 人点赞