Flink零基础实战教程:股票价格数据流处理

2019-12-26 14:41:43 浏览数 (1)

之前的《万字长文深度解析WordCount程序》使用WordCount展示了Flink程序的基本结构,本文将以股票价格案例来演示如何使用Flink的DataStream API。通过本文,你可以学到:

  1. 定义相关数据结构。
  2. Flink流处理程序的骨架。
  3. Flink的执行环境概念。
  4. 自定义Source、设置时间戳和Watermark。

数据结构

Flink能处理任何可被序列化的数据结构:

  • 基础数据类型,包括 String、Integer、Boolean、Array
  • 复杂数据结构,包括 Scala case class和 Java POJO

此外,Flink也支持Kryo序列化工具。

本例使用Scala case class来定义一个股票类,该对象包括三个字段:股票代号、时间戳和价格。真实的股票交易数据比这个更为复杂,这里只是一个简化的模型。

代码语言:javascript复制
case class StockPrice(symbol: String, timestamp: Long, price: Double)

当然,如果使用Java,也可以定义一个POJO(Plain Old Java Object),该类中各个字段或者具有public属性,或者有一个对应的getter和setter方法,且该类有一个无参数的构造函数。

代码语言:javascript复制
public class StockPrice {
  public String symbol;
  public Long timestamp;
  public Double price;
  
  public StockPrice() {};
  public StockPrice(String symbol, Long timestamp, Double price){
    ...
  };
}

相比而言,Scala的类定义更为简洁,因为Scala的编译器在编译阶段帮忙生成了不少代码,Java的代码风格有些臃肿。

Flink对数据类型有以上要求,主要因为在分布式计算过程中,需要将内存中的对象序列化成可多节点传输的数据,并且能够在对应节点被反序列化成对象。

Flink流处理程序的骨架结构

基于上面的数据结构,我们开始开发程序。下面的代码清单使用Flink对股票数据流分析程序,该程序能够统计数据源中每支股票5秒时间窗口内的最大值。

代码语言:javascript复制
object StockPriceDemo {
  def main(args: Array[String]) {
    // 设置执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    // 每5秒生成一个Watermark
    env.getConfig.setAutoWatermarkInterval(5000L)
    // 股票价格数据流
    val stockPriceRawStream: DataStream[StockPrice] = env
      // 该数据流由StockPriceSource类随机生成
      .addSource(new StockPriceSource)
      // 设置 Timestamp 和 Watermark
      .assignTimestampsAndWatermarks(new StockPriceTimeAssigner)
    val stockPriceStream: DataStream[StockPrice] = stockPriceRawStream
      .keyBy(_.symbol)
      // 设置5秒的时间窗口
      .window(TumblingEventTimeWindows.of(Time.seconds(5)))
      // 取5秒内某一只股票的最大值
      .max("price")
    // 打印结果
    stockPriceStream.print()
    // 执行程序
    env.execute("Compute max stock price")
  }
}

Java或Scala的程序入口一般是一个静态(static)的main函数。而在Scala中,object下的变量和方法都是静态的。在main函数中,还需要定义下面几个步骤:

  1. 设置运行环境。
  2. 读取一到多个数据源。
  3. 根据业务逻辑对数据流进行Transformation操作。
  4. 将结果输出。
  5. 调用作业执行函数 execute。

接下来我们对这五个步骤拆解分析。

设置执行环境

代码语言:javascript复制
val env = StreamExecutionEnvironment.getExecutionEnvironment

