Spark累加器(Accumulator)

2022-07-26 21:36:02 浏览数 (1)

什么是累加器

累加器:分布式共享只写变量。(Executor和Executor之间不能读数据) 累加器用来把Executor端变量信息聚合到Driver端。在Driver程序中定义的变量,在Executor端的每个task都会得到这个变量的一份新的副本,每个task更新这些副本的值后,传回Driver端进行merge。

案例演示

统计列表中的元素之和

代码语言:javascript复制
  @Test
  def demo: Unit ={
    
    val conf=new SparkConf().setMaster("local[2]").setAppName("test")
    val sc =new SparkContext(conf)

    //定义一个集合,分区为2;方便计算
    val rdd1: RDD[Int] = sc.parallelize(List(3, 2, 5, 4, 8, 6), 2)
    
    // 统计元素之和
    var sum=0
    
    // 循环累加
    rdd1.foreach(e=>{
      sum =sum e 
    })

    // 输出结果
   println(s"sum=$sum")
  }

此时 sum 结果为多少? 答案为0

代码语言:javascript复制
sum=0

为什么是0呢?难道不应该是3 2 5 4 8 6=28吗? 原因很简单,foreach 属于Action算子;算子都是是Executor中执行的,算子外的都在是Driver中执行的。若算子中的若要引入外部变量的数据,就需要进行序列化。 具体的操作如图;

草图

虽然对sum进行累加,但只是作用于分区内而言,对于Driver而言,sum始终是没有改变的。 我们可以打印出来看看,task就是一个线程,使用Thread.currentThread().getName可以获取线程名称

代码语言:javascript复制
    // 循环累加
    rdd1.foreach(e=>{
      sum =sum e
      println(s"${Thread.currentThread().getName};sum=$sum, e=$e ")
    })

分区0

代码语言:javascript复制
Executor task launch worker for task 0;sum=3, e=3 
Executor task launch worker for task 0;sum=5, e=2 
Executor task launch worker for task 0;sum=10, e=5 

分区1

代码语言:javascript复制
Executor task launch worker for task 1;sum=4, e=4
Executor task launch worker for task 1;sum=12, e=8 
Executor task launch worker for task 1;sum=18, e=6 

当然你可以说,我不用foreach,用其他的算子不行吗?当然可以,比如使用reduce

代码语言:javascript复制
  @Test
  def demo: Unit ={

    val conf=new SparkConf().setMaster("local[2]").setAppName("test")
    val sc =new SparkContext(conf)

    //定义一个集合,分区为2
    val rdd1: RDD[Int] = sc.parallelize(List(3, 2, 5, 4, 8, 6), 2)

    // 数据聚集
    val sum=rdd1.reduce(_ _)

    // 输出结果
    println(s"sum=$sum")

  }

输出结果,答案是28

代码语言:javascript复制
sum=28

条条大路通罗马,实现方式多种多样。

在Spark中如果想在Task计算的时候统计某些事件的数量,使用filter/reduce也可以,但是使用累加器是一种更方便的方式,累加器一个比较经典的应用场景是用来在Spark Streaming应用中记录某些事件的数量。

累加器的使用

使用累加器需要使用SparkContext设置 如下:sumAccumulator=累加器取个名

代码语言:javascript复制
val sumAccumulator=sc.longAccumulator("sumAccumulator")

内置累加器 内置的累加器有三种,LongAccumulator、DoubleAccumulator、CollectionAccumulator LongAccumulator: 数值型累加

代码语言:javascript复制
LongAccumulator longAccumulator = sc.longAccumulator("long-account");

DoubleAccumulator: 小数型累加

代码语言:javascript复制
DoubleAccumulator doubleAccumulator = sc.doubleAccumulator("double-account");

CollectionAccumulator:集合累加

代码语言:javascript复制
CollectionAccumulator<Integer> collectionAccumulator = sc.collectionAccumulator("double-account");

案例演示:

代码语言:javascript复制
  @Test
  def demo2(): Unit ={

    val conf=new SparkConf().setMaster("local[2]").setAppName("test")
    val sc =new SparkContext(conf)
    // 定义累加器
    val sumAccumulator=sc.longAccumulator("sumAccumulator")

    //定义一个集合,分区为2
    val rdd1: RDD[Int] = sc.parallelize(List(3, 2, 5, 4, 8, 6), 2)

    // 循环累加
    rdd1.foreach(e=>{
      sumAccumulator.add(e)
    })

    // 输出结果
    println(s"sum=${sumAccumulator.value}")

 }

结果

代码语言:javascript复制
sum=28

其他两种也就不演示了,使用起来都是一样。 add:存放数据 value:获取结果


累加器的作用

累加器:分布式只写变量(Executor端的task不能互相访问累加器的值)。 累加器对信息进行聚合。向Spark传递函数时,通常可以使用Driver端定义的变量,但是在Executor端使用此变量时,每个task中使用的都是此变量的副本。如果变量的值发生了变化,Driver端的变量值却不会改变。 我们可以通过累加器实现分片处理,同时更新变量值 原文链接:https://blog.csdn.net/FlatTiger/article/details/115133641 可以不用,但是不能不会。

