SparkStreaming实战案例二 UpdateStateByKey
需求
对从Socket接收的数据做WordCount并要求能够和历史数据进行累加!
如:
先发了一个spark,得到spark,1
然后不管隔多久再发一个spark,得到spark,2
也就是说要对数据的历史状态进行维护!
注意:可以使用如下API对状态进行维护
1.updateStateByKey
统计全局的key的状态,但是就算没有数据输入,他也会在每一个批次的时候返回之前的key的状态。假设5s产生一个批次的数据,那么5s的时候就会更新一次的key的值,然后返回。
这样的缺点就是,如果数据量太大的话,而且我们需要checkpoint数据,这样会占用较大的存储。
如果要使用updateStateByKey,就需要设置一个checkpoint目录,开启checkpoint机制。因为key的state是在内存维护的,如果宕机,则重启之后之前维护的状态就没有了,所以要长期保存它的话需要启用checkpoint,以便恢复数据。
2.mapWithState
也是用于全局统计key的状态,但是它如果没有数据输入,便不会返回之前的key的状态,有一点增量的感觉。
这样做的好处是,我们可以只是关心那些已经发生的变化的key,对于没有数据输入,则不会返回那些没有变化的key的数据。这样的话,即使数据量很大,checkpoint也不会像updateStateByKey那样,占用太多的存储。
代码实现
代码语言:javascript复制package cn.itcast.streaming
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, State, StateSpec, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
/**
* 使用SparkStreaming接收Socket数据,node01:9999
* 对从Socket接收的数据做WordCount并要求能够和历史数据进行累加!
* 如:
* 先发了一个spark,得到spark,1
* 然后不管隔多久再发一个spark,得到spark,2
* 也就是说要对数据的历史状态进行维护!
*/
object SparkStreamingDemo02_UpdateStateByKey {
def main(args: Array[String]): Unit = {
//1.创建环境
val conf: SparkConf = new SparkConf().setAppName(this.getClass.getSimpleName.stripSuffix("$")).setMaster("local[*]")
val sc: SparkContext = new SparkContext(conf)
sc.setLogLevel("WARN")
//batchDuration the time interval at which streaming data will be divided into batches
//流数据将被划分为批的时间间隔,就是每隔多久对流数据进行一次微批划分!
val ssc: StreamingContext = new StreamingContext(sc, Seconds(5))
// The checkpoint directory has not been set. Please set it by StreamingContext.checkpoint()
//注意:因为涉及到历史数据/历史状态,也就是需要将历史数据/状态和当前数据进行合并,作为新的Value!
//那么新的Value要作为下一次的历史数据/历史状态,那么应该搞一个地方存起来!
//所以需要设置一个Checkpoint目录!
ssc.checkpoint("./ckp")
//2.接收socket数据
val linesDS: ReceiverInputDStream[String] = ssc.socketTextStream("node1",9999)
//3.做WordCount
//======================updateStateByKey=======================
//val 函数名称 :(参数类型)=>函数返回值类型 = (参数名称:参数类型)=>{函数体}
//参数1:Seq[Int]:当前批次的数据,如发送了2个spark,那么key为spark,参数1为:Seq[1,1]
//参数2:Option[Int]:上一次该key的历史值!注意:历史值可能有可能没有!如果没有默认值应该为0,如果有就取出来
//返回值:Option[Int]:当前批次的值 历史值!
//Option表示:可能有Some可能没有None
val updateFunc= (currentValues:Seq[Int],historyValue:Option[Int])=>{
//将当前批次的数据和历史数据进行合并作为这一次新的结果!
if (currentValues.size > 0) {
val newValue: Int = currentValues.sum historyValue.getOrElse(0)//getOrElse(默认值)
Option(newValue)
}else{
historyValue
}
}
val resultDS: DStream[(String, Int)] = linesDS
.flatMap(_.split(" "))
.map((_, 1))
//.reduceByKey(_ _)
// updateFunc: (Seq[V], Option[S]) => Option[S]
.updateStateByKey(updateFunc)
//======================mapWithState=======================
//Spark 1.6提供新的状态更新函数【mapWithState】,mapWithState函数也会统计全局的key的状态,
//但是如果没有数据输入,便不会返回之前的key的状态,只是关心那些已经发生的变化的key,对于没有数据输入,则不会返回那些没有变化的key的数据。
val mappingFunc = (word: String, current: Option[Int], state: State[Int]) => {
val newCount = current.getOrElse(0) state.getOption.getOrElse(0)
val output = (word, newCount)
state.update(newCount)
output
}
val resultDS2 = linesDS
.flatMap(_.split(" "))
.map((_, 1))
.mapWithState(StateSpec.function(mappingFunc))
//4.输出
resultDS.print()
resultDS2.print()
//5.启动并等待程序停止
// 对于流式应用来说,需要启动应用
ssc.start()
// 流式应用启动以后,正常情况一直运行(接收数据、处理数据和输出数据),除非人为终止程序或者程序异常停止
ssc.awaitTermination()
// 关闭流式应用(参数一:是否关闭SparkContext,参数二:是否优雅的关闭)
ssc.stop(stopSparkContext = true, stopGracefully = true)
}
}