快速入门Flink (10) —— DataStream API 开发之【EventTime 与 Window】

2021-01-27 16:40:26 浏览数 (1)

写在前面: 博主是一名大数据的初学者,昵称来源于《爱丽丝梦游仙境》中的Alice和自己的昵称。作为一名互联网小白,写博客一方面是为了记录自己的学习历程,一方面是希望能够帮助到很多和自己一样处于起步阶段的萌新。由于水平有限,博客中难免会有一些错误,有纰漏之处恳请各位大佬不吝赐教!个人小站:http://alices.ibilibili.xyz/ , 博客主页:https://alice.blog.csdn.net/ 尽管当前水平可能不及各位大佬,但我还是希望自己能够做得更好,因为一天的生活就是一生的缩影。我希望在最美的年华,做最好的自己

在上一篇博客中,博主已经为大家介绍了DataStream API 开发之【Time 与 Window】,并着重介绍了常用的 Window API 。本篇博客,我们就趁热打铁,继续接下去讲, DataStream API 开发之【EventTime 与 Window】。

码字不易,先赞后看!!!


2、EventTime 与 Window

2.1 EventTime 的引入

在 Flink 的流式处理中,绝大部分的业务都会使用 eventTime,一般只在 eventTime 无法使用时,才会被迫使用 ProcessingTime 或者 IngestionTime。

如果要使用 EventTime,那么需要引入 EventTime 的时间属性,引入方式如下所示:

代码语言:javascript复制
val env = StreamExecutionEnvironment.getExecutionEnvironment 
// 从调用时刻开始给 env 创建的每一个 stream 追加时间特征
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

2.2 Watermark

2.2.1 基本概念

我们知道,流处理从事件产生,到流经 source,再到 operator,中间是有一个过程和 时间的,虽然大部分情况下,流到 operator 的数据都是按照事件产生的时间顺序来的,但 是也不排除由于网络、背压等原因,导致乱序的产生,所谓乱序,就是指 Flink 接收到的事件的先后顺序不是严格按照事件的 EventTime 顺序排列的。

那么此时出现一个问题,一旦出现乱序,如果只根据 eventTime 决定 window 的运行, 我们不能明确数据是否全部到位,但又不能无限期的等下去,此时必须要有个机制来保证一 个特定的时间后,必须触发 window 去进行计算了,这个特别的机制,就是 Watermark

Watermark 是一种衡量 Event Time 进展的机制,它是数据本身的一个隐藏属性,数据本身携带着对应的 Watermark

Watermark 是用于处理乱序事件的,而正确的处理乱序事件,通常用 Watermark 机制 结合 window 来实现。

数据流中的 Watermark 用于表示 timestamp 小于 Watermark 的数据,都已经到达了, 因此, window 的执行也是由 Watermark 触发的。

Watermark 可以理解成一个延迟触发机制,我们可以设置 Watermark 的延时时长 t, 每次系统会校验已经到达的数据中最大的 maxEventTime,然后认定 eventTime 小于 maxEventTime - t 的所有数据都已经到达,如果有窗口的停止时间等于 maxEventTime – t, 那么这个窗口被触发执行

当 Flink 接收到每一条数据时,都会产生一条 Watermark,这条 Watermark 就等于当 前所有到达数据中的 maxEventTime - 延迟时长,也就是说,Watermark 是由数据携带的, 一旦数据携带的 Watermark 比当前未触发的窗口的停止时间要晚,那么就会触发相应窗口 的执行。由于 Watermark 是由数据携带的,因此,如果运行过程中无法获取新的数据,那 么没有被触发的窗口将永远都不被触发。

上图中,我们设置的允许最大延迟到达时间为 2s,所以时间戳为 7s 的事件对应的 Watermark 是 5s,时间戳为 12s 的事件的 Watermark 是 10s,如果我们的窗口 1 是 1s~5s,窗口 2 是 6s~10s,那么时间戳为 7s 的事件到达时的 Watermarker 恰好触发窗口 1,时间戳为 12s 的事件到达时的 Watermark 恰好触发窗口 2。

2.2.2 Watermark 的引入
代码语言:javascript复制
 val senv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    // 从调用时刻开始给 senv 创建的每一个 stream 追加时间特征
    val stream: DataStream[String] = senv.readTextFile("eventTest.txt").assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[String](Time.milliseconds(200)) {

      override def extractTimestamp(t: String): Long = {
        // EventTime 是日志生成时间,我们从日志中解析 EventTime
        t.split(" ")(0).toLong
      }
    })