自定义累加器

自定义累加器步骤

  1. 定义 1.定义class继承AccumulatorV2 2.重写抽象方法
  2. 使用 1.初始化累加器对象 2.注册累加器 3.在分区中累加数据 4.获取最终结果

案例: 使用累加器实现WroldCount功能

  1. 定义一个class 继承AccumulatorV2 AccumulatorV2需要我们指定两个类型, INT:表示输入的数据类型 OUT:表示返回结果的数据类型。
代码语言:javascript复制
abstract class AccumulatorV2[IN, OUT]

不太理解没有关系,我们可以看看longAccumulator累加器中 INOUT 指定是什么? 传进去的是一个Long ,返回的也是一个Long;

代码语言:javascript复制
class LongAccumulator extends AccumulatorV2[jl.Long, jl.Long] {

我们在哪里传入的呢? add 就是传进去的参数(int 可以自动转为long)

代码语言:javascript复制
// 循环累加
rdd1.foreach(e=>{
  sumAccumulator.add(e)
})

我的思考方式应该是,我们应该给add传入什么类型的数据,该数据类型不就是IN吗? 既然是单词出现的个数,能否指定为String?若只是单纯的指定为String好像不太好计算。

代码语言:javascript复制
List("python","java","python","java","spark")

我们可以给每个单词分配一个值 1;

代码语言:javascript复制
List(("python",1),("java",1),("python",1),("java",1),("spark",1))

这样IN 的参数类型就明确了,首先是一个元组,元组类型为(String,Int) 那么OUT的类型呢?看下面的代码片段思考出了什么吗?

代码语言:javascript复制
// 输出结果
println(s"sum=${sumAccumulator.value}")

value 返回是不是最终的结果?WorldCount程序数据结果是什么? 是否就是这个?

代码语言:javascript复制
List(("python",2),("java",2),("spark",1))

OUT的类型,我们可以指定成一个List ,里面的元素类型,还是一个元组(String,Int)

还需要重写里面的方法。

代码语言:javascript复制
class CustomAccumulator extends AccumulatorV2[(String,Int),List[(String,Int)]]{
  
  /**
   * 累加器是否为空
   */
  override def isZero: Boolean = ???
  
  /**
   * 复制累加器
   */
  override def copy(): AccumulatorV2[(String, Int), List[(String, Int)]] = ???
  
  /**
   * 重置累加器
   */
  override def reset(): Unit = ???
  
  /**
   * 累加元素 [在每个task中累加]
   */
  override def add(v: (String, Int)): Unit = ???
  
  /**
   * 合并每个task的累加结果【在Driver中合并】
   */
  override def merge(other: AccumulatorV2[(String, Int), List[(String, Int)]]): Unit = ???
  
  /**
   * 获取Driver汇总结果
   */
  override def value: List[(String, Int)] = ???
}

先不着急写里面的实现,先调用,这样方便理解。

代码语言:javascript复制
  @Test
  def demo3(): Unit ={


    val conf=new SparkConf().setMaster("local[2]").setAppName("test")
    val sc =new SparkContext(conf)

    //初始化累加器
    val acc = new CustomAccumulator

    //注册累加器
    sc.register(acc,"CustomAccumulator")

    //读取文件
    val lines=sc.textFile("file:///C:/Users/123456/Desktop/worldCount.txt",2)

    // 列裁剪,数据扁平化
    val value: RDD[String] = lines.flatMap(_.split(" "))

    // 转换成我们需要的数据结构
    val mapList: RDD[(String, Int)] = value.map(e => (e, 1))

    // 循环累加
    mapList.foreach(e=>{
      acc.add(e)
    })

    // 输出结果
    println(s"sum=${acc.value}")

  }

worldCount.txt 内容

代码语言:javascript复制
hello java shell
python java java
wahaha java shell
hello java shell shell

每一个元素都会交给add,就先完成add函数

代码语言:javascript复制
  import scala.collection.mutable
  // 定义一个可变map 存储add 传入进来的元素
  val result=mutable.Map[String,Int]()
代码语言:javascript复制
  /**
   * 累加元素 [在每个task中累加]
   */
  override def add(v: (String, Int)): Unit = {
    // 传入进来的元素存到哪里?可以定义一个可变Map,存储每一个元素

    // 根据key找到map中的元素,修改原来的总数
    val sum=this.result.getOrElse(v._1,0) v._2
    // 覆盖原来的key
    this.result.put(v._1,sum) 
  }

不太理解也没关系,下面有完整的代码。

value 返回的结果不就是result的结果吗?所以直接maplist

代码语言:javascript复制
  /**
   * 获取Driver汇总结果
   */
  override def value: List[(String, Int)] = this.result.toList

目前完成代码

代码语言:javascript复制
class CustomAccumulator extends AccumulatorV2[(String,Int),List[(String,Int)]]{
  import scala.collection.mutable
  // 定义一个可变map 存储add 传入进来的元素
  val result=mutable.Map[String,Int]()


  /**
   * 累加器是否为空
   */
  override def isZero: Boolean = ???