这行代码可以获取一个Flink流处理执行环境。Flink一般运行在一个集群上,执行环境是Flink程序运行的上下文,它提供了一系列作业与集群交互的方法,比如作业如何与外部世界交互。当调用getExecutionEnvironment方法时,假如我们是在一个集群上提交作业,则返回集群的上下文,假如我们是在本地执行,则返回本地的上下文。本例中我们是进行流处理,在批处理场景则要获取DataSet API中批处理执行环境。

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)这行代码告知执行环境使用Event-time时间语义来进行后续时间上的计算。Event-time语义下需要依赖Watermark机制,即收到一个Watermark后,开始对这个窗口进行计算,比Watermark更晚到达的事件都被视为延迟数据。env.getConfig.setAutoWatermarkInterval(5000L)设置每5秒生成一个Watermark,默认情况下每200毫秒生成一个Watermark。

此外,我们还可以设置作业的并行度、配置Checkpoint等操作。可见,执行环境是我们与Flink交互的入口。

读取数据源

接着我们需要使用执行环境提供的方法读取数据源,读取数据源的部分统称为Source。数据源一般是消息队列或文件,我们也可以根据业务需求重写数据源,比如定时爬取网络中某处的数据。在本例中,我们使用val stockPriceRawStream: DataStream[StockPrice] = env.addSource(new StockPriceSource)来读取数据源,其中StockPriceSource随机生成了一些股票价格数据。最终生成的stockPriceRawStream是一个由StockPrice组成的DataStream数据流。

下面的代码清单展示了StockPriceSource类继承RichSourceFunction,对run方法重写,不断随机生成股票价格,生成的数据最终写入SourceContext中。

代码语言:javascript复制
class StockPriceSource extends RichSourceFunction[StockPrice]{
  var isRunning: Boolean = true
  val rand = new Random()
  // 初始化股票价格
  var priceList: List[Double] = List(100.0d, 200.0d, 300.0d, 400.0d, 500.0d)
  var stockId = 0
  var curPrice = 0.0d
  override def run(srcCtx: SourceContext[StockPrice]): Unit = {
    while (isRunning) {
      // 每次从列表中随机选择一只股票
      stockId = rand.nextInt(priceList.size)
      val curPrice =  priceList(stockId)   rand.nextGaussian() * 0.05
      priceList = priceList.updated(stockId, curPrice)
      val curTime = Calendar.getInstance.getTimeInMillis
      // 将数据源收集写入SourceContext
      srcCtx.collect(StockPrice("symbol_"   stockId.toString, curTime, curPrice))
      Thread.sleep(rand.nextInt(10))
    }
  }
  override def cancel(): Unit = {
    isRunning = false
  }
}

尽管StockPrice的数据结构中有时间戳的字段,但是Flink并不知道哪个字段是时间戳,所以还需要手动设置。assignTimestampsAndWatermarks(new StockPriceTimeAssigner)方法允许我们设置时间戳和Watermark,这样Flink就可以知道本程序的时间戳。Flink Watermark相关的内容将在后续文章中介绍。

下面的代码清单抽取数据源中StockPricetimestamp字段作为该事件的时间戳。

代码语言:javascript复制
class StockPriceTimeAssigner extends BoundedOutOfOrdernessTimestampExtractor[StockPrice](Time.seconds(5)) {
    override def extractTimestamp(t: StockPrice): Long = t.timestamp
}

Transformation

此时,我们已经获取了一个股票价格数据流,接下来我们就可以在数据流上进行有状态的计算了。我们一般使用Flink提供的各类算子,使用链式调用的方式,对一个数据流进行操作。经过各Transformation算子的处理,DataStream可能被转换为KeyedStreamJoinedStream等不同的数据流结构。相比Spark RDD的数据结构,Flink的数据流结构确实更加复杂。

本例中,我们按照股票代号对数据进行分组,并开启一个5秒的时间窗口,统计该窗口下某支股票的5秒内的最大值。

代码语言:javascript复制
val stockPriceStream: DataStream[StockPrice] = stockPriceRawStream
      .keyBy(_.symbol)
      // 设置5秒的时间窗口
      .window(TumblingEventTimeWindows.of(Time.seconds(5)))
      // 取5秒内某一只股票的最大值
      .max("price")

