外部数据源
Spark可以从外部存储系统读取数据,比如RDBMs表中或者HBase表中读写数据,这也是企业中常常使用,如:
1)、要分析的数据存储在HBase表中,需要从其中读取数据数据分析
日志数据:电商网站的商家操作日志
订单数据:保险行业订单数据
2)、使用Spark进行离线分析以后,往往将报表结果保存到MySQL表中
网站基本分析(pv、uv。。。。。)
注意:实际开发中会封装为工具类直接使用
https://github.com/teeyog/blog/issues/22
https://blog.csdn.net/u011817217/article/details/81667115
MySQL 数据源
实际开发中常常将分析结果RDD保存至MySQL表中,使用foreachPartition函数;此外Spark中提供JdbcRDD用于从MySQL表中读取数据。
调用RDD#foreachPartition函数将每个分区数据保存至MySQL表中,保存时考虑降低RDD分区数目和批量插入,提升程序性能。
演示代码
代码语言:javascript复制package cn.itcast.core
import java.sql.{Connection, DriverManager, PreparedStatement, ResultSet}
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.rdd.{JdbcRDD, RDD}
/**
* Author itcast
* Desc 演示使用Spark将数据写入到MySQL,再从MySQL读取出来
*/
object SparkJdbcDataSource {
def main(args: Array[String]): Unit = {
//1.创建SparkContext
val sparkConf: SparkConf = new SparkConf()
.setAppName(this.getClass.getSimpleName.stripSuffix("$"))
.setMaster("local[*]")
val sc: SparkContext = new SparkContext(sparkConf)
sc.setLogLevel("WARN")
//2.准备数据
val data: RDD[(String, Int)] = sc.parallelize(List(("jack", 18), ("tom", 19), ("rose", 20)))
//3.将RDD中的数据保存到MySQL中去
//将每一个分区中的数据保存到MySQL中去,有几个分区,就会开启关闭连接几次
//data.foreachPartition(itar=>dataToMySQL(itar))
data.foreachPartition(dataToMySQL) //方法即函数,函数即对象
//4.从MySQL读取数据
/*
class JdbcRDD[T: ClassTag](
sc: SparkContext,
getConnection: () => Connection,
sql: String,
lowerBound: Long,
upperBound: Long,
numPartitions: Int,
mapRow: (ResultSet) => T = JdbcRDD.resultSetToObjectArray _)
*/
val getConnection = ()=> DriverManager.getConnection("jdbc:mysql://localhost:3306/bigdata?characterEncoding=UTF-8","root","root")
val sql:String = "select id,name,age from t_student where id >= ? and id <= ?"
val mapRow = (rs:ResultSet) => {
val id: Int = rs.getInt(1)
val name: String = rs.getString(2)
val age: Int = rs.getInt("age")
(id,name,age)
}
val studentRDD: JdbcRDD[(Int, String, Int)] = new JdbcRDD(sc,getConnection,sql,4,5,2,mapRow)
println(studentRDD.collect().toBuffer)
}
/**
* 将分区中的数据保存到MySQL
* @param itar 传过来的每个分区有多条数据
*/
def dataToMySQL(itar: Iterator[(String, Int)]): Unit = {
//0.加载驱动
//Class.forName("") //源码中已经加载了
//1.获取连接
val connection: Connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/bigdata?characterEncoding=UTF-8","root","root")
//2.编写sql
val sql:String = "INSERT INTO `t_student` (`name`, `age`) VALUES (?, ?);"
//3.获取ps
val ps: PreparedStatement = connection.prepareStatement(sql)
itar.foreach(data=>{
//4.设置参数
ps.setString(1,data._1)
ps.setInt(2,data._2)
//5.执行sql
ps.addBatch()
})
ps.executeBatch()
ps.close()
connection.close()
}
}
HBase 数据源
Spark可以从HBase表中读写(Read/Write)数据,底层采用TableInputFormat和TableOutputFormat方式,与MapReduce与HBase集成完全一样,使用输入格式InputFormat和输出格式OutputFoamt。
HBase Sink
回顾MapReduce向HBase表中写入数据,使用TableReducer,其中OutputFormat为TableOutputFormat,读取数据Key:ImmutableBytesWritable(Rowkey),Value:Put(Put对象)。
写入数据时,需要将RDD转换为RDD[(ImmutableBytesWritable, Put)]类型,调用saveAsNewAPIHadoopFile方法数据保存至HBase表中。
HBase Client连接时,需要设置依赖Zookeeper地址相关信息及表的名称,通过Configuration设置属性值进行传递。
范例演示:将词频统计结果保存HBase表,表的设计
代码如下:
代码语言:javascript复制package cn.itcast.core
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
/**
* 将RDD数据保存至HBase表中
*/
object SparkWriteHBase {
def main(args: Array[String]): Unit = {
val sparkConf: SparkConf = new SparkConf()
.setAppName(this.getClass.getSimpleName.stripSuffix("$"))
.setMaster("local[*]")
val sc: SparkContext = new SparkContext(sparkConf)
sc.setLogLevel("WARN")
// 构建RDD
val list = List(("hadoop", 234), ("spark", 3454), ("hive", 343434), ("ml", 8765))
val outputRDD: RDD[(String, Int)] = sc.parallelize(list, numSlices = 2)
// 将数据写入到HBase表中, 使用saveAsNewAPIHadoopFile函数,要求RDD是(key, Value)
// 组装RDD[(ImmutableBytesWritable, Put)]
/**
* HBase表的设计:
* 表的名称:htb_wordcount
* Rowkey: word
* 列簇: info
* 字段名称: count
*/
val putsRDD: RDD[(ImmutableBytesWritable, Put)] = outputRDD.mapPartitions { iter =>
iter.map { case (word, count) =>
// 创建Put实例对象
val put = new Put(Bytes.toBytes(word))
// 添加列
put.addColumn(
// 实际项目中使用HBase时,插入数据,先将所有字段的值转为String,再使用Bytes转换为字节数组
Bytes.toBytes("info"), Bytes.toBytes("cout"), Bytes.toBytes(count.toString)
)
// 返回二元组
(new ImmutableBytesWritable(put.getRow), put)
}
}
// 构建HBase Client配置信息
val conf: Configuration = HBaseConfiguration.create()
// 设置连接Zookeeper属性
conf.set("hbase.zookeeper.quorum", "node1")
conf.set("hbase.zookeeper.property.clientPort", "2181")
conf.set("zookeeper.znode.parent", "/hbase")
// 设置将数据保存的HBase表的名称
conf.set(TableOutputFormat.OUTPUT_TABLE, "htb_wordcount")
/*
def saveAsNewAPIHadoopFile(
path: String,// 保存的路径
keyClass: Class[_], // Key类型
valueClass: Class[_], // Value类型
outputFormatClass: Class[_ <: NewOutputFormat[_, _]], // 输出格式OutputFormat实现
conf: Configuration = self.context.hadoopConfiguration // 配置信息
): Unit
*/
putsRDD.saveAsNewAPIHadoopFile(
"datas/spark/htb-output-" System.nanoTime(), //
classOf[ImmutableBytesWritable], //
classOf[Put], //
classOf[TableOutputFormat[ImmutableBytesWritable]], //
conf
)
// 应用程序运行结束,关闭资源
sc.stop()
}
}
运行完成以后,使用hbase shell查看数据:
HBase Source
回顾MapReduce从读HBase表中的数据,使用TableMapper,其中InputFormat为TableInputFormat,读取数据Key:ImmutableBytesWritable,Value:Result。
从HBase表读取数据时,同样需要设置依赖Zookeeper地址信息和表的名称,使用Configuration设置属性,形式如下:
此外,读取的数据封装到RDD中,Key和Value类型分别为:ImmutableBytesWritable和Result,不支持Java Serializable导致处理数据时报序列化异常。设置Spark Application使用Kryo序列化,性能要比Java 序列化要好,创建SparkConf对象设置相关属性,如下所示:
范例演示:从HBase表读取词频统计结果,代码如下
代码语言:javascript复制package cn.itcast.core
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.{CellUtil, HBaseConfiguration}
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
/**
* 从HBase 表中读取数据,封装到RDD数据集
*/
object SparkReadHBase {
def main(args: Array[String]): Unit = {
val sparkConf: SparkConf = new SparkConf()
.setAppName(this.getClass.getSimpleName.stripSuffix("$"))
.setMaster("local[*]")
val sc: SparkContext = new SparkContext(sparkConf)
sc.setLogLevel("WARN")
// 读取HBase Client 配置信息
val conf: Configuration = HBaseConfiguration.create()
conf.set("hbase.zookeeper.quorum", "node1")
conf.set("hbase.zookeeper.property.clientPort", "2181")
conf.set("zookeeper.znode.parent", "/hbase")
// 设置读取的表的名称
conf.set(TableInputFormat.INPUT_TABLE, "htb_wordcount")
/*
def newAPIHadoopRDD[K, V, F <: NewInputFormat[K, V]](
conf: Configuration = hadoopConfiguration,
fClass: Class[F],
kClass: Class[K],
vClass: Class[V]
): RDD[(K, V)]
*/
val resultRDD: RDD[(ImmutableBytesWritable, Result)] = sc.newAPIHadoopRDD(
conf,
classOf[TableInputFormat],
classOf[ImmutableBytesWritable],
classOf[Result]
)
println(s"Count = ${resultRDD.count()}")
resultRDD
.take(5)
.foreach { case (rowKey, result) =>
println(s"RowKey = ${Bytes.toString(rowKey.get())}")
// HBase表中的每条数据封装在result对象中,解析获取每列的值
result.rawCells().foreach { cell =>
val cf = Bytes.toString(CellUtil.cloneFamily(cell))
val column = Bytes.toString(CellUtil.cloneQualifier(cell))
val value = Bytes.toString(CellUtil.cloneValue(cell))
val version = cell.getTimestamp
println(s"t $cf:$column = $value, version = $version")
}
}
// 应用程序运行结束,关闭资源
sc.stop()
}
}
运行结果: