1 简介
处理无限流的核心:
- Flink 提供了完善的窗口机制,是 Flink 的一大亮点:窗口机制在Flink中的重要性,是Flink区别于其他流处理引擎的一个显著特点
- Flink窗口是一种把无限数据流切割为有限数据块的手段:窗口机制的作用,即将连续不断的数据流分割成一个个有限大小的数据块,以便聚合、统计等操作
1.1 流式数据到批处理
Flink中的窗口机制,如同一道桥梁,将原本连续不断的“流式数据”转化为有限的“批处理”数据块。这种转化为后续的分析计算提供了坚实的基础。
- 流式数据: 特指那些连续不断产生的数据,没有明确的开始和结束。例如,网站的日志数据、传感器数据等。
- 批处理: 指对一批固定大小的数据进行处理,通常是离线计算的范式。
- 窗口: Flink中的窗口机制将无限的流式数据分割成有限大小的时间区间或数据量的“窗口”。这些窗口就像一个个小型的批次,包含了特定时间段内的数据。
窗口机制的作用:
- 有限数据处理: 将无限的流式数据切割成有限的窗口,使得我们可以对每个窗口内的数据进行独立的计算,避免了处理无限数据的复杂性。
- 聚合计算: 窗口内的数据可以进行各种聚合操作,如求和、平均值、计数等,从而得到有意义的结果。
- 时间维度分析: 通过定义不同大小的时间窗口,可以对数据进行按时间段的分析,例如统计每小时的访问量、每天的销售额等。
- 事件驱动计算: 窗口可以根据事件的发生时间来划分,从而实现基于事件的处理逻辑。
形象比喻:
想象一条河流(流式数据),我们无法一次性处理整条河的水。为了更好地研究河水,我们可以用拦河坝将河流分成一个个水池(窗口),然后对每个水池的水进行取样、分析。
Windows将流拆分为有限大小的“桶”,可在其上应用计算。在Flink中,窗口是一种将连续不断的数据流分割成有限大小的时间区间或数据量的机制。通过窗口,我们可以对这些有限的数据块进行聚合、计算等操作,从而实现对数据的分析和处理。
1.2 为啥要用窗口?
- 有限数据处理: 无限的数据流无法直接进行计算,窗口机制将数据流分割成有限的数据块,使得计算成为可能。
- 聚合计算: 窗口可以对窗口内的数据进行聚合计算,例如求和、平均值、计数等。
- 时间窗口: 可以根据时间维度对数据进行分组,例如按分钟、小时、天等进行分组。
- 计数窗口: 可以根据数据的数量进行分组,例如每100条数据为一个窗口。
1.3 窗口Flink程序一般结构
- 第一个片段指被Keys化流
- 第二个片段指非被Keys化流
唯一区别是keyBy(...)呼吁Keys流和window(...)成为windowAll(...)非被Key化的数据流。
Keyed Windows
Non-Keyed Windows
方括号(...)中的命令可选。表明Flink允许你以多种不同方式自定义窗口逻辑,以最适合需求。
1.5 被Keys化与非被Keys化Windows
要指定的第一件事是你的流是否应该键入。必须在定义窗口之前完成此 算子操作。使用the keyBy(...)将你的无限流分成逻辑被Key化的数据流。如果keyBy(...)未调用,则表示你的流不是被Keys化的。
对于被Key化的数据流,可以将传入事件的任何属性用作键(此处有更多详细信息)。拥有被Key化的数据流将允许你的窗口计算由多个任务并行执行,因为每个逻辑被Key化的数据流可以独立于其余任务进行处理。引用相同Keys的所有数据元将被发送到同一个并行任务。
在非被Key化的数据流的情况下,你的原始流将不会被拆分为多个逻辑流,并且所有窗口逻辑将由单个任务执行,即并行度为1。
思考
- 数据如何分配到对应的窗口
- 数据分配到对应窗口如何触发计算
- 在窗口内如何进行操作
- 窗口如何关闭
- 咋在Flink中执行窗口
- 程序员咋从其提供的函数中获益最大化
2 窗口生命周期
使用基于事件时间的窗口策略,每5min创建一个非重叠(或翻滚)的窗口,并允许延迟1min。
2.1 创建
只要应该属于此窗口的第一个数据元到达,就会创建一个窗口,当时间(事件或处理时间)超过其结束时间戳加上用户指定时,窗口将被完全删除allowed lateness。
Flink保证仅删除基于时间的窗口而非其他类型,如全局窗口。Flink将创建一个新窗口,用于间隔12:00和12:05当具有落入此间隔的时间戳的第一个数据元到达时。
- 12:00就是以水印时间为标准
- 事件时间不晚于水印时间12:00,所以进入该窗口
2.2 销毁
当水印通过12:06 时间戳时它将删除它。
3 窗口的组成
每个窗口将具有Trigger和一个函数(ProcessWindowFunction,ReduceFunction, AggregateFunction或FoldFunction)连接到它。
3.1 Trigger
Trigger指定窗口被认为准备好应用该函数的条件,即执行函数何时触发。
3.2 执行函数
包含要应用于窗口内容的计算。
触发策略可能类似于“当窗口中的数据元数量大于4”时,或“当水印通过窗口结束时”。
触发器还可以决定在创建和删除之间的任何时间清除窗口的内容。在这种情况下,清除仅指窗口中的数据元,而不是窗口元数据。这意味着仍然可以将新数据添加到该窗口。
3.3 Evictor
可在触发器触发后以及应用函数之前和/或之后从窗口中删除数据元。
3.4 窗口分配器
指定流是否已键入后,下一步是定义一个窗口分配器。
窗口分配器定义咋将数据元分配给不同类型的窗口,这是通过WindowAssigner 在window(...)(对于被Keys化流)或windowAll()(对于非被Keys化流)调用中指定你的选择来完成的
WindowAssigner
负责将每个传入数据元分配给一个或多个窗口
4 内置窗口分类
- 全局窗口
- 滚动窗口
- 滑动窗口
- 会话窗口
- 时间窗口
- 计数窗口
所有内置窗口(全局窗口除外)都有基于时间的实现。
还可通过扩展WindowAssigner类实现自定义窗口分配器。所有内置窗口分配器(全局窗口除外)都根据时间为窗口分配数据元,这可以是处理时间或事件时间。
基于时间的窗口具有开始时间戳(包括)、结束时间戳(不包括),一起描述窗口大小。
Flink使用TimeWindow基于时间的窗口时使用,该窗口具有查询开始和结束时间戳的方法maxTimestamp()返回给定窗口的最大允许时间戳:
代码语言:java复制@PublicEvolving
public class TimeWindow extends Window {
private final long start;
private final long end;
public TimeWindow(long start, long end) {
this.start = start;
this.end = end;
}
/**
* Gets the starting timestamp of the window. This is the first timestamp that belongs to this
* window.
*
* @return The starting timestamp of this window.
*/
public long getStart() {
return start;
}
/**
* Gets the end timestamp of this window. The end timestamp is exclusive, meaning it is the
* first timestamp that does not belong to this window any more.
*
* @return The exclusive end timestamp of this window.
*/
public long getEnd() {
return end;
}
}
下图显示每个分配者的工作情况。紫色圆圈表示流的数据元,这些数据元由某个键(在这种情况下是用户1,用户2和用户3)划分。x轴显示时间的进度。
4.1 滚动窗口(Tumbling Window)
每个数据元分配给指定的窗口的窗口大小。如指定大小为5min的翻滚窗口,则将评估当前窗口,并且每5min将启动一个新窗口:
- 滚动窗口大小固定:
- 每个滚动窗口的大小一致,如置每个窗口为5min
- 窗口大小一旦确定,在整个窗口的计算过程中不会改变
- 滚动窗口时间不会重叠:
- 连续的滚动窗口之间没有重叠部分
- 每个数据元素只属一个窗口
- 滚动窗口只有时间一个参数:
- 滚动窗口的定义只需要指定一个时间参数,即窗口大小
- 窗口的划分完全基于时间维度,不涉及其他因素
形象比喻
想象一条河流,用固定的桶来舀水。每个桶就是一个滚动窗口。桶的大小固定,并且每次舀水时,桶与桶之间没有重叠。
应用场景
- 实时监控: 统计每5分钟的网站访问量,每个窗口代表5分钟内的数据
- 实时分析: 分析用户在过去1小时内的行为,每个窗口代表1小时内的数据。
- 实时告警: 如果某个窗口内的异常数据超过阈值,则触发告警
优点
- 简单易懂: 窗口定义简单,易理解实现
- 计算高效: 窗口之间无重叠,避免重复计算
- 结果清晰: 每个窗口的结果都是独立的,便分析
缺点
- 灵活性较差: 窗口大小固定,无法根据数据特点进行动态调整
- 可能丢失数据: 如数据到达延迟,可能丢失数据
小结
Flink中最基础、最常用的窗口类型之一。它适用于对时间维度有明确要求,且不需要对窗口大小进行动态调整的场景。通过滚动窗口,可对流式数据进行高效的实时处理和分析。
实例
代码语言:java复制DataStream<T> input = ...;
// tumbling event-time windows
input
.keyBy(<key selector>)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.<windowed transformation>(<window function>);
// tumbling processing-time windows
input
.keyBy(<key selector>)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.<windowed transformation>(<window function>);
// daily tumbling event-time windows offset by -8 hours.
input
.keyBy(<key selector>)
.window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8)))
.<windowed transformation>(<window function>);
代码语言:java复制public class JavaWindowsApp {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> text = env.socketTextStream("localhost", 9999);
text.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
String[] tokens = value.toLowerCase().split(",");
for (String token : tokens) {
if (token.length() > 0) {
out.collect(new Tuple2<>(token, 1));
}
}
}
}).keyBy(0)
.timeWindow(Time.seconds(5))
.sum(1)
.print()
.setParallelism(1);
env.execute("JavaWindowsApp");
}
}
4.2 滑动窗口(Sliding Window)
固定长度的窗口。
与滚动窗口类似,窗口大小由窗口大小参数配置
附加的窗口滑动参数控制滑动窗口的启动频率。因此,如幻灯片小于窗口大小,则滑动窗口可重叠。在这种情况下,数据元被分配给多个窗口。
如将10min的窗口滑动5min。有这玩意,你每隔5min就会得到一个窗口,其中包含过去10min内到达的事件,如下:
使用滑动窗口:
代码语言:java复制DataStream<T> input = ...;
// 滑动 事件时间 窗口
input
.keyBy(<key selector>)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.<windowed transformation>(<window function>);
// 滑动 处理时间 窗口
input
.keyBy(<key selector>)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.<windowed transformation>(<window function>);
// daily tumbling event-time windows offset by -8 hours.
input
.keyBy(<key selector>)
.window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8)))
.<windowed transformation>(<window function>);
4.3 会话窗口(Session Window)
根据数据之间的间隔时间来定义窗口,当数据之间间隔时间超过一定阈值时,就开启一个新窗口。
4.4 全局窗口
在Flink中,全局窗口(Global Window)是一种特殊的窗口类型,它将整个数据流视为一个窗口。也就是说,全局窗口没有明确的边界,它涵盖了所有输入的数据。
countWindowAll(5)
的含义
countWindowAll
: 这个方法用于定义一个全局计数窗口。5
: 表示每5个元素组成一个窗口。
即每当有5个元素进入数据流,就触发一次窗口计算。
特点
- 无边界: 没有明确的开始和结束时间或事件数量限制。
- 所有数据: 包含了数据流中的所有元素。
- 触发计算: 通常需要自定义触发器来指定何时触发窗口计算。
应用场景
- 特定事件触发: 当需要在某个特定的事件发生时触发计算,全局窗口非常适合。
- 聚合所有数据: 如果需要对整个数据流进行一次性聚合计算,全局窗口也是一个不错的选择。
- 自定义窗口逻辑: 全局窗口提供了最大的灵活性,可以自定义触发器和计算逻辑,实现各种复杂的窗口操作。
局限性
- 状态存储: 由于全局窗口包含了所有数据,因此需要更多的状态存储空间。
- 性能影响: 对整个数据流进行计算可能会影响性能,尤其是在数据量非常大的情况下。
- 复杂性: 全局窗口的配置和使用相对复杂,需要仔细考虑触发条件和计算逻辑。
区别
- 滚动窗口、滑动窗口、会话窗口:这些窗口都有明确的边界,要么是基于时间,要么是基于事件数量。
- 全局窗口:没有明确的边界,需要自定义触发器来控制窗口的计算。
示例
代码语言:java复制DataStream<Integer> dataStream = ...;
SingleOutputStreamOperator<Integer> result = dataStream
.countWindowAll(5)
.trigger(Trigger.count(5)) // 自定义触发器,每5个元素触发一次
.apply(new WindowFunction<Integer, Integer, GlobalWindow>() {
@Override
public void apply(GlobalWindow window, Iterable<Integer> values, Collector<Integer> out) throws Exception {
int sum = 0;
for (Integer value : values) {
sum = value;
}
out.collect( sum);
}
});
总结
全局窗口在Flink中是一个强大的工具,它提供了最大的灵活性,可以满足各种复杂的窗口计算需求。但是,由于其特点,在使用时需要谨慎考虑状态存储、性能和复杂性等因素。
何时使用全局窗口?
- 当你希望对整个数据流进行一次性聚合计算时。
- 当你需要根据特定的事件来触发计算时。
- 当其他窗口类型无法满足你的需求时。
需要注意的是:
- 全局窗口的性能可能不如其他窗口类型。
- 全局窗口需要仔细配置触发器,以避免无限循环或资源耗尽。
5 窗口函数
定义窗口分配器后,我们需要指定要在每个窗口上执行的计算。这是窗口函数的职责,窗口函数用于在系统确定窗口准备好进行处理后处理每个(可能是被Keys化的)窗口的数据元的窗函数可以是一个ReduceFunction,AggregateFunction,FoldFunction或ProcessWindowFunction。前两个可以更有效地执行,因为Flink可以在每个窗口到达时递增地聚合它们的数据元.
ProcessWindowFunction获取Iterable窗口中包含的所有数据元以及有关数据元所属窗口的其他元信息。
具有ProcessWindowFunction的窗口转换不能像其他情况一样有效地执行,因为Flink必须在调用函数之前在内部缓冲窗口的所有数据元。这可以通过组合来减轻ProcessWindowFunction与ReduceFunction,AggregateFunction或FoldFunction以获得两个窗口元件的增量聚合并且该附加元数据窗口 ProcessWindowFunction接收。我们将查看每个变体的示例。
7.1 ReduceFunction
指定如何组合输入中的两个数据元以生成相同类型的输出数据元.
Flink使用ReduceFunction来递增地聚合窗口的数据元.
定义和使用
代码语言:java复制DataStream<Tuple2<String, Long>> input = ...;
input
.keyBy(<key selector>)
.window(<window assigner>)
.reduce(new ReduceFunction<Tuple2<String, Long>> {
public Tuple2<String, Long> reduce(Tuple2<String, Long> v1, Tuple2<String, Long> v2) {
return new Tuple2<>(v1.f0, v1.f1 v2.f1);
}
});
原来传递进来的数据是字符串,此处我们就使用数值类型,通过数值类型来演示增量的效果。
这里不是等待窗口所有的数据进行一次性处理,而是数据两两处理
代码语言:java复制public class JavaWindowsReduceApp {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> text = env.socketTextStream("localhost", 9999);
text.flatMap(new FlatMapFunction<String, Tuple2<Integer, Integer>>() {
@Override
public void flatMap(String value, Collector<Tuple2<Integer, Integer>> out) throws Exception {
String[] tokens = value.toLowerCase().split(",");
for (String token : tokens) {
if (token.length() > 0) {
out.collect(new Tuple2<>(1, Integer.parseInt(token)));
}
}
}
}).keyBy(0)
.timeWindow(Time.seconds(5))
.reduce(new ReduceFunction<Tuple2<Integer, Integer>>() {
@Override
public Tuple2<Integer, Integer> reduce(Tuple2<Integer, Integer> value1, Tuple2<Integer, Integer> value2) throws Exception {
System.out.println("value1 = [" value1 "], value2 = [" value2 "]");
return new Tuple2<>(value1.f0, value1.f1 value2.f1);
}
})
.print()
.setParallelism(1);
env.execute("JavaWindowsReduceApp");
}
}
输入:
代码语言:bash复制javaedge@JavaEdgedeMac-mini ~ % nc -lk 9999
a,a,a,b,b,c
1,2,3,4,5
增量输出:
7.2 聚合函数
AggregateFunction是一个通用版本,ReduceFunction它有三种类型:
- 输入类型(IN)
- 累加器类型(ACC)
- 输出类型(OUT)
输入类型是输入流中数据元的类型,且AggregateFunction具有将一个输入数据元添加到累加器的方法。该接口还具有用于创建初始累加器的方法,用于将两个累加器合并到一个累加器中以及用于OUT从累加器提取输出(类型)。我们将在下面的示例中看到它的工作原理。
与之相同ReduceFunction,Flink将在窗口到达时递增地聚合窗口的输入数据元。
一个AggregateFunction可以被定义并这样使用:
代码语言:java复制/**
* The accumulator is used to keep a running sum and a count. The {@code getResult} method
* computes the average.
*/
private static class AverageAggregate implements AggregateFunction<Tuple2<String, Long>, Tuple2<Long, Long>, Double> {
@Override
public Tuple2<Long, Long> createAccumulator() {
return new Tuple2<>(0L, 0L);
}
@Override
public Tuple2<Long, Long> add(Tuple2<String, Long> value, Tuple2<Long, Long> accumulator) {
return new Tuple2<>(accumulator.f0 value.f1, accumulator.f1 1L);
}
@Override
public Double getResult(Tuple2<Long, Long> accumulator) {
return ((double) accumulator.f0) / accumulator.f1;
}
@Override
public Tuple2<Long, Long> merge(Tuple2<Long, Long> a, Tuple2<Long, Long> b) {
return new Tuple2<>(a.f0 b.f0, a.f1 b.f1);
}
}
DataStream<Tuple2<String, Long>> input = ...;
input
.keyBy(<key selector>)
.window(<window assigner>)
.aggregate(new AverageAggregate());
7.3 ProcessWindowFunction
ProcessWindowFunction获取包含窗口的所有数据元的Iterable,以及可访问时间和状态信息的Context对象,这使其能够提供比其他窗口函数更多的灵活性。这是以性能和资源消耗为代价的,因为数据元不能以递增方式聚合,而是需要在内部进行缓冲,直到窗口被认为已准备好进行处理。
代码语言:java复制public abstract class ProcessWindowFunction<IN, OUT, KEY, W extends Window> implements Function {
public abstract void process(
KEY key,
Context context,
Iterable<IN> elements,
Collector<OUT> out) throws Exception;
/**
* The context holding window metadata.
*/
public abstract class Context implements java.io.Serializable {
/**
* Returns the window that is being evaluated.
*/
public abstract W window();
/** Returns the current processing time. */
public abstract long currentProcessingTime();
/** Returns the current event-time watermark. */
public abstract long currentWatermark();
/**
* State accessor for per-key and per-window state.
*
* <p><b>NOTE:</b>If you use per-window state you have to ensure that you clean it up
* by implementing {@link ProcessWindowFunction#clear(Context)}.
*/
public abstract KeyedStateStore windowState();
/**
* State accessor for per-key global state.
*/
public abstract KeyedStateStore globalState();
}
}
该key参数是通过KeySelector为keyBy()调用指定的Keys提取的Keys。在元组索引键或字符串字段引用的情况下,此键类型始终是Tuple,你必须手动将其转换为正确大小的元组以提取键字段。
A ProcessWindowFunction可以像这样定义和使用:
代码语言:java复制DataStream<Tuple2<String, Long>> input = ...;
input
.keyBy(t -> t.f0)
.timeWindow(Time.minutes(5))
.process(new MyProcessWindowFunction());
/* ... */
public class MyProcessWindowFunction
extends ProcessWindowFunction<Tuple2<String, Long>, String, String, TimeWindow> {
@Override
public void process(String key, Context context, Iterable<Tuple2<String, Long>> input, Collector<String> out) {
long count = 0;
for (Tuple2<String, Long> in: input) {
count ;
}
out.collect("Window: " context.window() "count: " count);
}
}
该示例显示了ProcessWindowFunction对窗口中的数据元进行计数的情况。此外,窗口函数将有关窗口的信息添加到输出。
使用ProcessWindowFunction简单的聚合(例如count)非常低效。
代码语言:java复制public class JavaWindowsProcessApp {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> text = env.socketTextStream("localhost", 9999);
text.flatMap(new FlatMapFunction<String, Tuple2<Integer, Integer>>() {
@Override
public void flatMap(String value, Collector<Tuple2<Integer, Integer>> out) throws Exception {
String[] tokens = value.toLowerCase().split(",");
for (String token : tokens) {
if (token.length() > 0) {
out.collect(new Tuple2<>(1, Integer.parseInt(token)));
}
}
}
}).keyBy(0)
.timeWindow(Time.seconds(5))
.process(new ProcessWindowFunction<Tuple2<Integer, Integer>, Object, Tuple, TimeWindow>() {
@Override
public void process(Tuple tuple, Context context, Iterable<Tuple2<Integer, Integer>> elements, Collector<Object> out) throws Exception {
long count = 0;
for (Tuple2<Integer, Integer> in : elements) {
count ;
}
out.collect("Window: " context.window() "count: " count);
}
})
.print()
.setParallelism(1);
env.execute("JavaWindowsReduceApp");
}
}
4 案例
假设我们有一个数据流表示用户点击事件,每个事件包含时间戳和用户ID。我们可以使用一个滚动时间窗口(每5分钟一个窗口)来统计每个窗口内每个用户的点击次数。
代码语言:java复制DataStream<Event> clicks = ...;
DataStream<WindowedStream<Event, Tuple, TimeWindow>> windowedStream = clicks
.keyBy("userId") // 按用户ID分组
.window(TumblingEventTimeWindows.of(Time.minutes(5))); // 定义5分钟的滚动窗口
DataStream<WindowedStream<Event, Tuple, TimeWindow>> resultStream = windowedStream
.reduce(new ReduceFunction<Event>() {
@Override
public Event reduce(Event value1, Event value2) throws Exception {
// 计算点击次数
}
});