Flink 版本:1.11
本文将解释如何在 Flink 的 Table API 和 SQL 中为基于时间的操作定义时间属性。
1. 时间属性介绍
基于时间的操作,例如,Table API 和 SQL 查询中的窗口,需要知道时间相关的信息。因此,表需要提供逻辑时间属性以指明时间以及提供访问相应的时间戳。时间属性可以作为表 schema 的一部分,可以在用 CREATE TABLE DDL 语句创建表的时候指定、也可以在 DataStream 中指定、也可以在定义 TableSource 时指定。一旦时间属性定义好,就可以像普通列一样使用,也可以在时间相关的操作中使用。
只要时间属性没有被修改,只是从查询的一部分转发到另一部分,那么仍然是一个有效的时间属性。时间属性的行为类似于常规时间戳,并可用于计算。当在计算中使用时,时间属性被物化为一个标准时间戳。但是,不能使用普通时间戳来代替时间属性,也不能将其转换为时间属性。
2. 如何定义时间属性
Flink 可以根据如下两种时间概念来处理数据:
- 处理时间是指机器执行相应操作的系统时间(也称为纪元时间,例如 Java 的 System.currentTimeMillis())。
- 事件时间是指根据每一行中的时间戳来处理数据流。
因此,时间属性可以是基于处理时间的,也可以基于事件时间。此外,时间属性可以作为表 schema 的一部分,可以在用 CREATE TABLE DDL 语句创建表的时候指定、也可以在 DataStream 中指定、也可以在定义 TableSource 时指定。
2.1 处理时间
处理时间是基于机器的本地时间来处理数据,是最简单的一种时间概念,但是它不能提供确定性的结果。不同于事件时间,既不需要从数据里获取时间戳,也不需要生成 watermark。
2.1.1 在 DDL 中定义
处理时间属性可以在用 CREATE TABLE DDL 语句创建表时用计算列的方式定义。可以使用 PROCTIME() 函数定义处理时间,函数的返回类型是 TIMESTAMP_LTZ 类型。
代码语言:javascript复制CREATE TABLE user_actions (
user_name STRING,
data STRING,
user_action_time AS PROCTIME() -- 声明一个额外列作为处理时间属性
) WITH (
...
);
SELECT TUMBLE_START(user_action_time, INTERVAL '10' MINUTE), COUNT(DISTINCT user_name)
FROM user_actions
GROUP BY TUMBLE(user_action_time, INTERVAL '10' MINUTE);
计算列是一个虚拟列,使用 column_name AS computed_column_expression 语法生成,例如,cost AS price * quanitity,其中 price 和 quanitity 是表中的两个实际物理列。
2.1.2 在 DataStream 到 Table 转换时定义
在 DataStream 转换 Table 时,处理时间属性是在 schema 定义时使用 .proctime 属性定义。时间属性只能通过一个额外的逻辑字段来扩展物理 schema。因此,只能在 schema 定义的末尾进行定义。
代码语言:javascript复制DataStream<Tuple2<String, String>> stream = ...;
Table table = tEnv.fromDataStream(stream, $("user_name"), $("data"), $("user_action_time").proctime());
WindowedTable windowedTable = table.window(
Tumble.over(lit(10).minutes())
.on($("user_action_time"))
.as("userActionWindow")
);
2.1.3 在 TableSource 中定义
处理时间属性可以在实现了 DefinedProctimeAttribute 的 TableSource 中定义。逻辑时间属性会放在 TableSource 已有物理字段的最后。
代码语言:javascript复制// 定义一个由处理时间属性的 table source
public class UserActionSource implements StreamTableSource<Row>, DefinedProctimeAttribute {
@Override
public TypeInformation<Row> getReturnType() {
String[] names = new String[] {"user_name" , "data"};
TypeInformation[] types = new TypeInformation[] { Types.STRING(), Types.STRING()};
return Types.ROW(names, types);
}
@Override
public DataStream<Row> getDataStream(StreamExecutionEnvironment execEnv) {
// create stream
DataStream<Row> stream = ;
return stream;
}
@Override
public String getProctimeAttribute() {
// 这个名字的列会被追加到最后,作为第三列
return "user_action_time";
}
}
// register table source
tEnv.registerTableSource("user_actions", new UserActionSource());
WindowedTable windowedTable = tEnv
.from("user_actions")
.window(Tumble
.over(lit(10).minutes())
.on($("user_action_time"))
.as("userActionWindow"));
2.2 事件时间
事件时间允许 Table 程序根据每条记录中的时间戳生成结果,即使出现乱序或延迟事件也能获得一致的结果。此外,事件时间可以为在批处理和流环境中的 Table 程序提供统一的语法。流环境中的时间属性可以是批处理环境中一行的常规列。
为了处理乱序事件并区分流中的 on-time 和 late 事件,Flink 需要知道每一行的时间戳,并且还需要知道到目前为止处理进展(通过 Watermark)。
2.2.1 在 DDL 中定义
事件时间属性可以用 CREATE TABLE DDL 语句创建表时用 WATERMARK 语句定义。WATERMARK 语句在现有事件时间字段上定义 WATERMARK 生成表达式,将事件时间字段标记为事件时间属性。Flink 支持在 TIMESTAMP 列和 TIMESTAMP_LTZ 列上定义事件时间属性。如果 Source 中的时间戳数据为年-月-日-时-分-秒这种格式,一般是没有时区信息的字符串值,例如,2020-04-15 20:13:40.564,建议将事件时间属性定义为 TIMESTAMP 列:
代码语言:javascript复制CREATE TABLE user_actions (
user_name STRING,
data STRING,
user_action_time TIMESTAMP(3),
-- declare user_action_time as event time attribute and use 5 seconds delayed watermark strategy
WATERMARK FOR user_action_time AS user_action_time - INTERVAL '5' SECOND
) WITH (
...
);
SELECT TUMBLE_START(user_action_time, INTERVAL '10' MINUTE), COUNT(DISTINCT user_name)
FROM user_actions
GROUP BY TUMBLE(user_action_time, INTERVAL '10' MINUTE);
如果 Source 数据中的时间戳数据是一个纪元 (epoch) 时间,一般是一个 Long 值,例如,1618989564564,建议将事件时间属性定义在 TIMESTAMP_LTZ 列上:
代码语言:javascript复制CREATE TABLE user_actions (
user_name STRING,
data STRING,
ts BIGINT,
time_ltz AS TO_TIMESTAMP_LTZ(ts, 3),
-- declare time_ltz as event time attribute and use 5 seconds delayed watermark strategy
WATERMARK FOR time_ltz AS time_ltz - INTERVAL '5' SECOND
) WITH (
...
);
SELECT TUMBLE_START(time_ltz, INTERVAL '10' MINUTE), COUNT(DISTINCT user_name)
FROM user_actions
GROUP BY TUMBLE(time_ltz, INTERVAL '10' MINUTE);
2.2.2 在 DataStream 到 Table 转换时定义
在 DataStream 转换 Table 时,事件时间属性是在 schema 定义时使用 .rowtime 属性定义。在转换之前,时间戳和 watermark 在 DataStream 必须先设置好。在转换过程中,由于 DataStream 没有时区概念,因此 Flink 总是将 rowtime 属性解析成 TIMESTAMP WITHOUT TIME ZONE 类型,并且将所有事件时间的值都视为 UTC 时区的值。
在从 DataStream 到 Table 转换时定义事件时间属性有两种方式。取决于指定的 .rowtime 字段名称是否已经存在于 DataStream 的 schema 中,事件时间字段可以是:
- 在 schema 结尾追加一个新的字段
- 替换一个已经存在的字段。
不管在哪种情况下,事件时间戳字段都会保存 DataStream 事件的时间戳。
代码语言:javascript复制// (1) 追加一个新的字段
// 提取时间戳并分配watermarks
DataStream<Tuple2<String, String>> stream = inputStream.assignTimestampsAndWatermarks(...);
// 声明一个额外逻辑字段作为事件时间属性
// 在 schema 的末尾使用 user_action_time.rowtime 定义事件时间属性
Table table = tEnv.fromDataStream(stream, $("user_name"), $("data"), $("user_action_time").rowtime());
// (2) 替换一个已经存在的字段
// 从第一个字段提取时间戳并分配watermarks
DataStream<Tuple3<Long, String, String>> stream = inputStream.assignTimestampsAndWatermarks(...);
// 第一个字段已经用来提取时间戳,因此不在必须,可以直接使用事件时间属性替换这个字段
// replace first field with a logical event time attribute
Table table = tEnv.fromDataStream(stream, $("user_action_time").rowtime(), $("user_name"), $("data"));
// Usage:
WindowedTable windowedTable = table.window(Tumble
.over(lit(10).minutes())
.on($("user_action_time"))
.as("userActionWindow"));
2.2.3 在 TableSource 中定义
事件时间属性可以在实现了 DefinedRowTimeAttributes 的 TableSource 中定义。getRowtimeAttributeDescriptors() 方法返回一个 RowtimeAttributeDescriptor 列表,包含了事件时间属性名字、用来计算属性值的时间戳提取器以及 watermark 生成策略等信息。
需要确保 getDataStream() 方法返回的 DataStream 与定义的时间属性对齐。只有在定义了 StreamRecordTimestamp 时间戳分配器的时候,才认为 DataStream 有时间戳(由 TimestampAssigner 分配的时间戳)。只有定义了 PreserveWatermarks watermark 生成策略,DataStream 的 watermark 才会被保留。否则,只有时间字段的值是生效的。
代码语言:javascript复制// 定义一个有事件时间属性的 table source
public class UserActionSource implements StreamTableSource<Row>, DefinedRowtimeAttributes {
@Override
public TypeInformation<Row> getReturnType() {
String[] names = new String[] {"user_name", "data", "user_action_time"};
TypeInformation[] types =
new TypeInformation[] {Types.STRING(), Types.STRING(), Types.LONG()};
return Types.ROW(names, types);
}
@Override
public DataStream<Row> getDataStream(StreamExecutionEnvironment execEnv) {
// 构造 DataStream
// ...
// 基于 "user_action_time" 定义 watermark
DataStream<Row> stream = inputStream.assignTimestampsAndWatermarks(...);
return stream;
}
@Override
public List<RowtimeAttributeDescriptor> getRowtimeAttributeDescriptors() {
// 标记 "user_action_time" 字段是事件时间字段
// 给 "user_action_time" 构造一个时间属性描述符
RowtimeAttributeDescriptor rowtimeAttrDescr = new RowtimeAttributeDescriptor(
"user_action_time",
new ExistingField("user_action_time"),
new AscendingTimestamps()
);
List<RowtimeAttributeDescriptor> listRowtimeAttrDescr = Collections.singletonList(rowtimeAttrDescr);
return listRowtimeAttrDescr;
}
}
// register the table source
tEnv.registerTableSource("user_actions", new UserActionSource());
WindowedTable windowedTable = tEnv
.from("user_actions")
.window(Tumble.over(lit(10).minutes()).on($("user_action_time")).as("userActionWindow"));
原文:Time Attributes