一说到入门案例,我们就不得不提wordcount这个案例了哈哈哈。所以 此次还是这个案例
一. 案例
1. 需求
使用 netcat 工具向 9999 端口不断的发送数据,通过 Spark Streaming 读取端口数据并统计不同单词出现的次数
2. 添加依赖
代码语言:javascript复制<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.1.1</version>
</dependency>
3. 源码
代码语言:javascript复制package com.buwenbuhuo.spark.streaming.day01
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* @author 不温卜火
* @create 2020-08-07 11:41
MyCSDN : https://buwenbuhuo.blog.csdn.net/
*/
object WordCount {
def main(args: Array[String]): Unit = {
// 1. 创建StreamingContext
val conf: SparkConf = new SparkConf().setMaster("local[2]").setAppName("WordCount")
val ssc: StreamingContext = new StreamingContext(conf,Seconds(3))
// 2. 从数据源创建一个流: socket,rdd队列,自定义接收器 ,kafka(重点)
val sourceStream: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop002",9999)
// 3. 对流做各种转换
val result: DStream[(String, Int)] = sourceStream.flatMap(_.split(" ")).map((_,1)).reduceByKey(_ _)
// 4. 行动算子 print foreach foreachRDD
result.print() // 把结果打印在控制台
// 5. 启动流
ssc.start()
// 6. 阻止主线程退出(阻塞主线程)
ssc.awaitTermination()
}
}
4 .测试
- 1. hadoop002上启动 netcat
- 2. 可以打包到 linux 启动我们的 wordcount, 也可以在 idea 直接启动.
- 3. 查看输出结果. 每 3 秒统计一次数据的输入情况.
5. 注意
- 一旦StreamingContext已经启动, 则不能再添加添加新的 streaming computations
- 一旦一个StreamingContext已经停止(StreamingContext.stop()), 他也不能再重启
- 在一个 JVM 内, 同一时间只能启动一个StreamingContext
- stop() 的方式停止StreamingContext, 也会把SparkContext停掉. 如果仅仅想停止StreamingContext, 则应该这样: stop(false)
- 一个SparkContext可以重用去创建多个StreamingContext, 前提是以前的StreamingContext已经停掉,并且SparkContext没有被停掉
二. 案例解析
Discretized Stream(DStream) 是 Spark Streaming 提供的基本抽象, 表示持续性的数据流, 可以来自输入数据, 也可以是其他的 DStream 转换得到. 在内部, 一个 DSteam 用连续的一系列的 RDD 来表示. 在 DStream 中的每个 RDD 包含一个确定时间段的数据.
对 DStream 的任何操作都会转换成对他里面的 RDD 的操作. 比如前面的 wordcount 案例, flatMap是应用在 line DStream 的每个 RDD 上, 然后生成了 words SStream 中的 RDD. 如下图所示:
对这些 RDD 的转换是有 Spark 引擎来计算的. DStream 的操作隐藏的大多数的细节, 然后给开发者提供了方便使用的高级 API.
本次的分享就到这里了