Spark Streaming | Spark,从入门到精通

2018-09-18 14:46:34 浏览数 (1)

欢迎阅读美图数据技术团队的「Spark,从入门到精通」系列文章,本系列文章将由浅入深为大家介绍 Spark,从框架入门到底层架构的实现,相信总有一种姿势适合你,欢迎大家持续关注:)

往期直通车:Hello Spark!

Spark on Yarn

RDD原理与基础操作

注:本文节选自「酷玩 Spark」开源项目,原文地址:https://github.com/lw-lin/CoolplaySpark

Spark Streaming 是批处理的流式实时计算框架,支持从多种数据源获取数据,如 Kafka、TCP sockets、文件系统等。它可以使用诸如 map、reduce、join 等高级函数进行复杂算法的处理,最后还可以将处理结果存储到文件系统,数据库等。

Spark Streaming 有三个特点:

  • 基于 Spark Core Api,因此其能够与 Spark 中的其他模块保持良好的兼容性,为编程提供了良好的可扩展性;
  • 粗粒度的准实时处理框架,一次读取完成,或异步读完数据之后,再处理数据,且其计算可基于大内存进行,因而具有较高的吞吐量;
  • 采用统一的 DAG 调度以及 RDD,对实时计算有很好的容错支持;

/ 运行原理 /

图 1

如图 1 所示是 Spark 的整体架构图,它主要分为四个模块:

  • 静态的 RDD DAG 模版,表示处理逻辑;
  • 动态的工作控制器,将连续的 streaming data 切分为数据片段,并按照模板复制出新的 RDD DAG 的实例,对数据片段进行处理;
  • 原始数据的产生和导入
  • 对长时运行任务的保障,包括输入数据的失效后的重构和处理任务的失败后的重调。

DAG 静态定义

DAG 静态定义是将整个计算逻辑描述为一个 RDD DAG 的「模版」,在后面 Job 动态生成的时候,针对每个 batch,Spark Streaming 都将根据这个「模板」生成一个 RDD DAG 的实例。

图 2

接下来我们了解下 RDD 和 DStream 的关系。DStream 维护了对每个产出的 RDD 实例的引用,如图 2 所示,DStream 在 3 个 batch 里分别实例化了 3 个 RDD, a[1]、a[2]、a[3],然后 DStream 就保留了 batch 所产出的 RDD 的哈希表。

我们在考虑的时候,可以认为 RDD 加上 batch 维度就是 DStream,DStream 去掉 batch 维度就是 RDD。Spark 定义静态的计算逻辑后,通过动态的工作控制来调度。

Job 动态生成

在 Spark Streaming 程序的入口我们都会定义一个 batchDuration,即每隔固定时间就比照静态的 DStreamGraph 来动态生成一个 RDD DAG 实例。在 Spark Streaming 内整体负责动态作业调度的具体类是 JobScheduler,由 start() 运行。

JobScheduler 有两个非常重要的成员:JobGenerator 和 ReceiverTracker。JobScheduler 将每个 batch 的 RDD DAG 具体生成工作委托给 JobGenerator,而将源头输入数据的记录工作委托给 ReceiverTracker。

JobGenerator 维护了一个定时器,周期就是上文提到的 batchDuration,定时为每个 batch 生成 RDD DAG 的实例,其中每次 RDD DAG 实际生成包含 5 个步骤:

  • 要求 ReceiverTracker 将目前已收到的数据进行一次分配,即将上个批次切分后的数据,切分到到本次新的批次里;
  • 要求 DStreamGraph 复制出一套新的 RDD DAG 的实例, DStreamGraph 将要求图里的尾 DStream 节点生成具体的 RDD 实例,并递归的调用尾 DStream 的上游 DStream 节点……以此遍历整个 DStreamGraph,遍历结束也就正好生成了 RDD DAG 的实例;
  • 获取第 1 步 ReceiverTracker 分配到本 batch 的源头数据的 meta 信息;
  • 将第 2 步生成的本 batch 的 RDD DAG,和第 3 步获取到的 meta 信息,一同提交给 JobScheduler 异步执行;
  • 只要提交结束(不管是否已开始异步执行),就马上对整个系统的当前运行状态做一个 checkpoint。

