一篇文章让深入理解Flink SQL 时间特性

2021-02-04 14:13:00 浏览数 (1)

前言

基于时间的操作(比如 Table API 和 SQL 中窗口操作),需要定义相关的时间语义和时间数据来源的信息。所以,Table 可以提供一个逻辑上的时间字段,用于在表处理程序中,指示时间和访问相应的时间戳。

时间属性,可以是每个表 schema 的一部分。一旦定义了时间属性,它就可以作为一个字段引用,并且可以在基于时间的操作中使用。

时间属性的行为类似于常规时间戳,可以访问,并且进行计算。

一、处理时间(Processing Time)

处理时间语义下,允许表处理程序根据机器的本地时间生成结果。它是时间的最简单概念。它既不需要提取时间戳,也不需要生成 watermark。

定义处理时间属性有三种方法:在 DataStream 转化时直接指定;在定义 Table Schema时指定;在创建表的 DDL 中指定。

1.1 DataStream 转化成 Table 时指定

由 DataStream 转换成表时,可以在后面指定字段名来定义 Schema。在定义 Schema 期间,可以111用.proctime,定义处理时间字段。

注意,这个 proctime 属性只能通过附加逻辑字段,来扩展物理 schema。因此,只能在 schema 定义的末尾定义它。

代码如下

代码语言:javascript复制
package Processing

import java.sql.Timestamp

import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.Table
import org.apache.flink.table.api.scala._

/**
 * @Package
 * @author 大数据老哥
 * @date 2020/12/21 15:02
 * @version V1.0
 */
object FlinkSqlProcessingTimeDataStream {
  def main(args: Array[String]): Unit = {
    //构建运行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1) // 设置并行度为1方便后面进行测试
    env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
    //构建表运行环境
    val tableEnv = StreamTableEnvironment.create(env)
    // 读取数据
    val inputStream = env.readTextFile("./data/sensor.txt")
    // 解析数据
    val dataStream = inputStream.map(data => {
      val dataArray = data.split(",")
      (dataArray(0), dataArray(1).toLong, dataArray(2).toDouble)
    })

    val sensorTable: Table = tableEnv.fromDataStream(dataStream, 'id, 'temperature, 'timestamp, 'pt.proctime)
    // 打印出元数据信息
    sensorTable.printSchema()
    // 打印输出
    sensorTable.toAppendStream[(String, Long, Double, Timestamp)].print()
    env.execute("FlinkSqlProcessingTime")
  }
}

1.2 定义 Table Schema 时指定

这种方法其实也很简单,只要在定义 Schema 的时候,加上一个新的字段,并指定成 proctime 就可以了。

代码如下

代码语言:javascript复制
package Processing

import java.nio.file.FileSystems
import java.sql.Timestamp

import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.DataTypes
import org.apache.flink.table.api.scala.{StreamTableEnvironment, tableConversions}
import org.apache.flink.table.descriptors.{Csv, FileSystem, Schema}

/**
* @Package Processing
* @File :FlinkSqlProcessingTimeTable.java
* @author 大数据老哥
* @date 2020/12/21 15:32
* @version V1.0
*/
object FlinkSqlProcessingTimeTable {
 def main(args: Array[String]): Unit = {
   //构建运行环境
   val env = StreamExecutionEnvironment.getExecutionEnvironment
   env.setParallelism(1) // 设置并行度为1方便后面进行测试
   //构建表运行环境
   val tableEnv = StreamTableEnvironment.create(env)
   tableEnv.connect(new FileSystem().path("./data/sensor.txt"))
     .withFormat(new Csv())
     .withSchema(new Schema()
       .field("id", DataTypes.STRING())
       .field("temperature", DataTypes.BIGINT())
       .field("timestamp", DataTypes.DOUBLE())
      // .field("pt", DataTypes.TIMESTAMP(3)).proctime()
     ).createTemporaryTable("inputTable")
   val sensorTable = tableEnv.sqlQuery("select * from inputTable")
   //打印出元数据信息
   sensorTable.printSchema()
   // 打印输出
   //sensorTable.toAppendStream[(String, Long, Double,Timestamp)].print()
   env.execute("FlinkSqlProcessingTimeTable")
 }
}

