RDD的创建
官方文档:http://spark.apache.org/docs/latest/rdd-programming-guide.html#resilient-distributed-datasets-rdds
如何将数据封装到RDD集合中,主要有两种方式:并行化本地集合(Driver Program中)和引用加载外部存储系统(如HDFS、Hive、HBase、Kafka、Elasticsearch等)数据集。
并行化集合
由一个已经存在的 Scala 集合创建,集合并行化,集合必须时Seq本身或者子类对象。
演示范例代码,从List列表构建RDD集合:
代码语言:javascript复制package cn.itcast.core
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
/**
* Spark 采用并行化的方式构建Scala集合Seq中的数据为RDD
* - 将Scala集合转换为RDD
* sc.parallelize(seq)
* - 将RDD转换为Scala中集合
* rdd.collect()
* rdd.collectAsMap()
*/
object SparkParallelizeTest {
def main(args: Array[String]): Unit = {
// 创建应用程序入口SparkContext实例对象
val sparkConf: SparkConf = new SparkConf()
.setAppName(this.getClass.getSimpleName.stripSuffix("$"))
.setMaster("local[*]")
val sc: SparkContext = new SparkContext(sparkConf)
sc.setLogLevel("WARN")
// 1、Scala中集合Seq序列存储数据
val linesSeq: Seq[String] = Seq(
"hello me you her",
"hello you her",
"hello her",
"hello"
)
// 2、并行化集合创建RDD数据集
/*
def parallelize[T: ClassTag](
seq: Seq[T],
numSlices: Int = defaultParallelism
): RDD[T]
*/
val inputRDD: RDD[String] = sc.parallelize(linesSeq, numSlices = 2)
//val inputRDD: RDD[String] = sc.makeRDD(linesSeq, numSlices = 2)
// 3、调用集合RDD中函数处理分析数据
val resultRDD: RDD[(String, Int)] = inputRDD
.flatMap(_.split("\s "))
.map((_, 1))
.reduceByKey(_ _)
// 4、保存结果RDD到外部存储系统(HDFS、MySQL、HBase。。。。)
resultRDD.foreach(println)
// 应用程序运行结束,关闭资源
sc.stop()
}
}
外部存储系统
由外部存储系统的数据集创建,包括本地的文件系统,还有所有 Hadoop支持的数据集,比如 HDFS、Cassandra、HBase 等。实际使用最多的方法:textFile,读取HDFS或LocalFS上文本文件,指定文件路径和RDD分区数目。
范例演示:从文件系统读取数据,设置分区数目为2,代码如下。
代码语言:javascript复制package cn.itcast.core
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
/**
* 从HDFS/LocalFS文件系统加载文件数据,封装为RDD集合, 可以设置分区数目
* - 从文件系统加载
* sc.textFile("")
* - 保存文件系统
* rdd.saveAsTextFile("")
*/
object SparkFileSystemTest {
def main(args: Array[String]): Unit = {
// 创建应用程序入口SparkContext实例对象
val sparkConf: SparkConf = new SparkConf()
.setAppName(this.getClass.getSimpleName.stripSuffix("$"))
.setMaster("local[*]")
val sc: SparkContext = new SparkContext(sparkConf)
sc.setLogLevel("WARN")
// 1、从文件系统加载数据,创建RDD数据集
/*
def textFile(
path: String,
minPartitions: Int = defaultMinPartitions
): RDD[String]
*/
val inputRDD: RDD[String] = sc.textFile("data/input/words.txt",2)
println(s"Partitions Number : ${inputRDD.getNumPartitions}")
// 2、调用集合RDD中函数处理分析数据
val resultRDD: RDD[(String, Int)] = inputRDD
.flatMap(_.split("\s "))
.map((_, 1))
.reduceByKey(_ _)
// 3、保存结果RDD到外部存储系统(HDFS、MySQL、HBase。。。。)
resultRDD.foreach(println)
// 应用程序运行结束,关闭资源
sc.stop()
}
}
其中文件路径:可以指定文件名称,可以指定文件目录,可以使用通配符指定。
小文件读取
在实际项目中,有时往往处理的数据文件属于小文件(每个文件数据数据量很小,比如KB,几十MB等),文件数量又很大,如果一个个文件读取为RDD的一个个分区,计算数据时很耗时性能低下,使用SparkContext中提供:wholeTextFiles类,专门读取小文件数据。
范例演示:读取10个小文件数据,每个文件大小小于1MB,设置RDD分区数目为2。
代码语言:javascript复制package cn.itcast.core
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
/**
* 采用SparkContext#wholeTextFiles()方法读取小文件
*/
object SparkWholeTextFileTest {
def main(args: Array[String]): Unit = {
// 创建应用程序入口SparkContext实例对象
val sparkConf: SparkConf = new SparkConf()
.setAppName(this.getClass.getSimpleName.stripSuffix("$"))
.setMaster("local[*]")
val sc: SparkContext = new SparkContext(sparkConf)
sc.setLogLevel("WARN")
// wholeTextFiles()
val filesRDD: RDD[(String, String)] = sc.wholeTextFiles("data/input/ratings10", minPartitions = 2)
filesRDD.map(_._1).foreach(println)
val inputRDD: RDD[String] = filesRDD.flatMap(_._2.split("\n"))
println(s"Partitions Number = ${inputRDD.getNumPartitions}")
println(s"Count = ${inputRDD.count()}")
// 应用程序运行结束,关闭资源
sc.stop()
}
}
实际项目中,可以先使用wholeTextFiles方法读取数据,设置适当RDD分区,再将数据保存到文件系统,以便后续应用读取处理,大大提升性能。