数据产生与导入

DStream 的子类 ReceiverInputDStream 在某个 batch 里实例化 RDD,通过 Receiver 为这个 RDD 生产数据。Spark Streaming 在程序刚开始运行时:

  • 由 Receiver 的总指挥 ReceiverTracker 分发多个 job,到多个 executor 上分别启动 ReceiverSupervisor 实例;
  • 每个 ReceiverSupervisor 启动后将马上生成一个用户提供的 Receiver 实现的实例并在 Receiver 实例生成后调用 Receiver.onStart(),这时 Receiver 启动工作已经运行完毕。
  • Receiver 在 onStart() 启动后,就将持续不断地接收外界数据,并持续交给 ReceiverSupervisor 进行数据转储;
  • ReceiverSupervisor 持续不断地接收到 Receiver 转来的数据,如果数据很细小,就需要 BlockGenerator 攒多条数据成一块(4a)、然后再成块存储(4b 或 4c);反之就不用攒,直接成块存储(4b 或 4c);
  • 每次成块在 executor 存储完毕后,ReceiverSupervisor 就会及时上报块数据的 meta 信息给 driver 端的 ReceiverTracker,这里的 meta 信息包括数据的标识 id、数据的位置、数据的条数、数据的大小等信息;
  • ReceiverTracker 再将收到的块数据 meta 信息直接转给自己的成员 ReceivedBlockTracker,由 ReceivedBlockTracker 专门管理收到的块数据 meta 信息。

后续在 driver 端,就由 ReceiverInputDStream 在每个 batch 去检查 ReceiverTracker 收到的块数据 meta 信息,界定哪些新数据需要在本 batch 内处理,然后生成相应的 RDD 实例去处理这些块数据。

?举个例子

代码语言:javascript复制
import org.apache.spark.streaming._
import org.apache.spark.SparkConf

object example{

  def main(args:Array[String]):Unit = {

    val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
    val ssc = new StreamingContext(conf, Seconds(1))

    val lines = ssc.socketTextStream("localhost", 9999)

    val words = lines.flatMap(_.split(" "))      // DStream transformation
    val pairs = words.map(word => (word, 1))     // DStream transformation
    val wordCounts = pairs.reduceByKey(_   _)    // DStream transformation

    wordCounts.print()                           // DStream output

    ssc.start()
    ssc.awaitTermination()
  }
}

如以上代码所示:

  • 启动 Spark Streamingg 实例后将 batchDuration 设置为 1 秒;
  • ssc.socketTextStream() 将创建一个 SocketInputDStream,这个 InputDStream 的 SocketReceiver 将监听本机 9999 端口;
  • 接下来几行利用 DStream transformation 构造出了 lines -> words -> pairs -> wordCounts -> .print() 从lines 到 wordCounts print()的一个 DStreamGraph;
  • 到目前只是是定义好了产生数据的 SocketReceiver 及 DStreamGraph,这些都是静态的;
  • 下面这行 start() 将在幕后启动 JobScheduler,进而启动 JobGenerator 和 ReceiverTracker,其中 JobGenerator 开始不断的生成一个一个 batch,ReceiverTracker 创建和启动 Receiver;
  • 然后用户 code 主线程就 block 在awaitTermination了,block 的效果就是,后台的 JobScheduler 开始不断的生成一个一个 batch,也就是在这里,我们前面静态定义的 DStreamGraph 的 print(),才一次一次被在 RDD 实例上调用,一次一次打印出当前 batch 的结果;

长时容错

首先看 executor 端,在 executor 端 ReceiverSupervisor 和 Receiver 失效后直接重启即可,关键点是保障收到的块数据的安全,保障了源头块数据就能够保障 RDD DAG (Spark Core 的 lineage)重做。

