前言
在spark中,有两种算子,Transformation
转换算子和 Action
行动算子。Transformation
转换算子在整个job任务中,都是一个懒加载,只有真正执行了 Action
行动算子的时候,整个job
任务才会得到正在的运行。
可以把Transformation
转换算子理解成工厂中的流水线, Action
行动算子相当于总闸,只有拉下总闸,整条流水线便开始了运行。
行动算子有哪些?
- reduce() 聚合
- collect() 以数组的形式返回数据集
- count() 返回RDD中元素个数
- first() 返回RDD中的第一个元素
- take() 返回由RDD前n个元素组成的数组
- takeOrdered() 返回该RDD排序后前n个元素组成的数组
- aggregate()
- fold()
- countByKey() 统计每种key的个数
- save相关算子
- saveAsTextFile(path) 保存成Text文件
- saveAsSequenceFile(path) 保存成Sequencefile文件
- saveAsObjectFile(path) 序列化成对象保存到文件
- foreach(f) 遍历RDD中每一个元素
reduce()
根据聚合逻辑聚合数据集中的每个元素。(reduce里面需要具体的逻辑,根据里面的逻辑对相同分区的数据进行计算)
案例演示:计算集合中的总数
代码语言:javascript复制 @Test
def reduce(): Unit ={
// 创建 sc
val conf =new SparkConf().setMaster("local[4]").setAppName("test")
val sc=new SparkContext(conf)
// 创建一个集合
val list=List(1,2,3,4,5,6,7,8)
val rdd1: RDD[Int] = sc.parallelize(list, 4)
// 统计集合中的总数
val value: Int = rdd1.reduce((v1, v2) => v1 v2)
println(value)
// 关闭资源
sc.stop()
}
结果
代码语言:javascript复制36
collect()
将计算结果回收到Driver端。当数据量很大时就不要回收了,会造成oom。 一般在使用过滤算子或者一些能返回少量数据集的算子后
案例演示:编写worldCount程序,使用collect
收集结果
@Test
def collect(): Unit ={
// 创建sc
val conf =new SparkConf().setMaster("local[4]").setAppName("test")
val sc=new SparkContext(conf)
// 读取文件
val rdd1: RDD[String] =sc.textFile(path = "file:///C:/Users/123456/Desktop/worldCount.txt",minPartitions = 4)
// 数据扁平化,
val rdd2: RDD[String] =rdd1.flatMap(e=>e.split(" "))
// 映射
val rdd3: RDD[(String, Int)] =rdd2.map(e=>(e,1))
// 计算单词个数
val rdd4: RDD[(String, Int)] = rdd3.reduceByKey((v1, v2) => v1 v2)
//获取数据结果
val result: Array[(String, Int)] = rdd4.collect()
// 输出结果
result.foreach(e=>{
e match {
case (k,v)=>println(k,v)
}
})
// 关闭资源;养成良好编码习惯
sc.stop()
}
结果
代码语言:javascript复制(shell,4)
(wahaha,1)
(hello,2)
(python,1)
(java,5)
count()
返回数据集中的元素数。会在结果计算完成后回收到Driver端。返回行数
案例演示:编写worldCount程序,使用count
统计数据总数
@Test
def count(): Unit ={
// 创建sc
val conf =new SparkConf().setMaster("local[4]").setAppName("test")
val sc=new SparkContext(conf)
// 读取文件
val rdd1: RDD[String] =sc.textFile(path = "file:///C:/Users/123456/Desktop/worldCount.txt",minPartitions = 4)
// 数据扁平化,
val rdd2: RDD[String] =rdd1.flatMap(e=>e.split(" "))
// 映射
val rdd3: RDD[(String, Int)] =rdd2.map(e=>(e,1))
// 计算单词个数
val rdd4: RDD[(String, Int)] = rdd3.reduceByKey((v1, v2) => v1 v2)
//获取数据结果
val count: Long = rdd4.count()
println(count)
// 关闭资源;养成良好编码习惯
sc.stop()
}
结果
代码语言:javascript复制5
first()
代码语言:javascript复制first=take(1) 返回数据集中的第一个元素。
val stuList=List(("张三","男","16",97),("李四","男","16",89),("春娇","女","15",99),("尼古拉斯","男","18",100),("王富贵","男","17",70))
案例演示:返回成绩第一名的学生信息
代码语言:javascript复制 @Test
def first(): Unit ={
// 创建sc
val conf =new SparkConf().setMaster("local[4]").setAppName("test")
val sc=new SparkContext(conf)
// 学生信息
val stuList=List(("张三","男","16",97),("李四","男","16",89),("春娇","女","15",99),("尼古拉斯","男","18",100),("王富贵","男","17",70))
val rdd1: RDD[(String, String, String, Int)] = sc.parallelize(stuList, 4)
// 排名 按照成绩排名
val sortList: RDD[(String, String, String, Int)] = rdd1.sortBy(_._4,ascending = false)
// 返回第一名学生信息
val stuInfo: (String, String, String, Int) = sortList.first()
println(stuInfo)
// 关闭资源;养成良好编码习惯
sc.stop()
}
结果
代码语言:javascript复制(尼古拉斯,男,18,100)
take()
返回一个包含数据集前n个元素的集合。是一个(array)有几个partiotion 会有几个job触发
案例演示:返回前三名的学生信息
代码语言:javascript复制 @Test
def take(): Unit ={
// 创建sc
val conf =new SparkConf().setMaster("local[4]").setAppName("test")
val sc=new SparkContext(conf)
// 学生信息
val stuList=List(("张三","男","16",97),("李四","男","16",89),("春娇","女","15",99),("尼古拉斯","男","18",100),("王富贵","男","17",70))
val rdd1: RDD[(String, String, String, Int)] = sc.parallelize(stuList, 4)
// 排名 按照成绩排名
val sortList: RDD[(String, String, String, Int)] = rdd1.sortBy(_._4,ascending = false)
// 返回前三名学生信息
val top3: Array[(String, String, String, Int)] = sortList.take(3)
// 遍历打印
for(e <- top3 ){
e match {
case (name,sex,age,score)=>println(name,sex,age,score)
}
}
// 关闭资源;养成良好编码习惯
sc.stop()
}
结果
代码语言:javascript复制(尼古拉斯,男,18,100)
(春娇,女,15,99)
(张三,男,16,97)
takeOrdered()
作用和 take 类似,takeOrdered取数据前会对数据进行排序,默认按照降序
案例演示:返回前三名的学生信息
代码语言:javascript复制 @Test
def takeOrdered(): Unit ={
// 创建sc
val conf =new SparkConf().setMaster("local[4]").setAppName("test")
val sc=new SparkContext(conf)
// 学生信息
val stuList=List(("张三","男","16",97),("李四","男","16",89),("春娇","女","15",99),("尼古拉斯","男","18",100),("王富贵","男","17",70))
val rdd1: RDD[(String, String, String, Int)] = sc.parallelize(stuList, 4)
// 排名 按照成绩排名
//val sortList: RDD[(String, String, String, Int)] = rdd1.sortBy(_._4,ascending = false)
// 返回排完序后的前三条数据信息
val top3: Array[(String, String, String, Int)] = rdd1.takeOrdered(3)
// 遍历打印
for(e <- top3 ){
e match {
case (name,sex,age,score)=>println(name,sex,age,score)
}
}
// 关闭资源;养成良好编码习惯
sc.stop()
}
结果
代码语言:javascript复制(尼古拉斯,男,18,100)
(张三,男,16,97)
(春娇,女,15,99)
takeOrdered 源码
代码语言:javascript复制def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] = withScope {
if (num == 0) {
Array.empty
} else {
val mapRDDs = mapPartitions { items =>
// Priority keeps the largest elements, so let's reverse the ordering.
val queue = new BoundedPriorityQueue[T](num)(ord.reverse)
queue = collectionUtils.takeOrdered(items, num)(ord)
Iterator.single(queue)
}
if (mapRDDs.partitions.length == 0) {
Array.empty
} else {
mapRDDs.reduce { (queue1, queue2) =>
queue1 = queue2
queue1
}.toArray.sorted(ord)
}
}
}
指定排序规则
代码语言:javascript复制 @Test
def takeOrdered(): Unit ={
// 创建sc
val conf =new SparkConf().setMaster("local[4]").setAppName("test")
val sc=new SparkContext(conf)
// 学生信息
val stuList=List(("张三","男",16,97),("李四","男",16,89),("春娇","女",15,99),("尼古拉斯","男",18,100),("王富贵","男",17,70))
val rdd1: RDD[(String, String, Int, Int)] = sc.parallelize(stuList, 4)
// 排名 按照成绩排名
//val sortList: RDD[(String, String, Int, Int)] = rdd1.sortBy(_._4,ascending = false)
// 按照年龄排名
val top3: Array[(String, String, Int, Int)] = rdd1.takeOrdered(3)(Ordering.by(_._3))
// 遍历打印
for(e <- top3 ){
e match {
case (name,sex,age,score)=>println(name,sex,age,score)
}
}
// 关闭资源;养成良好编码习惯
sc.stop()
}
代码语言:javascript复制(春娇,女,15,99)
(李四,男,16,89)
(张三,男,16,97)
aggregate()
aggregate 源码
代码语言:javascript复制def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U = withScope {
// Clone the zero value since we will also be serializing it as part of tasks
var jobResult = Utils.clone(zeroValue, sc.env.serializer.newInstance())
val cleanSeqOp = sc.clean(seqOp)
val cleanCombOp = sc.clean(combOp)
val aggregatePartition = (it: Iterator[T]) => it.aggregate(zeroValue)(cleanSeqOp, cleanCombOp)
val mergeResult = (_: Int, taskResult: U) => jobResult = combOp(jobResult, taskResult)
sc.runJob(this, aggregatePartition, mergeResult)
jobResult
}
(zeroValue: U):表示设置一个初始值 (U, T):在分区内计算的逻辑;U表示初始值,T表示待处理的元素 (U, U) :在Driver内计算的逻辑;第一个U表示初始值,第二个U表示分区内的元素结果
案例说明:
代码语言:javascript复制 @Test
def aggregate(): Unit ={
// 创建sc
val conf =new SparkConf().setMaster("local[4]").setAppName("test")
val sc=new SparkContext(conf)
// 六个元素
val list=List(1,5,7,8,9,4)
// 为了方便计算,分为两个区
val rdd1: RDD[Int] = sc.parallelize(list, 2)
// 计算
val result=rdd1.aggregate(100)((u,t)=>u-t,(u1,u2)=>u1 u2)
// 结果应该是多少
println(result)
// 关闭资源;养成良好编码习惯
sc.stop()
}
答案
代码语言:javascript复制266
266是怎么来的呢? 首先有两个元素
代码语言:javascript复制val list=List(1,5,7,8,9,4)
两个分区
代码语言:javascript复制val rdd1: RDD[Int] = sc.parallelize(list, 2)
假设 分区0的元素为
代码语言:javascript复制1,5,7
分区1的元素为
代码语言:javascript复制8,9,4
(U, T):是用于计算分区分区内的。 分区0的计算逻辑
代码语言:javascript复制第一次 U=100(初始值)T=1(待处理的元素)进行运算 100-1=99
第二次 U=99(上一次的运算结果)T=5(待处理的元素)进行运算 99-5=94
第二次 U=94(上一次的运算结果)T=7(待处理的元素)进行运算 94-7=87
分区0的最终结果为87
分区1的计算逻辑
代码语言:javascript复制第一次 U=100(初始值)T=8(待处理的元素)进行运算 100-8=92
第二次 U=92(上一次的运算结果)T=9(待处理的元素)进行运算 92-9=83
第二次 U=83(上一次的运算结果)T=4(待处理的元素)进行运算 93-4=79
分区1的最终结果为79
(U, U) :在Driver内计算的逻辑;这里开始对各个分区的结果进行汇总
代码语言:javascript复制第一次 U1=100(初始值)u2=87(待处理的分区元素)进行运算 100 87=187
第er次 U1=187(上一次的运算结果)u2=79(待处理的分区元素)进行运算 187 79=266
最终结果就是266
fold()
fold 源码
代码语言:javascript复制def fold(zeroValue: T)(op: (T, T) => T): T = withScope {
// Clone the zero value since we will also be serializing it as part of tasks
var jobResult = Utils.clone(zeroValue, sc.env.closureSerializer.newInstance())
val cleanOp = sc.clean(op)
val foldPartition = (iter: Iterator[T]) => iter.fold(zeroValue)(cleanOp)
val mergeResult = (_: Int, taskResult: T) => jobResult = op(jobResult, taskResult)
sc.runJob(this, foldPartition, mergeResult)
jobResult
}
(zeroValue: T):表示设置一个初始值 (op: (T, T) => T):在Driver内计算的逻辑;第一个T表示初始值,第二个T表示待处理的元素
代码语言:javascript复制 @Test
def fold(): Unit ={
// 创建sc
val conf =new SparkConf().setMaster("local[4]").setAppName("test")
val sc=new SparkContext(conf)
val list=List(1,5,7,8,9,4)
val rdd1: RDD[Int] = sc.parallelize(list, 2)
// 计算
val result=rdd1.fold(100)((t1,t2)=>t1 t2)
// 结果
println(result)
// 关闭资源;养成良好编码习惯
sc.stop()
}
我最开始的以为过程是这样的。
代码语言:javascript复制第一次 T1=100(初始值)T2=1(待处理的元素)进行运算 100 1=101
第二次 T1=101(初始值)T2=5(待处理的分区元素)进行运算 101 5=106
第三次 T1=106(初始值)T2=7(待处理的分区元素)进行运算 106 7=113
第四次 T1=113(初始值)T2=8(待处理的分区元素)进行运算 113 8=121
第五次 T1=121(初始值)T2=9(待处理的分区元素)进行运算 121 9=130
第六次 T1=130(初始值)T2=4(待处理的分区元素)进行运算 130 4=134
最终结果为:134 ;但是这个答案是错误的(哭脸)。
其实fold
运算的逻辑就是aggregate
的运算逻辑
val result=rdd1.fold(100)((t1,t2)=>t1 t2)
这两句代码是等价的。
代码语言:javascript复制val result=rdd1.aggregate(100)((u,t)=>u t,(u1,u2)=>u1 u2)
fold
与aggregate
区别在于
aggregate
分区的逻辑 可以和Driver
内的运算逻辑不一致。
fold
分区的逻辑 与Driver
内的运算逻辑是一致的。
运算流程就复制上面的改改把,原理都是一样的。
假设 分区0的元素为
1,5,7
分区1的元素为
代码语言:javascript复制8,9,4
(T, T):是用于计算分区分区内的。 分区0的计算逻辑
代码语言:javascript复制第一次 T1=100(初始值)T2=1(待处理的元素)进行运算 100 1=101
第二次 T1=101(上一次的运算结果)T2=5(待处理的元素)进行运算 101 5=106
第二次 T1=106(上一次的运算结果)T2=7(待处理的元素)进行运算 106 7=113
分区0的最终结果为113
分区1的计算逻辑
代码语言:javascript复制第一次 T1=100(初始值)T2=8(待处理的元素)进行运算 100 8=108
第二次 T1=108(上一次的运算结果)T2=9(待处理的元素)进行运算 108 9=117
第二次 T1=117(上一次的运算结果)T2=4(待处理的元素)进行运算 117 4=121
分区1的最终结果为121
(T, T) :在Driver内计算的逻辑;这里开始对各个分区的结果进行汇总
代码语言:javascript复制第一次 T1=100(初始值)T2=108(待处理的分区元素)进行运算 100 113=213
第er次 T1=213(上一次的运算结果)T2=121(待处理的分区元素)进行运算 213 121=334
最终结果就是334
countByKey()
作用到K,V格式的RDD上,根据Key计数相同Key的数据集元素。(也就是个数)
上面使用collect
做一个WroldCount程序,这里使用countByKey
的方式实现(主要是懒,写得太累了)
@Test
def countByKey(): Unit ={
// 创建sc
val conf =new SparkConf().setMaster("local[4]").setAppName("test")
val sc=new SparkContext(conf)
// 读取文件
val rdd1: RDD[String] =sc.textFile(path = "file:///C:/Users/123456/Desktop/worldCount.txt",minPartitions = 4)
// 数据扁平化,
val rdd2: RDD[String] =rdd1.flatMap(e=>e.split(" "))
// 映射
val rdd3: RDD[(String, Int)] =rdd2.map(e=>(e,1))
//获取数据结果
val result: collection.Map[String, Long] = rdd3.countByKey()
// 输出结果
result.foreach(e=>{
e match {
case (k,v)=>println(k,v)
}
})
// 关闭资源;养成良好编码习惯
sc.stop()
}
结果
代码语言:javascript复制(shell,4)
(wahaha,1)
(java,5)
(python,1)
(hello,2)
save相关算子
可以看看我之前写的数据读取与保存,里面有简书sava相关算子
的操作或用法。
foreach(f)
循环遍历数据集中的每个元素,运行相应的逻辑。
案例演示:
代码语言:javascript复制 @Test
def foreach(): Unit ={
// 创建sc
val conf =new SparkConf().setMaster("local[4]").setAppName("test")
val sc=new SparkContext(conf)
// 学生信息
val stuList=List(("张三","男",16,97),("李四","男",16,89),("春娇","女",15,99),("尼古拉斯","男",18,100),("王富贵","男",17,70))
val rdd1: RDD[(String, String, Int, Int)] = sc.parallelize(stuList, 4)
//遍历
rdd1.foreach(stu=>{
stu match {
case (name,sex,age,score)=>println(name,sex,age,score)
}
})
// 关闭资源;养成良好编码习惯
sc.stop()
}
结果
代码语言:javascript复制(李四,男,16,89)
(春娇,女,15,99)
(尼古拉斯,男,18,100)
(张三,男,16,97)
(王富贵,男,17,70)