2021年大数据Spark(二十):Spark Core外部数据源引入

2021-10-09 16:59:19 浏览数 (1)


外部数据源

Spark可以从外部存储系统读取数据,比如RDBMs表中或者HBase表中读写数据,这也是企业中常常使用,如:

 1)、要分析的数据存储在HBase表中,需要从其中读取数据数据分析

日志数据:电商网站的商家操作日志

订单数据:保险行业订单数据

 2)、使用Spark进行离线分析以后,往往将报表结果保存到MySQL表中

网站基本分析(pv、uv。。。。。)

注意:实际开发中会封装为工具类直接使用

https://github.com/teeyog/blog/issues/22

https://blog.csdn.net/u011817217/article/details/81667115

MySQL 数据源

     实际开发中常常将分析结果RDD保存至MySQL表中,使用foreachPartition函数;此外Spark中提供JdbcRDD用于从MySQL表中读取数据。

调用RDD#foreachPartition函数将每个分区数据保存至MySQL表中,保存时考虑降低RDD分区数目和批量插入,提升程序性能。

演示代码

代码语言:javascript复制
package cn.itcast.core

import java.sql.{Connection, DriverManager, PreparedStatement, ResultSet}

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.rdd.{JdbcRDD, RDD}

/**
  * Author itcast
  * Desc 演示使用Spark将数据写入到MySQL,再从MySQL读取出来
  */
object SparkJdbcDataSource {
  def main(args: Array[String]): Unit = {
    //1.创建SparkContext
    val sparkConf: SparkConf = new SparkConf()
      .setAppName(this.getClass.getSimpleName.stripSuffix("$"))
      .setMaster("local[*]")
    val sc: SparkContext = new SparkContext(sparkConf)
    sc.setLogLevel("WARN")
    //2.准备数据
    val data: RDD[(String, Int)] = sc.parallelize(List(("jack", 18), ("tom", 19), ("rose", 20)))
    //3.将RDD中的数据保存到MySQL中去
    //将每一个分区中的数据保存到MySQL中去,有几个分区,就会开启关闭连接几次
    //data.foreachPartition(itar=>dataToMySQL(itar))
    data.foreachPartition(dataToMySQL) //方法即函数,函数即对象


    //4.从MySQL读取数据
    /*
    class JdbcRDD[T: ClassTag](
    sc: SparkContext,
    getConnection: () => Connection,
    sql: String,
    lowerBound: Long,
    upperBound: Long,
    numPartitions: Int,
    mapRow: (ResultSet) => T = JdbcRDD.resultSetToObjectArray _)
     */
    val getConnection = ()=> DriverManager.getConnection("jdbc:mysql://localhost:3306/bigdata?characterEncoding=UTF-8","root","root")
    val sql:String = "select id,name,age from t_student where id >= ? and id <= ?"
    val mapRow = (rs:ResultSet) => {
      val id: Int = rs.getInt(1)
      val name: String = rs.getString(2)
      val age: Int = rs.getInt("age")
      (id,name,age)
    }
    val studentRDD: JdbcRDD[(Int, String, Int)] = new JdbcRDD(sc,getConnection,sql,4,5,2,mapRow)
    println(studentRDD.collect().toBuffer)
  }

  /**
    * 将分区中的数据保存到MySQL
    * @param itar 传过来的每个分区有多条数据
    */
  def dataToMySQL(itar: Iterator[(String, Int)]): Unit = {
    //0.加载驱动
    //Class.forName("") //源码中已经加载了
    //1.获取连接
    val connection: Connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/bigdata?characterEncoding=UTF-8","root","root")
    //2.编写sql
    val sql:String = "INSERT INTO `t_student` (`name`, `age`) VALUES (?, ?);"
    //3.获取ps
    val ps: PreparedStatement = connection.prepareStatement(sql)
    itar.foreach(data=>{
      //4.设置参数
      ps.setString(1,data._1)
      ps.setInt(2,data._2)
      //5.执行sql
      ps.addBatch()
    })
    ps.executeBatch()
    ps.close()
    connection.close()
  }
}

​​​​​​​HBase 数据源

Spark可以从HBase表中读写(Read/Write)数据,底层采用TableInputFormatTableOutputFormat方式,与MapReduce与HBase集成完全一样,使用输入格式InputFormat和输出格式OutputFoamt。

​​​​​​​HBase Sink

回顾MapReduce向HBase表中写入数据,使用TableReducer,其中OutputFormat为TableOutputFormat,读取数据Key:ImmutableBytesWritable(Rowkey),Value:Put(Put对象)

写入数据时,需要将RDD转换为RDD[(ImmutableBytesWritable, Put)]类型,调用saveAsNewAPIHadoopFile方法数据保存至HBase表中。

HBase Client连接时,需要设置依赖Zookeeper地址相关信息及表的名称,通过Configuration设置属性值进行传递。

范例演示:将词频统计结果保存HBase表,表的设计

代码如下:

代码语言:javascript复制
package cn.itcast.core

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

/**
 * 将RDD数据保存至HBase表中
 */
