2021年大数据Spark(三十六):SparkStreaming实战案例一 WordCount

2021-10-09 16:47:10 浏览数 (1)


SparkStreaming实战案例一 WordCount

需求

从TCP Socket数据源实时消费数据,对每批次Batch数据进行词频统计WordCount,流程图如下:

准备工作

1.在node01上安装nc命令

nc是netcat的简称,原本是用来设置路由器,我们可以利用它向某个端口发送数据

代码语言:javascript复制
yum install -y nc

2.在node01启动客户端工具发送消息

代码语言:javascript复制
 nc -lk 9999

代码实现

http://spark.apache.org/docs/latest/streaming-programming-guide.html

从官方文档可知,提供两种方式构建StreamingContext实例对象,如下:

 第一种方式:构建SparkConf对象

 第二种方式:构建SparkContext对象

完整代码如下所示:

代码语言:javascript复制
package cn.itcast.streaming

import org.apache.commons.lang3.StringUtils
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
 * 基于IDEA集成开发环境,编程实现从TCP Socket实时读取流式数据,对每批次中数据进行词频统计。
 */
object SparkStreamingDemo01_WordCount {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setAppName(this.getClass.getSimpleName.stripSuffix("$")).setMaster("local[*]")
    val sc: SparkContext = new SparkContext(conf)
    sc.setLogLevel("WARN")
    //batchDuration the time interval at which streaming data will be divided into batches
    //流数据将被划分为批的时间间隔,就是每隔多久对流数据进行一次微批划分!
    val ssc: StreamingContext = new StreamingContext(sc, Seconds(5))

    val inputDStream: ReceiverInputDStream[String] = ssc.socketTextStream("node1", 9999)

    val resultDStream: DStream[(String, Int)] = inputDStream
      .filter(StringUtils.isNotBlank(_))
      .flatMap(_.trim.split("\s "))
      .map((_, 1))
      .reduceByKey(_   _)

    resultDStream.print(10)

    // 启动并等待程序停止
    // 对于流式应用来说,需要启动应用
    ssc.start()
    // 流式应用启动以后,正常情况一直运行(接收数据、处理数据和输出数据),除非人为终止程序或者程序异常停止
    ssc.awaitTermination()
    // 关闭流式应用(参数一:是否关闭SparkContext,参数二:是否优雅的关闭)
    ssc.stop(stopSparkContext = true, stopGracefully = true)
    //注意:
    //上面的代码可以做WordCount,但是只能对当前批次的数据进行累加!
  }
}

应用监控

运行上述词频统计案例,登录到WEB UI监控页面:http://localhost:4040/

查看相关监控信息。

 其一、Streaming流式应用概要信息

运行结果监控截图:

每批次Batch数据处理总时间TD = 批次调度延迟时间SD 批次数据处理时间PT

 其二、性能衡量标准

SparkStreaming实时处理数据性能如何(是否可以实时处理数据)??如何衡量的呢??

需要满足:

每批次数据处理时间TD  <=  BatchInterval每批次时间间隔

0 人点赞