ProcessFunction:Flink最底层API使用案例详解

2020-02-17 17:07:32 浏览数 (1)

之前提到的一些算子和函数能够进行一些时间上的操作,但是不能获取算子当前的Processing Time或者是Watermark时间戳,调用起来简单但功能相对受限。如果想获取数据流中Watermark的时间戳,或者在时间上前后穿梭,需要使用ProcessFunction系列函数,它们是Flink体系中最底层的API,提供了对数据流更细粒度的操作权限。Flink SQL是基于这些函数实现的,一些需要高度个性化的业务场景也需要使用这些函数。

目前,这个系列函数主要包括KeyedProcessFunctionProcessFunctionCoProcessFunctionKeyedCoProcessFunctionProcessJoinFunctionProcessWindowFunction等多种函数,这些函数各有侧重,但核心功能比较相似,主要包括两点:

  • 状态:我们可以在这些函数中访问和更新Keyed State 。
  • 定时器(Timer):像定闹钟一样设置定时器,我们可以在时间维度上设计更复杂的业务逻辑。

状态的介绍可以参考我的文章:Flink状态管理详解,这里我们重点讲解一下的使用ProcessFunction其他几个特色功能。本文所有代码都上传到了我的github:https://github.com/luweizheng/flink-tutorials

Timer的使用方法

我们可以把Timer理解成一个闹钟,使用前先在Timer中注册一个未来的时间,当这个时间到达,闹钟会“响起”,程序会执行一个回调函数,回调函数中执行一定的业务逻辑。这里以KeyedProcessFunction为例,来介绍Timer的注册和使用。

ProcessFunction有两个重要的接口processElementonTimer,其中processElement函数在源码中的Java签名如下:

代码语言:javascript复制
// 处理数据流中的一条元素
public abstract void processElement(I value, Context ctx, Collector<O> out)

processElement方法处理数据流中的一条元素,并通过Collector<O>输出出来。Context是它的区别于FlatMapFunction等普通函数的特色,开发者可以通过Context来获取时间戳,访问TimerService,设置Timer。

另外一个接口是onTimer

代码语言:javascript复制
// 时间到达后的回调函数
public void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out)

这是一个回调函数,当到了“闹钟”时间,Flink会调用onTimer,并执行一些业务逻辑。这里也有一个参数OnTimerContext,它实际上是继承了前面的Context,与Context几乎相同。

使用Timer的方法主要逻辑为:

  1. processElement方法中通过Context注册一个未来的时间戳t。这个时间戳的语义可以是Processing Time,也可以是Event Time,根据业务需求来选择。
  2. onTimer方法中实现一些逻辑,到达t时刻,onTimer方法被自动调用。