object SparkWriteHBase {
  def main(args: Array[String]): Unit = {
    val sparkConf: SparkConf = new SparkConf()
      .setAppName(this.getClass.getSimpleName.stripSuffix("$"))
      .setMaster("local[*]")
    val sc: SparkContext = new SparkContext(sparkConf)
    sc.setLogLevel("WARN")

    // 构建RDD
    val list = List(("hadoop", 234), ("spark", 3454), ("hive", 343434), ("ml", 8765))
    val outputRDD: RDD[(String, Int)] = sc.parallelize(list, numSlices = 2)

    // 将数据写入到HBase表中, 使用saveAsNewAPIHadoopFile函数,要求RDD是(key, Value)
    //  组装RDD[(ImmutableBytesWritable, Put)]
    /**
     * HBase表的设计:
     * 表的名称:htb_wordcount
     * Rowkey:  word
     * 列簇:    info
     * 字段名称: count
     */
    val putsRDD: RDD[(ImmutableBytesWritable, Put)] = outputRDD.mapPartitions { iter =>
      iter.map { case (word, count) =>
        // 创建Put实例对象
        val put = new Put(Bytes.toBytes(word))
        // 添加列
        put.addColumn(
          // 实际项目中使用HBase时,插入数据,先将所有字段的值转为String,再使用Bytes转换为字节数组
          Bytes.toBytes("info"), Bytes.toBytes("cout"), Bytes.toBytes(count.toString)
        )
        // 返回二元组
        (new ImmutableBytesWritable(put.getRow), put)
      }
    }

    // 构建HBase Client配置信息
    val conf: Configuration = HBaseConfiguration.create()
    // 设置连接Zookeeper属性
    conf.set("hbase.zookeeper.quorum", "node1")
    conf.set("hbase.zookeeper.property.clientPort", "2181")
    conf.set("zookeeper.znode.parent", "/hbase")
    // 设置将数据保存的HBase表的名称
    conf.set(TableOutputFormat.OUTPUT_TABLE, "htb_wordcount")
    /*
         def saveAsNewAPIHadoopFile(
             path: String,// 保存的路径
             keyClass: Class[_], // Key类型
             valueClass: Class[_], // Value类型
             outputFormatClass: Class[_ <: NewOutputFormat[_, _]], // 输出格式OutputFormat实现
             conf: Configuration = self.context.hadoopConfiguration // 配置信息
         ): Unit
     */
    putsRDD.saveAsNewAPIHadoopFile(
      "datas/spark/htb-output-"   System.nanoTime(), //
      classOf[ImmutableBytesWritable], //
      classOf[Put], //
      classOf[TableOutputFormat[ImmutableBytesWritable]], //
      conf
    )

    // 应用程序运行结束,关闭资源
    sc.stop()
  }
}

运行完成以后,使用hbase shell查看数据:

​​​​​​​HBase Source

回顾MapReduce从读HBase表中的数据,使用TableMapper,其中InputFormat为TableInputFormat,读取数据Key:ImmutableBytesWritable,Value:Result

   从HBase表读取数据时,同样需要设置依赖Zookeeper地址信息和表的名称,使用Configuration设置属性,形式如下:

     此外,读取的数据封装到RDD中,Key和Value类型分别为:ImmutableBytesWritable和Result,不支持Java Serializable导致处理数据时报序列化异常。设置Spark Application使用Kryo序列化,性能要比Java 序列化要好,创建SparkConf对象设置相关属性,如下所示:

范例演示:从HBase表读取词频统计结果,代码如下

代码语言:javascript复制
package cn.itcast.core

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.{CellUtil, HBaseConfiguration}
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

/**
 * 从HBase 表中读取数据,封装到RDD数据集
 */
object SparkReadHBase {
  def main(args: Array[String]): Unit = {
    val sparkConf: SparkConf = new SparkConf()
      .setAppName(this.getClass.getSimpleName.stripSuffix("$"))
      .setMaster("local[*]")
    val sc: SparkContext = new SparkContext(sparkConf)
    sc.setLogLevel("WARN")

    // 读取HBase Client 配置信息
    val conf: Configuration = HBaseConfiguration.create()
    conf.set("hbase.zookeeper.quorum", "node1")
    conf.set("hbase.zookeeper.property.clientPort", "2181")
    conf.set("zookeeper.znode.parent", "/hbase")

    // 设置读取的表的名称
    conf.set(TableInputFormat.INPUT_TABLE, "htb_wordcount")
    /*
         def newAPIHadoopRDD[K, V, F <: NewInputFormat[K, V]](
             conf: Configuration = hadoopConfiguration,
             fClass: Class[F],
             kClass: Class[K],
             vClass: Class[V]
         ): RDD[(K, V)]
     */
    val resultRDD: RDD[(ImmutableBytesWritable, Result)] = sc.newAPIHadoopRDD(
      conf,
      classOf[TableInputFormat],
      classOf[ImmutableBytesWritable],
      classOf[Result]
    )

    println(s"Count = ${resultRDD.count()}")
    resultRDD
      .take(5)
      .foreach { case (rowKey, result) =>
        println(s"RowKey = ${Bytes.toString(rowKey.get())}")
        // HBase表中的每条数据封装在result对象中,解析获取每列的值
        result.rawCells().foreach { cell =>
          val cf = Bytes.toString(CellUtil.cloneFamily(cell))
          val column = Bytes.toString(CellUtil.cloneQualifier(cell))
          val value = Bytes.toString(CellUtil.cloneValue(cell))
          val version = cell.getTimestamp
          println(s"t $cf:$column = $value, version = $version")
        }
      }

    // 应用程序运行结束,关闭资源
    sc.stop()
  }
}

运行结果:

0 人点赞