很多人不知道什么是Window?有哪些用途? 下面我们结合一个现实的例子来说明。
我们先提出一个问题:统计经过某红绿灯的汽车数量之和? 假设在一个红绿灯处,我们每隔15秒统计一次通过此红绿灯的汽车数量,如下图:
可以把汽车的经过看成一个流,无穷的流,不断有汽车经过此红绿灯,因此无法统计总共的汽车数量。但是,我们可以换一种思路,每隔15秒,我们都将与上一次的结果进行sum操作(滑动聚合),如下:
这个结果似乎还是无法回答我们的问题,根本原因在于流是无界的,我们不能限制流,但可以在有一个有界的范围内处理无界的流数据。
因此,我们需要换一个问题的提法:每分钟经过某红绿灯的汽车数量之和? 这个问题,就相当于一个定义了一个Window(窗口),window的界限是1分钟,且每分钟内的数据互不干扰,因此也可以称为翻滚(不重合)窗口,如下图:
第一分钟的数量为8,第二分钟是22,第三分钟是27。。。这样,1个小时内会有60个window。
再考虑一种情况,每30秒统计一次过去1分钟的汽车数量之和:
此时,window出现了重合。这样,1个小时内会有120个window。
扩展一下,我们可以在某个地区,收集每一个红绿灯处汽车经过的数量,然后每个红绿灯处都做一次基于1分钟的window统计,即并行处理:
通常来讲,Window就是用来对一个无限的流设置一个有限的集合,在有界的数据集上进行操作的一种机制。window又可以分为基于时间(Time-based)的window以及基于数量(Count-based)的window。
Flink DataStream API提供了Time和Count的window,同时增加了基于Session的window。同时,由于某些特殊的需要,DataStream API也提供了定制化的window操作,供用户自定义window。
1. Flink的Window类型
Flink基本分有3种window类型:CountWindow,TimeWindow和SessionWindow。 其中,CountWindow和TimeWindow还有滑动与滚动区分。
2.窗口函数有哪些
定义完窗口分配器后,需要指定在每个窗口上执行的计算,这就是窗口函数的职责。 在了解有哪些窗口函数之前,有必要了解Window的聚合分类: 全量聚合:简单点说是等属于窗口的数据到齐之后,才开始进行聚合计算;即全量聚合在未触发之前,会保存之前的状态,在最后窗口触发时,才会进行计算。(所以全量聚合的压力会很大。) 常见的窗口函数: apply(WindowFunction) --- 不过1.3之后被弃用 process(processWindowFunction)
增量聚合:窗口每进入一条数据,就进行一次计算。
代码语言:javascript复制reduce(reduceFunction);
fold;
aggregate(aggregateFunction);
sum(key);min(key);max(key)
sumBy(key);minBy(key);maxBy(key)
我们需要根据业务场景需要,决定使用是全量聚合还是增量聚合,并进一步选择使用哪一种聚合函数。
3.Window何时会被触发
Window何时被触发计算,是由触发器Trigger的onElement方法所决定。
该方法的参数: (1)element:到达的元素 (2)timestamp:元素达到的时间戳 (3)window:元素将被分配的窗口 (4)context:上下文
以时间类型设置为EventTime之后,触发器就是EventTimeTrigger,对应的onElement方法:
方法很简单:如果当前的watermark已经大于或等于窗口的最大时间戳(即窗口的endTime),那么就会触发窗口计算,并输出结果。 TriggerResult.FIRE:窗口计算并输出结果,尽管未清除窗口,但保留了所有元素。 否则的话,就是注册一个以窗口的最大时间戳为时间的定时器。
window.maxTimestamp()
这里的end是指窗口的结束时间,通常是在WindowAssigner中指定,WindowAssigner有:
以TumblingEventTimeWindows为例:
最后呢,给一个完整的例子:
代码语言:javascript复制import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows
import org.apache.flink.streaming.api.windowing.triggers.{CountTrigger, PurgingTrigger}
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow
object Window {
def main(args: Array[String]) {
// set up the execution environment
val env = StreamExecutionEnvironment.getExecutionEnvironment
val source = env.socketTextStream("localhost",9000)
val values = source.flatMap(value => value.split("\s ")).map(value => (value,1))
val keyValue = values.keyBy(0)
// define the count window without purge
val countWindowWithoutPurge = keyValue.window(GlobalWindows.create()).
trigger(CountTrigger.of(2))
val countWindowWithPurge = keyValue.window(GlobalWindows.create()).
trigger(PurgingTrigger.of(CountTrigger.of[GlobalWindow](2)))
countWindowWithoutPurge.sum(1).print()
countWindowWithPurge.sum(1).print()
env.execute()
// execute program
env.execute("Flink Scala API Skeleton")
}
}