一、Table API 和 Flink SQL 是什么?
• Flink 对批处理和流处理,提供了统一的上层 API
• Table API 是一套内嵌在 Java 和 Scala 语言中的查询API,它允许以非常直观的方式组合来自一些关系运算符的查询
• Flink 的 SQL 支持基于实现了 SQL 标准的 Apache Calcite
二、基本程序结构
代码语言:javascript复制// 创建表的执行环境
val tableEnv = StreamExecutionEnvironment.getExecutionEnvironment
// 创建一张表,用于读取数据
tableEnv.connect(...).createTemporaryTable("inputTable")
// 注册一张表,用于把计算结果输出
tableEnv.connect(...).createTemporaryTable("outputTable")
// 通过 Table API 查询算子,得到一张结果表
val result = tableEnv.from("inputTable").select(...)
// 通过 SQL查询语句,得到一张结果表
val sqlResult = tableEnv.sqlQuery("SELECT ... FROM inputTable ...")
// 将结果表写入输出表中
result.insertInto("outputTable")
三、 创建 TableEnvironment
创建表的执行环境,需要将 flink 流处理的执行环境传入
代码语言:javascript复制 val tableEnv = StreamTableEnvironment.create(env)
TableEnvironment 是 flink 中集成 Table API 和 SQL 的核心概念,所有对表的操作都基于 TableEnvironment
1. 注册 Catalog
2. 在 Catalog 中注册表
3. 执行 SQL 查询
4. 注册用户自定义函数(UDF)
3.1 配置老版本 planner 的流式查询TableEnvironment
代码语言:javascript复制val settings = EnvironmentSettings.newInstance()
.useOldPlanner()
.inStreamingMode()
.build()
val tableEnv = StreamTableEnvironment.create(env, settings)
3.2 配置老版本 planner 的批式查询TableEnvironment
代码语言:javascript复制val batchEnv = ExecutionEnvironment.getExecutionEnvironment
val batchTableEnv = BatchTableEnvironment.create(batchEnv)
3.3 配置 blink planner 的流式查询TableEnvironment
代码语言:javascript复制val bsSettings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build()
val bsTableEnv = StreamTableEnvironment.create(env, bsSettings)
3.4 配置 blink planner 的批式查询TableEnvironment
代码语言:javascript复制val bbSettings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inBatchMode()
.build()
val bbTableEnv = TableEnvironment.create(bbSettings)
四、表(Table)
TableEnvironment 可以注册目录 Catalog,并可以基于 Catalog 注册表;
表(Table)是由一个“标识符”(identifier)来指定的,由3部分组成:Catalog名、数据库(database)名和对象名;
表可以是常规的,也可以是虚拟的(视图,View);
常规表(Table)一般可以用来描述外部数据,比如文件、数据库表或消息队列的数据,也可以直接从 DataStream转换而来;
视图(View)可以从现有的表中创建,通常是 table API 或者 SQL 查询的一个结果集;
4.1 创建表
TableEnvironment 可以调用 .connect() 方法,连接外部系统,并调用 .createTemporaryTable() 方法,在 Catalog 中注册表
代码语言:javascript复制tableEnv
.connect(...) // 定义表的数据来源,和外部系统建立连接
.withFormat(...) // 定义数据格式化方法
.withSchema(...) // 定义表结构
.createTemporaryTable("MyTable") // 创建临时表
4.1.1 连接到外部文件系统(CSV)
代码语言:javascript复制val filePath = "***********"
tableEnvironment.connect(new FileSystem().path(filePath))
// .withFormat(new OldCsv()) //定义读取数据之后的格式化方法
// 新版CSV文件系统需要引入依赖
// <dependency>
// <groupId>org.apache.flink</groupId>
// <artifactId>flink-csv</artifactId>
// <version>1.10.0</version>
// </dependency>
.withFormat(new Csv())
.withSchema(
new Schema()
.field("id",DataTypes.STRING())
.field("timestamp",DataTypes.BIGINT())
.field("temperature",DataTypes.DOUBLE())
)
.createTemporaryTable("CSVTable")
//转换成流打印输出
val table = tableEnvironment.from("CSVTable")
table.toAppendStream[(String,Long,Double)].print()
4.1.2 连接到Kafka
代码语言:javascript复制tableEnvironment.connect(
new Kafka()
.version("0.11")
.topic("sensor")
.property("zookeeper.connect", "localhost:2181")
.property("bootstrap.servers", "localhost:9092")
)
.withFormat(new Csv())
.withSchema(new Schema()
.field("id",DataTypes.STRING())
.field("timestamp",DataTypes.BIGINT())
.field("temperature",DataTypes.DOUBLE())
)
.createTemporaryTable("KafkaTable")
//转换成流打印输出
val KafkaTable = tableEnvironment.from("KafkaTable")
KafkaTable.toAppendStream[(String,Long,Double)].print()
4.2 表的查询
Table API 是集成在 Scala 和 Java 语言内的查询 API;
Table API 基于代表“表”的 Table 类,并提供一整套操作处理的方法 API,这些方法会返回 一个新的 Table 对象,表示对输入表应用转换操作的结果
有些关系型转换操作,可以由多个方法调用组成,构成链式调用结构
代码语言:javascript复制// 定义样例类,传感器id,时间戳,温度
case class SensorReading(id: String, timestamp: Long, temperature: Double)
val sensorTable: Table = tableEnv.from("sensorTable")
val resultTable: Table = sensorTable
.select("id, temperature")
.filter("id = 'sensor_1'")
代码语言:javascript复制 //简单聚合查询
val resultTable: Table = sensorTable
.groupBy("id")
.select('id,'timestamp.max as 'max_timestamp)
4.3 表的查询-SQL
Flink 的 SQL 集成,基于实现 了SQL 标准的 Apache Calcite
在 Flink 中,用常规字符串来定义 SQL 查询语句
SQL 查询的结果,也是一个新的 Table
代码语言:javascript复制val resultSqlTable: Table = tableEnv
.sqlQuery("select id, temperature from sensorTable where id ='sensor_1'")
代码语言:javascript复制val resultSqlTable: Table = tableEnvironment.sqlQuery(
"""
|select
| id,max(timestamp) timestamp
|from
| CSVTable
|group by
| id
|""".stripMargin)
4.4 将 DataStream 转换成表
对于一个 DataStream,可以直接转换成 Table,进而方便地调用 Table API 做转换操作
代码语言:javascript复制val inputStream:DataStream[String] = environment.readTextFile("")
val datastream:DataStream[SensorReading] = inputStream.map(
data => {
val dataArray = data.split(",")
SensorReading(dataArray(0), dataArray(1).toLong, dataArray(2).toDouble)
}
)
val sensorTable: Table = tableEnv.fromDataStream(dataStream)
默认转换后的 Table schema 和 DataStream 中的字段定义一一对应,也可以单独指定出来
代码语言:javascript复制val inputStream:DataStream[String] = environment.readTextFile("")
val datastream:DataStream[SensorReading] = inputStream.map(
data => {
val dataArray = data.split(",")
SensorReading(dataArray(0), dataArray(1).toLong, dataArray(2).toDouble)
}
)
val sensorTable = tableEnv.fromDataStream(dataStream,'id, 'timestamp, 'temperature)
DataStream 中的数据类型,与表的 Schema 之间的对应关系,可以有两种:基于字段名称,或者基于字段位置
基于名称(name-based)
代码语言:javascript复制val sensorTable = tableEnv.fromDataStream(dataStream,
'timestamp as 'ts, 'id as 'myId, 'temperature)
基于位置(position-based)
代码语言:javascript复制val sensorTable = tableEnv.fromDataStream(dataStream, 'myId, 'ts)
4.5 创建临时视图(Temporary View)
4.5.1 基于 DataStream 创建临时视图
代码语言:javascript复制tableEnv.createTemporaryView("sensorView", dataStream)
tableEnv.createTemporaryView("sensorView",
dataStream, 'id, 'temperature, 'timestamp as 'ts)
4.5.2 基于 Table 创建临时视图
代码语言:javascript复制tableEnv.createTemporaryView("sensorView", sensorTable)
4.6 输出表
表的输出,是通过将数据写入 TableSink 来实现的
TableSink 是一个通用接口,可以支持不同的文件格式、存储数据库和消息队列
输出表最直接的方法,就是通过 Table.insertInto() 方法将一个 Table 写入注册过的 TableSink 中
更新模式
对于流式查询,需要声明如何在表和外部连接器之间执行转换与外部系统交换的消息类型,由更新模式(Update Mode)指定
追加(Append)模式
表只做插入操作,和外部连接器只交换插入(Insert)消息
撤回(Retract)模式
表和外部连接器交换添加(Add)和撤回(Retract)消息
插入操作(Insert)编码为 Add 消息;删除(Delete)编码为 Retract 消息;更新(Update)编码为上一条的 Retract 和下一条的 Add 消息
更新插入(Upsert)模式
更新和插入都被编码为 Upsert 消息;删除编码为 Delete 消息
代码语言:javascript复制tableEnv.connect(...)
.createTemporaryTable("outputTable")
val resultSqlTable: Table = ...
resultTable.insertInto("outputTable")
4.6.1 输出到文件
代码语言:javascript复制tableEnv.connect(new FileSystem().path("output.txt")) // 定义到文件系统的连接
.withFormat(new Csv())
.withSchema(new Schema()
.field("id", DataTypes.STRING()) .
field("temp", DataTypes.Double())
)
.createTemporaryTable("outputTable") // 创建临时表
resultTable.insertInto("outputTable") // 输出表
4.6.2 输出到 Kafka
代码语言:javascript复制tableEnv.connect(
new Kafka()
.version("0.11")
.topic("sinkTest")
.property("zookeeper.connect", "localhost:2181")
.property("bootstrap.servers", "localhost:9092"))
.withFormat( new Csv() )
.withSchema( new Schema()
.field("id", DataTypes.STRING())
.field("temp", DataTypes.DOUBLE())
)
.createTemporaryTable("kafkaOutputTable")
resultTable.insertInto("kafkaOutputTable")
4.6.3 输出到ES
代码语言:javascript复制tableEnv.connect(
new Elasticsearch()
.version("6")
.host("localhost", 9200, "http")
.index("sensor")
.documentType("temp")
)
.inUpsertMode() //更新模式调整为Upsert
.withFormat(new Json())
.withSchema( new Schema()
.field("id", DataTypes.STRING())
.field("count", DataTypes.BIGINT())
)
.createTemporaryTable("esOutputTable")
aggResultTable.insertInto("esOutputTable")
4.6.4 输出到 MySql
代码语言:javascript复制val sinkDDL: String =
"""
|create table jdbcOutputTable (
| id varchar(20) not null,
| cnt bigint not null
|) with (
| 'connector.type' = 'jdbc',
| 'connector.url' = 'jdbc:mysql://localhost:3306/test',
| 'connector.table' = 'sensor_count',
| 'connector.driver' = 'com.mysql.jdbc.Driver',
| 'connector.username' = 'root',
| 'connector.password' = '123456'
|)
""".stripMargin
tableEnv.sqlUpdate(sinkDDL) // 执行 DDL创建表
aggResultSqlTable.insertInto("jdbcOutputTable")
4.7 将 Table 转换成 DataStream
表可以转换为 DataStream 或 DataSet ,这样自定义流处理或批处理程序就可以继续在 Table API 或 SQL 查询的结果上运行了;
将表转换为 DataStream 或 DataSet 时,需要指定生成的数据类型,即要将表的每一行转换成的数据类型;
表作为流式查询的结果,是动态更新的;
转换有两种转换模式:追加(Appende)模式和撤回(Retract)模式;
追加模式(Append Mode)
用于表只会被插入(Insert)操作更改的场景
代码语言:javascript复制val resultStream: DataStream[Row] = tableEnv.toAppendStream[Row](resultTable)
撤回模式(Retract Mode)
用于任何场景。有些类似于更新模式中 Retract 模式,它只有 Insert 和 Delete 两类操作。
得到的数据会增加一个 Boolean 类型的标识位(返回的第一个字段),用它来表示到底是新增的数据(Insert),还是被删除的数据(Delete)
代码语言:javascript复制val aggResultStream: DataStream[(Boolean, (String, Long))] = tableEnv
.toRetractStream[(String, Long)](aggResultTable)
五、查看执行计划
Table API 提供了一种机制来解释计算表的逻辑和优化查询计划
查看执行计划,可以通过 TableEnvironment.explain(table) 方法或 TableEnvironment.explain() 方法完成,返回一个字符串,描述三个计划:
- 优化的逻辑查询计划
- 优化后的逻辑查询计划
- 实际执行计划
val explaination: String = tableEnv.explain(resultTable)
println(explaination)
六、流处理和关系代数的区别
七、动态表(Dynamic Tables)
动态表是 Flink 对流数据的 Table API 和 SQL 支持的核心概念
与表示批处理数据的静态表不同,动态表是随时间变化的
持续查询(Continuous Query)
- 动态表可以像静态的批处理表一样进行查询,查询一个动态表会产生持续查询(Continuous Query)
- 连续查询永远不会终止,并会生成另一个动态表
- 查询会不断更新其动态结果表,以反映其动态输入表上的更改
流式表查询的处理过程:
- 流被转换为动态表
- 对动态表计算连续查询,生成新的动态表
- 生成的动态表被转换回流
- 为了处理带有关系查询的流,必须先将其转换为表
- 从概念上讲,流的每个数据记录,都被解释为对结果表的插入(Insert)修改操作
- 持续查询会在动态表上做计算处理,并作为结果生成新的动态表
动态表转成 DataStream
与常规的数据库表一样,动态表可以通过插入(Insert)、更新(Update)和删除(Delete)更改,进行持续的修改
将动态表转换为流或将其写入外部系统时,需要对这些更改进行编码
仅追加(Append-only)流
仅通过插入(Insert)更改来修改的动态表,可以直接转换为仅追加流
撤回(Retract)流
撤回流是包含两类消息的流:添加(Add)消息和撤回(Retract)消息
Upsert(更新插入)流
Upsert 流也包含两种类型的消息:Upsert 消息和删除(Delete)消息。
八、时间特性
基于时间的操作(比如 Table API 和 SQL 中窗口操作),需要定义相关的时间语义和时间数据来源的信息
Table 可以提供一个逻辑上的时间字段,用于在表处理程序中,指示时间和访问相应的时间戳
时间属性,可以是每个表schema的一部分。一旦定义了时间属性,它就可以作为一个字段引用,并且可以在基于时间的操作中使用
时间属性的行为类似于常规时间戳,可以访问,并且进行计算
8.1 定义处理时间(Processing Time)
处理时间语义下,允许表处理程序根据机器的本地时间生成结果。它是时间的最简单概念。它既不需要提取时间戳,也不需要生成 watermark;
由 DataStream 转换成表时指定;
在定义Schema期间,可以使用.proctime,指定字段名定义处理时间字段;
这个proctime属性只能通过附加逻辑字段,来扩展物理schema。因此,只能在schema定义的末尾定义它;
由 DataStream 转换成表时指定
代码语言:javascript复制val sensorTable = tableEnv.fromDataStream(dataStream,
'id, 'temperature, 'timestamp, 'pt.proctime)
定义 Table Schema 时指定
代码语言:javascript复制.withSchema(new Schema()
.field("id", DataTypes.STRING())
.field("timestamp", DataTypes.BIGINT())
.field("temperature", DataTypes.DOUBLE())
.field("pt", DataTypes.TIMESTAMP(3))
.proctime()
)
在创建表的 DDL 中定义
代码语言:javascript复制val sinkDDL: String =
"""
|create table dataTable (
| id varchar(20) not null,
| ts bigint,
| temperature double,
| pt AS PROCTIME()
|) with (
| 'connector.type' = 'filesystem',
| 'connector.path' = '/sensor.txt',
| 'format.type' = 'csv'
|)
""".stripMargin
tableEnv.sqlUpdate(sinkDDL)
8.2 定义事件时间(Event Time)
事件时间语义,允许表处理程序根据每个记录中包含的时间生成结果。这样即使在有乱序事件或者延迟事件时,也可以获得正确的结果;
为了处理无序事件,并区分流中的准时和迟到事件;Flink 需要从事件数据中,提取时间戳,并用来推进事件时间的进展;
定义事件时间,同样有三种方法:
由 DataStream 转换成表时指定
定义 Table Schema 时指定
在创建表的 DDL 中定义
由 DataStream 转换成表时指定
在 DataStream 转换成 Table,使用 .rowtime 可以定义事件时间属性
代码语言:javascript复制// 将 DataStream转换为 Table,并指定时间字段
val sensorTable = tableEnv.fromDataStream(dataStream,
'id, 'timestamp.rowtime, 'temperature)
// 或者,直接追加时间字段
val sensorTable = tableEnv.fromDataStream(dataStream,
'id, 'temperature, 'timestamp, 'rt.rowtime)
定义 Table Schema 时指定
代码语言:javascript复制.withSchema(new Schema()
.field("id", DataTypes.STRING())
.field("timestamp", DataTypes.BIGINT())
.rowtime(
new Rowtime()
.timestampsFromField("timestamp") // 从字段中提取时间戳
.watermarksPeriodicBounded(1000) // watermark延迟1秒
)
.field("temperature", DataTypes.DOUBLE())
)
在创建表的 DDL 中定义
代码语言:javascript复制val sinkDDL: String =
"""
|create table dataTable (
| id varchar(20) not null,
| ts bigint,
| temperature double,
| rt AS TO_TIMESTAMP( FROM_UNIXTIME(ts) ),
| watermark for rt as rt - interval '1' second
|) with (
| 'connector.type' = 'filesystem',
| 'connector.path' = '/sensor.txt',
| 'format.type' = 'csv'
|)
""".stripMargin
tableEnv.sqlUpdate(sinkDDL)
九、窗口
时间语义,要配合窗口操作才能发挥作用
在 Table API 和 SQL 中,主要有两种窗口
Group Windows(分组窗口)
根据时间或行计数间隔,将行聚合到有限的组(Group)中,并对每个组的数据执行一次聚合函数
Over Windows
针对每个输入行,计算相邻行范围内的聚合
9.1 Group Windows
Group Windows 是使用 window(w:GroupWindow)子句定义的,并且必须由as子句指定一个别名;
为了按窗口对表进行分组,窗口的别名必须在 group by 子句中,像常规的分组字段一样引用。
代码语言:javascript复制val table = input
.window([w: GroupWindow] as 'w) // 定义窗口,别名为 w
.groupBy('w, 'a) // 按照字段 a和窗口 w分组
.select('a, 'b.sum) // 聚合
或者,还可以把窗口的相关信息,作为字段添加到结果表中:
代码语言:javascript复制val table = input
.window([w: GroupWindow] as 'w)
.groupBy('w, 'a)
.select('a, 'w.start, 'w.end, 'w.rowtime, 'b.count)
Table API提供了一组具有特定语义的预定义Window类,这些类会被转换为底层DataStream或DataSet的窗口操作。
Table API支持的窗口定义,和我们熟悉的一样,主要也是三种:滚动(Tumbling)、滑动(Sliding)和会话(Session)。
9.1.1 滚动窗口
滚动窗口(Tumbling windows)要用Tumble类来定义,另外还有三个方法:
over:定义窗口长度
on:用来分组(按时间间隔)或者排序(按行数)的时间字段
as:别名,必须出现在后面的groupBy中
代码语言:javascript复制// Tumbling Event-time Window(事件时间字段rowtime)
.window(Tumble over 10.minutes on 'rowtime as 'w)
// Tumbling Processing-time Window(处理时间字段proctime)
.window(Tumble over 10.minutes on 'proctime as 'w)
// Tumbling Row-count Window (类似于计数窗口,按处理时间排序,10行一组)
.window(Tumble over 10.rows on 'proctime as 'w)
9.1.2 滑动窗口
滑动窗口(Sliding windows)要用Slide类来定义,另外还有四个方法:
over:定义窗口长度
every:定义滑动步长
on:用来分组(按时间间隔)或者排序(按行数)的时间字段
as:别名,必须出现在后面的groupBy中
代码语言:javascript复制// Sliding Event-time Window
.window(Slide over 10.minutes every 5.minutes on 'rowtime as 'w)
// Sliding Processing-time window
.window(Slide over 10.minutes every 5.minutes on 'proctime as 'w)
// Sliding Row-count window
.window(Slide over 10.rows every 5.rows on 'proctime as 'w)
9.1.3 会话窗口
会话窗口(Session windows)要用Session类来定义,另外还有三个方法:
withGap:会话时间间隔
on:用来分组(按时间间隔)或者排序(按行数)的时间字段
as:别名,必须出现在后面的groupBy中
代码如下:
代码语言:javascript复制// Session Event-time Window
.window(Session withGap 10.minutes on 'rowtime as 'w)
// Session Processing-time Window
.window(Session withGap 10.minutes on 'proctime as 'w)
9.2 Over Windows
Over window聚合是标准SQL中已有的(Over子句),可以在查询的SELECT子句中定义。
Over window 聚合,会针对每个输入行,计算相邻行范围内的聚合。
Over windows使用.window(w:overwindows*)子句定义,并在select()方法中通过别名来引用。
比如这样:
代码语言:javascript复制val table = input
.window([w: OverWindow] as 'w)
.select('a, 'b.sum over 'w, 'c.min over 'w)
Table API提供了Over类,来配置Over窗口的属性。可以在事件时间或处理时间,以及指定为时间间隔、或行计数的范围内,定义Over windows。
无界的over window是使用常量指定的。也就是说,时间间隔要指定UNBOUNDED_RANGE,或者行计数间隔要指定UNBOUNDED_ROW。而有界的over window是用间隔的大小指定的。
实际代码应用如下:
1) 无界的 over window
代码语言:javascript复制// 无界的事件时间over window (时间字段 "rowtime")
.window(Over partitionBy 'a orderBy 'rowtime preceding UNBOUNDED_RANGE as 'w)
//无界的处理时间over window (时间字段"proctime")
.window(Over partitionBy 'a orderBy 'proctime preceding UNBOUNDED_RANGE as 'w)
// 无界的事件时间Row-count over window (时间字段 "rowtime")
.window(Over partitionBy 'a orderBy 'rowtime preceding UNBOUNDED_ROW as 'w)
//无界的处理时间Row-count over window (时间字段 "rowtime")
.window(Over partitionBy 'a orderBy 'proctime preceding UNBOUNDED_ROW as 'w)
2) 有界的over window
代码语言:javascript复制// 有界的事件时间over window (时间字段 "rowtime",之前1分钟)
.window(Over partitionBy 'a orderBy 'rowtime preceding 1.minutes as 'w)
// 有界的处理时间over window (时间字段 "rowtime",之前1分钟)
.window(Over partitionBy 'a orderBy 'proctime preceding 1.minutes as 'w)
// 有界的事件时间Row-count over window (时间字段 "rowtime",之前10行)
.window(Over partitionBy 'a orderBy 'rowtime preceding 10.rows as 'w)
// 有界的处理时间Row-count over window (时间字段 "rowtime",之前10行)
.window(Over partitionBy 'a orderBy 'proctime preceding 10.rows as 'w)
9.3 SQL中窗口的定义
9.3.1 Group Windows
Group Windows在SQL查询的Group BY子句中定义。与使用常规GROUP BY子句的查询一样,使用GROUP BY子句的查询会计算每个组的单个结果行。
SQL支持以下Group窗口函数:
代码语言:javascript复制 TUMBLE(time_attr, interval)
定义一个滚动窗口,第一个参数是时间字段,第二个参数是窗口长度。
代码语言:javascript复制 HOP(time_attr, interval, interval)
定义一个滑动窗口,第一个参数是时间字段,第二个参数是窗口滑动步长,第三个是窗口长度。
代码语言:javascript复制 SESSION(time_attr, interval)
定义一个会话窗口,第一个参数是时间字段,第二个参数是窗口间隔(Gap)。
另外还有一些辅助函数,可以用来选择Group Window的开始和结束时间戳,以及时间属性。
这里只写TUMBLE_,滑动和会话窗口是类似的(HOP_,SESSION_*)。
代码语言:javascript复制TUMBLE_START(time_attr, interval)
TUMBLE_END(time_attr, interval)
TUMBLE_ROWTIME(time_attr, interval)
TUMBLE_PROCTIME(time_attr, interval)
9.3.2 Over Windows
由于Over本来就是SQL内置支持的语法,所以这在SQL中属于基本的聚合操作。所有聚合必须在同一窗口上定义,也就是说,必须是相同的分区、排序和范围。目前仅支持在当前行范围之前的窗口(无边界和有边界)。
注意,ORDER BY必须在单一的时间属性上指定。
代码如下:
代码语言:javascript复制SELECT COUNT(amount) OVER (
PARTITION BY use
ORDER BY proctime
ROWS BETWEEN 2 PRECEDING AND CURRENT ROW)
FROM Orders
// 也可以做多个聚合
SELECT COUNT(amount) OVER w, SUM(amount) OVER w
FROM Orders
WINDOW w AS (
PARTITION BY use
ORDER BY proctime
ROWS BETWEEN 2 PRECEDING AND CURRENT ROW)
十、函数
10.1 系统内置函数
Flink Table API 和 SQL为用户提供了一组用于数据转换的内置函数。SQL中支持的很多函数,Table API和SQL都已经做了实现,其它还在快速开发扩展中。
以下是一些典型函数的举例,全部的内置函数,可以参考官网介绍。
比较函数
代码语言:javascript复制SQL:
value1 = value2
value1 > value2
Table API:
ANY1 === ANY2
ANY1 > ANY2
逻辑函数
代码语言:javascript复制SQL:
boolean1 OR boolean2
boolean IS FALSE
NOT boolean
Table API:
BOOLEAN1 || BOOLEAN2
BOOLEAN.isFalse
!BOOLEAN
算术函数
代码语言:javascript复制SQL:
numeric1 numeric2
POWER(numeric1, numeric2)
Table API:
NUMERIC1 NUMERIC2
NUMERIC1.power(NUMERIC2)
字符串函数
代码语言:javascript复制SQL:
string1 || string2
UPPER(string)
CHAR_LENGTH(string)
Table API:
STRING1 STRING2
STRING.upperCase()
STRING.charLength()
时间函数
代码语言:javascript复制SQL:
DATE string
TIMESTAMP string
CURRENT_TIME
INTERVAL string range
Table API:
STRING.toDate
STRING.toTimestamp
currentTime()
NUMERIC.days
NUMERIC.minutes
聚合函数
代码语言:javascript复制SQL:
COUNT(*)
SUM([ ALL | DISTINCT ] expression)
RANK()
ROW_NUMBER()
Table API:
FIELD.count
FIELD.sum0
10.2 UDF
用户定义函数(User-defined Functions,UDF)是一个重要的特性,因为它们显著地扩展了查询(Query)的表达能力。一些系统内置函数无法解决的需求,我们可以用UDF来自定义实现。
在大多数情况下,用户定义的函数必须先注册,然后才能在查询中使用。不需要专门为Scala 的Table API注册函数。
函数通过调用registerFunction()方法在TableEnvironment中注册。当用户定义的函数被注册时,它被插入到TableEnvironment的函数目录中,这样Table API或SQL解析器就可以识别并正确地解释它。
10.2.1 标量函数(Scalar Functions)
用户定义的标量函数,可以将0、1或多个标量值,映射到新的标量值(输出单个值)。
为了定义标量函数,必须在org.apache.flink.table.functions中扩展基类Scalar Function,并实现(一个或多个)求值(evaluation,eval)方法。标量函数的行为由求值方法决定,求值方法必须公开声明并命名为eval(直接def声明,没有override)。求值方法的参数类型和返回类型,确定了标量函数的参数和返回类型。
在下面的代码中,我们定义自己的HashCode函数,在TableEnvironment中注册它,并在查询中调用它。
代码语言:javascript复制// 自定义一个标量函数
class HashCode( factor: Int ) extends ScalarFunction {
def eval( s: String ): Int = {
s.hashCode * facto
}
}
主函数中调用,计算sensor id的哈希值(前面部分照抄,流环境、表环境、读取source、建表):
代码语言:javascript复制def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val settings = EnvironmentSettings
.newInstance()
.useOldPlanner()
.inStreamingMode()
.build()
val tableEnv = StreamTableEnvironment.create( env, settings )
// 定义好 DataStream
val inputStream: DataStream[String] = env.readTextFile("..\sensor.txt")
val dataStream: DataStream[SensorReading] = inputStream
.map(data => {
val dataArray = data.split(",")
SensorReading(dataArray(0), dataArray(1).toLong, dataArray(2).toDouble)
})
.assignAscendingTimestamps(_.timestamp * 1000L)
// 将 DataStream转换为 Table,并指定时间字段
val sensorTable = tableEnv.fromDataStream(dataStream, 'id, 'timestamp.rowtime, 'temperature)
// Table API中使用
val hashCode = new HashCode(10)
val resultTable = sensorTable
.select( 'id, hashCode('id) )
// SQL 中使用
tableEnv.createTemporaryView("sensor", sensorTable)
tableEnv.registerFunction("hashCode", hashCode)
val resultSqlTable = tableEnv.sqlQuery("select id, hashCode(id) from sensor")
// 转换成流,打印输出
resultTable.toAppendStream[Row].print("table")
resultSqlTable.toAppendStream[Row].print("sql")
env.execute()
}
10.2.2 表函数(Table Functions)
与用户定义的标量函数类似,用户定义的表函数,可以将0、1或多个标量值作为输入参数;与标量函数不同的是,它可以返回任意数量的行作为输出,而不是单个值。
为了定义一个表函数,必须扩展org.apache.flink.table.functions中的基类TableFunction并实现(一个或多个)求值方法。表函数的行为由其求值方法决定,求值方法必须是public的,并命名为eval。求值方法的参数类型,决定表函数的所有有效参数。
返回表的类型由TableFunction的泛型类型确定。求值方法使用protected collect(T)方法发出输出行。
在Table API中,Table函数需要与.joinLateral或.leftOuterJoinLateral一起使用。
joinLateral算子,会将外部表中的每一行,与表函数(TableFunction,算子的参数是它的表达式)计算得到的所有行连接起来。
而leftOuterJoinLateral算子,则是左外连接,它同样会将外部表中的每一行与表函数计算生成的所有行连接起来;并且,对于表函数返回的是空表的外部行,也要保留下来。
在SQL中,则需要使用Lateral Table(),或者带有ON TRUE条件的左连接。
下面的代码中,我们将定义一个表函数,在表环境中注册它,并在查询中调用它。
自定义TableFunction:
代码语言:javascript复制// 自定义TableFunction
class Split(separator: String) extends TableFunction[(String, Int)]{
def eval(str: String): Unit = {
str.split(separator).foreach(
word => collect((word, word.length))
)
}
}
接下来,就是在代码中调用。首先是Table API的方式:
代码语言:javascript复制// Table API中调用,需要用joinLateral
val resultTable = sensorTable
.joinLateral(split('id) as ('word, 'length)) // as对输出行的字段重命名
.select('id, 'word, 'length)
// 或者用leftOuterJoinLateral
val resultTable2 = sensorTable
.leftOuterJoinLateral(split('id) as ('word, 'length))
.select('id, 'word, 'length)
// 转换成流打印输出
resultTable.toAppendStream[Row].print("1")
resultTable2.toAppendStream[Row].print("2")
然后是SQL的方式:
代码语言:javascript复制 tableEnv.createTemporaryView("sensor", sensorTable)
tableEnv.registerFunction("split", split)
val resultSqlTable = tableEnv.sqlQuery(
"""
|select id, word, length
|from
|sensor, LATERAL TABLE(split(id)) AS newsensor(word, length)
""".stripMargin)
// 或者用左连接的方式
val resultSqlTable2 = tableEnv.sqlQuery(
"""
|SELECT id, word, length
|FROM
|senso
| LEFT JOIN
| LATERAL TABLE(split(id)) AS newsensor(word, length)
| ON TRUE
""".stripMargin
)
// 转换成流打印输出
resultSqlTable.toAppendStream[Row].print("1")
resultSqlTable2.toAppendStream[Row].print("2")
10.2.3 聚合函数(Aggregate Functions)
用户自定义聚合函数(User-Defined Aggregate Functions,UDAGGs)可以把一个表中的数据,聚合成一个标量值。用户定义的聚合函数,是通过继承AggregateFunction抽象类实现的。
上图中显示了一个聚合的例子。
假设现在有一张表,包含了各种饮料的数据。该表由三列(id、name和price)、五行组成数据。现在我们需要找到表中所有饮料的最高价格,即执行max()聚合,结果将是一个数值。
AggregateFunction的工作原理如下:
首先,它需要一个累加器,用来保存聚合中间结果的数据结构(状态)。可以通过调用AggregateFunction的createAccumulator()方法创建空累加器。
随后,对每个输入行调用函数的accumulate()方法来更新累加器。
处理完所有行后,将调用函数的getValue()方法来计算并返回最终结果。
AggregationFunction要求必须实现的方法:
代码语言:javascript复制createAccumulator()
accumulate()
getValue()
除了上述方法之外,还有一些可选择实现的方法。其中一些方法,可以让系统执行查询更有效率,而另一些方法,对于某些场景是必需的。例如,如果聚合函数应用在会话窗口(session group window)的上下文中,则merge()方法是必需的。
代码语言:javascript复制retract()
merge()
resetAccumulator()
接下来我们写一个自定义AggregateFunction,计算一下每个sensor的平均温度值。
代码语言:javascript复制// 定义AggregateFunction的Accumulato
class AvgTempAcc {
var sum: Double = 0.0
var count: Int = 0
}
class AvgTemp extends AggregateFunction[Double, AvgTempAcc] {
override def getValue(accumulator: AvgTempAcc): Double =
accumulator.sum / accumulator.count
override def createAccumulator(): AvgTempAcc = new AvgTempAcc
def accumulate(accumulator: AvgTempAcc, temp: Double): Unit ={
accumulator.sum = temp
accumulator.count = 1
}
}
接下来就可以在代码中调用了。
代码语言:javascript复制// 创建一个聚合函数实例
val avgTemp = new AvgTemp()
// Table API的调用
val resultTable = sensorTable.groupBy('id)
.aggregate(avgTemp('temperature) as 'avgTemp)
.select('id, 'avgTemp)
// SQL的实现
tableEnv.createTemporaryView("sensor", sensorTable)
tableEnv.registerFunction("avgTemp", avgTemp)
val resultSqlTable = tableEnv.sqlQuery(
"""
|SELECT
|id, avgTemp(temperature)
|FROM
|senso
|GROUP BY id
""".stripMargin)
// 转换成流打印输出
resultTable.toRetractStream[(String, Double)].print("agg temp")
resultSqlTable.toRetractStream[Row].print("agg temp sql")
10.2.4 表聚合函数(Table Aggregate Functions)
用户定义的表聚合函数(User-Defined Table Aggregate Functions,UDTAGGs),可以把一个表中数据,聚合为具有多行和多列的结果表。这跟AggregateFunction非常类似,只是之前聚合结果是一个标量值,现在变成了一张表。
比如现在我们需要找到表中所有饮料的前2个最高价格,即执行top2()表聚合。我们需要检查5行中的每一行,得到的结果将是一个具有排序后前2个值的表。
用户定义的表聚合函数,是通过继承TableAggregateFunction抽象类来实现的。
TableAggregateFunction的工作原理如下:
首先,它同样需要一个累加器(Accumulator),它是保存聚合中间结果的数据结构。通过调用TableAggregateFunction的createAccumulator()方法可以创建空累加器。
随后,对每个输入行调用函数的accumulate()方法来更新累加器。
处理完所有行后,将调用函数的emitValue()方法来计算并返回最终结果。
AggregationFunction要求必须实现的方法:
代码语言:javascript复制createAccumulator()
accumulate()
除了上述方法之外,还有一些可选择实现的方法:
代码语言:javascript复制retract()
merge()
resetAccumulator()
emitValue()
emitUpdateWithRetract()
接下来我们写一个自定义TableAggregateFunction,用来提取每个sensor最高的两个温度值。
代码语言:javascript复制// 先定义一个 Accumulator
class Top2TempAcc{
var highestTemp: Double = Int.MinValue
var secondHighestTemp: Double = Int.MinValue
}
// 自定义 TableAggregateFunction
class Top2Temp extends TableAggregateFunction[(Double, Int), Top2TempAcc]{
override def createAccumulator(): Top2TempAcc = new Top2TempAcc
def accumulate(acc: Top2TempAcc, temp: Double): Unit ={
if( temp > acc.highestTemp ){
acc.secondHighestTemp = acc.highestTemp
acc.highestTemp = temp
} else if( temp > acc.secondHighestTemp ){
acc.secondHighestTemp = temp
}
}
def emitValue(acc: Top2TempAcc, out: Collector[(Double, Int)]): Unit ={
out.collect(acc.highestTemp, 1)
out.collect(acc.secondHighestTemp, 2)
}
}
接下来就可以在代码中调用了。
代码语言:javascript复制// 创建一个表聚合函数实例
val top2Temp = new Top2Temp()
// Table API的调用
val resultTable = sensorTable.groupBy('id)
.flatAggregate( top2Temp('temperature) as ('temp, 'rank) )
.select('id, 'temp, 'rank)
// 转换成流打印输出
resultTable.toRetractStream[(String, Double, Int)].print("agg temp")
resultSqlTable.toRetractStream[Row].print("agg temp sql")