注意:使用Schema方式首先要看看它是否支持 csv 格式是不是支持的因为他底层没事进行实现。

使用Kafka的时候可以使用这种方式它底层实现了这个方法

1.3 创建表的 DDL 中指定

在创建表的 DDL 中,增加一个字段并指定成 proctime,也可以指定当前的时间字段。

代码如下

代码语言:javascript复制
package Processing

import java.sql.Timestamp

import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.EnvironmentSettings
import org.apache.flink.table.api.scala._

/**
* @Package Processing
* @File :FlinkSqlProsessingTableDDL.java
* @author 大数据老哥
* @date 2020/12/21 16:28
* @version V1.0
*/
object FlinkSqlProsessingTimeTableDDL {
 def main(args: Array[String]): Unit = {
   //构建运行环境
   val env = StreamExecutionEnvironment.getExecutionEnvironment
   env.setParallelism(1) // 设置并行度为1方便后面进行测试
   //构建表运行环境
   val settings = EnvironmentSettings.newInstance()
     .useBlinkPlanner()
     .inStreamingMode().build()
   val tableEnv = StreamTableEnvironment.create(env, settings)
   var sqlDDL =
     """
       |create table dataTable (
       | id varchar(20) not null,
       | ts bigint,
       | temperature double,
       | pt AS PROCTIME()
       |) with (
       | 'connector.type' = 'filesystem',
       | 'connector.path' = './data/sensor.txt',
       | 'format.type' = 'csv'
       |)
       |""".stripMargin
   tableEnv.sqlUpdate(sqlDDL)
   val table = tableEnv.sqlQuery("select * from dataTable ")
   table.printSchema()
   // 打印输出
   table.toAppendStream[(String, Long, Double,Timestamp)].print()
 env.execute("FlinkSqlProsessingTimeTableDDL")
 }
}

注意:运行这段 DDL,必须使用 Blink Planner。负责会报错

二、事件时间(Event Time)

事件时间语义,允许表处理程序根据每个记录中包含的时间生成结果。这样即使在有乱序事件或者延迟事件时,也可以获得正确的结果。 为了处理无序事件,并区分流中的准时和迟到事件;Flink 需要从事件数据中,提取时间戳,并用来推进事件时间的进展(watermark)。

2.1 DataStream 转化成 Table 时指定

在 DataStream 转换成 Table,schema 的定义期间,使用.rowtime可以定义事件时间属性。注意,必须在转换的数据流中分配时间戳和 watermark。

在将数据流转换为表时,有两种定义时间属性的方法。根据指定的.rowtime 字段名是否存在于数据流的架构中,timestamp 字段可以:

  • 作为新字段追加到 schema
  • 替换现有字段

在这两种情况下,定义的事件时间戳字段,都将保存 DataStream 中事件时间戳的值。

代码如下:

代码语言:javascript复制
package EventTime

import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.Table
import org.apache.flink.table.api.scala._
import org.apache.flink.types.Row

/**
 * @Package EventTime
 * @File :FlinkSqlEvnetTime.java
 * @author 大数据老哥
 * @date 2020/12/21 16:57
 * @version V1.0
 */
object FlinkSqlEventTime {
  def main(args: Array[String]): Unit = {
    //构建运行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1) // 设置并行度为1方便后面进行测试
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime )
    //构建表运行环境
    val tableEnv = StreamTableEnvironment.create(env)
    // 读取数据
    val inputStream = env.readTextFile("./data/sensor.txt")
    // 解析数据
    val dataStream = inputStream.map(data => {
      val dataArray = data.split(",")
      SensorReading(dataArray(0), dataArray(1).toLong, dataArray(2).toDouble)
    }).assignAscendingTimestamps(_.temperature * 1000L)
    // 方式一
    val sensorTable: Table = tableEnv.fromDataStream(dataStream, 'id, 'temperature.rowtime, 'timestamp)
    // 方式二
    val sensorTable2 = tableEnv.fromDataStream(dataStream, 'id, 'temperature, 'timestamp, 'rt.rowtime)
    sensorTable.printSchema()
    sensorTable.toAppendStream[Row].print("方式一")
    sensorTable2.toAppendStream[Row].print("方式二")
    env.execute("FlinkSqlEventTime")
  }
  case class SensorReading(id: String, temperature: Long, timestamp: Double)
}