Spark Streaming 对源头块数据的保障,分为 4 个层次,全面、相互补充,又可根据不同场景灵活设置:

  • 热备:热备是指在存储块数据时,将其存储到本 executor、并同时 replicate 到另外一个 executor 上去。这样在一个 replica 失效后,可以立刻无感知切换到另一份 replica 进行计算。实现方式是,在实现自己的 Receiver 时,即指定一下 StorageLevel 为 MEMORY_ONLY_2 或 MEMORY_AND_DISK_2 就可以了。 *1.5.2 update 这已经是默认了
  • 冷备:冷备是每次存储块数据前,先把块数据作为 log 写出到 WriteAheadLog 里,再存储到本 executor。executor 失效时,就由另外的 executor 去读 WAL,再重做 log 来恢复块数据。WAL 通常写到可靠存储如 HDFS 上,所以恢复时可能需要一段 recover time。
  • 重放:如果上游支持重放,比如 Apache Kafka,那么就可以选择不用热备或者冷备来另外存储数据了,而是在失效时换一个 executor 进行数据重放即可。
  • 忽略:最后,如果应用的实时性需求大于准确性,那么一块数据丢失后我们也可以选择忽略、不恢复失效的源头数据。

上文曾提到块数据的 meta 信息上报到 ReceiverTracker,然后交给 ReceivedBlockTracker 做具体的管理。ReceivedBlockTracker 也采用 WAL 冷备方式进行备份,在 driver 失效后,由新的 ReceivedBlockTracker 读取 WAL 并恢复 block 的 meta 信息。

另外,需要定时对 DStreamGraph 和 JobScheduler 做 Checkpoint,来记录整个 DStreamGraph 的变化、和每个 batch 的 job 的完成情况。

注意到这里采用的是完整 checkpoint 的方式,和之前的 WAL 的方式都不一样。Checkpoint 通常也是落地到可靠存储如 HDFS。Checkpoint 发起的间隔默认的是和 batchDuration 一致;即每次 batch 发起、提交了需要运行的 job 后就做 Checkpoint,另外在 job 完成了更新任务状态的时候再次做一下 Checkpoint。

这样一来,在 driver 失效并恢复后,可以读取最近一次的 Checkpoint 来恢复作业的 DStreamGraph 和 job 的运行及完成状态。

Spark Streaming 窗口操作

/ Structured Streaming /

Structured Streaming 是一种基于 Spark SQL 引擎构建的可扩展且容错的流处理引擎,它可以以静态数据表示批量计算的方式来表达流式计算。 Spark SQL 引擎将随着 streaming data 持续到达而增量地持续地运行,并更新最终结果。

StreamExecution 的初始状态

值得注意的是,Structured Streaming 也是先纯定义、再触发执行的模式。前面大部分代码是纯定义 Dataset/DataFrame 的产生、变换和写出,后面位置再真正 start 一个新线程去触发执行之前的定义。在新的执行线程里我们需要持续地去发现新数据,进而持续地查询最新计算结果至写出。

这些 DataFrame的产生、变换和写出的信息就对应保存在 StreamExecution非常重要的 3 个成员变量中:

  • sources: streaming data 的产生端(如 kafka等);
  • logicalPlan: DataFrame/Dataset 的一系列变换,即计算逻辑;
  • sink: 最终结果写出的接收端(比如 file system 等)。

Structured Streaming 持续查询

StreamExecution 通过 Source.getOffset() 获取最新的 offsets,即最新的数据进度,将 offsets 写入到 offsetLog 里,将来可用作故障恢复用。在 3a 将预先定义好的逻辑(即 logicalPlan 成员变量)制作一个副本出来,3b 给定刚刚取到的 offsets,通过 Source.getBatch(offsets) 获取本执行新收到的数据的 DataFrame 表示。经过这两步,构造完成的 LogicalPlan 就是针对本执行新收到的数据的 DataFrame 整个处理逻辑。

接着将表示计算结果的 DataFrame 交给 Sink,6a 通过 Source.commit() 告知 Source 数据已经完整处理结束,6b 将本次执行的批次 id 写入到 batchCommitLog 里。

StreamExecution 增量持续查询

Structured Streaming 在编程模型上暴露给用户的是每次持续查询看做面对全量数据,所以每次执行的结果是针对全量数据进行计算的结果,但是在实际执行过程中,由于全量数据会越攒越多,每次对全量数据进行计算的代价和消耗会越来越大。

因此 Structured Streaming 引入全局范围、高可用的 StateStore 转全量为增量,即在每次执行时先从 StateStore 里 restore 出上次执行后的状态,再加入本执行的新数据进行计算,如果有状态改变,将把改变的状态重新 save 到 StateStore 里。

