Apache Flink中的各个窗口时间的概念区分

2020-04-14 14:44:46 浏览数 (1)

Apache Flink中提供了基于时间的窗口计算,例如计算五分钟内的用户数量或每一分钟计算之前五分钟的服务器异常日志占比等。因此Apache Flink在流处理中提供了不同时间的支持。

处理时间(Processing Time)

处理时间是执行相应的操作时的系统时间。一般来说就是Apache Flink在执行某条数据的计算的时刻的系统时间。

处理时间是最简单的时间概念,基于处理时间能够实现最佳的性能与延迟,例如计算五分钟的用户数量,无需设置其他相关的项目直接可以通过系统的当前时间进行计算即可。但是也会有某些影响,例如基于网络或者其他原因造成某些数据无法按照预计的时间到到,或者说在Apache Flink任务重启时都会造成计算结果与预期的结果不符的情况出现。

摄取时间(Ingestion Time)

摄取时间是指Apache Flink读取某条数据的时间,摄取时间是基于事件时间与处理时间之间的,因为摄取时间会在数据到来的时候给予一次时间戳,基于时间的计算需要按照时间戳去进行。所以在操作时会把数据分配到不同的不同的窗口进行计算。但是相对于事件时间来说,它更加简单一些,不需要设置Watermarks。

事件时间(Event Time)

事件时间是比较好理解的一个时间,就是类似于上面展示的log4j输出到日志中的时间,在大部分的场景中我们在进行计算时都会利用这个时间。例如计算五分钟内的日志错误占比等。

Apache Flink能够支持基于事件的时间设置,事件时间是最接近于事实需求的时间。我们通常的数据处理大部分是基于事件时间的处理。 那么在流式计算中做事件时间的处理基于某些原因可能就会存在问题,流处理在事件产生过程中,通过消息队列,到Flink的Source获取、再到Operator。中间的过程都会产生时间消耗。还有一些其他的情况,例如网络抖动造成的数据延迟等就会存在数据乱序。

但是对于数据乱序我们又不能无限期的等待事件到来,(谁知道它还来不来)。那么Apache Flink就有一个Watermark用来解决该问题,Watermark就是保证在一个特定的时间后进行触发window计算的机制。

代码语言:javascript复制
def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    //设置引擎的执行为事件时间
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    val text = env.socketTextStream("localhost",9999)
    //设置时间戳与Watermark
    val eventText = text.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[String] {
      val maxOutOfOrderTime = 10000L  //设置10s的时间,意思是超过10s到达的数据将不会被处理
      var currentTimestamp:Long = _    // 从数据上获取到的当前时间
      override def getCurrentWatermark: Watermark = {
        //根据可容忍的最大延迟时间获取watermark
        new Watermark(currentTimestamp-maxOutOfOrderTime)
      }
      //从String中提取出事件时间
      override def extractTimestamp(str: String, l: Long): Long = {
        val sdf:SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,S")
        //获取到数据的事件时间
        currentTimestamp = sdf.parse(str.split("\|")(0)).getTime
        currentTimestamp
      }
    })

    val count = eventText.map(res=>{
      val ress  = res.split("\|")
      (ress(1),1)
    }).keyBy(0)
      .timeWindow(Time.seconds(5))
      .sum(1)
    //输出结果
    count.print()
    env.execute("Apache Flink Event Time Watermark")
  }

以上代码发布于 https://github.com/CainGao/flink_learn

0 人点赞