这一章从实际的需求TopN来学习Flink中的窗口知识。
在上一章代码中使用了timeWindow
,使得我们可以操作Flink流中的一个时间段内的数据,这就引出了Flink中的"窗口"概念:在大多数场景下,数据流都是"无限的",因引我们无法等待数据流终止后才进行一些统计计算,而通常的需求是对一段时间或是一定范围内的数据进行分析。
Flink提供了两种窗口:Time Window和Count Window,而本章涉及到Time Window的部分概念和用法。
代码语言:javascript复制package all.in.one.c03
import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.extensions._
import org.apache.flink.streaming.api.scala.function.ProcessAllWindowFunction
import org.apache.flink.streaming.api.windowing.assigners.{SlidingProcessingTimeWindows, TumblingProcessingTimeWindows}
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
import scala.util.Random
object Chapter03 extends App {
// 使用createLocalEnvironmentWithWebUI可以在本地查看WebUI,在集群提交任务无需此方法
val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI()
// Flink的输入为Source,这里我们构建一个定义Source:C03Source
val sourceDataStream = env.addSource(new C03Source())
// 接下来以品类做为key,计算每个品类的总价格
// 同样keyingBy来自org.apache.flink.streaming.api.scala.extensions._包,这里使用keyBy也可以
// keyBy操作后会返回一个KeyedStream,保存了key信息
sourceDataStream
.keyingBy(_._1)
// 与Chapter 02不同,这里我们调用window来设置窗口
// 以下代码说明参见README
.window(SlidingProcessingTimeWindows.of(Time.seconds(60L), Time.seconds(10L)))
// 计算交易额的总和
.sum(1)
.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(10L)))
.process(new ProcessAllWindowFunction[(String, Long), String, TimeWindow] {
override def process(context: Context, elements: Iterable[(String, Long)], out: Collector[String]): Unit = {
val top3 = elements.toSeq
.sortBy(-_._2)
.take(3)
.zipWithIndex
.map { case ((item, price), idx) => s" ${idx 1}. $item: $price" }
.mkString("n")
out.collect(("-" * 16) "n" top3)
}
})
.print()
env.execute("Chapter 03")
/**
* 每100ms产生一条"交易"数据,最终输出品类 价格(随机产生)
*/
class C03Source extends SourceFunction[(String, Long)] {
private val items = Array(
// 男装
"卫衣",
"T恤",
"牛仔裤",
"西服",
"风衣",
// 女装
"连衣裙",
"卫衣",
"衬衫",
"针织衫",
"休闲裤",
// 手机数码
"手机",
"手机配件",
"摄影摄像",
"影音娱乐",
"数码配件",
"智能设备",
"电子教育",
// 电脑办公
"电脑整机",
"电脑组件",
"外设",
"网络产品",
"办公设备",
"文具耗材",
// 家用电器
"电视",
"空调",
"洗衣机",
"冰箱",
"厨卫",
"生活电器",
// 户外运动
"运动鞋包",
"运行服饰",
"户外鞋服",
"户外装备",
"骑行",
"健身",
// 家具家装
"厨房卫浴",
"灯饰照明",
"五金工具",
"客厅家具",
"餐厅家具",
// 图书文娱
"少儿读物",
"文学",
"动漫",
"专业"
)
var running = true
/**
* Flink会调用run来收集数据
*/
override def run(sourceContext: SourceFunction.SourceContext[(String, Long)]): Unit = {
val random = new Random()
do {
val item = items(random.nextInt(items.length))
val price = random.nextInt(3333) 33
// context.collect通知Flink新元素进入系统
sourceContext.collect(item -> price.toLong)
Thread.sleep(1000)
} while (running)
}
override def cancel(): Unit = running = false
}
}
TopN需求
假设电商网站有这样一个榜单:展示1分钟内当前用户购买品类交易额的Top3,并且榜单要每10秒刷新一次。而我们现在可以拿到一个交易流,里面记录了交易品类和交易额,要如何实现呢?先看代码效果,启动all.in.one.c03.Chapter03
后会看到输出如:
9> ----------------
1. 厨卫: 3956
2. 文具耗材: 3174
3. 摄影摄像: 2738
10> ----------------
1. 厨卫: 3956
2. 文具耗材: 3174
3. 智能设备: 3108
11> ----------------
1. 影音娱乐: 4304
2. 风衣: 4286
3. 厨卫: 3956
12> ----------------
1. 牛仔裤: 5261
2. 衬衫: 5155
3. 厨卫: 4629
...
...
输入
之前的章节中,我们的输入是监听一个Socket地址读取数据(socketTextStream
),这些都是Flink内置简单的输入方式,而本质上Flink Stream的输入就是实现相应的接口来接收数据:SourceFunction
,它包括run
(Flink调用run方法收集数据)和cancel
(任务停止时调用),如socketTextStream
就是创建了一个org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction
。
在代码中,我们实现了一个C03Source,它会约每100ms随机输出品类和价格数据。然后使用env.addSource(new C03Source())
来得到相应的数据流DataStream。
窗口操作
根据需求,我们要计算过去60秒内的交易额,所以很容易想到:将时间窗口的时长设置为60秒,然后计算这段时间内每个品类的交易额的和,最后计算Top3就可以了。假设使用上一章的方法timeWindow(Time.seconds(60))
,计算的结果是没有问题的,但是你会发现它是每60秒计算一次,无法满足需求每10秒更新一次榜单。此时会引出时间窗口的两个类型(这一章只介绍这两种):滚动(Tumbling)与滑动(Sliding)。
见上图,在定义窗口时指定它的大小,同时再指定触发窗口的间隔或者说滑动距离,这样创建的窗口就是滑动窗口。(timeWindow(Time.seconds(60))
的方法实现就是创建一个滚动窗口)
在代码中,我们使用window(SlidingProcessingTimeWindows.of(Time.seconds(60L), Time.seconds(10L)))
创建窗口然后计算sum,此时得到了每一个品类过去一分钟内的总交易额。
在这之后,代码中使用windowAll(TumblingProcessingTimeWindows.of(Time.seconds(10L)))
方法指定了大小为10秒的滚动窗口。那么windowAll
和window
的区别是什么呢?我们不能忘记一件事:Flink是分布式处理引擎,所以计算是同时发生在各个节点的,当使用windowAll
时,数据会汇集一个节点去执行我们指定的计算。
windowAll
方法返回的是AllWindowedStream
类型的对象,使用process
方法指定对数据进行何种操作。在process
中,我们创建了ProcessAllWindowFunction
的匿名子类对象,并将所有元素的Top3拼为字符串并交给Flink。
思考
- 计算TopN时我们用到了WindowAll,实际上它就是全局并发为1的操作,那么它的计算受单台机器的限制,且在实际的业务中业务的复杂和量级都可能会出现数据热点,这时要怎么解决呢?
- 观察创建时间窗口的类名称:
SlidingProcessingTimeWindows
和TumblingProcessingTimeWindows
,时间窗口的“时间”是什么时间?假如某些数据有延迟很晚才出现在数据流中,如果你来设计Flink会怎么做?
以上问题会在后续的章节中找到答案。
WebUI
可以看到,本章我们使用以下代码创建了ENV:
代码语言:javascript复制val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI()
主要目的是为了在本地启动时可以看到WebUI(在集群提交任务无需此方法)。在启动后,日志中会输出类似以下内容:
代码语言:javascript复制[Chapter 03 - main] 17:07:13.338 INFO org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint(RestServerEndpoint.java:139) - Starting rest endpoint.
[Chapter 03 - main] 17:07:13.862 INFO org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint(RestServerEndpoint.java:242) - Rest endpoint listening at localhost:8081
[Chapter 03 - main] 17:07:13.865 INFO org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint(WebMonitorEndpoint.java:702) - Web frontend listening at http://localhost:8081.
[Chapter 03 - mini-cluster-io-thread-1] 17:07:13.866 INFO org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint(WebMonitorEndpoint.java:758) - http://localhost:8081 was granted leadership with leaderSessionID=30a5533d-fbf0-4b70-95d3-cd8813bb6492
说明WebUI启动成功,并且监听本地的8081端口,此时在浏览器中打开http://localhost:8081,在RunningJob选择刚刚启动的Job,可以看到类似以下页面:
可以先在页面上熟悉Flink WebUI提供的模块和可获取信息,后续会根据相应功能介绍页面的使用。