摘要
Spark的数据读取及数据保存可以从两个维度来作区分:文件格式以及文件系统。
文件格式分为:
Text文件
Json文件
- Csv文件
- Sequence文件
- Object文件;
文件系统分为:
- 本地文件系统
-
HDFS
以及数据库。
文件类数据读取与保存
Text文件 基本语法:
- 数据读取:textFile(String)
- 数据保存:saveAsTextFile(String)
案例演示:经典的worldCount
程序,并将程序计算结果写入到本地文件中
@Test
def textTest(): Unit ={
// 创建sc
val conf =new SparkConf().setMaster("local[4]").setAppName("test")
val sc=new SparkContext(conf)
// 读取文件
val rdd1: RDD[String] =sc.textFile(path = "file:///C:/Users/123456/Desktop/worldCount.txt",minPartitions = 4)
// 数据扁平化,
val rdd2: RDD[String] =rdd1.flatMap(e=>e.split(" "))
// 映射
val rdd3: RDD[(String, Int)] =rdd2.map(e=>(e,1))
// 计算单词个数
val rdd4: RDD[(String, Int)] = rdd3.reduceByKey((v1, v2) => v1 v2)
//将数据写入目录中,该目录不能存在
rdd4.saveAsTextFile("file:///C:/Users/123456/Desktop/worldCount_0001")
// 关闭资源;养成良好编码习惯
sc.stop()
}
worldCount_0001
是一个目录,并且不能存在
就像跑了一个MR,将数据按照分区存入不同的目录中。
Sequence文件
SequenceFile文件是Hadoop用来存储二进制形式的key-value
对而设计的一种平面文件(Flat File)。在SparkContext中,可以调用sequenceFile[keyClass
, valueClass
](path)。
案例演示:
保存Sequence
文件
@Test
def sequenceWriteTest(): Unit ={
// 创建sc
val conf =new SparkConf().setMaster("local[4]").setAppName("test")
val sc=new SparkContext(conf)
// 读取文件
val rdd1: RDD[String] =sc.textFile(path = "file:///C:/Users/123456/Desktop/worldCount.txt",minPartitions = 4)
// 数据扁平化,
val rdd2: RDD[String] =rdd1.flatMap(e=>e.split(" "))
// 映射
val rdd3: RDD[(String, Int)] =rdd2.map(e=>(e,1))
// 计算单词个数
val rdd4: RDD[(String, Int)] = rdd3.reduceByKey((v1, v2) => v1 v2)
//将数据写入目录中,该目录不能存在
rdd4.saveAsSequenceFile("file:///C:/Users/123456/Desktop/worldCount_0003")
// 关闭资源;养成良好编码习惯
sc.stop()
}
读取Sequence
文件
@Test
def sequenceReadTest(): Unit ={
// 创建sc
val conf =new SparkConf().setMaster("local[4]").setAppName("test")
val sc=new SparkContext(conf)
// 读取文件
val rdd1: RDD[(String, Int)] =sc.sequenceFile[String,Int](path = "file:///C:/Users/123456/Desktop/worldCount_0003",minPartitions = 4)
//打印
rdd1.foreach(e=>{
println(e)
})
// 关闭资源;养成良好编码习惯
sc.stop()
}
打印结果
代码语言:javascript复制(python,1)
(shell,4)
(wahaha,1)
(java,5)
(hello,2)
注意:
sc.sequenceFile[String,Int]
需要指定返回参数类型 。
Object对象文件
对象文件是将对象序列化后保存的文件,采用Java的序列化机制。可以通过objectFile
[k
,v
](path)函数接收一个路径,读取对象文件,返回对应的RDD,也可以通过调用saveAsObjectFile
()实现对对象文件的输出。因为是序列化所以要指定类型。
案例演示
将数据保存成Object
文件
@Test
def ObjectWriteTest(): Unit ={
// 创建sc
val conf =new SparkConf().setMaster("local[4]").setAppName("test")
val sc=new SparkContext(conf)
// 读取文件
val rdd1: RDD[String] =sc.textFile(path = "file:///C:/Users/123456/Desktop/worldCount.txt",minPartitions = 4)
// 数据扁平化,
val rdd2: RDD[String] =rdd1.flatMap(e=>e.split(" "))
// 映射
val rdd3: RDD[(String, Int)] =rdd2.map(e=>(e,1))
// 计算单词个数
val rdd4: RDD[(String, Int)] = rdd3.reduceByKey((v1, v2) => v1 v2)
//将数据写入目录中,该目录不能存在
rdd4.saveAsObjectFile("file:///C:/Users/123456/Desktop/worldCount_0002")
// 关闭资源;养成良好编码习惯
sc.stop()
}
读取 Object 文件
代码语言:javascript复制 @Test
def ObjectReadTest(): Unit ={
// 创建sc
val conf =new SparkConf().setMaster("local[4]").setAppName("test")
val sc=new SparkContext(conf)
// 读取对象文件
// sc.objectFile[(String,Int)] 需要指定数据类型,写入进去的是一个元组,读取的时候应该也元组的形式返回
val rdd1=sc.objectFile[(String,Int)](path = "file:///C:/Users/123456/Desktop/worldCount_0002",minPartitions = 4)
//打印
rdd1.foreach(e=>{
println(e)
})
// 关闭资源;养成良好编码习惯
sc.stop()
}
结果
代码语言:javascript复制(python,1)
(wahaha,1)
(shell,4)
(hello,2)
(java,5)
注意:
sc.objectFile[(String,Int)]
必须指定数据类型
文件系统类数据读取与保存
Spark的整个生态系统与Hadoop是完全兼容的,所以对于Hadoop所支持的文件类型或者数据库类型,Spark也同样支持。
另外,由于Hadoop的API有新旧两个版本,所以Spark为了能够兼容Hadoop所有的版本,也提供了两套创建操作接口。如TextInputFormat,新旧两个版本所引用分别是org.apache.hadoop.mapred.InputFormat、org.apache.hadoop.mapreduce.InputFormat(NewInputFormat)