文章大纲
RDD(Resilient Distributed Dataset, 弹性分布式数据集)是 Spark 中相当重要的一个核心抽象概念,要学习 Spark 就必须对 RDD 有一个清晰的认识。
RDD 是 Spark 中对所有数据处理的一种最基本的抽象,它代表一个不可变、可分区、里面的元素可并行计算的集合。
1
RDD 的概述
以下从 RDD 的概念、特点、属性以及核心结构几个方面,了解 RDD 的基本知识内容。
1.1
RDD 的概念
RRD 的全称是 Resilient Distributed Dataset(弹性分布式数据集),从其名字中便可分解出 RDD 的三大概念:
- Resilient :弹性的,包括存储和计算两个方面。存储弹性是指,RDD 中的数据可以保存在内存中,内存放不下时也可以保存在磁盘中;计算弹性是指,RDD 具有自动容错的特点,当运算中出现异常情况导致 Partition 数据丢失或运算失败时,可以根据 Lineage(血统)关系对数据进行重建。
- Distributed :分布式的,也包括存储和计算两个方面。RDD 的数据元素是分布式存储的,同时其运算方式也是分布式的。
- Dataset :数据集,RDD 本质上是一个存放元素的分布式数据集合。
1.2
RDD 的特点
RDD 具有数据流模型的特点:自动容错、位置感知性调度、可伸缩性等。
RDD 允许用户在执行多个查询时,显式地将工作数据集缓存在内存中,后续的查询能够重用该工作数据集,极大地提升了查询的效率。
1.3
RDD 的属性
Spark 源码中,对 RDD 类的介绍注释如下:
代码语言:javascript复制/* Internally, each RDD is characterized by five main properties:
*
* - A list of partitions
* - A function for computing each split
* - A list of dependencies on other RDDs
* - Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
* - Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file)
*/
从注释中拆解出 RDD 的五大属性:
1.3.1. A list of partitions
一组Partition(分区)的列表,即数据集的基本组成单位。
对于 RDD 来说,每个分区都会被一个计算任务处理,这样 Partition 的数量就决定了并行计算的粒度。
用户可以在创建 RDD 时指定 RDD 的 Partition 数量,如果没有指定,那么 Spark 默认的 Partition 数量就是 Applicaton 运行时分配到的 CPU Core 数目。
1.3.2. A function for computing each split
一个计算每个 Partition 的函数。
Spark 中 RDD 的计算是以 Partition 为单位的,每个 RDD 都会实现 compute()
函数以达到这个目的。
compute()
函数会对迭代器进行复合,不需要保存每次计算的结果。
1.3.3. A list of dependencies on other RDDs
RDD 之间的依赖关系。
由于 RDD 是只读的数据集,如果对 RDD 中的数据进行改动,就只能通过 Transformation 操作,由一个或多个 RDD 计算生成一个新的 RDD,所以 RDD 之间就会形成类似 Pipeline(流水线)的前后依赖关系,前面的称为parent RDD(父 RDD),后面的称为child RDD(子 RDD)。
当计算过程中出现异常情况导致部分 Partition 数据丢失时,Spark 可以通过这种依赖关系从父 RDD 中重新计算丢失的分区数据,而不需要对 RDD 中的所有分区全部重新计算,以提高迭代计算性能。
1.3.4. A Partitoner for key-value RDDs
可选,针对 Key-Value 型 RDD 的 Partitioner(分区函数)。
Spark 中实现了两种类型的 Partitioner,可控制 Key 分到哪个 Reducer,一个是基于 Hash 的HashPartitioner,另外一个是基于 Range 的RangePartitioner。
只有对于 Key-Value 型的 RDD,才会有 Partitioner,非 Key-Value 型的 RDD 的 Partitioner 值是 None。
Partitioner 函数不但决定了 RDD 本身的 Partition 数量,也决定了 parent RDD Shuffle 输出时的 Partition 数量。
1.3.5. A list of preferred locations to compute each split on
可选,一个存储每个 Partition 的Preferred Location(优先位置)的列表。
对于每个 HDFS 文件来说,这个列表保存的就是每个 Partition 所在 block 的位置。
按照“移动数据不如移动计算”的理念,Spark 在进行任务调度的时候,会尽可能地优先将计算任务分配到其所要处理的 block 的存储位置。
1.4
RDD 的核心结构
从 RDD 的属性中,可以解读出 Spark RDD 的以下核心结构:
1.4.1. Partition
RDD 内部的数据集在逻辑上和物理上都被划分为了多个 Partitions(分区)。
详细介绍见上面的 1.3.1. 节及《Spark 入门基础知识》中的 4.3.4. 节。
1.4.2. Dependency
RDD 之间会产生 Dependency(依赖关系)。
详细介绍见上面的 1.3.3. 节及《Spark 入门基础知识》中的 4.3.2. 节。
1.4.3. Partitioner
针对 Key-Value 型 RDD 的 Partitioner(分区函数)。
详细介绍见上面的 1.3.4. 节。
1.4.4. Stage
当 Spark 执行作业时,会根据 RDD 之间的宽窄依赖关系,将 DAG 划分成多个相互依赖的 Stage(阶段)。
详细介绍见《Spark 入门基础知识》中的 4.3.3. 节。
1.4.5. Preferred Location
Preferred Location(优先位置)是用于存储每个 Partition 优先位置的列表。
详细介绍见上面的 1.3.5. 节。
1.4.6. CheckPoint
CheckPoint(检查点)是 Spark 提供的一种基于快照的缓存容错机制。
详细介绍见《Spark 入门基础知识》中的 2.3. 节。
2
RDD 的操作
以下从 RDD 的创建、Transformation 及 Action、API 算子几个方面,了解 RDD 的基本操作。
2.1
RDD 创建方式
Spark 提供了多种创建 RDD 的方式。
2.1.1. 通过并行化方式创建
Spark 创建 RDD 最简单的方式就是把已经存在的 Scala 集合传给 SparkContext 的 parallelize()
方法。
不过,这种方式在开发中并不常用,因为使用这种方式,就需要将整个数据集先放到一个节点的内存中。
利用 parallelize()
方法将已经存在的一个 Scala 集合转换为 RDD,Scala 集合中的数据也会被复制到 RDD 中参与并行计算。
val array = Array(1, 2, 3, 4, 5)
val rdd = sc.parallelize(array)
并行化方式创建 RDD
Spark 默认根据运行任务分配到的 CPU Core 数目自动设置 Partition 数量,若在 parallelize()
方法中指定,则使用指定的数量设置。
建议使用默认值,因为 Partition 数量参数设置太小不能很好地利用 CPU,设置太大又会导致任务阻塞等待。
2.1.2. 通过读取外部文件方式生成
在一般开发场景中,Spark 创建 RDD 最常用的方式,是通过 Hadoop 或者其他外部存储系统的数据集来创建,包括本地文件系统、HDFS、Cassandra、HBase 等。
通过 SparkContext 的 textFile()
方法来读取文本文件,创建 RDD :
val file = sc.textFile("/spark/hello.txt")
读取外部文件方式创建 RDD
其中, textFile()
方法的 URL 参数可以是本地文件路径、HDFS 存储路径等,Spark 会读取该路径下所有的文件,并将其作为数据源加载到内存,生成对应的 RDD。
2.1.3. 其他方式
还有其他创建 RDD 的方式,包括:
通过读取数据库(如 MySQL、Hive、MongoDB、ELK 等)的数据集生成 RDD;
通过其他的 RDD 转换生成 RDD 等。
2.2
RDD Transformation 及 Action
2.2.1. RDD Transformation 操作
RDD 的 Transformation(转换)操作,是在现有的 RDD 基础上创建并返回一个新的 RDD 的操作。
Transformation 操作具有Lazy(惰性)特性,即代码不会立即触发执行实际的操作,而是先记录 RDD 之间的转换关系。
只有当程序里触发 Action 操作时,Transformation 操作的代码才会真正地被执行,并返回计算结果。
这种设计可以使得 Spark 的计算运行更具效率。
例如,需要从一个日志文件 hbase-hadoop100.out 的信息中,找出错误的报警信息,则可以使用 Transformation 操作中的 filter()
算子来实现:
val initialRDD = sc.textFile("/opt/logs/hbase-hadoop100.out")
val errorRDD = initialRDD.filter(x => x.contains("ERROR"))
textFile()
方法创建了名为 initialRDD 的 RDD,但此时其仅指向文件位置,并未将日志文件 hbase-hadoop100.out 加载到内存中。
filter()
方法在 initialRDD 的基础上创建了名为 errorRDD 的 Transformation RDD,并使用匿名函数传递筛选条件。此时同样未立即执行文件信息筛选的操作,错误的报警信息未返回。
2.2.2. RDD Action 操作
若需要触发代码的运行,对数据集进行实际的计算操作,并返回结果,那一段 Spark 代码中至少需要有一个 Action 操作。
Action 操作会强制执行那些求值必须用到的 RDD 的 Transformation 操作,并将最终的计算结果返回给 Driver 程序,或写入到外部存储系统中。
接着上面的例子,需要将上一步统计出来的报警信息的内容保存到文件中,则可以使用 Action 操作中的 saveAsTextFile()
算子来实现:
errorRDD.saveAsTextFile("/opt/logs/error-info.log")
其中,saveAsTextFile()
可以触发实际的计算,强制执行前面的 Transformation 操作,将日志文件加载到内存中,然后筛选出文件中的报警信息。
Spark RDD 会将计算划分到不同的 Stage 中,并在不同的节点上进行,每个节点都会运行计算 saveAsTextFile()
的结果,类似 MapReduce 中的 Mapper。
所有的运算操作完成后,会将结果聚合到一起,返回给 Driver 程序,类似 MapReduce 中的 Reducer。
2.2.3. 惰性求值计算机制
Transformation 操作具有 Lazy 特性,是一种惰性求值计算机制。也就是说,调用 Transformation 操作时,Spark 不会立即开始执行真正的计算,而是在内部记录下所要执行的操作的相关信息,待执行 Action 操作时,Spark 才会真正的开始计算。
可见,RDD 不仅可以看作是一个存放分布式数据的数据集,也可以当作是通过 Transformation 操作构建出来的、记录计算指令的列表。
那为什么要这样做?这样设计的优势在哪?
继续以上面的例子来说明。若上面的 Action 操作不是将返回的结果保存到文件中,而是执行 first()
算子,即返回第一个错误的报警信息。
如果不引入惰性计算机制,读取文件时就把数据加载到内存中存储起来,然后生成 errorRDD,马上筛选出错误的报警信息内容,等筛选操作执行完成后,又只要求返回第一个结果。这样做是不是太浪费存储空间?
所以,Spark 实际上是在 Action 操作 first()
算子的时候,才开始真正的运算:只扫描第一个匹配的内容,而不需要读取整个日志文件信息。
惰性求值计算机制避免了对所有的 RDD 操作都进行一遍运算,其可以将很多操作结合在一起,以减少运算的步骤,使 Spark 的计算运行更高效。
2.3
RDD API 算子
2.3.1. Spark 函数的传递
Spark API 是依赖 Driver 程序中的传递函数,在集群上执行 RDD 操作及运算的。
在 Scala 中,函数的创建可以通过匿名函数 Lambda 表达式或自定义 Function 类两种方式实现。Lambda 表达式简单、方便、易用;但在复杂的应用场景中,还是需要用 Function 类来自定义函数功能的。
例如,用 Lambda 表达式的方式,在 Spark 中,对 RDD 的数据进行平方运算,并剔除结果为 0 的数据:
代码语言:javascript复制val list: List[Int] = List(-3, -2, -1, 0, 1, 2, 3)
val initialRDD = sc.parallelize(list)
val squareRDD = initialRDD.map(x => x * x)
val resultRDD = squareRDD.filter(x = > x != 0)
Spark 算子中函数传递过程
map()
算子可以把求平方的 Lambda 函数运用到 initialRDD 的每个元素上,然后把计算返回的结果作为 squareRDD 中对应元素的值。
filter()
算子通过 Lambda 函数,将 squareRDD 中满足筛选条件的数据放入到 resultRDD 中返回。需要注意的是,first()
算子中的 Lambda 函数需要返回一个 Bool 值,若为 True
则保留数据。
当然,这个只是举例说明如何在算子中传递函数,由于没有 Action 操作,惰性机制下,以上运算实际上是暂时不会被执行的。
2.3.2. Transformation 算子
Transformation 算子(方法)主要用于 RDD 之间的转化和数据处理,如过滤、去重、求并集、连接等,常用的 Transformation 算子如下:
RDD Transformation 算子
2.3.3. Action 算子
Action 算子(方法)主要用于对 RDD 的 Transformation 操作结果进行统一的执行处理,如结果收集、数量统计、数据保存等,常用的 Action 算子如下:
RDD Action 算子
一段 Spark 代码中至少要有一个 Action 操作,运算才能执行。
3
RDD 的依赖关系
RDD 的依赖关系在本文 1.3.3. 节及《Spark 入门基础知识》中的 4.3.2. 节中已经进行了详细的讲解。
3.1
RDD 窄依赖与宽依赖关系
RDD 之间的依赖关系又分为Narrow Dependency(窄依赖)和Wide Dependency(宽依赖)。
详细介绍见《Spark 入门基础知识》中的 4.3.2. 节。
在窄依赖中,无论数据规模有多大,child RDD 所依赖的 parent RDD 的 Partition 数量都是确定的。
在宽依赖中,依赖是 Shuffle 级别的,数据规模越大,child RDD 所依赖的 parent RDD 的数量就越多,从而 child RDD 所依赖的 parent RDD 的 Partition 数量也会变得越来越多。
3.2
设计宽窄依赖关系的原因
在窄依赖中 child RDD 的每个 Partition 数据的生成操作都是可以并行执行的,而在宽依赖中需要所有 parent RDD 的 Shuffle 结果完成后才能被执行。
在 Spark 执行作业时,会根据 RDD 之间的宽窄依赖关系,将 DAG 划分成多个相互依赖的 Stage,生成一个完整的最优执行计划,使每个 Stage 内的 RDD 都尽可能在各个节点上并行地被执行。
按 RDD之间的宽窄依赖关系划分 Stage 的思路及过程,详见《Spark 入门基础知识》中的 4.3.3. 节。
版权信息:© Pierre-Yves Babelon / Moment / Getty Images