  /**
   * 复制累加器
   */
  override def copy(): AccumulatorV2[(String, Int), List[(String, Int)]] = ???

  /**
   * 重置累加器
   */
  override def reset(): Unit = ???

  /**
   * 累加元素 [在每个task中累加]
   */
  override def add(v: (String, Int)): Unit = {
    // 传入进来的元素存到哪里?可以定义一个可变Map,存储每一个元素

    // 根据key找到map中的元素,修改原来的总数
    val sum=this.result.getOrElse(v._1,0) v._2
    // 覆盖原来的key
    this.result.put(v._1,sum)

  }

  /**
   * 合并每个task的累加结果【在Driver中合并】
   */
  override def merge(other: AccumulatorV2[(String, Int), List[(String, Int)]]): Unit = ???

  /**
   * 获取Driver汇总结果
   */
    override def value: List[(String, Int)] = this.result.toList
}

当前累加器的数据都是在result中,所以直接判断 result是否为空即可

代码语言:javascript复制
/**
   * 累加器是否为空
   */
  override def isZero: Boolean = result.isEmpty

复制累加器;理解起来有点抽象,new CustomAccumulator定义在Driver中,但是整个计算是在每个分区中,所以我们需要创建一个新的累加器给他(后面会有画图,理解起来就不会那么抽象了)。

代码语言:javascript复制
  /**
   * 复制累加器
   */
  override def copy(): AccumulatorV2[(String, Int), List[(String, Int)]] =  new CustomAccumulator()

重置累加器 : 就是清空数据

代码语言:javascript复制
  /**
   * 重置累加器
   */
  override def reset(): Unit = this.result.clear()

上面说了,计算都在分区中进行的,所以需要对每个分区的数据进行汇总

代码语言:javascript复制
  /**
   * 合并每个task的累加结果【在Driver中合并】
   */
  override def merge(other: AccumulatorV2[(String, Int), List[(String, Int)]]): Unit = {
    // 获取其他分区的累加器数据结果
    val value: List[(String, Int)] = other.value

    //与result数据合并
    val list: List[(String, Int)] = result.toList
    // 此时 newList 中肯定有重复数据
    val newList: List[(String, Int)] =list  value

    // 分组,聚合
    val groupList: Map[String, List[(String, Int)]] = newList.groupBy(e => e._1)
    println(groupList)


    // e._1 单词
    // e._2 依然还是一个列表
    // e._2.map(_._2).sum  获取里面的单词数
    val newResult: Map[String, Int] =groupList.map(e=>{
      val sum = e._2.map(_._2).sum
      (e._1,sum)
    })
    // 合并map
    result.  =(newResult)

  }

完整代码

代码语言:javascript复制
class CustomAccumulator extends AccumulatorV2[(String,Int),List[(String,Int)]]{
  import scala.collection.mutable
  // 定义一个可变map 存储add 传入进来的元素
  val result=mutable.Map[String,Int]()


  /**
   * 累加器是否为空
   */
  override def isZero: Boolean = result.isEmpty

  /**
   * 复制累加器
   */
  override def copy(): AccumulatorV2[(String, Int), List[(String, Int)]] =new CustomAccumulator()

  /**
   * 重置累加器
   */
  override def reset(): Unit = this.result.clear()

  /**
   * 累加元素 [在每个task中累加]
   */
  override def add(v: (String, Int)): Unit = {
    // 传入进来的元素存到哪里?可以定义一个可变Map,存储每一个元素


    // 根据key找到map中的元素,修改原来的总数
    val sum=this.result.getOrElse(v._1,0) v._2
    // 覆盖原来的key
    this.result.put(v._1,sum)

  }

  /**
   * 合并每个task的累加结果【在Driver中合并】
   */
  override def merge(other: AccumulatorV2[(String, Int), List[(String, Int)]]): Unit = {
    // 获取其他分区的累加器数据结果
    val value: List[(String, Int)] = other.value

    //与result数据合并
    val list: List[(String, Int)] = result.toList
    // 此时 newList 中肯定有重复数据
    val newList: List[(String, Int)] =list  value

    // 分组,聚合
    val groupList: Map[String, List[(String, Int)]] = newList.groupBy(e => e._1)
    println(groupList)


    // e._1 单词
    // e._2 依然还是一个列表
    // e._2.map(_._2).sum  获取里面的单词数
    val newResult: Map[String, Int] =groupList.map(e=>{
      val sum = e._2.map(_._2).sum
      (e._1,sum)
    })
    // 合并map
    result.  =(newResult)

  }

  /**
   * 获取Driver汇总结果
   */
    override def value: List[(String, Int)] = this.result.toList
}

数据结果

代码语言:javascript复制
sum=List((wahaha,1), (java,5), (shell,4), (hello,2), (python,1))

分区二与分区一合并的数据。

代码语言:javascript复制
Map(shell -> List((shell,2), (shell,2)), wahaha -> List((wahaha,1)), java -> List((java,1), (java,4)), python -> List((python,1)), hello -> List((hello,1), (hello,1)))

0 人点赞