2.3 EventTimeWindow API

当使用 EventTimeWindow 时,所有的 Window 在 EventTime 的时间轴上进行划分,也就是说,在 Window 启动后,会根据初始的 EventTime 时间每隔一段时间划分一个窗口, 如果 Window 大小是 3 秒,那么 1 分钟内会把 Window 划分为如下的形式:

代码语言:javascript复制
[00:00:00,00:00:03) 
[00:00:03,00:00:06) 
... 
[00:00:57,00:01:00)

如果 Window 大小是 10 秒,则 Window 会被分为如下的形式:

代码语言:javascript复制
[00:00:00,00:00:10) 
[00:00:10,00:00:20) 
... 
[00:00:50,00:01:00)

注意,窗口是左闭右开的,形式为:[window_start_time,window_end_time)

Window 的设定无关数据本身,而是系统定义好了的,也就是说,Window 会一直按照指定的时间间隔进行划分,不论这个 Window 中有没有数据,EventTime 在这个 Window 期间 的数据会进入这个 Window。

Window 会不断产生,属于这个 Window 范围的数据会被不断加入到 Window 中,所有 未被触发的 Window 都会等待触发,只要 Window 还没触发,属于这个 Window 范围的数据。就会一直被加入到 Window 中,直到 Window 被触发才会停止数据的追加,而当 Window 触发之后才接受到的属于被触发 Window 的数据会被丢弃。

Window 会在以下的条件满足时被触发执行:

  • watermark 时间 >= window_end_time;
  • 在[window_start_time,window_end_time)中有数据存在

可以通过下图来说明 Watermark、EventTime 和 Window 的关系。

2.3.1 滚动窗口
代码语言:javascript复制
/*
 * @Author: Alice菌
 * @Date: 2020/8/13 11:13
 * @Description: 


    当使用 EventTimeWindow 时,所有的 Window 在 EventTime 的时间轴上进行划分,

 */
/*** 步骤:
  * 1.创建流处理环境
  * 2.设置EventTime
  * 3.构建数据源
  * 4.设置水印
  * 5.逻辑处理
  * 6.引入滚动窗口TumblingEventTimeWindows
  * 7.聚合操作
  * 8.输出打印
  * 9.执行程序     */
object TumblingEventTimeWindowsDemo {
  def main(args: Array[String]): Unit = {

    // 1、创建流处理环境
    val senv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    // 2、设置 EventTime
    senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    // 3、构建数据源
    // 数据格式:   1000 hello
    val socketSource: DataStream[String] = senv.socketTextStream("node01",9999)
    // 4、设置水印
    val groupKeyedStream: KeyedStream[(String, Int), Tuple] = socketSource.assignTimestampsAndWatermarks(
      new BoundedOutOfOrdernessTimestampExtractor[String](Time.seconds(2)) {
        override def extractTimestamp(element: String): Long = {

          // EventTime 是日志生成时间,我们从日志中解析 EventTime
          val eventTime: Long = element.split(" ")(0).toLong
          eventTime
        }
      })

      // 5、逻辑处理
      .map(x => x.split(" ")(1))
      .map((_, 1))
      .keyBy(0)

    // 6、引入滚动窗口TumblingEventTimeWindows
    val windowStream: WindowedStream[(String, Int), Tuple, TimeWindow] = groupKeyedStream.window(TumblingEventTimeWindows.of(Time.seconds(2)))

    // 7、聚合操作
    val resultDataStream: DataStream[(String, Int)] = windowStream.reduce((v1,v2) => (v1._1,v1._2 v2._2))

    // 8、输出打印
    resultDataStream.print()

    // 9、执行程序
    senv.execute("TumblingEventTimeWindowsDemo")


    /**
      * 结果是按照 Event Time 的时间窗口计算得出的,而无关系统的时间(包括输入的快慢)
      */

  }
}

结果是按照 Event Time 的时间窗口计算得出的,而无关系统的时间(包括输入的快慢)

2.3.2 滑动窗口
代码语言:javascript复制
/*
 * @Author: Alice菌
 * @Date: 2020/10/24 18:28
 * @Description: 
      滑动窗口(SlidingEventTimeWindows)
*/
/**
  步骤:

  * 1.创建流处理环境
  * 2.设置EventTime
  * 3.构建数据源
  * 4.设置水印
  * 5.逻辑处理
  * 6.引入滑动窗口 SlidingEventTimeWindows
  * 7.聚合操作
  * 8.输出打印
  * 9.执行程序
  */
