附代码|Flink实时计算TopN

2022-03-10 10:36:51 浏览数 (1)

这一章从实际的需求TopN来学习Flink中的窗口知识。

在上一章代码中使用了timeWindow,使得我们可以操作Flink流中的一个时间段内的数据,这就引出了Flink中的"窗口"概念:在大多数场景下,数据流都是"无限的",因引我们无法等待数据流终止后才进行一些统计计算,而通常的需求是对一段时间或是一定范围内的数据进行分析。

Flink提供了两种窗口:Time Window和Count Window,而本章涉及到Time Window的部分概念和用法。

代码语言:javascript复制
package all.in.one.c03

import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.extensions._
import org.apache.flink.streaming.api.scala.function.ProcessAllWindowFunction
import org.apache.flink.streaming.api.windowing.assigners.{SlidingProcessingTimeWindows, TumblingProcessingTimeWindows}
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector

import scala.util.Random

object Chapter03 extends App {

  // 使用createLocalEnvironmentWithWebUI可以在本地查看WebUI,在集群提交任务无需此方法
  val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI()
  // Flink的输入为Source,这里我们构建一个定义Source:C03Source
  val sourceDataStream = env.addSource(new C03Source())
  // 接下来以品类做为key,计算每个品类的总价格
  // 同样keyingBy来自org.apache.flink.streaming.api.scala.extensions._包,这里使用keyBy也可以
  // keyBy操作后会返回一个KeyedStream,保存了key信息
  sourceDataStream
    .keyingBy(_._1)
    // 与Chapter 02不同,这里我们调用window来设置窗口
    // 以下代码说明参见README
    .window(SlidingProcessingTimeWindows.of(Time.seconds(60L), Time.seconds(10L)))
    // 计算交易额的总和
    .sum(1)
    .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(10L)))
    .process(new ProcessAllWindowFunction[(String, Long), String, TimeWindow] {
      override def process(context: Context, elements: Iterable[(String, Long)], out: Collector[String]): Unit = {
        val top3 = elements.toSeq
          .sortBy(-_._2)
          .take(3)
          .zipWithIndex
          .map { case ((item, price), idx) => s"   ${idx   1}. $item: $price" }
          .mkString("n")
        out.collect(("-" * 16)   "n"   top3)
      }
    })
    .print()
  env.execute("Chapter 03")

  /**
    * 每100ms产生一条"交易"数据,最终输出品类 价格(随机产生)
    */
  class C03Source extends SourceFunction[(String, Long)] {
    private val items = Array(
      // 男装
      "卫衣",
      "T恤",
      "牛仔裤",
      "西服",
      "风衣",
      // 女装
      "连衣裙",
      "卫衣",
      "衬衫",
      "针织衫",
      "休闲裤",
      // 手机数码
      "手机",
      "手机配件",
      "摄影摄像",
      "影音娱乐",
      "数码配件",
      "智能设备",
      "电子教育",
      // 电脑办公
      "电脑整机",
      "电脑组件",
      "外设",
      "网络产品",
      "办公设备",
      "文具耗材",
      // 家用电器
      "电视",
      "空调",
      "洗衣机",
      "冰箱",
      "厨卫",
      "生活电器",
      // 户外运动
      "运动鞋包",
      "运行服饰",
      "户外鞋服",
      "户外装备",
      "骑行",
      "健身",
      // 家具家装
      "厨房卫浴",
      "灯饰照明",
      "五金工具",
      "客厅家具",
      "餐厅家具",
      // 图书文娱
      "少儿读物",
      "文学",
      "动漫",
      "专业"
    )
    var running = true

    /**
      * Flink会调用run来收集数据
      */
    override def run(sourceContext: SourceFunction.SourceContext[(String, Long)]): Unit = {
      val random = new Random()
      do {
        val item = items(random.nextInt(items.length))
        val price = random.nextInt(3333)   33
        // context.collect通知Flink新元素进入系统
        sourceContext.collect(item -> price.toLong)
        Thread.sleep(1000)
      } while (running)
    }

    override def cancel(): Unit = running = false

  }

}

TopN需求

假设电商网站有这样一个榜单:展示1分钟内当前用户购买品类交易额的Top3,并且榜单要每10秒刷新一次。而我们现在可以拿到一个交易流,里面记录了交易品类和交易额,要如何实现呢?先看代码效果,启动all.in.one.c03.Chapter03后会看到输出如:

