数据读取与保存

2022-05-09 15:41:00 浏览数 (1)

摘要

Spark的数据读取及数据保存可以从两个维度来作区分:文件格式以及文件系统。

文件格式分为:

  • Text文件
  • Json文件
  • Csv文件
  • Sequence文件
  • Object文件;

文件系统分为:

  • 本地文件系统
  • HDFS以及数据库。

文件类数据读取与保存

Text文件 基本语法:

  • 数据读取:textFile(String)
  • 数据保存:saveAsTextFile(String)

案例演示:经典的worldCount程序,并将程序计算结果写入到本地文件中

代码语言:javascript复制
  @Test
  def textTest(): Unit ={
    // 创建sc
    val conf =new SparkConf().setMaster("local[4]").setAppName("test")
    val sc=new SparkContext(conf)

    // 读取文件
    val rdd1: RDD[String] =sc.textFile(path = "file:///C:/Users/123456/Desktop/worldCount.txt",minPartitions = 4)

    // 数据扁平化,
    val rdd2: RDD[String] =rdd1.flatMap(e=>e.split(" "))

    // 映射
    val rdd3: RDD[(String, Int)] =rdd2.map(e=>(e,1))

    // 计算单词个数
    val rdd4: RDD[(String, Int)] = rdd3.reduceByKey((v1, v2) => v1   v2)


    //将数据写入目录中,该目录不能存在
    rdd4.saveAsTextFile("file:///C:/Users/123456/Desktop/worldCount_0001")

    // 关闭资源;养成良好编码习惯
    sc.stop()

  }

worldCount_0001 是一个目录,并且不能存在

程序结果程序结果

就像跑了一个MR,将数据按照分区存入不同的目录中。

Sequence文件 SequenceFile文件是Hadoop用来存储二进制形式的key-value对而设计的一种平面文件(Flat File)。在SparkContext中,可以调用sequenceFile[keyClass, valueClass](path)。

案例演示: 保存Sequence文件

代码语言:javascript复制
 @Test
  def sequenceWriteTest(): Unit ={
    // 创建sc
    val conf =new SparkConf().setMaster("local[4]").setAppName("test")
    val sc=new SparkContext(conf)

    // 读取文件
    val rdd1: RDD[String] =sc.textFile(path = "file:///C:/Users/123456/Desktop/worldCount.txt",minPartitions = 4)

    // 数据扁平化,
    val rdd2: RDD[String] =rdd1.flatMap(e=>e.split(" "))

    // 映射
    val rdd3: RDD[(String, Int)] =rdd2.map(e=>(e,1))

    // 计算单词个数
    val rdd4: RDD[(String, Int)] = rdd3.reduceByKey((v1, v2) => v1   v2)


    //将数据写入目录中,该目录不能存在
    rdd4.saveAsSequenceFile("file:///C:/Users/123456/Desktop/worldCount_0003")

    // 关闭资源;养成良好编码习惯
    sc.stop()

  }

读取Sequence文件

代码语言:javascript复制
  @Test
  def sequenceReadTest(): Unit ={
    // 创建sc
    val conf =new SparkConf().setMaster("local[4]").setAppName("test")
    val sc=new SparkContext(conf)

    // 读取文件
    val rdd1: RDD[(String, Int)] =sc.sequenceFile[String,Int](path = "file:///C:/Users/123456/Desktop/worldCount_0003",minPartitions = 4)
    //打印
    rdd1.foreach(e=>{
      println(e)
    })

    // 关闭资源;养成良好编码习惯
    sc.stop()
  }

打印结果

代码语言:javascript复制
(python,1)
(shell,4)
(wahaha,1)
(java,5)
(hello,2)

注意: sc.sequenceFile[String,Int] 需要指定返回参数类型 。

Object对象文件 对象文件是将对象序列化后保存的文件,采用Java的序列化机制。可以通过objectFile[k,v](path)函数接收一个路径,读取对象文件,返回对应的RDD,也可以通过调用saveAsObjectFile()实现对对象文件的输出。因为是序列化所以要指定类型。

案例演示 将数据保存成Object文件

代码语言:javascript复制
  @Test
  def ObjectWriteTest(): Unit ={
    // 创建sc
    val conf =new SparkConf().setMaster("local[4]").setAppName("test")
    val sc=new SparkContext(conf)

    // 读取文件
    val rdd1: RDD[String] =sc.textFile(path = "file:///C:/Users/123456/Desktop/worldCount.txt",minPartitions = 4)

    // 数据扁平化,
    val rdd2: RDD[String] =rdd1.flatMap(e=>e.split(" "))

    // 映射
    val rdd3: RDD[(String, Int)] =rdd2.map(e=>(e,1))

    // 计算单词个数
    val rdd4: RDD[(String, Int)] = rdd3.reduceByKey((v1, v2) => v1   v2)


    //将数据写入目录中,该目录不能存在
    rdd4.saveAsObjectFile("file:///C:/Users/123456/Desktop/worldCount_0002")

    // 关闭资源;养成良好编码习惯
    sc.stop()

  }

读取 Object 文件

代码语言:javascript复制
  @Test
  def ObjectReadTest(): Unit ={
    // 创建sc
    val conf =new SparkConf().setMaster("local[4]").setAppName("test")
    val sc=new SparkContext(conf)

    // 读取对象文件
    // sc.objectFile[(String,Int)] 需要指定数据类型,写入进去的是一个元组,读取的时候应该也元组的形式返回
    val rdd1=sc.objectFile[(String,Int)](path = "file:///C:/Users/123456/Desktop/worldCount_0002",minPartitions = 4)

    //打印
    rdd1.foreach(e=>{
      println(e)
    })

    // 关闭资源;养成良好编码习惯
    sc.stop()
  }

结果

代码语言:javascript复制
(python,1)
(wahaha,1)
(shell,4)
(hello,2)
(java,5)

注意: sc.objectFile[(String,Int)] 必须指定数据类型

文件系统类数据读取与保存 Spark的整个生态系统与Hadoop是完全兼容的,所以对于Hadoop所支持的文件类型或者数据库类型,Spark也同样支持。另外,由于Hadoop的API有新旧两个版本,所以Spark为了能够兼容Hadoop所有的版本,也提供了两套创建操作接口。如TextInputFormat,新旧两个版本所引用分别是org.apache.hadoop.mapred.InputFormat、org.apache.hadoop.mapreduce.InputFormat(NewInputFormat)

0 人点赞