object SlidingEventTimeWindowsDemo {
  def main(args: Array[String]): Unit = {

    // 1、 创建流处理环境
    val senv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    // 2、 追加时间特征
    senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    // 3、 创建数据源
    val socketSource: DataStream[String] = senv.socketTextStream("node01", 9999)
    // 4、 添加水印
    // 模拟的数据格式: 10000 hello
    val waterMarkDataStream: DataStream[String] = socketSource.assignTimestampsAndWatermarks(
       
      new BoundedOutOfOrdernessTimestampExtractor[String](Time.seconds(0)) {

        override def extractTimestamp(element: String): Long = {

          val eventTime: Long = element.split(" ")(0).toLong
          eventTime
        }
      })

    // 5、数据处理
    val resultKeyedStream: KeyedStream[(String, Int), Tuple] = waterMarkDataStream
      .map(x => x.split(" ")(1))
      .map((_, 1))
      .keyBy(0)


    // 6、引入滑动窗口 SlidingEventTimeWindows
    val slidingWindowedStream: WindowedStream[(String, Int), Tuple, TimeWindow] = resultKeyedStream.window(SlidingEventTimeWindows.of(Time.seconds(5),
      Time.seconds(2)))

    // 7、聚合计算
    val result: DataStream[(String, Int)] = slidingWindowedStream.reduce((v1,v2) => (v1._1,v1._2 v2._2))

    // 8、打印测试输出
    result.print()

    // 9、执行程序
    senv.execute("SlidingEventTimeWindowsDemo")

  }
  }
2.3.3 会话窗口

相邻两次数据的 EventTime 的时间差超过指定的时间间隔就会触发执行。如果加入 Watermark,那么当触发执行时,所有满足时间间隔而还没有触发的 Window 会同时触发执行。

代码语言:javascript复制
/*
 * @Author: Alice菌
 * @Date: 2020/10/24 20:58
 * @Description: 
    会话窗口

    相邻两次数据的 EventTime 的时间差超过指定的时间间隔就会触发执行

 */
object EventTimeSessionWindowsDemo {
  def main(args: Array[String]): Unit = {
    // 1、 创建流处理环境
    val senv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    // 2、 追加时间特征
    senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    // 3、 创建数据源
    val socketSource: DataStream[String] = senv.socketTextStream("node01",9999)
    // 4、 添加水印
    // 模拟的数据格式:    1000 hello
    val waterMarkDataStream: DataStream[String] = socketSource.assignTimestampsAndWatermarks(

      new BoundedOutOfOrdernessTimestampExtractor[String](Time.seconds(0)) {
        // 提取时间
        override def extractTimestamp(element: String): Long = {

          val eventTime: Long = element.split(" ")(0).toLong
          eventTime
        }
      })

    // 5、数据处理
    val groupKeyStream: KeyedStream[(String, Int), Tuple] = waterMarkDataStream
      .map(x => x.split(" ")(1))
      .map((_, 1))
      .keyBy(0)

    // 6、 引入会话窗口 EventTimeSessionWindows
    val sessionWindowStream: WindowedStream[(String, Int), Tuple, TimeWindow] = groupKeyStream
      .window(EventTimeSessionWindows.withGap(Time.seconds(5)))

    // 7、 聚合计算
    val resultDataStream: DataStream[(String, Int)] = sessionWindowStream.reduce((v1,v2) => (v1._1,v1._2   v2._2))

    // 8、 输出打印
    resultDataStream.print()

    // 9、 执行程序
    senv.execute(this.getClass.getSimpleName)
  }
}

彩蛋

为了能鼓励大家多学会总结,菌在这里贴上自己平时做的思维导图,需要的朋友,可以关注博主个人微信公众号【猿人菌】,后台回复“思维导图”即可获取。

小结

很高兴能在文末看见你,朋友,有任何好的想法或者建议都可以在评论区留言,或者直接私信我也ok,在最美的年华,做最好的自己,我是Alice,我们下一期见~~

文章持续更新,可以微信搜一搜「 猿人菌 」第一时间阅读,思维导图,大数据书籍,大数据高频面试题,海量一线大厂面经,300G大数据全套视频等你获取…期待您的关注!

0 人点赞