Context中,我们可以获取一个TimerService,这是一个访问时间戳和Timer的接口。我们可以通过Context.timerService.registerProcessingTimeTimer`Context.timerService.registerEventTimeTimer这两个方法来注册Timer,只需要传入一个时间戳即可。我们可以通过Context.timerService.deleteProcessingTimeTimerContext.timerService.deleteEventTimeTimer来删除之前注册的Timer。此外,还可以从中获取当前的时间戳:Context.timerService.currentProcessingTimeContext.timerService.currentWatermark。从函数名看出,这里都是两两出现的函数,两个方法分别对应两种时间语义。

注意,我们只能在KeyedStream上注册Timer。每个Key下可以使用不同的时间戳注册不同的Timer,但是每个Key的每个时间戳只能注册一个Timer。如果想在一个DataStream上应用Timer,可以将所有数据映射到一个伪造的Key上,但这样所有数据会流入一个算子子任务。

我们再次以股票股票交易场景来解释如何使用Timer。一次股票交易包括:股票代号、时间戳、股票价格、成交量。我们现在想看一支股票10秒内是否一直连续上涨,如果一直上涨,则发送出一个提示。

代码语言:javascript复制
case class StockPrice(symbol: String, ts: Long, price: Double, volume: Int)
class IncreaseAlertFunction(intervalMills: Long)
extends KeyedProcessFunction[String, StockPrice, String] {
  // 状态:保存某支股票上次交易价格
  lazy val lastPrice: ValueState[Double] =
  getRuntimeContext.getState(
    new ValueStateDescriptor[Double]("lastPrice", Types.of[Double])
  )
  // 状态:保存某支股票的定时器时间戳
  lazy val currentTimer: ValueState[Long] =
  getRuntimeContext.getState(
    new ValueStateDescriptor[Long]("timer", Types.of[Long])
  )
  override def processElement(stock: StockPrice,
                              context: KeyedProcessFunction[String, StockPrice, String]#Context,
                              out: Collector[String]): Unit = {
    // 获取lastPrice状态中的数据,第一次使用时会被初始化为0
    val prevPrice = lastPrice.value()
    // 更新lastPrice
    lastPrice.update(stock.price)
    val curTimerTimestamp = currentTimer.value()
    if (prevPrice == 0.0) {
      // 第一次使用,不做任何处理
    } else if (stock.price < prevPrice) {
      // 如果新流入的股票价格降低,删除Timer,否则该Timer一直保留
      context.timerService().deleteEventTimeTimer(curTimerTimestamp)
      currentTimer.clear()
    } else if (stock.price >= prevPrice && curTimerTimestamp == 0) {
      // 如果新流入的股票价格升高
      // curTimerTimestamp为0表示currentTimer状态中是空的,还没有对应的Timer
      // 新Timer = 当前时间   interval
      val timerTs = context.timestamp()   intervalMills
      val formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS")
      context.timerService().registerEventTimeTimer(timerTs)
      // 更新currentTimer状态,后续数据会读取currentTimer,做相关判断
      currentTimer.update(timerTs)
    }
  }
  override def onTimer(ts: Long,
                       ctx: KeyedProcessFunction[String, StockPrice, String]#OnTimerContext,
                       out: Collector[String]): Unit = {
    val formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS")
    out.collect("time: "   formatter.format(ts)   ", symbol: '"   ctx.getCurrentKey  
                " monotonically increased for "   intervalMills   " millisecond.")
    // 清空currentTimer状态
    currentTimer.clear()
  }
}

在主逻辑里,通过下面的process算子调用KeyedProcessFunction

代码语言:javascript复制
val inputStream: DataStream[StockPrice] = ...
val warnings = inputStream
      .keyBy(stock => stock.symbol)
      // 调用process函数
      .process(new IncreaseAlertFunction(10000))

Checkpoint时,Timer也会随其他状态数据一起保存起来。如果使用Processing Time语义设置一些Timer,重启时这个时间戳已经过期,那些回调函数会立刻被调用执行。

侧输出SideOutput

ProcessFunction的另一大特色功能是可以将一部分数据发送到另外一个流中,而且输出到的两个流数据类型可以不一样,我们通过OutputTag[T]来标记另外一个数据流。在ProcessFunction中这样将某类数据过滤出来:

代码语言:javascript复制
class IncreaseAlertFunction(intervalMills: Long) extends KeyedProcessFunction[String, Stock, String] {
  override def processElement(stock: Stock,
                              context: KeyedProcessFunction[String, Stock, String]#Context,
                              out: Collector[String]): Unit = {
    // 其他业务逻辑...
    // 定义一个OutputTag,Stock为这个SideOutput流的数据类型
    val highVolumeOutput: OutputTag[Stock] = new OutputTag[Stock]("high-volume-trade")
    if (stock.volume > 1000) {
      // 将Stock筛选出来发送到该OutputTag下
      context.output(highVolumeOutput, stock)
    }
  }
}

在主逻辑中,通过下面的方法获取侧输出:

代码语言:javascript复制
// 收集SideOutput
val outputTag: OutputTag[Stock] = OutputTag[Stock]("high-volume-trade")
val sideOutputStream: DataStream[Stock] = mainStream.getSideOutput(outputTag)

从这个例子中可以看到,KeyedProcessFunction的输出类型是String,而SideOutput的输出类型是Stock,两者可以不同。

使用ProcessFunction实现Join

如果想从更细的粒度上实现两个数据流的Join,可以使用CoProcessFunctionKeyedCoProcessFunction。这两个函数都有processElement1processElement2方法,分别对第一个数据流和第二个数据流的每个元素进行处理。两个数据流的数据类型以及输出类型可以互不相同。尽管数据来自两个不同的流,但是他们可以共享同样的状态,所以可以参考下面的逻辑来实现Join:

  • 创建一到多个状态,两个数据流都能访问到这些状态,这里以状态a为例。
  • processElement1方法处理第一个数据流,更新状态a。
  • processElement2方法处理第二个数据流,根据状态a中的数据,生成相应的输出。

我们这次将股票价格结合媒体评价两个数据流一起讨论,假设对于某支股票有一个媒体评价数据流,这个数据流包含了对该支股票的正负评价。两支数据流一起流入KeyedCoProcessFunctionprocessElement2方法处理流入的媒体数据,将媒体评价更新到状态mediaState上,processElement1方法处理流入的股票交易数据,获取mediaState`状态,生成到新的数据流。两个方法分别处理两个数据流,共享一个状态,通过状态来通信。

在主逻辑中,我们将两个数据流connect,然后按照股票代号进行keyBy,进而使用process算子:

代码语言:javascript复制
val stockPriceRawStream: DataStream[StockPrice] = ...
val mediaStatusStream: DataStream[Media] = ...
val warnings = stockStream.connect(mediaStream)
      .keyBy(0, 0)
      // 调用process函数
      .process(new AlertProcessFunction())

KeyedCoProcessFunction的具体实现:

代码语言:javascript复制
class JoinStockMediaProcessFunction extends KeyedCoProcessFunction[String, StockPrice, Media, StockPrice] {
  // mediaState
  private var mediaState: ValueState[String] = _
  override def open(parameters: Configuration): Unit = {
    // 从RuntimeContext中获取状态
    mediaState = getRuntimeContext.getState(
      new ValueStateDescriptor[String]("mediaStatusState", classOf[String]))
  }
  override def processElement1(stock: StockPrice,
                               context: KeyedCoProcessFunction[String, StockPrice, Media, StockPrice]#Context,
                               collector: Collector[StockPrice]): Unit = {
    val mediaStatus = mediaState.value()
    if (null != mediaStatus) {
      val newStock = stock.copy(mediaStatus = mediaStatus)
      collector.collect(newStock)
    }
  }
  override def processElement2(media: Media,
                               context: KeyedCoProcessFunction[String, StockPrice, Media, StockPrice]#Context,
                               collector: Collector[StockPrice]): Unit = {
    // 第二个流更新mediaState
    mediaState.update(media.status)
  }
}

这个例子比较简单,没有使用Timer,实际的业务场景中状态一般用到Timer将过期的状态清除。很多互联网APP的机器学习样本拼接都可能依赖这个函数来实现:服务端的机器学习特征是实时生成的,用户在APP上的行为是交互后产生的,两者属于两个不同的数据流,可以按照这个逻辑来将两个数据流拼接起来,通过拼接更快得到下一轮机器学习的样本数据。两个数据流的中间数据放在状态中,为避免状态的无限增长,需要使用Timer将过期的状态清除。

注意,使用Event Time时,两个数据流必须都设置好Watermark,只设置一个流的Event Time和Watermark,无法在CoProcessFunctionKeyedCoProcessFunction中使用Timer功能,因为process算子无法确定自己应该以怎样的时间来处理数据。

0 人点赞