DataStream API 开发
1、Time 与 Window
1.1 Time
在 Flink 的流式处理中,会涉及到时间的不同概念,如下图所示:
Event Time:是事件创建的时间。它通常由事件中的时间戳描述,例如采集的日志数据中, 每一条日志都会记录自己的生成时间,Flink 通过时间戳分配器访问事件时间戳。
Ingestion Time:是数据进入 Flink 的时间。
Processing Time:是每一个执行基于时间操作的算子的本地系统时间,与机器相关,默认的时间属性就是 Processing Time。
例如,一条日志进入 Flink 的时间为 2019-08-12 10:00:00.123,到达 Window 的系统时间为:
2019-08-12 10:00:01.234,
日志的内容如下:
2019-08-02 18:37:15.624 INFO Fail over to rm2
对于业务来说,要统计 1min 内的故障日志个数,哪个时间是最有意义的?—— eventTime, 因为我们要根据日志的生成时间进行统计。
1.2 Window
1.2.1 Window 概述
Streaming 流式计算是一种被设计用于处理无限数据集的数据处理引擎,而无限数据集是指一种不断增长的本质上无限的数据集,而 window 是一种切割无限数据为有限块进行处理的手段。
Window 是无限数据流处理的核心,Window 将一个无限的 stream 拆分成有限大小的”buckets”桶,我们可以在这些桶上做计算操作。
1.2.2 Window 类型
Window 可以分成两类:
1) CountWindow:按照指定的数据条数生成一个 Window,与时间无关。
2) TimeWindow:按照时间生成 Window。
对于 TimeWindow,可以根据窗口实现原理的不同分成三类:滚动窗口(Tumbling Window)、 滑动窗口(Sliding Window)和会话窗口(Session Window)。
- 滚动窗口(Tumbling Windows)
将数据依据固定的窗口长度对数据进行切片。
特点:时间对齐,窗口长度固定,没有重叠。
滚动窗口分配器将每个元素分配到一个指定窗口大小的窗口中,滚动窗口有一个固定的大小,并且不会出现重叠。例如:如果你指定了一个 5 分钟大小的滚动窗口,窗口的创建如下图所示:
适用场景:适合做 BI 统计等(做每个时间段的聚合计算)
- 滑动窗口(Sliding Windows)
滑动窗口是固定窗口的更广义的一种形式,滑动窗口由固定的窗口长度和滑动间隔组成。
特点:时间对齐,窗口长度固定,有重叠。
滑动窗口分配器将元素分配到固定长度的窗口中,与滚动窗口类似,窗口的大小由窗口大小参数来配置,另一个窗口滑动参数控制滑动窗口开始的频率。因此,滑动窗口如果滑动参数小于窗口大小的话,窗口是可以重叠的,在这种情况下元素会被分配到多个窗口中。 例如,你有 10 分钟的窗口和 5 分钟的滑动,那么每个窗口中 5 分钟的窗口里包含着上个 10 分钟产生的数据。
如下图所示:
适用场景:对最近一个时间段内的统计(求某接口最近 5min 的失败率来决定是否要报警)
- 会话窗口(Session Windows)
由一系列事件组合一个指定时间长度的 timeout 间隙组成,类似于 web 应用的 session, 也就是一段时间没有接收到新数据就会生成新的窗口。
特点:时间无对齐。
session 窗口分配器通过 session 活动来对元素进行分组,session 窗口跟滚动窗口和滑动窗口相比,不会有重叠和固定的开始时间和结束时间的情况,相反,当它在一个固定的时间周期内不再收到元素,即非活动间隔产生,那个这个窗口就会关闭。一个 session 窗口通过一个 session 间隔来配置,这个 session 间隔 定义了非活跃周期的长度,当这个非活跃周期产生,那么当前的 session 将关闭并且后续的元素将被分配到新的 session 窗口中去。
1.3 Window API
下面介绍一些流数据处理中常用的一些Window API。
1.3.1 CountWindow
CountWindow 根据窗口中相同 key 元素的数量来触发执行,执行时只计算元素数量达到窗口大小的 key 对应的结果。
默认的 CountWindow 是一个滚动窗口,只需要指定窗口大小即可,当元素数量达到窗口大小时,就会触发窗口的执行。
步骤:
1.获取执行环境
2.创建 SocketSource
3.对 stream 进行处理并按 key 聚合
4.countWindow 操作
5.执行聚合操作
6.将聚合数据输出
7.执行程序
- 参考代码
import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.streaming.api.scala.{DataStream, KeyedStream, StreamExecutionEnvironment, WindowedStream}
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow
/*
* @Author: Alice菌
* @Date: 2020/7/10 09:22
* @Description:
*/
object StreamCountWindow {
def main(args: Array[String]): Unit = {
// 1、创建执行环境
val senv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
// 2、 构建数据源 , 创建 SocketSource
val socketSource: DataStream[String] = senv.socketTextStream("node01",9999)
// 3、 对 stream 进行处理并按 key 聚合
import org.apache.flink.api.scala._
val keyByStream: KeyedStream[(String, Int), Tuple] = socketSource.flatMap(x=>x.split(" ")).map((_, 1)).keyBy(0)
// 4、 引入 countWindow 操作
// 这里的 5 指的是 5 个相同的 key 的元素计算一次
val streamWindow: WindowedStream[(String, Int), Tuple, GlobalWindow] = keyByStream.countWindow(5)
// 执行聚合操作
val reduceStream: DataStream[(String, Int)] = streamWindow.reduce((v1,v2) => (v1._1,v1._2 v2._2))
// 将聚合数据输出
reduceStream.print(this.getClass.getSimpleName)
// 执行程序
senv.execute("StreamCountWindow")
}
}
- 演示效果
我们打开node01节点上的9999通信端口。
nc -lk 9999
然后制造一些信息。
此时观察控制台,可以发现将key的个数等于5的结果展示了出来。
1.3.2 TimeWindow
TimeWindow 是将指定时间范围内的所有数据组成一个 window,一次对一个 window 里面的所有数据进行计算。
Flink 默认的时间窗口根据 Processing Time 进行窗口的划分,将 Flink 获取到的数据 根据进入 Flink 的时间 划分到不同的窗口中。
步骤:
1.获取执行环境
2.创建你 socket 链接获取数据
3.进行数据转换处理并按 key 聚合
4.引入 timeWindow
5.执行聚合操作
6.输出打印数据
7.执行程序
- 参考代码
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.streaming.api.scala.{DataStream, KeyedStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
/*
* @Author: Alice菌
* @Date: 2020/8/10 23:53
* @Description:
*/
object StreamTimeWindow {
def main(args: Array[String]): Unit = {
//1.获取执行环境
val senv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
//2.创建socket链接获取数据
val socketSource: DataStream[String] = senv.socketTextStream("node01", 9999)
//3.进行数据转换处理并按 key 聚合
val keyByStream: KeyedStream[(String, Int), Tuple] = socketSource.flatMap(x => x.split(" ")).map((_, 1)).keyBy(0)
//4.引入滚动窗口
val timeWindowStream: WindowedStream[(String, Int), Tuple, TimeWindow] = keyByStream.timeWindow(Time.seconds(5))
//5.执行聚合操作
val reduceStream: DataStream[(String, Int)] = timeWindowStream.reduce(
(item1, item2) => (item1._1, item1._2 item2._2)
)
//6.输出打印数据
reduceStream.print()
//7.执行程序
senv.execute("StreamTimeWindow")
}
}
- 演示效果
观察程序的控制台,发现每达到5秒,就会计算一个窗口内的数据。
1.3.3 Window Reduce
这意味着 WindowedStream → DataStream:给 window 赋一个 reduce 功能的函数,并返回一个聚合的结果。
- 参考代码
import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.streaming.api.scala.{DataStream, KeyedStream, StreamExecutionEnvironment, WindowedStream}
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
/*
* @Author: Alice菌
* @Date: 2020/8/11 09:38
* @Description:
*/
object StreamReduceWindow {
def main(args: Array[String]): Unit = {
// 1、 获取执行环境
val senv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
// 创建 SocketSource
val stream: DataStream[String] = senv.socketTextStream("node01",9999)
// 对 stream 进行处理并按 key 聚合
val streamKeyBy: KeyedStream[(String, Int), Tuple] = stream.flatMap(x => x.split("0")).map(item => (item,1)).keyBy(0)
// 引入时间窗口
val streamWindow: WindowedStream[(String, Int), Tuple, TimeWindow] = streamKeyBy.timeWindow(Time.seconds(5))
// 执行聚合操作
val streamReduce: DataStream[(String, Int)] = streamWindow.reduce(
(item1, item2) => (item1._1, item1._2 item2._2)
)
// 将聚合数据写入文件
streamReduce.print()
// 执行程序
senv.execute("StreamReduceWindow")
}
}
因为效果和上边介绍的TimeWindow
是一样的,所以这里就不做演示了。
1.3.4 Window Apply
apply 方法可以进行一些自定义处理,通过匿名内部类的方法来实现。当有一些复杂计算时使用。
用法
- 实现一个 WindowFunction 类
- 指定该类的泛型为 [输入数据类型, 输出数据类型, keyBy 中使用分组字段的类型, 窗 口类型]
示例
使用 apply 方法来实现单词统计
步骤
1) 获取流处理运行环境
2) 构建 socket 流数据源,并指定 IP 地址和端口号
3) 对接收到的数据转换成单词元组
4) 使用 keyBy 进行分流(分组)
5) 使用 timeWinodw 指定窗口的长度(每 3 秒计算一次)
6) 实现一个 WindowFunction 匿名内部类
■ apply 方法中实现聚合计算
■ 使用 Collector.collect 收集数据
7) 打印输出
8) 启动执行
9) 在 Linux 中,使用 nc -lk 端口号 监听端口,并发送单词
参考代码
代码语言:javascript复制import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.RichWindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
/*
* @Author: Alice菌
* @Date: 2020/8/11 09:58
* @Description:
使用 apply 实现单词统计
apply 方法可以进行一些自定义处理,通过匿名内部类的方法来实现。
当有一些复杂计算时使用。
*/
object StreamApplyWindow {
def main(args: Array[String]): Unit = {
// 1、获取流处理运行环境
val senv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
// 2、构建 socket 流数据源,并指定 IP 地址和端口
val textDataStream: DataStream[String] = senv.socketTextStream("node01",9999).flatMap(_.split(" "))
// 3、对接收到的数据转换成单词元组
val wordDataStream: DataStream[(String, Int)] = textDataStream.map((_,1))
// 4、使用 keyBy 进行分流(分组)
val groupedDataStream: KeyedStream[(String, Int), String] = wordDataStream.keyBy(_._1)
// 5、使用 timeWindow 指定窗口的长度(每3秒计算一次)
val windowDataStream: WindowedStream[(String, Int), String, TimeWindow] = groupedDataStream.timeWindow(Time.seconds(3))
// 6、实现一个 WindowFunction 匿名内部类
/*
@tparam IN The type of the input value. 输入值的类型
* @tparam OUT The type of the output value. 输出值的类型
* @tparam KEY The type of the key. key值的类型
* @tparam W The type of Window that this window function can be applied on. 可以应用此窗口功能的窗口类型
*/
val reduceDataStream: DataStream[(String, Int)] = windowDataStream.apply(new RichWindowFunction[(String, Int), (String, Int), String, TimeWindow] {
// 自定义操作,在apply 方法中实现数据的聚合
override def apply(key: String, window: TimeWindow, input: Iterable[(String, Int)], out: Collector[(String, Int)]): Unit = {
val tuple: (String, Int) = input.reduce((t1, t2) => {
(t1._1, t1._2 t2._2)
})
// 将要返回的数据收集起来,发送回去
out.collect(tuple)
}
})
// 打印结果
reduceDataStream.print()
// 执行程序
senv.execute("StreamApplyWindow")
}
}
同上,效果和上边介绍的TimeWindow
是一样的,所以这里就不做演示了。
1.3.5 Window Fold
WindowedStream → DataStream:给窗口赋一个 fold 功能的函数,并返回一个 fold 后的结果。
参考代码
代码语言:javascript复制import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
/*
* @Author: Alice菌
* @Date: 2020/8/11 10:31
* @Description:
*/
object StreamFoldWindow {
def main(args: Array[String]): Unit = {
// 1、获取执行环境
val senv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
// 创建 SocketSource
val stream: DataStream[String] = senv.socketTextStream("node01",9999)
// 对 stream 进行处理并按 key 聚合
val streamKeyBy: KeyedStream[(String, Int), Tuple] = stream.flatMap(x => x.split(" ")).map((_,1)).keyBy(0)
// 引入滚动窗口
val streamWindow: WindowedStream[(String, Int), Tuple, TimeWindow] = streamKeyBy.timeWindow(Time.seconds(3))
// 执行 fold 操作
val streamFold: DataStream[Int] = streamWindow.fold(100) {
(begin, item) => begin item._2
}
// 将聚合数据写入文件
streamFold.print()
// 执行程序
senv.execute("StreamFoldWindow")
}
}
演示效果
客户端
程序控制台
1.3.6 Aggregation on Window
WindowedStream → DataStream:对一个 window 内的所有元素做聚合操作。min 和 minBy 的区别是 min 返回的是最小值,而 minBy 返回的是包含最小值字段的元素(同样的原理适 用于 max 和 maxBy)。
参考代码
代码语言:javascript复制import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
/*
* @Author: Alice菌
* @Date: 2020/8/11 16:21
* @Description:
*/
object StreamAggregationWindow {
def main(args: Array[String]): Unit = {
// 获取执行环境
val senv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
// 创建 SocketSource
val socketStream: DataStream[String] = senv.socketTextStream("node01",9999)
// 对 stream 进行处理并按 key 聚合
val keyByStream: KeyedStream[(String, String), Tuple] = socketStream.map(item => (item.split(" ")(0),item.split(" ")(1))).keyBy(0)
// 引入滚动窗口
val streamWindow: WindowedStream[(String, String), Tuple, TimeWindow] = keyByStream.timeWindow(Time.seconds(5))
// 执行聚合操作
val streamMax: DataStream[(String, String)] = streamWindow.max(1)
// 将聚合数据输出
streamMax.print()
// 执行程序
senv.execute("StreamAggregationWindow")
}
}
演示效果
客户端
程序控制台
小结
本篇博客主要为大家介绍了Flink流处理DataStreamAPI 开发中,关于 【Time与Window】方面的知识内容,下一篇博客将为大家介绍同系列 【EventTime 与 Window】,敬请期待?
如果以上过程中出现了任何的纰漏错误,烦请大佬们指正?
受益的朋友或对大数据技术感兴趣的伙伴记得点赞关注支持一波?
希望我们都能在学习的道路上越走越远?