代码语言:javascript复制
9> ----------------
   1. 厨卫: 3956
   2. 文具耗材: 3174
   3. 摄影摄像: 2738
10> ----------------
   1. 厨卫: 3956
   2. 文具耗材: 3174
   3. 智能设备: 3108
11> ----------------
   1. 影音娱乐: 4304
   2. 风衣: 4286
   3. 厨卫: 3956
12> ----------------
   1. 牛仔裤: 5261
   2. 衬衫: 5155
   3. 厨卫: 4629
  ...
  ...

输入

之前的章节中,我们的输入是监听一个Socket地址读取数据(socketTextStream),这些都是Flink内置简单的输入方式,而本质上Flink Stream的输入就是实现相应的接口来接收数据:SourceFunction,它包括run(Flink调用run方法收集数据)和cancel(任务停止时调用),如socketTextStream就是创建了一个org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction

在代码中,我们实现了一个C03Source,它会约每100ms随机输出品类和价格数据。然后使用env.addSource(new C03Source())来得到相应的数据流DataStream。

窗口操作

根据需求,我们要计算过去60秒内的交易额,所以很容易想到:将时间窗口的时长设置为60秒,然后计算这段时间内每个品类的交易额的和,最后计算Top3就可以了。假设使用上一章的方法timeWindow(Time.seconds(60)),计算的结果是没有问题的,但是你会发现它是每60秒计算一次,无法满足需求每10秒更新一次榜单。此时会引出时间窗口的两个类型(这一章只介绍这两种):滚动(Tumbling)与滑动(Sliding)。

见上图,在定义窗口时指定它的大小,同时再指定触发窗口的间隔或者说滑动距离,这样创建的窗口就是滑动窗口。(timeWindow(Time.seconds(60))的方法实现就是创建一个滚动窗口)

在代码中,我们使用window(SlidingProcessingTimeWindows.of(Time.seconds(60L), Time.seconds(10L)))创建窗口然后计算sum,此时得到了每一个品类过去一分钟内的总交易额。

在这之后,代码中使用windowAll(TumblingProcessingTimeWindows.of(Time.seconds(10L)))方法指定了大小为10秒的滚动窗口。那么windowAllwindow的区别是什么呢?我们不能忘记一件事:Flink是分布式处理引擎,所以计算是同时发生在各个节点的,当使用windowAll时,数据会汇集一个节点去执行我们指定的计算。

windowAll方法返回的是AllWindowedStream类型的对象,使用process方法指定对数据进行何种操作。在process中,我们创建了ProcessAllWindowFunction的匿名子类对象,并将所有元素的Top3拼为字符串并交给Flink。

思考

  1. 计算TopN时我们用到了WindowAll,实际上它就是全局并发为1的操作,那么它的计算受单台机器的限制,且在实际的业务中业务的复杂和量级都可能会出现数据热点,这时要怎么解决呢?
  2. 观察创建时间窗口的类名称:SlidingProcessingTimeWindowsTumblingProcessingTimeWindows,时间窗口的“时间”是什么时间?假如某些数据有延迟很晚才出现在数据流中,如果你来设计Flink会怎么做?

以上问题会在后续的章节中找到答案。

WebUI

可以看到,本章我们使用以下代码创建了ENV:

代码语言:javascript复制
val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI()

主要目的是为了在本地启动时可以看到WebUI(在集群提交任务无需此方法)。在启动后,日志中会输出类似以下内容:

代码语言:javascript复制
[Chapter 03 - main] 17:07:13.338 INFO  org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint(RestServerEndpoint.java:139) - Starting rest endpoint.
[Chapter 03 - main] 17:07:13.862 INFO  org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint(RestServerEndpoint.java:242) - Rest endpoint listening at localhost:8081
[Chapter 03 - main] 17:07:13.865 INFO  org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint(WebMonitorEndpoint.java:702) - Web frontend listening at http://localhost:8081.
[Chapter 03 - mini-cluster-io-thread-1] 17:07:13.866 INFO  org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint(WebMonitorEndpoint.java:758) - http://localhost:8081 was granted leadership with leaderSessionID=30a5533d-fbf0-4b70-95d3-cd8813bb6492

说明WebUI启动成功,并且监听本地的8081端口,此时在浏览器中打开http://localhost:8081,在RunningJob选择刚刚启动的Job,可以看到类似以下页面:

可以先在页面上熟悉Flink WebUI提供的模块和可获取信息,后续会根据相应功能介绍页面的使用。

0 人点赞