什么是累加器
累加器:分布式共享只写变量。(Executor和Executor之间不能读数据) 累加器用来把Executor端变量信息聚合到Driver端。在Driver程序中定义的变量,在Executor端的每个task都会得到这个变量的一份新的副本,每个task更新这些副本的值后,传回Driver端进行merge。
案例演示
统计列表中的元素之和
代码语言:javascript复制 @Test
def demo: Unit ={
val conf=new SparkConf().setMaster("local[2]").setAppName("test")
val sc =new SparkContext(conf)
//定义一个集合,分区为2;方便计算
val rdd1: RDD[Int] = sc.parallelize(List(3, 2, 5, 4, 8, 6), 2)
// 统计元素之和
var sum=0
// 循环累加
rdd1.foreach(e=>{
sum =sum e
})
// 输出结果
println(s"sum=$sum")
}
此时 sum
结果为多少? 答案为0
sum=0
为什么是0
呢?难道不应该是3 2 5 4 8 6=28
吗?
原因很简单,foreach
属于Action算子
;算子都是是Executor
中执行的,算子外的都在是Driver
中执行的。若算子中的若要引入外部变量的数据,就需要进行序列化
。
具体的操作如图;
草图
虽然对sum进行累加,但只是作用于分区内而言,对于Driver
而言,sum
始终是没有改变的。
我们可以打印出来看看,task
就是一个线程,使用Thread.currentThread().getName
可以获取线程名称
// 循环累加
rdd1.foreach(e=>{
sum =sum e
println(s"${Thread.currentThread().getName};sum=$sum, e=$e ")
})
分区0
代码语言:javascript复制Executor task launch worker for task 0;sum=3, e=3
Executor task launch worker for task 0;sum=5, e=2
Executor task launch worker for task 0;sum=10, e=5
分区1
代码语言:javascript复制Executor task launch worker for task 1;sum=4, e=4
Executor task launch worker for task 1;sum=12, e=8
Executor task launch worker for task 1;sum=18, e=6
当然你可以说,我不用foreach
,用其他的算子不行吗?当然可以,比如使用reduce
。
@Test
def demo: Unit ={
val conf=new SparkConf().setMaster("local[2]").setAppName("test")
val sc =new SparkContext(conf)
//定义一个集合,分区为2
val rdd1: RDD[Int] = sc.parallelize(List(3, 2, 5, 4, 8, 6), 2)
// 数据聚集
val sum=rdd1.reduce(_ _)
// 输出结果
println(s"sum=$sum")
}
输出结果,答案是28
代码语言:javascript复制sum=28
条条大路通罗马,实现方式多种多样。
在Spark中如果想在Task计算的时候统计某些事件的数量,使用filter/reduce也可以,但是使用累加器是一种更方便的方式,累加器一个比较经典的应用场景是用来在Spark Streaming应用中记录某些事件的数量。
累加器的使用
使用累加器需要使用SparkContext
设置
如下:sumAccumulator
=累加器取个名
val sumAccumulator=sc.longAccumulator("sumAccumulator")
内置累加器 内置的累加器有三种,LongAccumulator、DoubleAccumulator、CollectionAccumulator LongAccumulator: 数值型累加
代码语言:javascript复制LongAccumulator longAccumulator = sc.longAccumulator("long-account");
DoubleAccumulator: 小数型累加
代码语言:javascript复制DoubleAccumulator doubleAccumulator = sc.doubleAccumulator("double-account");
CollectionAccumulator:集合累加
代码语言:javascript复制CollectionAccumulator<Integer> collectionAccumulator = sc.collectionAccumulator("double-account");
案例演示:
代码语言:javascript复制 @Test
def demo2(): Unit ={
val conf=new SparkConf().setMaster("local[2]").setAppName("test")
val sc =new SparkContext(conf)
// 定义累加器
val sumAccumulator=sc.longAccumulator("sumAccumulator")
//定义一个集合,分区为2
val rdd1: RDD[Int] = sc.parallelize(List(3, 2, 5, 4, 8, 6), 2)
// 循环累加
rdd1.foreach(e=>{
sumAccumulator.add(e)
})
// 输出结果
println(s"sum=${sumAccumulator.value}")
}
结果
代码语言:javascript复制sum=28
其他两种也就不演示了,使用起来都是一样。
add
:存放数据
value
:获取结果
累加器的作用
累加器:分布式只写变量(Executor端的task不能互相访问累加器的值)。
累加器对信息进行聚合。向Spark传递函数时,通常可以使用Driver端定义的变量,但是在Executor端使用此变量时,每个task中使用的都是此变量的副本。如果变量的值发生了变化,Driver端的变量值却不会改变。
我们可以通过累加器实现分片处理,同时更新变量值
原文链接:https://blog.csdn.net/FlatTiger/article/details/115133641
可以不用,但是不能不会。
自定义累加器
自定义累加器步骤
- 定义 1.定义class继承AccumulatorV2 2.重写抽象方法
- 使用 1.初始化累加器对象 2.注册累加器 3.在分区中累加数据 4.获取最终结果
案例:
使用累加器实现WroldCount
功能
- 定义一个class 继承
AccumulatorV2
AccumulatorV2需要我们指定两个类型, INT:表示输入的数据类型 OUT:表示返回结果的数据类型。
abstract class AccumulatorV2[IN, OUT]
不太理解没有关系,我们可以看看longAccumulator
累加器中 IN
和 OUT
指定是什么?
传进去的是一个Long ,返回的也是一个Long;
class LongAccumulator extends AccumulatorV2[jl.Long, jl.Long] {
我们在哪里传入的呢? add 就是传进去的参数(int 可以自动转为long
)
// 循环累加
rdd1.foreach(e=>{
sumAccumulator.add(e)
})
我的思考方式应该是,我们应该给add
传入什么类型的数据,该数据类型不就是IN
吗?
既然是单词出现的个数,能否指定为String
?若只是单纯的指定为String
好像不太好计算。
List("python","java","python","java","spark")
我们可以给每个单词分配一个值 1;
代码语言:javascript复制List(("python",1),("java",1),("python",1),("java",1),("spark",1))
这样IN
的参数类型就明确了,首先是一个元组,元组类型为(String,Int)
那么OUT的类型呢?看下面的代码片段思考出了什么吗?
// 输出结果
println(s"sum=${sumAccumulator.value}")
value
返回是不是最终的结果?WorldCount
程序数据结果是什么?
是否就是这个?
List(("python",2),("java",2),("spark",1))
OUT的类型,我们可以指定成一个List
,里面的元素类型,还是一个元组(String,Int)
还需要重写里面的方法。
代码语言:javascript复制class CustomAccumulator extends AccumulatorV2[(String,Int),List[(String,Int)]]{
/**
* 累加器是否为空
*/
override def isZero: Boolean = ???
/**
* 复制累加器
*/
override def copy(): AccumulatorV2[(String, Int), List[(String, Int)]] = ???
/**
* 重置累加器
*/
override def reset(): Unit = ???
/**
* 累加元素 [在每个task中累加]
*/
override def add(v: (String, Int)): Unit = ???
/**
* 合并每个task的累加结果【在Driver中合并】
*/
override def merge(other: AccumulatorV2[(String, Int), List[(String, Int)]]): Unit = ???
/**
* 获取Driver汇总结果
*/
override def value: List[(String, Int)] = ???
}
先不着急写里面的实现,先调用,这样方便理解。
代码语言:javascript复制 @Test
def demo3(): Unit ={
val conf=new SparkConf().setMaster("local[2]").setAppName("test")
val sc =new SparkContext(conf)
//初始化累加器
val acc = new CustomAccumulator
//注册累加器
sc.register(acc,"CustomAccumulator")
//读取文件
val lines=sc.textFile("file:///C:/Users/123456/Desktop/worldCount.txt",2)
// 列裁剪,数据扁平化
val value: RDD[String] = lines.flatMap(_.split(" "))
// 转换成我们需要的数据结构
val mapList: RDD[(String, Int)] = value.map(e => (e, 1))
// 循环累加
mapList.foreach(e=>{
acc.add(e)
})
// 输出结果
println(s"sum=${acc.value}")
}
worldCount.txt 内容
代码语言:javascript复制hello java shell
python java java
wahaha java shell
hello java shell shell
每一个元素都会交给add
,就先完成add
函数
import scala.collection.mutable
// 定义一个可变map 存储add 传入进来的元素
val result=mutable.Map[String,Int]()
代码语言:javascript复制 /**
* 累加元素 [在每个task中累加]
*/
override def add(v: (String, Int)): Unit = {
// 传入进来的元素存到哪里?可以定义一个可变Map,存储每一个元素
// 根据key找到map中的元素,修改原来的总数
val sum=this.result.getOrElse(v._1,0) v._2
// 覆盖原来的key
this.result.put(v._1,sum)
}
不太理解也没关系,下面有完整的代码。
value
返回的结果不就是result的结果吗?所以直接map
转list
。
/**
* 获取Driver汇总结果
*/
override def value: List[(String, Int)] = this.result.toList
目前完成代码
代码语言:javascript复制class CustomAccumulator extends AccumulatorV2[(String,Int),List[(String,Int)]]{
import scala.collection.mutable
// 定义一个可变map 存储add 传入进来的元素
val result=mutable.Map[String,Int]()
/**
* 累加器是否为空
*/
override def isZero: Boolean = ???
/**
* 复制累加器
*/
override def copy(): AccumulatorV2[(String, Int), List[(String, Int)]] = ???
/**
* 重置累加器
*/
override def reset(): Unit = ???
/**
* 累加元素 [在每个task中累加]
*/
override def add(v: (String, Int)): Unit = {
// 传入进来的元素存到哪里?可以定义一个可变Map,存储每一个元素
// 根据key找到map中的元素,修改原来的总数
val sum=this.result.getOrElse(v._1,0) v._2
// 覆盖原来的key
this.result.put(v._1,sum)
}
/**
* 合并每个task的累加结果【在Driver中合并】
*/
override def merge(other: AccumulatorV2[(String, Int), List[(String, Int)]]): Unit = ???
/**
* 获取Driver汇总结果
*/
override def value: List[(String, Int)] = this.result.toList
}
当前累加器的数据都是在result
中,所以直接判断 result
是否为空即可
/**
* 累加器是否为空
*/
override def isZero: Boolean = result.isEmpty
复制累加器;理解起来有点抽象,new CustomAccumulator
定义在Driver
中,但是整个计算是在每个分区中,所以我们需要创建
一个新的累加器给他(后面会有画图,理解起来就不会那么抽象了)。
/**
* 复制累加器
*/
override def copy(): AccumulatorV2[(String, Int), List[(String, Int)]] = new CustomAccumulator()
重置累加器 : 就是清空数据
代码语言:javascript复制 /**
* 重置累加器
*/
override def reset(): Unit = this.result.clear()
上面说了,计算都在分区中进行的,所以需要对每个分区的数据进行汇总
代码语言:javascript复制 /**
* 合并每个task的累加结果【在Driver中合并】
*/
override def merge(other: AccumulatorV2[(String, Int), List[(String, Int)]]): Unit = {
// 获取其他分区的累加器数据结果
val value: List[(String, Int)] = other.value
//与result数据合并
val list: List[(String, Int)] = result.toList
// 此时 newList 中肯定有重复数据
val newList: List[(String, Int)] =list value
// 分组,聚合
val groupList: Map[String, List[(String, Int)]] = newList.groupBy(e => e._1)
println(groupList)
// e._1 单词
// e._2 依然还是一个列表
// e._2.map(_._2).sum 获取里面的单词数
val newResult: Map[String, Int] =groupList.map(e=>{
val sum = e._2.map(_._2).sum
(e._1,sum)
})
// 合并map
result. =(newResult)
}
完整代码
代码语言:javascript复制class CustomAccumulator extends AccumulatorV2[(String,Int),List[(String,Int)]]{
import scala.collection.mutable
// 定义一个可变map 存储add 传入进来的元素
val result=mutable.Map[String,Int]()
/**
* 累加器是否为空
*/
override def isZero: Boolean = result.isEmpty
/**
* 复制累加器
*/
override def copy(): AccumulatorV2[(String, Int), List[(String, Int)]] =new CustomAccumulator()
/**
* 重置累加器
*/
override def reset(): Unit = this.result.clear()
/**
* 累加元素 [在每个task中累加]
*/
override def add(v: (String, Int)): Unit = {
// 传入进来的元素存到哪里?可以定义一个可变Map,存储每一个元素
// 根据key找到map中的元素,修改原来的总数
val sum=this.result.getOrElse(v._1,0) v._2
// 覆盖原来的key
this.result.put(v._1,sum)
}
/**
* 合并每个task的累加结果【在Driver中合并】
*/
override def merge(other: AccumulatorV2[(String, Int), List[(String, Int)]]): Unit = {
// 获取其他分区的累加器数据结果
val value: List[(String, Int)] = other.value
//与result数据合并
val list: List[(String, Int)] = result.toList
// 此时 newList 中肯定有重复数据
val newList: List[(String, Int)] =list value
// 分组,聚合
val groupList: Map[String, List[(String, Int)]] = newList.groupBy(e => e._1)
println(groupList)
// e._1 单词
// e._2 依然还是一个列表
// e._2.map(_._2).sum 获取里面的单词数
val newResult: Map[String, Int] =groupList.map(e=>{
val sum = e._2.map(_._2).sum
(e._1,sum)
})
// 合并map
result. =(newResult)
}
/**
* 获取Driver汇总结果
*/
override def value: List[(String, Int)] = this.result.toList
}
数据结果
代码语言:javascript复制sum=List((wahaha,1), (java,5), (shell,4), (hello,2), (python,1))
分区二与分区一合并的数据。
代码语言:javascript复制Map(shell -> List((shell,2), (shell,2)), wahaha -> List((wahaha,1)), java -> List((java,1), (java,4)), python -> List((python,1)), hello -> List((hello,1), (hello,1)))