作者:韩信子@ShowMeAI
教程地址:http://www.showmeai.tech/tutorials/84
本文地址:http://www.showmeai.tech/article-detail/179
声明:版权所有,转载请联系平台与作者并注明出处
1.Spark Streaming解读
1)Spark Streaming简介
Spark Streaming是Spark核心API的一个扩展,可以实现实时数据的可拓展,高吞吐量,容错机制的实时流处理框架。
Spark Streaming 支持的数据输入源很多,例如:Kafka、 Flume、Twitter、ZeroMQ 和简单的 TCP 套接字等等。数据输入后可以用 Spark 的高度抽象原语如:map、reduce、join、window 等进行运算。而结果也能保存在很多地方,如HDFS,数据库等。另外Spark Streaming 也能和 MLlib(机器学习)以及 Graphx 完美融合。
(1)流数据特点
- 数据一直在变化
- 数据无法回退
- 数据始终源源不断涌进
(2)DStream概念
和 Spark 基于 RDD 的概念很相似,Spark Streaming 使用离散化流(discretized stream)作为抽象表示,叫作DStream。DStream 是随时间推移而收到的数据的序列。在内部,每个时间区间收到的数据都作为 RDD 存在,而DStream 是由这些RDD 所组成的序列(因此得名“离散化”)。
(3)DStream形成步骤
- 针对某个时间段切分的小数据块进行RDD DAG构建。
- 连续时间内产生的一连串小的数据进行切片处理分别构建RDD DAG,形成DStream。
定义一个RDD处理逻辑,数据按照时间切片,每次流入的数据都不一样,但是RDD的DAG逻辑是一样的,即按照时间划分成一个个batch,用同一个逻辑处理。
DStream 可以从各种输入源创建,比如 Flume、Kafka 或者 HDFS。创建出来的 DStream 支持两种操作,一种是转化操作(transformation),会生成一个新的DStream,另一种是输出操作(output operation),可以把数据写入外部系统中。DStream 提供了许多与 RDD 所支持的操作相类似的操作支持,还增加了与时间相关的新操作,比如滑动窗口。
2)Spark Streaming特点
Spark Streaming有下述一些特点:
- 易用:Spark Streaming支持Java、Python、Scala等编程语言,可以像编写离线程序一样编写实时计算的程序求照的器。
- 容错:Spark Streaming在没有额外代码和配置的情况下,可以恢复丢失的数据。对于实时计算来说,容错性至关重要。首先要明确一下Spak中RDD的容错机制,即每一个RDD都是个不可变的分布式可重算的数据集,它记录着确定性的操作继承关系(lineage),所以只要输入数据是可容错的,那么任意一个RDD的分区(Partition)出错或不可用,都可以使用原始输入数据经过转换操作重新计算得到。
- 易整合到Spark体系中:Spark Streaming可以在Spark上运行,并且还允许重复使用相同的代码进行批处理。也就是说,实时处理可以与离线处理相结合,实现交互式的查询操作。
3)Spark Streaming架构
大家知道Spark的工作机制如下:
而SparkStreaming架构由三个模块组成:
在上图中几个核心的角色和功能分别是:
- Master:记录Dstream之间的依赖关系或者血缘关系,并负责任务调度以生成新的RD
- Worker:
- ①从网络接收数据并存储到内存中
- ②执行RDD计算
- Client:负责向Spark Streaming中灌入数据(flume kafka)
4)Spark Streaming 作业提交
(1)相关组件
Spark Sreaming的作业提交包含的组件和功能分别为:
- Network Input Tracker:跟踪每一个网络 received数据,并且将其映射到相应的 Input Dstream上
- Job Scheduler:周期性的访问 Dstream Graph并生成 Spark Job,将其交给 Job Manager执行
- Job Manager:获取任务队列,并执行 Spark任务
(2)具体流程
具体的作业提交流程如下:
要传入的数据会编排成block id(元数据)的形式,再加上RDD的逻辑,就生产了job scheduler,通过job manager形成job queue,以队列形式有序执行。真正的数据是以block形式传入worker,由worker上的executor通过元数据信息Block ID去HDFS上拉取对应的block数据进行执行。
Network Input Tracker传入的并不是真正的数据,而是Block IDs,相当于获取的是元数据,数据是通过worker进行接受的,也就是说Master上不管真正数据的接受情况,Master上只是能够拿到数据block的id,至于这些block做什么操作,是会放到 Job Manager去,按照顺序执行。
5)SparkStreaming工作原理
Discretized Stream 是Spark Streaming 的基础抽象,代表持续性的数据流和经过各种 Spark 原语操作后的结果数据流。在内部实现上,DStream 是一系列连续的RDD 来表示。每个RDD 含有一段时间间隔内的数据。
简单来说,SparkStreaming接受实时的数据流,把数据按照指定的时间段切成一片片小的数据块(SparkStreaming将每个小的数据块当作RDD来处理),然后把数据块传给Spark Engine处理,最终得到一批批的结果。
- 每一批数据,在Spark内核中对应一个RDD实例
- DStream可以看作一组RDDs,是持续的RDD序列
对于Streaming来说,它的单位是DStream,而对于SparkCore,它的单位是RDD。针对Spark开发,就是开发RDD的DAG图,而针对SparkStreaming,就是开发DStream。
DStream 代表连续的一组RDD,每个RDD都包含特定时间间隔的数据。DStream内部的操作,可以直接映射到内部RDD进行,相当于DStream是在RDD上增加一个时间的维度得到的。RDD是DStream最小的一个数据单元。DStream中对数据的操作也是按照RDD为单位来进行的。
简单来理解,SparkStreaming对于流数据的处理速度是秒级别,无法达到Storm的毫秒级别,因此也可以将Streaming看作是微批处理。
2.DStream详解
大家在上文中频繁看到Dstream的核心概念,下面我们对其做一些展开讲解。
整体上看,Spark Streaming 的处理思路:将连续的数据持久化、离散化,然后进行批量处。
对上面这句话进行分析:
- 数据持久化:接收到的数据暂存,方便数据出错进行回滚
- 离散化:按时间分片,形成处理单元
- 分片处理:采用RDD模式将数据分批处理
- DStream 相当于对 RDD 的再次封装 ,它提供了转化操作和输出操作两种操作方法
1)DStream创建注意事项
Spark Streaming 原生支持一些不同的数据源。一些“核心”数据源已经被打包到 Spark Streaming 的 Maven 工件中,而其他的一些则可以通过 spark-streaming-kafka 等附加工件获取。每个接收器都以 Spark 执行器程序中一个长期运行的任务的形式运行,因此会占据分配给应用的 CPU 核心。
此外,我们还需要有可用的 CPU 核心来处理数据。这意味着如果要运行多个接收器,就必须至少有和接收器数目相同的核心数,还要加上用来完成计算所需要的核心数。例如,如果我们想要在流计算应用中运行 10 个接收器,那么至少需要为应用分配 11 个 CPU 核心。所以如果在本地模式运行,不要使用local 或者 local。
2)DStream转换
(1)TransFormation算子与输出
DStream 上的原语与 RDD 的类似,分为Transformations(转换)和Output Operations(输出)两种,此外转换操作中还有一些比较特殊的原语,如:updateStateByKey()、transform()以及各种Window 相关的原语。
① TransFormation
- Spark支持RDD进行各种转换,因为 Dstream是由RDD组成的,Spark Streaming提供了一个可以在 DStream上使用的转换集合,这些集合和RDD上可用的转换类似;
- 转换应用到 Dstream的每个RDD;
- Spark Streaming提供了 reduce和 count这样的算子,但不会直接触发 Dstream计算;
- 常用算子:Map、 flatMap、 join、 reduceByKey;
② Output
- Print:控制台输出;
- saveAsObjectFile、 saveAsTextFile、 saveAsHadoopFiles:将一批数据输出到 Hadoop文件系统中,用批量数据的开始时间戳来命名;
- forEachRDD:允许用户对 Stream的每一批量数据对应的RDD本身做任意操作;
DStream = [rdd1, rdd2, …, rddn]
RDD两类算子:transformation、action
DStream两类算子:transformation、output
(2)无状态转换
无状态转化操作就是把简单的 RDD 转化操作应用到每个批次上,也就是转化 DStream 中的每一个 RDD。部分无状态转化操作列在了下表中。注意,针对键值对的 DStream 转化操作(比如 reduceByKey())要添加 import StreamingContext._才能在 Scala 中使用。
函数名称 | 目的 | Scala示例 | 用来操作 DStream[T] 的用户自定义函数的函数签名 |
---|---|---|---|
map ( ) | 对 DStream 中的每个元素应用给定函数,返回由各元素输出的元素组成的DStream | ds.map(x => x 1) | f : (T) -> U |
flatMap( ) | 对 DStream 中的每个元素应用给定函数,返回由各元素输出的迭代器组成的 DStream | ds.flatMap(x => x.split (“ ”) ) | f : T -> Iterable [U] |
filter( ) | 返回由给定 DStream 中通过筛选的元素组成的 DStream | ds.filter(x => x! = 1 ) | f : T -> Boolean |
repartition( ) | 改变 DStream 的分区数 | ds.repartition(10 ) | N / A |
reduceByKey( ) | 将每个批次中键相同的记录归约 | ds.reduceByKey((x, y) => x y) | f : T , T -> T |
groupByKey( ) | 将每个批次中的记录根据键分组 | ds.groupByKey( ) | N / A |
需要注意的是,尽管这些函数看起来像作用在整个流上一样,但事实上每个DStream 在内部是由许多RDD(批次)组成,且无状态转化操作是分别应用到每个 RDD 上的。例如,reduceByKey()会归约每个时间区间中的数据,但不会归约不同区间之间的数据。
无状态转化操作也能在多个 DStream 间整合数据,不过也是在各个时间区间内。例如,键 值对DStream 拥有和RDD 一样的与连接相关的转化操作,也就是cogroup()、join()、leftOuterJoin() 等。我们可以在DStream 上使用这些操作,这样就对每个批次分别执行了对应的RDD 操作。
我们还可以像在常规的 Spark 中一样使用 DStream 的 union() 操作将它和另一个 DStream 的内容合并起来,也可以使用 StreamingContext.union()来合并多个流。
(3)有状态转换
① UpdateStateByKey (全局统计量)
UpdateStateByKey 原语用于记录历史记录,有时,我们需要在DStream 中跨批次维护状态(例如流计算中累加wordcount)。针对这种情况,updateStateByKey()为我们提供了对一个状态变量的访问,用于键值对形式的DStream。
给定一个由(键,事件)对构成的 DStream,并传递一个指定如何根据新的事件更新每个键对应状态的函数,它可以构建出一个新的 DStream,其内部数据为(键,状态) 对。
- updateStateByKey() 的结果会是一个新的 DStream,其内部的 RDD 序列是由每个时间区间对应的(键,状态)对组成的。
- updateStateByKey 操作使得我们可以在用新信息进行更新时保持任意的状态。
为使用这个功能,你需要做下面两步:
- 定义状态,状态可以是一个任意的数据类型。
- 定义状态更新函数,用此函数阐明如何使用之前的状态和来自输入流的新值对状态进行更新。 使用updateStateByKey 需要对检查点目录进行配置,会使用检查点来保存状态。
如果要使用updateStateByKey算子,就必须设置一个checkpoint目录,开启checkpoint机制,这样的话才能把每个key对应的state除了在内存中有,在磁盘上也checkpoint一份。因为要长期保存一份key的state的话,那么spark streaming是要求必须用checkpoint的,以避免内存数据的丢失。
主要解决:比如说在双十一统计一天销量和成交金额,这些计算需要全量汇总,对数据进行累加,就需要避免数据在内存中丢失,造成不准确。
② Window Operations
Window Operations 有点类似于 Storm 中的 State,可以设置窗口的大小和滑动窗口的间隔来动态的获取当前Steaming 的允许状态。
基于窗口的操作会在一个比 StreamingContext 的批次间隔更长的时间范围内,通过整合多个批次(在窗口内的批次)的结果,计算出整个窗口的结果。
简单来说,Streaming的Window Operations是Spark提供的一组窗口操作,通过滑动窗口的技术,对大规模数据的增量更新进行统计分析,即定时进行一段时间内的数据处理。
所有基于窗口的操作都需要两个参数,分别为窗口时长以及滑动步长,两者都必须是 StreamContext 的批次间隔的整数倍。
窗口时长控制每次计算最近的多少个批次的数据,其实就是最近的 windowDuration/batchInterval 个批次。如果有一个以 10 秒为批次间隔的源 DStream,要创建一个最近 30 秒的时间窗口(即最近 3 个批次),就应当把 windowDuration 设为 30 秒。而滑动步长的默认值与批次间隔相等,用来控制对新的 DStream 进行计算的间隔。如果源 DStream 批次间隔为 10 秒,并且我们只希望每两个批次计算一次窗口结果, 就应该把滑动步长设置为 20 秒。
- 窗口总长度(window length)// Reduce last 30 seconds of data, every 10 seconds
- 滑动时间间隔(slide interval) va1 windowedWordCounts = pairs.reduceByKeyAndWindow( , Seconds(30), Seconds(10))
滑动窗口的长度必须是滑动时间间隔的整数倍。因为RDD是DStream上最小的数据单元不可切分。如果不是整数倍,会出现一个RDD被切分的情况,程序会报错。
3)DStream Graph
DStream Graph是一系列transformation操作的抽象,例如:
c = a.join(b), d = c.filter() 时, 它们的 DAG 逻辑关系是a/b → c,c → d,但在 Spark Streaming 在进行物理记录时却是反向的 a/b ← c, c ← d, 目的是为了追溯。
(1)DStreamGraph
- ①找代码输出
- ②根据输出再往前追溯依赖关系
Dstream之间的转换所形成的的依赖关系全部保存在DStreamGraph中, DStreamGraph对于后期生成RDD Graph至关重要。
DStreamGraph有点像简洁版的DAG scheduler,负责根据某个时间间隔生成一序列 JobSet,以及按照依赖关系序列化
代码是一直在跑的,每隔一定时间就会形成一个RDD。
(2)DStream与RDD对比与理解
DStream.map(RDD => RDD.map)
- 时间维度:batchinterval为时间间隔不断的生成Job实例并在集群上运行。
- 空间维度:代表RDD依赖关系构成的具体的业务逻辑的处理步骤,用DStreamGraph表示
随看时间的流逝,基于 Dstream Graph不断的生成 RDD Graph也就是DAG的方式产生Job,并通过 Jobscheduler 的线程池提交给 SparkCluster不断的执行。
每个时间间隔会积累一定的数据,这些数据可以看成由 event 组成(假设以 kafka 或者Flume为例),时间间隔是固定的,在时间间隔内的数据就是固定的。也就是RDD是由一个时间间隔内所有数据构成。时间维度的不同,导致每次处理的数据量及内容不同。
3.Spark Streaming应用代码示例
我们先来看一看一个简单的 Spark Streaming 程序的样子。假如我们想要计算从一个监听 TCP socket 的数据服务器接收到的文本数据(text data)中的字数,我们可以按照如下步骤进行:
① 首先, 我们导入StreamingContext,这是所有流功能的主要入口点。 我们创建了一个带有 2 个执行线程和间歇时间为 1 秒的本地 StreamingContext。
代码语言:python代码运行次数:0复制from pyspark import SparkContext
from pyspark.streaming import StreamingContext
# 创建一个具有两个工作线程(working thread)并且批次间隔为 1 秒的本地 StreamingContext .
sc = SparkContext("local[2]", "NetworkWordCount")
ssc = StreamingContext(sc, 1)
② 使用该 context,我们创建一个代表从 TCP 源流数据的DStream,指定主机名(例如 localhost)和端口(例如 9999)。
代码语言:python代码运行次数:0复制# 创建一个将要连接到 hostname:port 的 DStream,如 localhost:9999
lines = ssc.socketTextStream("localhost", 9999)
③ 上述lines DStream 表示将要从数据服务器接收到的数据流。在这个离散流(DStream)中的每一条记录都是一行文本(text)。接下来,我们希望通过空格字符拆分这些数据,把每一行切分为单词。
代码语言:python代码运行次数:0复制# 将每一行拆分成单词
words = lines.flatMap(lambda line: line.split(" "))
④ flatMap 是一种一对多的DStream操作,它会通过在源DStream中根据每个记录生成多个新纪录的形式创建一个新的DStream。在这种情况下,每一行都将被拆分成多个单词和代表单词DStream的单词流。下一步,我们想要计算这些单词:
代码语言:python代码运行次数:0复制# 计算每一个 batch(批次)中的每一个 word(单词)
pairs = words.map(lambda word: (word, 1))
wordCounts = pairs.reduceByKey(lambda x, y: x y)
# 在控制台打印出在这个DStream中生成的每个 RDD 的前十个元素
wordCounts.print()
上述单词DStream进行了进一步的映射(一对一的转换)为一个 (word, 1) paris 的DStream,这个 DStream 然后被reduce来获得数据中每个批次的单词频率。最后,wordCounts.print() 将会打印一些每秒生成的统计结果。
⑤ 注意当这些行被执行的时候, Spark Streaming 仅仅设置了计算,只有在启动时才会执行,并没有开始真正地处理。为了在所有的转换都已经设置好之后开始处理,我们在最后调用:
代码语言:txt复制ssc.start() # Start the computation
ssc.awaitTermination() # Wait for the computation to terminate
该部分完整的代码可以在 Spark Streaming 示例 NetworkWordCount 中找到。
如果你已经 下载 并且 构建 Spark, 您可以使用如下方式来运行该示例. 你首先需要运行 Netcat(一个在大多数类 Unix 系统中的小工具)作为我们使用的数据服务器。
代码语言:txt复制$ nc -lk 9999
然后,在另一个不同的终端,你可以通过执行如下命令来运行该示例:
代码语言:txt复制$ ./bin/spark-submit examples/src/main/python/streaming/network_wordcount.py localhost 9999
然后,在运行在 netcat 服务器上的终端输入的任何行(lines),都将被计算,并且每一秒都显示在屏幕上,它看起来就像下面这样:
代码语言:txt复制# TERMINAL 1:
# Running Netcat
$ nc -lk 9999
hello world
# TERMINAL 2: RUNNING network_wordcount.py
$ ./bin/spark-submit examples/src/main/python/streaming/network_wordcount.py localhost 9999
...
-------------------------------------------
Time: 2014-10-14 15:25:21
-------------------------------------------
(hello,1)
(world,1)
...