2.2 定义 Table Schema 时指定

这种方法只要在定义 Schema 的时候,将事件时间字段,并指定成 rowtime 就可以了。

代码如下

代码语言:javascript复制
package EventTime

import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.DataTypes
import org.apache.flink.table.api.scala._
import org.apache.flink.table.descriptors.{Csv, FileSystem, Rowtime, Schema}
import org.apache.flink.types.Row

/**
 * @Package EventTime
 * @File :FlinkSqlEventTimeschema.java
 * @author 大数据老哥
 * @date 2020/12/21 22:30
 * @version V1.0
 */
object FlinkSqlEventTimeSchema {
  def main(args: Array[String]): Unit = {
    //构建运行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1) // 设置并行度为1方便后面进行测试
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime )
    //构建表运行环境
    val tableEnv = StreamTableEnvironment.create(env)
    // 读取数据
   tableEnv.connect(new FileSystem().path("./data/sensor.txt"))
     .withFormat(new Csv())
     .withSchema(new Schema()
       .field("id",DataTypes.STRING())
       .field("temperature",DataTypes.BIGINT())
       .rowtime(
         new Rowtime()
           .timestampsFromField("temperature")  // 从字段使用
           .watermarksPeriodicBounded(1000)  //  watermark 延迟 1 秒
       )
       .field("timestamp",DataTypes.DOUBLE())
       
     ).createTemporaryTable("inputTable") // 创建个临时表
    val table = tableEnv.sqlQuery("select  * from inputTable")
    table.printSchema()
    table.toAppendStream[Row].print()

    env.execute("FlinkSqlEventTimeSchema")
  }
}

2.3 为创建表的 DDL 中指定

事件时间属性,是使用 CREATE TABLE DDL 中的 WARDMARK 语句定义的。watermark 语句,定义现有事件时间字段上的 watermark 生成表达式,该表达式将事件时间字段标记为事件时间属性。

代码如下

代码语言:javascript复制
package EventTime

import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.EnvironmentSettings
import org.apache.flink.table.api.scala._
import org.apache.flink.types.Row

/**
 * @Package EventTime
 * @File :FlinkSqlEventTimeDDL.java
 * @author 大数据老哥
 * @date 2020/12/21 22:54
 * @version V1.0
 */
object FlinkSqlEventTimeDDL {
  def main(args: Array[String]): Unit = {
    //构建运行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1) // 设置并行度为1方便后面进行测试
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    //构建表运行环境
    val settings = EnvironmentSettings.newInstance()
      .useBlinkPlanner()
      .inStreamingMode().build()
    val tableEnv = StreamTableEnvironment.create(env,settings)
    var sqlDDL =
      """
        |create table dataTable(
        |id varchar(20),
        |ts bigint,
        |temperature double,
        | rt AS TO_TIMESTAMP(FROM_UNIXTIME(ts)),
        | watermark for rt as rt - interval '1' second
        |) with(
        |'connector.type'='filesystem',
        |'connector.path'='file:///D:d12FlinkFlinkSQLDemodatasensor.txt',
        |'format.type'='csv'
        |)
        |""".stripMargin

    tableEnv.sqlUpdate(sqlDDL) // 执行 DDL
    val table = tableEnv.sqlQuery("select * from dataTable")
    table.printSchema()
    table.toAppendStream[Row].print()
    env.execute("FlinkSqlEventTimeDDL")
  }
}

这里 FROM_UNIXTIME 是系统内置的时间函数,用来将一个整数(秒数)转换成“YYYY-MM-DD hh:mm:ss”格式(默认,也可以作为第二个 String 参数传入)的日期时间字符串(date time string);然后再用 TO_TIMESTAMP 将其转换成 Timestamp。

小结

上述分享了处理时间和事件时间 分别用是三种方式给大家进行实现了,上述有什么明白的可以在评论区留言或者关注我的公众号【大数据老哥】。后期会继续分享关于Flink 的知识,喜欢的朋友点个关注我们下期见~~~。

0 人点赞