Spark Day04:Spark Core
02-[了解]-今日课程内容提纲
代码语言:javascript复制主要讲解RDD函数,分为2类:Transformation转换函数和Action触发函数
RDD中函数:
- 函数分类,不同类型函数功能
- 常见函数概述
- 5种类型RDD函数
实际项目中使用最多的,必须要掌握
- RDD 持久化函数
可以将RDD分布式集合数据进行缓存,比如缓存到Executor内存中,再次处理数据时,直接从内存读取
- RDD Checkpoint
将RDD数据保存到可靠文件系统中,比如HDFS
首先创建Maven Module模块,编写好代码模块,讲解某个知识点时,在编写核心代码
03-[掌握]-RDD 函数分类
RDD 的操作主要可以分为
Transformation
和Action
两种。
Transformation
转换,将1个RDD转换为另一个RDDAction
触发,当1个RDD调用函数以后,触发一个Job执行(调用Action函数以后,返回值不是RDD)
官方文档:http://spark.apache.org/docs/latest/rdd-programming-guide.html#rdd-operations
RDD中2种类型操作函数:Transformation(lazy)和Action(eager)函数
- Transformation转换函数
- Action触发函数,触发一个Job执行
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-a1fQcH5e-1638793130130)(/img/image-20210422150349862.png)]
04-[了解]-RDD 中常见函数概述
RDD中包含很多函数,主要可以分为两类:Transformation转换函数和Action函数。
代码语言:javascript复制主要常见使用函数如下,每个函数通过演示范例讲解。
1、分区操作函数
对RDD中每个分区数据进行操作
2、重分区函数
调整RDD中分区数目,要么变大,要么变小
3、聚合函数
对RDD中数据进行聚合统计,比如使用reduce、redueBykey等
4、关联函数
对2个RDD进行JOIN操作,类似SQL中JOIN,分为:等值JOIN、左外连接和右外连接、全外连接fullOuterJoin
RDD函数练习:运行spark-shell命令行,在本地模式运行,执行函数使用
05-[掌握]-RDD 函数之基本函数使用
RDD中map、filter、flatMap及foreach等函数为最基本函数,都是对RDD中
每个元素进行操作,将元素传递到函数中进行转换
。
代码语言:javascript复制编写词频统计WordCount程序,使用基本函数
package cn.itcast.spark.func.basic
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
/**
* 演示RDD中基本函数使用
*/
object _01SparkBasicTest {
def main(args: Array[String]): Unit = {
// 创建SparkContext实例对象,传递SparkConf对象,设置应用配置信息
val sc: SparkContext = {
// a. 创建SparkConf对象
val sparkConf = new SparkConf()
.setAppName(this.getClass.getSimpleName.stripSuffix("$"))
.setMaster("local[2]")
// b. 传递sparkConf对象,构建SparkContext实例
SparkContext.getOrCreate(sparkConf)
}
// step1. 读取数据
val inputRDD: RDD[String] = sc.textFile("datas/wordcount/input.data", minPartitions = 2)
// step2. 处理数据
val resultRDD: RDD[(String, Int)] = inputRDD
// 过滤数据
.filter(line => null != line && line.trim.length > 0)
// 分割单词
.flatMap(line => line.trim.split("\s "))
// 转换为二元组
.map(word => word -> 1)
// 按照单词分组,对组内数据进行聚合求和
.reduceByKey((tmp, item) => tmp item) // TODO: 隐式转换,将RDD对象抓好为PairRDDFunctions对象,调用方法
// step3. 输出数据
resultRDD.foreach(item => println(item))
// 应用结束,关闭资源
sc.stop()
}
}
06-[掌握]-RDD 函数之分区操作函数
每个RDD由多分区组成的,实际开发建议对每个分区数据的进行操作,
map函数使用mapPartitions代替
、foreach函数使用foreachPartition代替
。 前面编写WordCount词频统计代码中,使用map函数和forearch函数,针对RDD中每个元素操作,并不是针对每个分区数据操作的,如果针对分区操作:mapPartitions和foreachPartition
针对分区数据进行操作时,函数的参数类型:迭代器Iterator
,封装分区中所有数据
代码语言:javascript复制针对词频统计WordCount代码进行修改,针对分区数据操作,范例代码如下:
package cn.itcast.spark.func.iter
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
/**
* 分区操作函数:mapPartitions和foreachPartition
*/
object _02SparkIterTest {
def main(args: Array[String]): Unit = {
// 创建SparkContext实例对象,传递SparkConf对象,设置应用配置信息
val sc: SparkContext = {
// a. 创建SparkConf对象
val sparkConf = new SparkConf()
.setAppName(this.getClass.getSimpleName.stripSuffix("$"))
.setMaster("local[2]")
// b. 传递sparkConf对象,构建SparkContext实例
SparkContext.getOrCreate(sparkConf)
}
// step1. 读取数据
val inputRDD: RDD[String] = sc.textFile("datas/wordcount.data", minPartitions = 2)
// step2. 处理数据
val resultRDD: RDD[(String, Int)] = inputRDD
// 过滤数据
.filter(line => line.trim.length != 0 )
// 对每行数据进行单词分割
.flatMap(line => line.trim.split("\s "))
// 转换为二元组
//.map(word => word -> 1)
/*
def mapPartitions[U: ClassTag](
f: Iterator[T] => Iterator[U],
preservesPartitioning: Boolean = false
): RDD[U]
*/
.mapPartitions(iter => iter.map(word => (word, 1)))
// 分组聚合
.reduceByKey((tmp, item) => tmp item)
// step3. 输出数据
//resultRDD.foreach(item => println(item))
/*
def foreachPartition(f: Iterator[T] => Unit): Unit
*/
resultRDD.foreachPartition(iter => iter.foreach(item => println(item)))
// 应用结束,关闭资源
sc.stop()
}
}
为什么要对分区操作,而不是对每个数据操作,好处在哪里呢???
07-[掌握]-RDD 函数之重分区函数
代码语言:javascript复制如何对RDD中分区数目进行调整(增加分区或减少分区),在RDD函数中主要有如下三个函数。
上述2个函数最为关键:
- 增加RDD分区数目:repartition
- 减少RDD分区数目:coalesce,不产生Shuffle
代码语言:javascript复制package cn.itcast.spark.func.iter
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
/**
* 分区操作函数:mapPartitions和foreachPartition
*/
object _02SparkPartitionTest {
def main(args: Array[String]): Unit = {
// 创建SparkContext实例对象,传递SparkConf对象,设置应用配置信息
val sc: SparkContext = {
// a. 创建SparkConf对象
val sparkConf = new SparkConf()
.setAppName(this.getClass.getSimpleName.stripSuffix("$"))
.setMaster("local[2]")
// b. 传递sparkConf对象,构建SparkContext实例
SparkContext.getOrCreate(sparkConf)
}
// step1. 读取数据
val inputRDD: RDD[String] = sc.textFile("datas/wordcount.data", minPartitions = 2)
println(s"raw rdd partitions = ${inputRDD.getNumPartitions}")
// TODO: 增加RDD分区数目
val etlRDD: RDD[String] = inputRDD.repartition(3)
println(s"etl rdd partitions = ${etlRDD.getNumPartitions}")
// step2. 处理数据
val resultRDD: RDD[(String, Int)] = inputRDD
// 过滤数据
.filter(line => line.trim.length != 0 )
// 对每行数据进行单词分割
.flatMap(line => line.trim.split("\s "))
// 转换为二元组
//.map(word => word -> 1)
/*
def mapPartitions[U: ClassTag](
f: Iterator[T] => Iterator[U],
preservesPartitioning: Boolean = false
): RDD[U]
*/
.mapPartitions(iter => iter.map(word => (word, 1)))
// 分组聚合
.reduceByKey((tmp, item) => tmp item)
// step3. 输出数据
//resultRDD.foreach(item => println(item))
/*
def foreachPartition(f: Iterator[T] => Unit): Unit
*/
// TODO: 降低结果RDD分区数目
val outputRDD: RDD[(String, Int)] = resultRDD.coalesce(1)
println(s"output rdd partitions = ${outputRDD.getNumPartitions}")
outputRDD.foreachPartition(iter => iter.foreach(item => println(item)))
// 应用结束,关闭资源
sc.stop()
}
}
在实际开发中,
什么时候适当调整RDD的分区数目呢?让程序性能更好好呢????
08-[掌握]-RDD 函数之RDD 中聚合函数
回顾列表List中reduce聚合函数核心概念:
聚合的时候,往往需要聚合中间临时变量
。查看列表List中聚合函数reduce和fold源码如下:
通过代码,看看列表List中聚合函数使用:
运行截图如下所示:
fold聚合函数,比reduce聚合函数,多提供一个可以初始化聚合中间临时变量的值参数:
聚合操作时,往往聚合过程中需要中间临时变量(到底时几个变量,具体业务而定),如下案例:
在RDD中提供类似列表List中聚合函数reduce和fold
,查看如下:
案例演示:求列表List中元素之和,RDD中分区数目为2,核心业务代码如下:
运行结果解析如下:
查看RDD中高级聚合函数
aggregate
,函数声明如下:
代码语言:javascript复制业务需求:对RDD中数据进行求和sum。
// TODO:aggregate函数,累计求和
/*
def aggregate[U: ClassTag]
(zeroValue: U)
(
seqOp: (U, T) => U,
combOp: (U, U) => U
): U
*/
val aggSum: Int = datasRDD.aggregate(0)(
// seqOp: (U, T) => U 分区内数据聚合
(tmp: Int, item: Int) => {
println(s"seq -> p-${TaskContext.getPartitionId()}: tmp = ${tmp}, item = ${item}, sum = ${tmp item}")
tmp item
},
// combOp: (U, U) => U 分区间数据聚合
(tmp, item) => {
println(s"comb -> p-${TaskContext.getPartitionId()}: tmp = ${tmp}, item = ${item}, sum = ${tmp item}")
tmp item
}
)
println(s"aggSum = ${aggSum}")
09-[掌握]-RDD 函数之PairRDDFunctions 聚合函数
在Spark中有一个object对象
PairRDDFunctions
,主要针对RDD的数据类型是Key/Value对的数据提供函数
,方便数据分析处理。比如使用过的函数:reduceByKey、groupByKey等。
*ByKey函数
:将相同Key的Value进行聚合操作的,省去先分组再聚合。
- 第一类:分组函数
groupByKey
- 第二类:分组聚合函数
reduceByKey和foldByKey
- 第三类:分组聚合函数
aggregateByKey
在企业中如果对数据聚合使用,
不能使用reduceByKey完成时
,考虑使用aggregateByKey
函数,基本上都能完成任意聚合功能。
10-[掌握]-RDD 函数之关联JOIN函数
当两个RDD的数据类型为二元组Key/Value对时,可以依据Key进行关联Join。
RDD中关联JOIN函数都在PairRDDFunctions中,具体截图如下:
具体看一下join(等值连接)函数说明:
范例演示代码:
代码语言:javascript复制package cn.itcast.spark.func.join
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
/**
* RDD中关联函数Join,针对RDD中数据类型为Key/Value对
*/
object _04SparkJoinTest {
def main(args: Array[String]): Unit = {
// 创建SparkContext实例对象,传递SparkConf对象,设置应用配置信息
val sc: SparkContext = {
// a. 创建SparkConf对象
val sparkConf = new SparkConf()
.setAppName(this.getClass.getSimpleName.stripSuffix("$"))
.setMaster("local[2]")
// b. 传递sparkConf对象,构建SparkContext实例
SparkContext.getOrCreate(sparkConf)
}
// 模拟数据集
val empRDD: RDD[(Int, String)] = sc.parallelize(
Seq((1001, "zhangsan"), (1002, "lisi"), (1003, "wangwu"), (1004, "zhaoliu"))
)
val deptRDD: RDD[(Int, String)] = sc.parallelize(
Seq((1001, "sales"), (1002, "tech"))
)
// TODO: 等值连接
// deptno empname deptname
val joinRDD: RDD[(Int, (String, String))] = empRDD.join(deptRDD)
joinRDD.foreach{case (deptno, (empname, deptname)) =>
println(s"deptno = ${deptno}, empname = ${empname}, deptname = ${deptname}")
}
println("======================================================")
// TODO:左外连接
val leftRDD: RDD[(Int, (String, Option[String]))] = empRDD.leftOuterJoin(deptRDD)
leftRDD.foreach{case (deptno, (empname, option)) =>
val deptname: String = option match {
case Some(name) => name
case None => "未知"
}
println(s"deptno = ${deptno}, empname = ${empname}, deptname = ${deptname}")
}
// 应用结束,关闭资源
sc.stop()
}
}
11-[掌握]-RDD 持久化
在实际开发中
某些RDD的计算或转换可能会比较耗费时间,如果这些RDD后续还会频繁的被使用到
,那么可以将这些RDD进行持久化/缓存,这样下次再使用到的时候就不用再重新计算了,提高了程序运行的效率。 将RDD数据进行缓存时,本质上就是将RDD各个分区数据进行缓存
- 缓存函数
可以将RDD数据直接缓存到内存中,函数声明如下:
但是实际项目中,不会直接使用上述的缓存函数,RDD数据量往往很多,内存放不下的。在实际的项目中缓存RDD数据时,往往使用如下函数,依据具体的业务和数据量,指定缓存的级别:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-bxNtlFD7-1638793130145)(/img/image-20210422172215367.png)]
- 缓存级别
在Spark框架中对数据缓存可以指定不同的级别,对于开发来说至关重要,如下所示:
实际项目中缓存数据时,往往选择如下两种级别:
缓存函数与Transformation函数一样,都是Lazy操作,需要Action函数触发,通常使用count函数触发。
- 释放缓存
缓存的RDD数据,不再被使用时,考虑释资源,使用如下函数:
此函数属于eager,立即执行。
- 何时缓存数据
在实际项目开发中,什么时候缓存RDD数据,最好呢???
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-TKk5WJgJ-1638793130147)(img/image-20210422172821282.png)]
12-[了解]-RDD Checkpoint
RDD 数据可以持久化,但是持久化/缓存可以把数据放在内存中,虽然是快速的,但是也是最不可靠的;也可以把数据放在磁盘上,也不是完全可靠的!例如磁盘会损坏等。 Checkpoint的产生就是为了更加可靠的数据持久化,在Checkpoint的时候一般把数据放在在HDFS上,这就天然的借助了HDFS天生的高容错、高可靠来实现数据最大程度上的安全,实现了RDD的容错和高可用。
在Spark Core中对RDD做checkpoint,可以切断做checkpoint RDD的依赖关系,将RDD数据保存到可靠存储(如HDFS)以便数据恢复;
代码语言:javascript复制案例演示代码如下:
package cn.itcast.spark.ckpt
import org.apache.spark.{SparkConf, SparkContext}
/**
* RDD数据Checkpoint设置,案例演示
*/
object _06SparkCkptTest {
def main(args: Array[String]): Unit = {
// 创建应用程序入口SparkContext实例对象
val sc: SparkContext = {
// 1.a 创建SparkConf对象,设置应用的配置信息
val sparkConf: SparkConf = new SparkConf()
.setAppName(this.getClass.getSimpleName.stripSuffix("$"))
.setMaster("local[2]")
// 1.b 传递SparkConf对象,构建Context实例
new SparkContext(sparkConf)
}
// TODO: 设置检查点目录,将RDD数据保存到那个目录
sc.setCheckpointDir("datas/ckpt/")
// 读取文件数据
val datasRDD = sc.textFile("datas/wordcount.data")
// TODO: 调用checkpoint函数,将RDD进行备份,需要RDD中Action函数触发
datasRDD.checkpoint()
datasRDD.count()
// TODO: 再次执行count函数, 此时从checkpoint读取数据
println(datasRDD.count())
// 应用程序运行结束,关闭资源
Thread.sleep(1000000000)
sc.stop()
}
}
面试题:持久化和Checkpoint的区别: