Spark Streaming 快速入门系列(5) | 还不会DStream转换,一文带你深入了解

2020-10-28 17:38:26 浏览数 (1)

关于转换这方面的一些具体问题,如果想要了解可以点击下列网址进行查看: http://spark.apache.org/docs/2.1.1/streaming-programming-guide.html#transformations-on-dstreams

  上图为官网的解释,我们可以翻译为:   与RDD相似,转换允许修改来自输入DStream的数据。DStream支持普通Spark RDD上可用的许多转换。

  除此之外,DStream分为Transformations(转换)和Output Operations(输出)两种,此外转换操作中还有一些比较特殊的原语,如:updateStateByKey()transform()以及各种Window相关的原语。

  • 一些常见的方法

  在DStream转换中,大体可分为无状态转换操作和有状态转换操作两种! 下面就围绕这两个方面进行详细讲解。

一. 无状态转换操作

  无状态转化操作就是把简单的RDD转化操作应用到每个批次上,也就是转化DStream中的每一个RDD。部分无状态转化操作列在了下表中。

  需要记住的是,尽管这些函数看起来像作用在整个流上一样,但事实上每个DStream在内部是由许多RDD(批次)组成,且无状态转化操作是分别应用到每个RDD上的。例如,reduceByKey()会化简每个时间区间中的数据,但不会化简不同区间之间的数据。

  举个例子,在之前的wordcount程序中,我们只会统计几秒内接收到的数据的单词个数,而不会累加。

  无状态转化操作也能在多个DStream间整合数据,不过也是在各个时间区间内。例如,键值对DStream拥有和RDD一样的与连接相关的转化操作,也就是cogroup()join()leftOuterJoin() 等。我们可以在DStream上使用这些操作,这样就对每个批次分别执行了对应的RDD操作。

  我们还可以像在常规的 Spark 中一样使用 DStreamunion() 操作将它和另一个DStream 的内容合并起来,也可以使用StreamingContext.union()来合并多个流。

transform操作

transform 原语允许 DStream上执行任意的RDD-to-RDD函数。

  可以用来执行一些 RDD 操作, 即使这些操作并没有在 SparkStreaming 中暴露出来.

  该函数每一批次调度一次。其实也就是对DStream中的RDD应用转换。

  • 1. 样例源码
代码语言:javascript复制
package com.buwenbuhuo.spark.streaming.day02
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
 *
 * @author 不温卜火
 * @create 2020-08-12 18:45
 * MyCSDN :  https://buwenbuhuo.blog.csdn.net/
 *
 */
object TransformDemo {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("TransformDemo").setMaster("local[2]")
    val ssc = new StreamingContext(conf, Seconds(3))
    val dstream: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop002", 9999)

    val resultDStream = dstream.transform(rdd => {
      rdd.flatMap(_.split("\W")).map((_, 1)).reduceByKey(_   _)
    })
    resultDStream.print


    ssc.start()
    ssc.awaitTermination()
  }

}
  • 2. 运行结果

二. 有状态转换操作

  此部分主要介绍两个有状态的操作

2.1 updateStateByKey

  上图为官方解释,下面为翻译:

updateStateByKey操作允许在使用新信息不断更新状态的同时能够保留他的状态.

需要做两件事情:

  1. 定义状态. 状态可以是任意数据类型
  2. 定义状态更新函数. 指定一个函数, 这个函数负责使用以前的状态和新值来更新状态.

  在每个阶段, Spark 都会在所有已经存在的 key 上使用状态更新函数, 而不管是否有新的数据在.

代码语言:javascript复制
def updateStateByKey[S: ClassTag](
                 updateFunc: (Seq[V], Option[S]) => Option[S]): DStream[(K, S)]
  • 1. 样例源码
代码语言:javascript复制
package com.buwenbuhuo.spark.streaming.day02

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
 *
 * @author 不温卜火
 * @create 2020-08-12 18:45
 * MyCSDN :  https://buwenbuhuo.blog.csdn.net/
 *
 */
object UpstateByKeyDemo {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("UpstateByKeyDemo")
    val ssc: StreamingContext = new StreamingContext(conf,Seconds(3))
    ssc.checkpoint("ck1")

    ssc
        .socketTextStream("hadoop002",9999)
        .flatMap(_.split("\W "))
        .map((_,1))
        .updateStateByKey[Int]((seq:Seq[Int],opt:Option[Int]) =>{
                Some(seq.sum   opt.getOrElse(0))
    })
        .print(1000)


    ssc.start()
    ssc.awaitTermination()
  }
}
  • 2. 运行结果
  • 3. 源码流解析

2.2 window 操作(窗口操作)

  Spark Streaming 也提供了窗口计算, 允许执行转换操作作用在一个窗口内的数据.

  默认情况下, 计算只对一个时间段内的RDD进行, 有了窗口之后, 可以把计算应用到一个指定的窗口内的所有 RDD 上.

  一个窗口可以包含多个时间段. 基于窗口的操作会在一个比StreamingContext的批次间隔更长的时间范围内,通过整合多个批次的结果,计算出整个窗口的结果。

  上图所示, 窗口在 DStream 上每滑动一次, 落在窗口内的那些 RDD会结合在一起, 然后在上面操作产生新的 RDD, 组成了 window DStream.

在上面图的情况下, 操作会至少应用在 3 个数据单元上, 每次滑动 2 个时间单位. 所以, 窗口操作需要 2 个参数:

  1. 窗口长度 – 窗口的持久时间(执行一次持续多少个时间单位)(图中是 3)
  2. 滑动步长 – 窗口操作被执行的间隔(每多少个时间单位执行一次).(图中是 2 )

  注意: 这两个参数必须是源 DStream 的 interval 的倍数.

  • 一些常见的窗口操作 所有这些操作均采用上述两个参数-windowLength和slideInterval。
  • 1. 测试程序源码
代码语言:javascript复制
package com.buwenbuhuo.spark.streaming.day02.window
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}


/**
 *
 * @author 不温卜火
 * @create 2020-08-12 18:45
 * MyCSDN :  https://buwenbuhuo.blog.csdn.net/
 *
 */
object Window1 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("Window1").setMaster("local[2]")
    val ssc = new StreamingContext(conf, Seconds(3))

    ssc
      .socketTextStream("hadoop002",9999)
      .flatMap(_.split("\W "))
      .map((_,1))
        .reduceByKeyAndWindow(_   _,Seconds(6))
        .print()

    ssc.start()
    ssc.awaitTermination()
  }
}
  • 2. 运行结果

2.3 window的优化操作

  • 1. reduceByKeyAndWindow(reduceFunc: (V, V) => V, windowDuration: Duration)
代码语言:javascript复制
val wordAndOne: DStream[(String, Int)] = words.map((_, 1))
/*
参数1: reduce 计算规则
参数2: 窗口长度
参数3: 窗口滑动步长. 每隔这么长时间计算一次.
 */
val count: DStream[(String, Int)] =
wordAndOne.reduceByKeyAndWindow((x: Int, y: Int) => x   y,Seconds(15), Seconds(10))
  • 2. reduceByKeyAndWindow(reduceFunc: (V, V) => V, invReduceFunc: (V, V) => V, windowDuration: Duration, slideDuration: Duration) 比没有invReduceFunc高效. 会利用旧值来进行计算.

invReduceFunc: (V, V) => V 窗口移动了, 上一个窗口和新的窗口会有重叠部分, 重叠部分的值可以不用重复计算了. 第一个参数就是新的值, 第二个参数是旧的值.

代码语言:javascript复制
ssc.sparkContext.setCheckpointDir("hdfs://hadoop002:9000/checkpoint")
val count: DStream[(String, Int)] =
    wordAndOne.reduceByKeyAndWindow((x: Int, y: Int) => x   y,(x: Int, y: Int) => x - y,Seconds(15), Seconds(10))
  • 3. window(windowLength, slideInterval) 基于对源 DStream 窗化的批次进行计算返回一个新的 Dstream
代码语言:javascript复制
package com.buwenbuhuo.spark.streaming.day02.window

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}


/**
 *
 * @author 不温卜火
 * @create 2020-08-12 22:45
 * MyCSDN :  https://buwenbuhuo.blog.csdn.net/
 *
 */
object Window2 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("Window2").setMaster("local[2]")
    val ssc = new StreamingContext(conf, Seconds(3))
    
    ssc.checkpoint("ck3")
    ssc
      .socketTextStream("hadoop002",9999)
      .window(Seconds(9),Seconds(6))
      .flatMap(_.split("\W "))
      .map((_,1))
        .reduceByKeyAndWindow(_   _,Seconds(6))
        .print()

    ssc.start()
    ssc.awaitTermination()
  }
}
  • 4. countByWindow(windowLength, slideInterval) 返回一个滑动窗口计数流中的元素的个数。
  • 5. countByValueAndWindow(windowLength, slideInterval, [numTasks]) 对(K,V)对的DStream调用,返回(K,Long)对的新DStream,其中每个key的的对象的v是其在滑动窗口中频率。如上,可配置reduce任务数量。

  本次的分享就到这里了

0 人点赞