keyBy算子将数据流按照股票的symbol分组,相同symbol的股票数据会被归结到一起;window算子开启了一个5秒的滚动窗口;max算子统计这个5秒窗口内的最大值。最终我们能够得到每支股票5秒内的最大值。

输出结果

然后我们需要将前面的计算结果输出到外部系统,可能是一个消息队列、文件系统或数据库,也可以自定义输出方式,输出结果的部分统称为Sink。

本例中,5秒窗口内每支股票的最大值是计算结果,是一个DataStream[StockPrice]结构的数据流。我们调用print函数将这个数据流打印到标准输出(standard output)。

执行

当定义好程序的Source、Transformation和Sink的业务逻辑后,程序并不会立即执行这些算子对应的任何计算,还需要调用执行环境execute()方法来执行前面的业务逻辑。Flink是延迟执行(lazy evaluation)的,即当程序明确调用execute()方法时,Flink会将数据流图转化为一个JobGraph,提交给JobManager,JobManager根据当前的执行环境来执行这个作业。

总结

一个Flink程序的核心业务逻辑主要包括:Source、Transformation和Sink三部分。程序的开始前要设置执行环境,最后要调用execute()方法。

整个程序的完整代码如下所示,完整程序和更多案例参见我的GitHub:https://github.com/luweizheng/flink-tutorials

代码语言:javascript复制

package com.flink.tutorials.demos.stock
import java.util.Calendar
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.functions.source.RichSourceFunction
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.{TumblingEventTimeWindows, TumblingProcessingTimeWindows}
import scala.util.Random
object StockPriceDemo {
  /**
    * Case Class StockPrice
    * symbol 股票代号
    * timestamp 时间戳
    * price 价格
    */
  case class StockPrice(symbol: String, timestamp: Long, price: Double)
  def main(args: Array[String]) {
    // 设置执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    // 每5秒生成一个Watermark
    env.getConfig.setAutoWatermarkInterval(5000L)
    // 股票价格数据流
    val stockPriceRawStream: DataStream[StockPrice] = env
      // 该数据流由StockPriceSource类随机生成
      .addSource(new StockPriceSource)
      // 设置 Timestamp 和 Watermark
      .assignTimestampsAndWatermarks(new StockPriceTimeAssigner)
    val stockPriceStream: DataStream[StockPrice] = stockPriceRawStream
      .keyBy(_.symbol)
      // 设置5秒的时间窗口
      .window(TumblingEventTimeWindows.of(Time.seconds(5)))
      // 取5秒内某一只股票的最大值
      .max("price")
    // 打印结果
    stockPriceStream.print()
    // 执行程序
    env.execute("Compute max stock price")
  }
  class StockPriceSource extends RichSourceFunction[StockPrice]{
    var isRunning: Boolean = true
    val rand = new Random()
    // 初始化股票价格
    var priceList: List[Double] = List(100.0d, 200.0d, 300.0d, 400.0d, 500.0d)
    var stockId = 0
    var curPrice = 0.0d
    override def run(srcCtx: SourceContext[StockPrice]): Unit = {
      while (isRunning) {
        // 每次从列表中随机选择一只股票
        stockId = rand.nextInt(priceList.size)
        val curPrice =  priceList(stockId)   rand.nextGaussian() * 0.05
        priceList = priceList.updated(stockId, curPrice)
        val curTime = Calendar.getInstance.getTimeInMillis
        // 将数据源收集写入SourceContext
        srcCtx.collect(StockPrice("symbol_"   stockId.toString, curTime, curPrice))
        Thread.sleep(rand.nextInt(10))
      }
    }
    override def cancel(): Unit = {
      isRunning = false
    }
  }
  class StockPriceTimeAssigner extends BoundedOutOfOrdernessTimestampExtractor[StockPrice](Time.seconds(5)) {
    override def extractTimestamp(t: StockPrice): Long = t.timestamp
  }
}

0 人点赞