1.产生实时流
代码语言:javascript复制nc -lk 1234
hello hadoop word
hello spark hbase
hive hello china
2.MyNetworkWordCount.scala
代码语言:javascript复制package day10
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
object MyNetworkWordCount {
Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
def main(args: Array[String]): Unit = {
//创建StreamingContext对象
val sparkConf = new SparkConf().setAppName("MyNetworkWordCount").setMaster("local[2]")
//定义一个采样时间 每隔2秒钟采集一次数据
val ssc = new StreamingContext(sparkConf,Seconds(2))
//创建一个离散流 DStream代表输入的数据流
val lines = ssc.socketTextStream("hadoop01",1234)
//处理数据
val words = lines.flatMap(_.split(" "))
val result = words.map(x => (x,1)).reduceByKey(_ _)
//输出结果
result.print()
ssc.start()
ssc.awaitTermination()
}
}
2.MyNetworkWordCount_total.scala(设置检查点)
代码语言:javascript复制package day10
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
object MyNetworkWordCount_total {
def main(args: Array[String]): Unit = {
//取消日志
Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
//设置环境变量
val conf = new SparkConf().setAppName("StreamWordCountTotal").setMaster("local[2]")
//创建StreamingContext
val ssc = new StreamingContext(conf,Seconds(3))
//设置标记点
ssc.checkpoint("./cpt")
//创建离散流 DStream代表输入的离散流
val lines = ssc.socketTextStream("hadoop01",1234)
//处理分词
val words = lines.flatMap(_.split(" "))
val pairs = words.map(word => (word,1))
//构建函数
val updateStateFunc = (currValues:Seq[Int],preValues:Option[Int])=>{
val curr = currValues.sum
val pre = preValues.getOrElse(0)
Some(curr pre)
}
//累加
val totalvalue = pairs.updateStateByKey(updateStateFunc)
totalvalue.print()
ssc.start()
ssc.awaitTermination()
}
}
3.MyNetworkTotalWordCountV2.scala(开发自己的实时词频统计程序(累计单词出现次数))
代码语言:javascript复制package cn.edu360.spark.day10
import javax.net.ssl.SSLSessionContext
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* 开发自己的实时词频统计程序(累计单词出现次数)
*
*/
object MyNetworkTotalWordCountV2 {
val ckp = "./ckp"
/**
* 该函数会作用在相同的key的value上
* @param newValues
* @param runningCount
* @return
*/
def updataFunction(newValues:Seq[Int], runningCount:Option[Int]):Option[Int] = {
//得到当前的总和
val currentTotal = newValues.sum
//执行累加操作:如果是第一次执行(单词第一次出现,则没有之前的值)
val totalValues = runningCount.getOrElse(0)
Some(currentTotal totalValues)
}
//如果从checkpoint目录中恢复不了上一个job的scc实例,则创建一个新的ssc
def funcToCreateContext (): StreamingContext={
println("create new ssc ..........")
//创建StreamingContext对象
val sparkConf = new SparkConf().setAppName("MyNetworkWordCount").setMaster("local[2]")
//定义一个采样时间,每隔2秒钟采集一次数据,这个时间不能随意设置
val ssc = new StreamingContext(sparkConf, Seconds(2))
//设置检查点目录
ssc.checkpoint(ckp)
//创建一个离散流,DStream代表输入的数据流
val lines: ReceiverInputDStream[String] = ssc.socketTextStream("bigdata01", 5678)
//设置checkpoint,默认每10秒做一次checkpoint
lines.checkpoint(Seconds(6))
//处理分词,每个单词记一次数
val words = lines.flatMap(_.split(" "))
val pairs = words.map(x=>(x, 1))
//累加
val totalResult = pairs.updateStateByKey(updataFunction _)
totalResult.print()
ssc
}
def main(args: Array[String]): Unit = {
Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
//定义一个采样时间,每隔2秒钟采集一次数据,这个时间不能随意设置
val ssc = StreamingContext.getOrCreate(ckp, funcToCreateContext _)
//开始计算
ssc.start()
//等待计算被中断
ssc.awaitTermination()
}
}
4.onlinehotitems.scala(窗口函数)
代码语言:javascript复制package cn.edu360.spark.day10
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* 背景描述:
* 在社交网络(微博),电子商务(京东)、搜索引擎(百度)、股票交易中人们关心的内容之一是我所关注的内容中,大家正在关注什么
* 在实际企业中非常有价值
* 例如:我们关注过去30分钟大家都在热搜什么?并且每5分钟更新一次。要求列出来搜索前三名的话题内容
*
* 数据格式:
* hadoop,201810080102
* spark,201810080103
*
* 问题:
* 下述代码每隔20秒回重新计算之前60秒内的所有数据,如果窗口时间间隔太长,那么需要重新计算的数据就比较大,非常耗时
* 解决:
* searchPair.reduceByKeyAndWindow((v1:Int, v2:Int) => v1 v2, (v1:Int, v2:Int) => v1-v2, Seconds(60), Seconds(20))
*
*
*/
object OnlineHotItems {
def main(args: Array[String]): Unit = {
Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
//创建StreamingContext对象
val sparkConf = new SparkConf().setAppName("OnlineHotItems").setMaster("local[2]")
/**
* 此处设置Batch Interval 是在Spark Streaming中生成基本Job的时间单位,窗口和滑动时间间隔必须是是该
* Batch Interval的整数倍
*/
val ssc = new StreamingContext(sparkConf, Seconds(5))
//创建一个离散流,DStream代表输入的数据流
val hottestStream: ReceiverInputDStream[String] = ssc.socketTextStream("bigdata01", 5678)
/**
* 用户搜索的格式简化为item,time 在这里我们由于要计算出热点内容,所以只需要取出item即可
* 提取出的item然后通过map转换为(item,1)格式
*/
val searchPair = hottestStream.map(_.split(",")(0)).filter(!_.isEmpty).map(item=>(item, 1))
val hottestDStream: DStream[(String, Int)] = searchPair.reduceByKeyAndWindow((v1:Int, v2:Int) => v1 v2, Seconds(60), Seconds(20))
val result: DStream[(String, Int)] = hottestDStream.transform(hottestRDD => {
val top3: Array[(String, Int)] = hottestRDD.map(pair => (pair._2, pair._1)).sortByKey(false).map(pair => (pair._2, pair._1)).take(3)
ssc.sparkContext.makeRDD(top3)
})
result.print()
ssc.start()
ssc.awaitTermination()
}
}
5.MyNetworkWordCountMysql.scala(保存数据到mysql)
有一个配置文件
application.conf
代码语言:javascript复制db.url="jdbc:mysql://hadoop01:3306/bigdata"
db.user="root"
db.password="123456"
db.table="test"
代码语言:javascript复制package cn.edu360.spark.day11
import java.util.Properties
import com.typesafe.config.ConfigFactory
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.sql.{SaveMode, SparkSession}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.ReceiverInputDStream
/**
* 将实时词频统计的数据写入到mysql数据库
*
*/
object MyNetworkWordCountMysql {
def main(args: Array[String]): Unit = {
Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
//会去加载resources下面的配置文件,默认规则:application.conf->application.json->application.properties
val config = ConfigFactory.load()
//创建StreamingContext对象
val sparkConf = new SparkConf().setAppName("MyNetworkWordCount").setMaster("local[2]")
//定义一个采样时间,每隔2秒钟采集一次数据,这个时间不能随意设置
val ssc = new StreamingContext(sparkConf, Seconds(5))
//创建一个离散流,DStream代表输入的数据流
val lines: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop", 1234)
//插入当前批次计算出来的数据结果
lines.foreachRDD(rdd => {
//创建一个Spark Session对象
val spark = SparkSession.builder().config(rdd.sparkContext.getConf).getOrCreate()
//将Rdd转换成DataFrame
import spark.implicits._
//words是列的名字,表只有一列
val wordsDataFrame = rdd.flatMap(line => line.split(" ")).toDF("words")
//创建临时视图
wordsDataFrame.createOrReplaceTempView("wordcount")
//执行Sql, 进行词频统计
val result = spark.sql("select words, count(*) as total from wordcount group by words")
//封装用户名和口令
val props = new Properties()
props.setProperty("user", config.getString("db.user"))
props.setProperty("password", config.getString("db.password"))
if (!result.rdd.isEmpty()) {
result.write.mode(SaveMode.Append).jdbc(config.getString("db.url"), config.getString("db.table"), props)
}
})
ssc.start()
ssc.awaitTermination()
}
}
6.MyNetworkWordCountMysqlState.scala(Mysql加强版)
代码语言:javascript复制package cn.edu360.spark.day11
import java.sql.DriverManager
import com.typesafe.config.ConfigFactory
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.ReceiverInputDStream
/**
* 将实时词频统计的数据写入到mysql数据库(基于State统计)
* Created by zhangjingcun on 2018/10/9 9:43.
*/
object MyNetworkWordCountMysqlState {
def main(args: Array[String]): Unit = {
Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
//会去加载resources下面的配置文件,默认规则:application.conf->application.json->application.properties
val config = ConfigFactory.load()
//创建StreamingContext对象
val sparkConf = new SparkConf().setAppName("MyNetworkWordCountMysqlState").setMaster("local[2]")
//定义一个采样时间,每隔5秒钟采集一次数据,这个时间不能随意设置
val ssc = new StreamingContext(sparkConf, Seconds(2))
//创建一个离散流,DStream代表输入的数据流
val lines: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop01", 1234)
/**
* 插入当前批次计算出来的数据结果
* foreachRDD 在Driver端运行
* foreachPartition、foreach在worker端运行
*/
lines.foreachRDD(rdd => {
//计算当前批次结果
val curr_batch_result: RDD[(String, Int)] = rdd.flatMap(line => line.split(" ")).filter(!_.isEmpty).map(word => (word, 1)).reduceByKey(_ _)
//插入当前批次计算出来的数据结果
curr_batch_result.foreachPartition(partition => {
//创建一个连接
val url = config.getString("db.url")
val user = config.getString("db.user")
val password = config.getString("db.password")
val conn = DriverManager.getConnection(url, user, password)
//将当前分区里面的所有数据都插入到mysql数据库中
partition.foreach(tp => {
val word = tp._1
//判断即将插入的数据是否之前已经插入过,如果已经插入过,则进行更新操作,否则就是插入
val pstmts = conn.prepareStatement("select * from wordcount where words=?")
pstmts.setString(1, word)
val rs = pstmts.executeQuery()
var flag = false
while (rs.next()) {
println(s"${word}数据库中存在")
flag = true //已经存在该单词
//即将插入的单词已经存在,可以进行更新操作
val dbCurrCount = rs.getInt("total")
//计算最新的值
val newCount = dbCurrCount tp._2
//更新
val update = conn.prepareStatement("update wordcount set total=? where words=?")
update.setInt(1, newCount)
update.setString(2, word)
update.executeUpdate()
update.close()
}
rs.close()
pstmts.close()
if (!flag){
//插入一条数据
val stmts = conn.prepareStatement("insert into wordcount values(?,?)")
stmts.setString(1, tp._1)
stmts.setInt(2, tp._2)
stmts.executeUpdate()
pstmts.close()
}
})
if (conn!=null) conn.close()
})
})
ssc.start()
ssc.awaitTermination()
}
}
7.SparkStreaming_SparkSql
使用SparkSql查询SparkStreaming里的数据
代码语言:javascript复制package day11
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.{Seconds, StreamingContext}
object SparkStreaming_SparkSql {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[2]")
val ssc = new StreamingContext(conf,Seconds(5))
val lines = ssc.socketTextStream("hadoop01",1234)
val words = lines.flatMap(_.split(" "))
//使用sparkSql老查询SparkStreaming的流式数据
words.foreachRDD(rdd => {
val spark = SparkSession.builder().config(rdd.sparkContext.getConf).getOrCreate()
import spark.implicits._
val wordsDataFrame = rdd.toDF("words")
wordsDataFrame.createOrReplaceTempView("t_words")
val result = spark.sql("select words,count(*) as total from t_words group by words")
result.show()
})
ssc.start()
ssc.awaitTermination()
}
}