一、RDD简介
- RDD是Spark的核心抽象,全称为Resillient Distributed Dataset,即弹性分布式数据集
- RDD在抽象上来说是一种元素集合,包含了数据。它是被分区的,分为多个分区,每个分区分布在集群中的不同节点上(分区即partition),从而让RDD中的数据可以被并行操作。(分布式的特性)
- RDD通常通过Hadoop上的文件,即HDFS文件,来进行创建;有时也可以通过Spark应用程序中的集合来创建。
- RDD最重要的特性就是,提供了容错性,可以自动从节点失败中恢复过来。即如果某个节点上的RDD partition,因为节点故障,导致数据丢了,那么RDD会自动通过自己的数据来源重新计算partition。这一切对使用者是透明的。
- RDD的数据默认的情况下是存放在内存中的,但是在内存资源不足时,Spark会自动将RDD数据写入磁盘。(弹性的特性)
二、创建RDD的三种方式
在RDD中,通常就代表和包含了Spark应用程序的输入源数据。 当我们,在创建了初始的RDD之后,才可以通过Spark Core提供的transformation算子,对该RDD进行transformation(转换)操作,来获取其他的RDD。 Spark Core为我们提供了三种创建RDD的方式,包括:
- 使用程序中的集合创建RDD
- 使用本地文件创建RDD
- 使用HDFS文件创建RDD
2.1 应用场景
- 使用程序中的集合创建RDD,主要用于进行测试,可以在实际部署到集群运行之前,自己使用集合构造测试数据,来测试后面的spark应用的流程
- 使用本地文件创建RDD,主要用于的场景为:在本地临时性地处理一些存储了大量数据的文件
- 使用HDFS文件创建RDD,应该是最常用的生产环境处理方式,主要可以针对HDFS上存储的大数据,进行离线批处理操作
2.2 实际操作
2.2.1 并行化创建RDD
如果要通过并行化集合来创建RDD,需要针对程序中的集合,调用SparkContext中的parallelize()方法。Spark会将集合中的数据拷贝到集群上去,形成一个分布式的数据集合,也就是一个RDD。即:集合中的部分数据会到一个节点上,而另一部分数据会到其它节点上。然后就可以采用并行的方式来操作这个分布式数据集合。
代码语言:javascript复制// 并行化创建RDD部分代码
// 实现1到5的累加求和
val arr = Array(1,2,3,4,5)
val rdd = sc.parallelize(arr)
val sum = rdd.reduce(_ _)
在调用parallelize()方法时,有一个重要的参数可以指定,就是要将集合切分成多少个partition。Spark会为每一个partition运行一个task来进行处理。Spark官方的建议是,为集群中的每个CPU创建2-4个partition。Spark默认会根据集群的情况来设置partition的数量。但是也可以在调用parallelize()方法时,传入第二个参数,来设置RDD的partition数量。比如,parallelize(arr, 10)
2.2.2 使用textFile方法,通过本地文件或HDFS创建RDD
Spark是支持使用任何Hadoop支持的存储系统上的文件创建RDD的,比如说HDFS、Cassandra、HBase以及本地文件。通过调用SparkContext的textFile()方法,可以针对本地文件或HDFS文件创建RDD。
代码语言:javascript复制// 实现文件字数统计
// textFile()方法中,输入本地文件路径或是HDFS路径
// HDFS:hdfs://spark1:9000/data.txt
// local:/home/hadoop/data.txt
val rdd = sc.textFile(“/home/hadoop/data.txt”)
val wordCount = rdd.map(line => line.length).reduce(_ _)
通过本地文件或HDFS创建RDD的几个注意点 1、如果是针对本地文件的话: * 如果是在Windows上进行本地测试,windows上有一份文件即可; * 如果是在Spark集群上针对Linux本地文件,那么需要将文件拷贝到所有worker节点上(就是在spark-submit上使用--master指定了master节点,使用standlone模式进行运行,而textFile()方法内仍然使用的是Linux本地文件,在这种情况下,是需要将文件拷贝到所有worker节点上的); 2、Spark的textFile()方法支持针对目录、压缩文件以及通配符进行RDD创建 3、Spark默认会为hdfs文件的每一个block创建一个partition,但是也可以通过textFile()的第二个参数手动设置分区数量,只能比block数量多,不能比block数量少
2.2.3 Spark支持的其余方法,创建RDD
SparkContext的textFile()除了可以针对上述几种普通的文件创建RDD之外,还有一些特例的方法来创建RDD:
- SparkContext的wholeTextFiles()方法,可以针对一个目录中的大量小文件,返回由(fileName,fileContent)组成的pair,即pairRDD,而不是普通的RDD。该方法返回的是文件名字和文件中的具体内容;而普通的textFile()方法返回的RDD中,每个元素就是文本中一行文本。
- SparkContext的sequenceFile(K,V)方法,可以针对SequenceFile创建RDD,K和V泛型类型就是SequenceFile的key和value的类型。K和V要求必须是Hadoop的序列化机制,比如IntWritable、Text等。
- SparkContext的hadoopRDD()方法,对于Hadoop的自定义输入类型,可以创建RDD。该方法接收JobConf、InputFormatClass、Key和Value的Class。
- SparkContext的objectFile()方法,可以针对之前调用的RDD的saveAsObjectFile()创建的对象序列化的文件,反序列化文件中的数据,并创建一个RDD。
本文转自:https://blog.csdn.net/lemonZhaoTao/article/details/77923337