前言
基于时间的操作(比如 Table API 和 SQL 中窗口操作),需要定义相关的时间语义和时间
数据来源的信息。所以,Table 可以提供一个逻辑上的时间字段,用于在表处理程序中,指示时间和访问相应的时间戳。
时间属性,可以是每个表 schema 的一部分。一旦定义了时间属性,它就可以作为一个字段引用,并且可以在基于时间的操作中使用。
时间属性的行为类似于常规时间戳,可以访问,并且进行计算。
一、处理时间(Processing Time)
处理时间语义下,允许表处理程序根据机器的本地时间生成结果。它是时间的最简单概念。它既不需要提取时间戳,也不需要生成 watermark。
定义处理时间属性有三种方法:在 DataStream 转化时直接指定;在定义 Table Schema时指定;在创建表的 DDL 中指定。
1.1 DataStream 转化成 Table 时指定
由 DataStream 转换成表时,可以在后面指定字段名来定义 Schema。在定义 Schema 期间,可以111用.proctime,定义处理时间字段。
注意,这个 proctime 属性只能通过附加逻辑字段,来扩展物理 schema。因此,只能在 schema 定义的末尾定义它。
代码如下:
代码语言:javascript复制package Processing
import java.sql.Timestamp
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.Table
import org.apache.flink.table.api.scala._
/**
* @Package
* @author 大数据老哥
* @date 2020/12/21 15:02
* @version V1.0
*/
object FlinkSqlProcessingTimeDataStream {
def main(args: Array[String]): Unit = {
//构建运行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1) // 设置并行度为1方便后面进行测试
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
//构建表运行环境
val tableEnv = StreamTableEnvironment.create(env)
// 读取数据
val inputStream = env.readTextFile("./data/sensor.txt")
// 解析数据
val dataStream = inputStream.map(data => {
val dataArray = data.split(",")
(dataArray(0), dataArray(1).toLong, dataArray(2).toDouble)
})
val sensorTable: Table = tableEnv.fromDataStream(dataStream, 'id, 'temperature, 'timestamp, 'pt.proctime)
// 打印出元数据信息
sensorTable.printSchema()
// 打印输出
sensorTable.toAppendStream[(String, Long, Double, Timestamp)].print()
env.execute("FlinkSqlProcessingTime")
}
}
1.2 定义 Table Schema 时指定
这种方法其实也很简单,只要在定义 Schema 的时候,加上一个新的字段,并指定成 proctime 就可以了。
代码如下
代码语言:javascript复制package Processing
import java.nio.file.FileSystems
import java.sql.Timestamp
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.DataTypes
import org.apache.flink.table.api.scala.{StreamTableEnvironment, tableConversions}
import org.apache.flink.table.descriptors.{Csv, FileSystem, Schema}
/**
* @Package Processing
* @File :FlinkSqlProcessingTimeTable.java
* @author 大数据老哥
* @date 2020/12/21 15:32
* @version V1.0
*/
object FlinkSqlProcessingTimeTable {
def main(args: Array[String]): Unit = {
//构建运行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1) // 设置并行度为1方便后面进行测试
//构建表运行环境
val tableEnv = StreamTableEnvironment.create(env)
tableEnv.connect(new FileSystem().path("./data/sensor.txt"))
.withFormat(new Csv())
.withSchema(new Schema()
.field("id", DataTypes.STRING())
.field("temperature", DataTypes.BIGINT())
.field("timestamp", DataTypes.DOUBLE())
// .field("pt", DataTypes.TIMESTAMP(3)).proctime()
).createTemporaryTable("inputTable")
val sensorTable = tableEnv.sqlQuery("select * from inputTable")
//打印出元数据信息
sensorTable.printSchema()
// 打印输出
//sensorTable.toAppendStream[(String, Long, Double,Timestamp)].print()
env.execute("FlinkSqlProcessingTimeTable")
}
}
注意:使用Schema方式首先要看看它是否支持 csv 格式是不是支持的因为他底层没事进行实现。
使用Kafka的时候可以使用这种方式它底层实现了这个方法
1.3 创建表的 DDL 中指定
在创建表的 DDL 中,增加一个字段并指定成 proctime,也可以指定当前的时间字段。
代码如下
代码语言:javascript复制package Processing
import java.sql.Timestamp
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.EnvironmentSettings
import org.apache.flink.table.api.scala._
/**
* @Package Processing
* @File :FlinkSqlProsessingTableDDL.java
* @author 大数据老哥
* @date 2020/12/21 16:28
* @version V1.0
*/
object FlinkSqlProsessingTimeTableDDL {
def main(args: Array[String]): Unit = {
//构建运行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1) // 设置并行度为1方便后面进行测试
//构建表运行环境
val settings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode().build()
val tableEnv = StreamTableEnvironment.create(env, settings)
var sqlDDL =
"""
|create table dataTable (
| id varchar(20) not null,
| ts bigint,
| temperature double,
| pt AS PROCTIME()
|) with (
| 'connector.type' = 'filesystem',
| 'connector.path' = './data/sensor.txt',
| 'format.type' = 'csv'
|)
|""".stripMargin
tableEnv.sqlUpdate(sqlDDL)
val table = tableEnv.sqlQuery("select * from dataTable ")
table.printSchema()
// 打印输出
table.toAppendStream[(String, Long, Double,Timestamp)].print()
env.execute("FlinkSqlProsessingTimeTableDDL")
}
}
注意:运行这段 DDL,必须使用 Blink Planner。负责会报错
二、事件时间(Event Time)
事件时间语义,允许表处理程序根据每个记录中包含的时间生成结果。这样即使在有乱序事件或者延迟事件时,也可以获得正确的结果。 为了处理无序事件,并区分流中的准时和迟到事件;Flink 需要从事件数据中,提取时间戳,并用来推进事件时间的进展(watermark)。
2.1 DataStream 转化成 Table 时指定
在 DataStream 转换成 Table,schema 的定义期间,使用.rowtime
可以定义事件时间属性。注意,必须在转换的数据流中分配时间戳和 watermark。
在将数据流转换为表时,有两种定义时间属性的方法。根据指定的.rowtime 字段名是否存在于数据流的架构中,timestamp 字段可以:
- 作为新字段追加到 schema
- 替换现有字段
在这两种情况下,定义的事件时间戳字段,都将保存 DataStream 中事件时间戳的值。
代码如下:
代码语言:javascript复制package EventTime
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.Table
import org.apache.flink.table.api.scala._
import org.apache.flink.types.Row
/**
* @Package EventTime
* @File :FlinkSqlEvnetTime.java
* @author 大数据老哥
* @date 2020/12/21 16:57
* @version V1.0
*/
object FlinkSqlEventTime {
def main(args: Array[String]): Unit = {
//构建运行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1) // 设置并行度为1方便后面进行测试
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime )
//构建表运行环境
val tableEnv = StreamTableEnvironment.create(env)
// 读取数据
val inputStream = env.readTextFile("./data/sensor.txt")
// 解析数据
val dataStream = inputStream.map(data => {
val dataArray = data.split(",")
SensorReading(dataArray(0), dataArray(1).toLong, dataArray(2).toDouble)
}).assignAscendingTimestamps(_.temperature * 1000L)
// 方式一
val sensorTable: Table = tableEnv.fromDataStream(dataStream, 'id, 'temperature.rowtime, 'timestamp)
// 方式二
val sensorTable2 = tableEnv.fromDataStream(dataStream, 'id, 'temperature, 'timestamp, 'rt.rowtime)
sensorTable.printSchema()
sensorTable.toAppendStream[Row].print("方式一")
sensorTable2.toAppendStream[Row].print("方式二")
env.execute("FlinkSqlEventTime")
}
case class SensorReading(id: String, temperature: Long, timestamp: Double)
}
2.2 定义 Table Schema 时指定
这种方法只要在定义 Schema 的时候,将事件时间字段,并指定成 rowtime 就可以了。
代码如下
代码语言:javascript复制package EventTime
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.DataTypes
import org.apache.flink.table.api.scala._
import org.apache.flink.table.descriptors.{Csv, FileSystem, Rowtime, Schema}
import org.apache.flink.types.Row
/**
* @Package EventTime
* @File :FlinkSqlEventTimeschema.java
* @author 大数据老哥
* @date 2020/12/21 22:30
* @version V1.0
*/
object FlinkSqlEventTimeSchema {
def main(args: Array[String]): Unit = {
//构建运行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1) // 设置并行度为1方便后面进行测试
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime )
//构建表运行环境
val tableEnv = StreamTableEnvironment.create(env)
// 读取数据
tableEnv.connect(new FileSystem().path("./data/sensor.txt"))
.withFormat(new Csv())
.withSchema(new Schema()
.field("id",DataTypes.STRING())
.field("temperature",DataTypes.BIGINT())
.rowtime(
new Rowtime()
.timestampsFromField("temperature") // 从字段使用
.watermarksPeriodicBounded(1000) // watermark 延迟 1 秒
)
.field("timestamp",DataTypes.DOUBLE())
).createTemporaryTable("inputTable") // 创建个临时表
val table = tableEnv.sqlQuery("select * from inputTable")
table.printSchema()
table.toAppendStream[Row].print()
env.execute("FlinkSqlEventTimeSchema")
}
}
2.3 为创建表的 DDL 中指定
事件时间属性,是使用 CREATE TABLE DDL 中的 WARDMARK 语句定义的。watermark 语句,定义现有事件时间字段上的 watermark 生成表达式,该表达式将事件时间字段标记为事件时间属性。
代码如下
代码语言:javascript复制package EventTime
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.EnvironmentSettings
import org.apache.flink.table.api.scala._
import org.apache.flink.types.Row
/**
* @Package EventTime
* @File :FlinkSqlEventTimeDDL.java
* @author 大数据老哥
* @date 2020/12/21 22:54
* @version V1.0
*/
object FlinkSqlEventTimeDDL {
def main(args: Array[String]): Unit = {
//构建运行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1) // 设置并行度为1方便后面进行测试
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
//构建表运行环境
val settings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode().build()
val tableEnv = StreamTableEnvironment.create(env,settings)
var sqlDDL =
"""
|create table dataTable(
|id varchar(20),
|ts bigint,
|temperature double,
| rt AS TO_TIMESTAMP(FROM_UNIXTIME(ts)),
| watermark for rt as rt - interval '1' second
|) with(
|'connector.type'='filesystem',
|'connector.path'='file:///D:d12FlinkFlinkSQLDemodatasensor.txt',
|'format.type'='csv'
|)
|""".stripMargin
tableEnv.sqlUpdate(sqlDDL) // 执行 DDL
val table = tableEnv.sqlQuery("select * from dataTable")
table.printSchema()
table.toAppendStream[Row].print()
env.execute("FlinkSqlEventTimeDDL")
}
}
这里 FROM_UNIXTIME 是系统内置的时间函数,用来将一个整数(秒数)转换成“YYYY-MM-DD hh:mm:ss”格式(默认,也可以作为第二个 String 参数传入)的日期时间字符串(date time string);然后再用 TO_TIMESTAMP 将其转换成 Timestamp。
小结
上述分享了处理时间和事件时间 分别用是三种方式给大家进行实现了,上述有什么明白的可以在评论区留言或者关注我的公众号【大数据老哥】。后期会继续分享关于Flink 的知识,喜欢的朋友点个关注我们下期见~~~。