所以 Structured Streaming 在具体实现上转换为增量的持续查询。

故障恢复

由于 exectutor 节点的故障可由 Spark 框架本身很好的 handle,不引起可用性问题,因此只讨论 driver 故障恢复。如果在某个执行过程中发生 driver 故障,那么重新起来的 StreamExecution 读取 WAL offsetlog 恢复出最新的 offsets ,并读取 batchCommitLog 决定是否需要重做最近一个批次。

事件时间

当我们有一系列到达的记录时,首先对时间列 timestamp 做长度为10m,滑动为5m 的 window() 操作。

如图右上角的虚框部分,当达到一条记录 12:22|dog 时,会将 12:22 归入两个窗口 12:15-12:25、12:20-12:30,所以产生两条记录:12:15-12:25|dog、12:20-12:30|dog,所以这里 window() 操作的本质是 explode(),可由一条数据产生多条数据。

接着对 window() 操作的结果,以 window 列和 word 列为 key,做 groupBy() 操作。这个操作的聚合过程是增量的最后得到一个有 window、 word、count 三列的状态集。

代码语言:javascript复制
val windowedCounts = words
    .withWatermark("timestamp", "10 minutes")  // 注意这里的 watermark 设置!
    .groupBy(
        window($"timestamp", "10 minutes", "5 minutes"),
        $"word")
    .count()

对于数据延迟通过 withWatermark("timestamp", "10 minutes") 告诉 Structured Streaming,以 timestamp 列的最大值为锚点,往前推 10min 以前的数据不会再接收。

  • 在 12:20 这个批次结束后,锚点变成了 12:20|dog,owl 这条记录的 event time 12:20 ,watermark 变成了 12:20 - 10min = 12:10;
  • 在 12:30 批次结束时,即知道 event time 12:10 以前的数据不再收到了,因而 window 12:00-12:10 的结果也不会再被更新,即可以安全地输出结果 12:00-12:10|cat|2;
  • 在结果 12:00-12:10|cat|2 输出以后,State 中也不再保存 window 12:00-12:10 的相关信息,即 State Store 中的此条状态得到了清理。

接下来看 structured streaming 的输出模式,complete 输出模式如同上面的流程,接着主要讲另外两种输出模式:append 和 update。

Append 的语义将保证一旦输出了某条 key,未来就不会再输出同一个 key。所以,在上图 12:10 这个批次直接输出 12:00-12:10|cat|1, 12:05-12:15|cat|1 将是错误的,因为在 12:20 将结果更新为了 12:00-12:10|cat|2,但是 Append 模式下却不会再次输出 12:00-12:10|cat|2,因为前面输出过了同一条 key 12:00-12:10|cat 的结果12:00-12:10|cat|1。

为了解决这个问题,在 Append 模式下 Structured Streaming 需要知道某一条 key 的结果什么时候不会再更新了,当确认结果不会再更新的时候就可以将结果进行输出。

如上图所示,如果我们确定 12:30 这个批次以后不会再有对 12:00-12:10 这个 window 的更新,那么我们就可以把 12:00-12:10 的结果在 12:30 这个批次输出,并且也会保证后面的批次不会再输出 12:00-12:10 的 window 的结果,维护了 Append 模式的语义。

Update 模式是在 Spark 2.1.1 及以后版本获得正式支持。

如上图所示,在 Update 模式中,只有本执行批次 State 中被更新了的条目会被输出:

  • 在 12:10 这个执行批次,State 中全部 2 条都是新增的(因而也都是被更新了的),所以输出全部 2 条;
  • 在 12:20 这个执行批次,State 中 2 条是被更新了的、 4 条都是新增的(因而也都是被更新了的),所以输出全部 6 条;
  • 在 12:30 这个执行批次,State 中 4 条是被更新了的,所以输出 4 条。这些需要特别注意的一点是,如 Append 模式一样,本执行批次中由于(通过 watermark 机制)确认 12:00-12:10 这个 window 不会再被更新,因而将其从 State 中去除,但没有因此产生输出。

0 人点赞