SparkStreaming实战案例一 WordCount
需求
从TCP Socket数据源实时消费数据,对每批次Batch数据进行词频统计WordCount,流程图如下:
准备工作
1.在node01上安装nc命令
nc是netcat的简称,原本是用来设置路由器,我们可以利用它向某个端口发送数据
代码语言:javascript复制yum install -y nc
2.在node01启动客户端工具发送消息
代码语言:javascript复制 nc -lk 9999
代码实现
http://spark.apache.org/docs/latest/streaming-programming-guide.html
从官方文档可知,提供两种方式构建StreamingContext实例对象,如下:
第一种方式:构建SparkConf对象
第二种方式:构建SparkContext对象
完整代码如下所示:
代码语言:javascript复制package cn.itcast.streaming
import org.apache.commons.lang3.StringUtils
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* 基于IDEA集成开发环境,编程实现从TCP Socket实时读取流式数据,对每批次中数据进行词频统计。
*/
object SparkStreamingDemo01_WordCount {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName(this.getClass.getSimpleName.stripSuffix("$")).setMaster("local[*]")
val sc: SparkContext = new SparkContext(conf)
sc.setLogLevel("WARN")
//batchDuration the time interval at which streaming data will be divided into batches
//流数据将被划分为批的时间间隔,就是每隔多久对流数据进行一次微批划分!
val ssc: StreamingContext = new StreamingContext(sc, Seconds(5))
val inputDStream: ReceiverInputDStream[String] = ssc.socketTextStream("node1", 9999)
val resultDStream: DStream[(String, Int)] = inputDStream
.filter(StringUtils.isNotBlank(_))
.flatMap(_.trim.split("\s "))
.map((_, 1))
.reduceByKey(_ _)
resultDStream.print(10)
// 启动并等待程序停止
// 对于流式应用来说,需要启动应用
ssc.start()
// 流式应用启动以后,正常情况一直运行(接收数据、处理数据和输出数据),除非人为终止程序或者程序异常停止
ssc.awaitTermination()
// 关闭流式应用(参数一:是否关闭SparkContext,参数二:是否优雅的关闭)
ssc.stop(stopSparkContext = true, stopGracefully = true)
//注意:
//上面的代码可以做WordCount,但是只能对当前批次的数据进行累加!
}
}
应用监控
运行上述词频统计案例,登录到WEB UI监控页面:http://localhost:4040/
查看相关监控信息。
其一、Streaming流式应用概要信息
运行结果监控截图:
每批次Batch数据处理总时间TD = 批次调度延迟时间SD 批次数据处理时间PT
其二、性能衡量标准
SparkStreaming实时处理数据性能如何(是否可以实时处理数据)??如何衡量的呢??
需要满足:
每批次数据处理时间TD <= BatchInterval每批次时间间隔