Flink Windows窗口简介和使用

2020-06-15 14:52:44 浏览数 (1)

很多人不知道什么是Window?有哪些用途? 下面我们结合一个现实的例子来说明。

我们先提出一个问题:统计经过某红绿灯的汽车数量之和? 假设在一个红绿灯处,我们每隔15秒统计一次通过此红绿灯的汽车数量,如下图:

可以把汽车的经过看成一个流,无穷的流,不断有汽车经过此红绿灯,因此无法统计总共的汽车数量。但是,我们可以换一种思路,每隔15秒,我们都将与上一次的结果进行sum操作(滑动聚合),如下:

这个结果似乎还是无法回答我们的问题,根本原因在于流是无界的,我们不能限制流,但可以在有一个有界的范围内处理无界的流数据。

因此,我们需要换一个问题的提法:每分钟经过某红绿灯的汽车数量之和? 这个问题,就相当于一个定义了一个Window(窗口),window的界限是1分钟,且每分钟内的数据互不干扰,因此也可以称为翻滚(不重合)窗口,如下图:

第一分钟的数量为8,第二分钟是22,第三分钟是27。。。这样,1个小时内会有60个window。

再考虑一种情况,每30秒统计一次过去1分钟的汽车数量之和:

此时,window出现了重合。这样,1个小时内会有120个window。

扩展一下,我们可以在某个地区,收集每一个红绿灯处汽车经过的数量,然后每个红绿灯处都做一次基于1分钟的window统计,即并行处理:

通常来讲,Window就是用来对一个无限的流设置一个有限的集合,在有界的数据集上进行操作的一种机制。window又可以分为基于时间(Time-based)的window以及基于数量(Count-based)的window。

Flink DataStream API提供了Time和Count的window,同时增加了基于Session的window。同时,由于某些特殊的需要,DataStream API也提供了定制化的window操作,供用户自定义window。

1. Flink的Window类型

Flink基本分有3种window类型:CountWindow,TimeWindow和SessionWindow。 其中,CountWindow和TimeWindow还有滑动与滚动区分。

2.窗口函数有哪些

定义完窗口分配器后,需要指定在每个窗口上执行的计算,这就是窗口函数的职责。 在了解有哪些窗口函数之前,有必要了解Window的聚合分类: 全量聚合:简单点说是等属于窗口的数据到齐之后,才开始进行聚合计算;即全量聚合在未触发之前,会保存之前的状态,在最后窗口触发时,才会进行计算。(所以全量聚合的压力会很大。) 常见的窗口函数: apply(WindowFunction) --- 不过1.3之后被弃用 process(processWindowFunction)

增量聚合:窗口每进入一条数据,就进行一次计算。

代码语言:javascript复制
reduce(reduceFunction);
fold;
aggregate(aggregateFunction);
sum(key);min(key);max(key)
sumBy(key);minBy(key);maxBy(key)

我们需要根据业务场景需要,决定使用是全量聚合还是增量聚合,并进一步选择使用哪一种聚合函数。

3.Window何时会被触发

Window何时被触发计算,是由触发器Trigger的onElement方法所决定。

该方法的参数: (1)element:到达的元素 (2)timestamp:元素达到的时间戳 (3)window:元素将被分配的窗口 (4)context:上下文

以时间类型设置为EventTime之后,触发器就是EventTimeTrigger,对应的onElement方法:

方法很简单:如果当前的watermark已经大于或等于窗口的最大时间戳(即窗口的endTime),那么就会触发窗口计算,并输出结果。 TriggerResult.FIRE:窗口计算并输出结果,尽管未清除窗口,但保留了所有元素。 否则的话,就是注册一个以窗口的最大时间戳为时间的定时器。

window.maxTimestamp()

这里的end是指窗口的结束时间,通常是在WindowAssigner中指定,WindowAssigner有:

以TumblingEventTimeWindows为例:

最后呢,给一个完整的例子:

代码语言:javascript复制
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows
import org.apache.flink.streaming.api.windowing.triggers.{CountTrigger, PurgingTrigger}
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow

object Window {
  def main(args: Array[String]) {
// set up the execution environment
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    val source = env.socketTextStream("localhost",9000)

    val values = source.flatMap(value => value.split("\s ")).map(value => (value,1))

    val keyValue = values.keyBy(0)
    // define the count window without purge
    val countWindowWithoutPurge = keyValue.window(GlobalWindows.create()).
      trigger(CountTrigger.of(2))
 
    val countWindowWithPurge = keyValue.window(GlobalWindows.create()).
      trigger(PurgingTrigger.of(CountTrigger.of[GlobalWindow](2)))

    countWindowWithoutPurge.sum(1).print()
    countWindowWithPurge.sum(1).print()
    env.execute()
// execute program
    env.execute("Flink Scala API Skeleton")
  }
}

0 人点赞