窗口是处理无限流的核心。窗口拆分将流拆为有限数量数据的bucket,这样就可以应用计算。
窗口的生命周期
当第一个应该属于这个窗口的元素到达时,就会创建一个窗口,当时间(事件或处理时间)超过它的结束时间戳加上用户指定的允许延迟lateness
时,窗口将被完全删除。
每个窗口有一个触发器Trigger
和Function
,(ProcessWindowFunction
, ReduceFunction
, or AggregateFunction
) ,该函数将包含应用于窗口内容的计算,而Trigger指定了窗口被认为可以应用该函数的条件。触发策略可能类似于“当窗口中的元素数量大于4时”,或者“当水印通过窗口末端时”。触发器还可以决定在创建和删除窗口之间的任何时间清除窗口的内容。在本例中,清除仅指窗口中的元素,而不是窗口元数据。这意味着新的数据仍然可以添加到该窗口。您可以指定一个Evictor
(参见驱逐器),它将能够在触发器触发后以及在函数应用之前和/或之后从窗口中删除元素。
Keyed 和 非 Keyed 窗口
使用keyBy(…)将把你的无限流分割成逻辑键控流。如果keyBy(…)没有被调用,你的流就不是键控的。
Keyed流中各个 KeyedStream 允许并发的执行窗口计算,各自独立,相同的key的元素会发送到同一个的并发任务。
非Keyed流,窗口逻辑是在单个任务中执行。
窗口指定者
stream 知道是否keyed后,接下来就需要定义窗口指定者(WindowAssigner
)。keyBy的流使用window
方法,非keyBy的使用 windowAll
方法。WindowAssigner
为每个元素指定一个或者多窗口。Flink预定义的窗口指定者用于大多数的场景,名称是 tumbling windows, sliding windows, session windows 和 global windows。你也可以自定义窗口指定者,实现一个 WindowAssigner
的类。所有内置的窗口指定者(除了global windows)指定元素基于时间,可以选择 处理时间
、事件事件
。请查看我们关于 event time 的部分,了解处理时间和事件时间之间的区别以及时间戳和水印是如何生成的。
基于时间的窗口,有开始时间(包含),和结束时间(不包含)决定了窗口的大小。在代码中,Flink在处理基于时间的窗口时使用TimeWindow,这些窗口具有查询开始和结束时间戳的方法,以及一个额外的方法maxTimestamp(),该方法返回给定窗口所允许的最大时间戳。
滚动窗口
滚动窗口赋值器将每个元素赋给指定窗口大小的窗口,滚动窗口大小是固定的,并且没有覆盖。
代码语言:javascript复制DataStream<T> input = ...;
// tumbling event-time windows
input
.keyBy(<key selector>)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.<windowed transformation>(<window function>);
// tumbling processing-time windows
input
.keyBy(<key selector>)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.<windowed transformation>(<window function>);
// daily tumbling event-time windows offset by -8 hours.
input
.keyBy(<key selector>)
.window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8)))
.<windowed transformation>(<window function>);
滑动窗口
滑动窗口赋值器将元素分配给固定长度的窗口。与滚动窗口分配器类似,窗口的大小由窗口大小参数配置。另一个窗口滑动参数控制滑动窗口的启动频率。因此,如果滑动窗口小于窗口大小,滑动窗口可以重叠。在这种情况下,元素被分配给多个窗口。
例如,你可以有一个10分钟大小的窗口,可以滑动5分钟。这样,每隔5分钟就会有一个窗口,其中包含过去10分钟内到达的事件。
代码语言:javascript复制DataStream<T> input = ...;
// sliding event-time windows
input
.keyBy(<key selector>)
.window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.<windowed transformation>(<window function>);
// sliding processing-time windows
input
.keyBy(<key selector>)
.window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.<windowed transformation>(<window function>);
// sliding processing-time windows offset by -8 hours
input
.keyBy(<key selector>)
.window(SlidingProcessingTimeWindows.of(Time.hours(12), Time.hours(1), Time.hours(-8)))
.<windowed transformation>(<window function>);
会话窗口
会话窗口分配器按活动的会话分组元素。会话窗口不重叠,也没有固定的开始和结束时间,这与滚动窗口和滑动窗口不同。相反,当会话窗口在一段时间内没有接收到元素时,即当出现不活动间隙时,会话窗口将关闭。会话窗口分配器可以配置一个静态会话间隙,也可以配置一个会话间隙提取器函数,该函数定义不活动的时间有多长。当此期限到期时,当前会话将关闭,随后的元素将被分配给一个新的会话窗口。
由于会话窗口没有固定的开始和结束,因此它们的计算方法与滚动和滑动窗口不同。在内部,会话窗口操作符为每个到达的记录创建一个新窗口,如果窗口之间的距离小于定义的间隔,则将它们合并在一起。为了是可合并的,会话窗口操作符需要一个合并触发器和一个合并窗口函数,如ReduceFunction, AggregateFunction,或ProcessWindowFunction
代码语言:javascript复制DataStream<T> input = ...;
// event-time session windows with static gap
input
.keyBy(<key selector>)
.window(EventTimeSessionWindows.withGap(Time.minutes(10)))
.<windowed transformation>(<window function>);
// event-time session windows with dynamic gap
input
.keyBy(<key selector>)
.window(EventTimeSessionWindows.withDynamicGap((element) -> {
// determine and return session gap
}))
.<windowed transformation>(<window function>);
// processing-time session windows with static gap
input
.keyBy(<key selector>)
.window(ProcessingTimeSessionWindows.withGap(Time.minutes(10)))
.<windowed transformation>(<window function>);
// processing-time session windows with dynamic gap
input
.keyBy(<key selector>)
.window(ProcessingTimeSessionWindows.withDynamicGap((element) -> {
// determine and return session gap
}))
.<windowed transformation>(<window function>);
全局窗口
全局窗口赋值器将所有元素分配给同一个全局窗口。只有当您还指定了自定义触发器时,此窗口模式才有用。否则,将不执行计算,因为全局窗口没有一个可以处理聚合元素的自然端点。
代码语言:javascript复制DataStream<T> input = ...;
input
.keyBy(<key selector>)
.window(GlobalWindows.create())
.<windowed transformation>(<window function>);
窗口函数
在定义窗口赋值器之后,我们需要指定要在每个窗口上执行的计算。这是window函数的职责,它用于在系统确定窗口已经准备好进行处理时处理每个(可能是Keyed)窗口的元素(参阅 triggers 了解Flink如何确定窗口何时准备好)。
窗口函数可以是ReduceFunction、AggregateFunction或ProcessWindowFunction中的一个。前两个可以更有效地执行(参见State Size部分),因为Flink可以在每个窗口的元素到达时增量聚合它们。ProcessWindowFunction获取包含在窗口中的所有元素的可迭代对象,以及关于这些元素所属窗口的附加元信息。
使用ProcessWindowFunction的窗口转换不能像其他情况那样有效地执行,因为Flink必须在调用函数之前在内部缓冲窗口的所有元素。这可以通过将ProcessWindowFunction与ReduceFunction或AggregateFunction相结合来减轻,以获得窗口元素的增量聚合和ProcessWindowFunction接收的额外窗口元数据。我们将看这些变体的例子。
结合函数(ReduceFunction)
ReduceFunction指定如何组合输入中的两个元素来生成相同类型的输出元素。
代码语言:javascript复制DataStream<Tuple2<String, Long>> input = ...;
input
.keyBy(<key selector>)
.window(<window assigner>)
.reduce(new ReduceFunction<Tuple2<String, Long>>() {
public Tuple2<String, Long> reduce(Tuple2<String, Long> v1, Tuple2<String, Long> v2) {
return new Tuple2<>(v1.f0, v1.f1 v2.f1);
}
});
聚合函数(AggregateFunction)
AggregateFunction是ReduceFunction的一般化版本,它有三种类型:输入类型(IN)、累加类型(ACC)和输出类型(OUT)。输入类型是输入流中的元素类型,AggregateFunction有一个方法可以将一个输入元素添加到累加器中。该接口还具有创建初始累加器、将两个累加器合并为一个累加器以及从累加器提取输出(类型为OUT)的方法。
代码语言:javascript复制/**
* The accumulator is used to keep a running sum and a count. The {@code getResult} method
* computes the average.
*/
private static class AverageAggregate
implements AggregateFunction<Tuple2<String, Long>, Tuple2<Long, Long>, Double> {
@Override
public Tuple2<Long, Long> createAccumulator() {
return new Tuple2<>(0L, 0L);
}
@Override
public Tuple2<Long, Long> add(Tuple2<String, Long> value, Tuple2<Long, Long> accumulator) {
return new Tuple2<>(accumulator.f0 value.f1, accumulator.f1 1L);
}
@Override
public Double getResult(Tuple2<Long, Long> accumulator) {
return ((double) accumulator.f0) / accumulator.f1;
}
@Override
public Tuple2<Long, Long> merge(Tuple2<Long, Long> a, Tuple2<Long, Long> b) {
return new Tuple2<>(a.f0 b.f0, a.f1 b.f1);
}
}
DataStream<Tuple2<String, Long>> input = ...;
input
.keyBy(<key selector>)
.window(<window assigner>)
.aggregate(new AverageAggregate());
处理窗口(ProcessWindowFunction)
ProcessWindowFunction获得一个包含窗口所有元素的Iterable,以及一个可以访问时间和状态信息的Context对象,这使得它比其他窗口函数提供了更多的灵活性。这是以性能和资源消耗为代价的,因为不能增量聚合元素,而是需要在内部缓冲,直到认为窗口已经准备好进行处理。
代码语言:javascript复制public abstract class ProcessWindowFunction<IN, OUT, KEY, W extends Window> implements Function {
/**
* Evaluates the window and outputs none or several elements.
*
* @param key The key for which this window is evaluated.
* @param context The context in which the window is being evaluated.
* @param elements The elements in the window being evaluated.
* @param out A collector for emitting elements.
*
* @throws Exception The function may throw exceptions to fail the program and trigger recovery.
*/
public abstract void process(
KEY key,
Context context,
Iterable<IN> elements,
Collector<OUT> out) throws Exception;
/**
* The context holding window metadata.
*/
public abstract class Context implements java.io.Serializable {
/**
* Returns the window that is being evaluated.
*/
public abstract W window();
/** Returns the current processing time. */
public abstract long currentProcessingTime();
/** Returns the current event-time watermark. */
public abstract long currentWatermark();
/**
* State accessor for per-key and per-window state.
*
* <p><b>NOTE:</b>If you use per-window state you have to ensure that you clean it up
* by implementing {@link ProcessWindowFunction#clear(Context)}.
*/
public abstract KeyedStateStore windowState();
/**
* State accessor for per-key global state.
*/
public abstract KeyedStateStore globalState();
}
}
key参数是通过为keyBy()调用指定的KeySelector提取的键。对于元组索引键或字符串字段引用,此键类型总是Tuple,您必须手动将其转换为大小正确的元组,以提取键字段。
代码语言:javascript复制DataStream<Tuple2<String, Long>> input = ...;
input
.keyBy(t -> t.f0)
.window(TumblingEventTimeWindows.of(Time.minutes(5)))
.process(new MyProcessWindowFunction());
/* ... */
public class MyProcessWindowFunction
extends ProcessWindowFunction<Tuple2<String, Long>, String, String, TimeWindow> {
@Override
public void process(String key, Context context, Iterable<Tuple2<String, Long>> input, Collector<String> out) {
long count = 0;
for (Tuple2<String, Long> in: input) {
count ;
}
out.collect("Window: " context.window() "count: " count);
}
}
具有增量聚合的 ProcessWindowFunction
ProcessWindowFunction可以与ReduceFunction或AggregateFunction组合,以在元素到达窗口时增量聚合元素。当窗口关闭时,ProcessWindowFunction将提供聚合的结果。这允许它在访问ProcessWindowFunction的附加窗口元信息的同时递增地计算窗口。
Incremental Window Aggregation with ReduceFunction
下面的示例展示了如何将递增的ReduceFunction与ProcessWindowFunction组合起来,以返回窗口中最小的事件以及窗口的开始时间。
代码语言:javascript复制DataStream<SensorReading> input = ...;
input
.keyBy(<key selector>)
.window(<window assigner>)
.reduce(new MyReduceFunction(), new MyProcessWindowFunction());
// Function definitions
private static class MyReduceFunction implements ReduceFunction<SensorReading> {
public SensorReading reduce(SensorReading r1, SensorReading r2) {
return r1.value() > r2.value() ? r2 : r1;
}
}
private static class MyProcessWindowFunction
extends ProcessWindowFunction<SensorReading, Tuple2<Long, SensorReading>, String, TimeWindow> {
public void process(String key,
Context context,
Iterable<SensorReading> minReadings,
Collector<Tuple2<Long, SensorReading>> out) {
SensorReading min = minReadings.iterator().next();
out.collect(new Tuple2<Long, SensorReading>(context.window().getStart(), min));
}
}
Incremental Window Aggregation with AggregateFunction
下面的示例展示了如何将增量AggregateFunction与ProcessWindowFunction组合起来计算平均值,并同时发出键和窗口。
代码语言:javascript复制DataStream<Tuple2<String, Long>> input = ...;
input
.keyBy(<key selector>)
.window(<window assigner>)
.aggregate(new AverageAggregate(), new MyProcessWindowFunction());
// Function definitions
/**
* The accumulator is used to keep a running sum and a count. The {@code getResult} method
* computes the average.
*/
private static class AverageAggregate
implements AggregateFunction<Tuple2<String, Long>, Tuple2<Long, Long>, Double> {
@Override
public Tuple2<Long, Long> createAccumulator() {
return new Tuple2<>(0L, 0L);
}
@Override
public Tuple2<Long, Long> add(Tuple2<String, Long> value, Tuple2<Long, Long> accumulator) {
return new Tuple2<>(accumulator.f0 value.f1, accumulator.f1 1L);
}
@Override
public Double getResult(Tuple2<Long, Long> accumulator) {
return ((double) accumulator.f0) / accumulator.f1;
}
@Override
public Tuple2<Long, Long> merge(Tuple2<Long, Long> a, Tuple2<Long, Long> b) {
return new Tuple2<>(a.f0 b.f0, a.f1 b.f1);
}
}
private static class MyProcessWindowFunction
extends ProcessWindowFunction<Double, Tuple2<String, Double>, String, TimeWindow> {
public void process(String key,
Context context,
Iterable<Double> averages,
Collector<Tuple2<String, Double>> out) {
Double average = averages.iterator().next();
out.collect(new Tuple2<>(key, average));
}
}
在ProcessWindowFunction中使用每个窗口的状态
除了访问键态(任何富函数都可以),ProcessWindowFunction还可以使用作用域为函数当前处理的窗口的键态。在这种上下文中,理解每个窗口状态所指的窗口是什么非常重要。有不同的“窗口”涉及:
- 当指定window操作时定义的窗口:这可能是1小时的滚动窗口或2小时的滑动窗口,滑动时间为1小时。
- 为给定键定义的窗口的实际实例:对于用户id xyz,这可能是12:00到13:00的时间窗口。这是基于窗口定义的,根据作业当前正在处理的键的数量和事件所处的时间槽,将会有许多窗口。
process()调用接收的Context对象上有两个方法,它们允许访问两种类型的状态:
- globalState(),它允许访问不局限于窗口的键态
- windowState(),它允许访问也限定在窗口范围内的键态
当使用窗口状态时,在窗口被清除时清理该状态也很重要。这应该发生在clear()方法中。
WindowFunction (Legacy)
在一些可以使用ProcessWindowFunction的地方,你也可以使用WindowFunction。这是ProcessWindowFunction的旧版本,提供较少的上下文信息,并且没有一些高级特性,比如每个窗口的键态。这个接口在某些时候将被弃用。
有用的状态大小注意事项
窗口可以在很长一段时间内(如天、周或月)定义,因此可以累积非常大的状态。在估算窗口计算的存储需求时,有几个规则需要记住:
- 每个窗口创建一个它所属的每个元素的副本。有鉴于此,跌跌撞撞的窗口保留每个元素的一个副本(一个元素只属于一个窗口,除非它后期被删除)。相比之下,滑动窗口创建每个元素的几个,正如在 Window Assigners 部分中解释的那样。因此,一天大小的滑动窗口和一秒钟的滑动窗口可能不是一个好主意。
- ReduceFunction 和 AggregateFunction 可以显著减少存储需求,因为它们急切地聚合元素并且每个窗口只存储一个值。相比之下,仅仅使用 ProcessWindowFunction 就需要累积所有元素。
- 使用驱逐器可以防止任何预聚合,因为在应用计算之前,窗口的所有元素都必须通过驱逐器传递(请参阅驱逐器)。