在某些场景中,例如读取 compacted topic 或者输出(更新)聚合结果的时候,需要将 Kafka 消息记录的 key 当成主键处理,用来确定一条数据是应该作为插入、删除还是更新记录来处理。为了实现该功能,社区为 Kafka 专门新增了一个 upsert connector(upsert-kafka),该 connector 扩展自现有的 Kafka connector,工作在 upsert 模式(FLIP-149)下。新的 upsert-kafka connector 既可以作为 source 使用,也可以作为 sink 使用,并且提供了与现有的 kafka connector 相同的基本功能和持久性保证,因为两者之间复用了大部分代码。
要使用 upsert-kafka connector,必须在创建表时定义主键,并为键(key.format)和值(value.format)指定序列化反序列化格式。
一、Upsert Kafka Connector是什么?
Upsert Kafka 连接器支持以 upsert 方式从 Kafka topic 中读取数据并将数据写入 Kafka topic。
作为 source,upsert-kafka 连接器生产 changelog 流,其中每条数据记录代表一个更新或删除事件。更准确地说,数据记录中的 value 被解释为同一 key 的最后一个 value 的 UPDATE,如果有这个 key(如果不存在相应的 key,则该更新被视为 INSERT)。用表来类比,changelog 流中的数据记录被解释为 UPSERT,也称为 INSERT/UPDATE,因为任何具有相同 key 的现有行都被覆盖。另外,value 为空的消息将会被视作为 DELETE 消息。
作为 sink,upsert-kafka 连接器可以消费 changelog 流。它会将 INSERT/UPDATE_AFTER 数据作为正常的 Kafka 消息写入,并将 DELETE 数据以 value 为空的 Kafka 消息写入(表示对应 key 的消息被删除)。Flink 将根据主键列的值对数据进行分区,从而保证主键上的消息有序,因此同一主键上的更新/删除消息将落在同一分区中。
如果是更新,则同一个key会存储多条数据,但在读取该表数据时,只保留最后一次更新的值),并将 DELETE 数据以 value 为空的 Kafka 消息写入(key被打上墓碑标记,表示对应 key 的消息被删除)。
Flink 将根据主键列的值对数据进行分区,从而保证主键上的消息有序,因此同一主键上的更新/删除消息将落在同一分区中。
upsert-kafka connector相关参数
connector
必选。指定要使用的连接器,Upsert Kafka 连接器使用:'upsert-kafka'。
topic 必选。用于读取和写入的 Kafka topic 名称。
properties.bootstrap.servers 必选。以逗号分隔的 Kafka brokers 列表。
key.format 必选。用于对 Kafka 消息中 key 部分序列化和反序列化的格式。key 字段由 PRIMARY KEY 语法指定。支持的格式包括 'csv'、'json'、'avro'。
value.format 必选。用于对 Kafka 消息中 value 部分序列化和反序列化的格式。支持的格式包括 'csv'、'json'、'avro'。
properties 可选。该选项可以传递任意的 Kafka 参数。选项的后缀名必须匹配定义在 Kafka 参数文档中的参数名。Flink 会自动移除 选项名中的 "properties." 前缀,并将转换后的键名以及值传入 KafkaClient。例如,你可以通过 'properties.allow.auto.create.topics' = 'false' 来禁止自动创建 topic。但是,某些选项,例如'key.deserializer' 和 'value.deserializer' 是不允许通过该方式传递参数,因为 Flink 会重写这些参数的值。
value.fields-include 可选,默认为ALL。控制key字段是否出现在 value 中。当取ALL时,表示消息的 value 部分将包含 schema 中所有的字段,包括定义为主键的字段。当取EXCEPT_KEY时,表示记录的 value 部分包含 schema 的所有字段,定义为主键的字段除外。
key.fields-prefix 可选。为了避免与value字段命名冲突,为key字段添加一个自定义前缀。默认前缀为空。一旦指定了key字段的前缀,必须在DDL中指明前缀的名称,但是在构建key的序列化数据类型时,将移除该前缀。见下面的示例。在需要注意的是:使用该配置属性,value.fields-include的值必须为EXCEPT_KEY。
二、使用步骤
1.引入库
代码语言:javascript复制 <!-- Flink kafka connector: kafka版本大于1.0.0可以直接使用通用的连接器 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>1.12.0</version>
<scope>provided</scope>
</dependency>
2.SQL计算
示例:实时地统计网页PV和UV的总量
代码语言:javascript复制-- 创建kafka数据源表(json格式)
-- 'format.type' = 'json', -- required: specify the format type
-- 'format.fail-on-missing-field' = 'true', -- optional: flag whether to fail if a field is missing or not,'false' by default
-- 'format.ignore-parse-errors' = 'true', -- optional: skip fields and rows with parse errors instead of failing;
CREATE TABLE source_ods_fact_user_ippv (
user_id STRING,
client_ip STRING,
client_info STRING,
pagecode STRING,
access_time TIMESTAMP,
dt STRING,
WATERMARK FOR access_time AS access_time - INTERVAL '5' SECOND -- 定义watermark
) WITH (
'connector' = 'kafka',
'topic' = 'user_ippv',
'scan.startup.mode' = 'earliest-offset',
'properties.group.id' = 'group1',
'properties.bootstrap.servers' = 'xxx:9092',
'format' = 'json',
'json.fail-on-missing-field' = 'false',
'json.ignore-parse-errors' = 'true'
);
-- 创建kafka upsert结果表且指定组合主键为:do_date,do_min
CREATE TABLE result_total_pvuv_min (
do_date STRING, -- 统计日期
do_min STRING, -- 统计分钟
pv BIGINT, -- 点击量
uv BIGINT, -- 一天内同个访客多次访问仅计算一个UV
currenttime TIMESTAMP, -- 当前时间
PRIMARY KEY (do_date, do_min) NOT ENFORCED
) WITH (
'connector' = 'upsert-kafka',
'topic' = 'result_total_pvuv_min',
'properties.bootstrap.servers' = 'xxx:9092',
'key.json.ignore-parse-errors' = 'true',
'value.json.fail-on-missing-field' = 'false',
'key.format' = 'json',
'value.format' = 'json',
'value.fields-include' = 'ALL'
);
-- 创建视图
CREATE VIEW view_total_pvuv_min AS
SELECT
dt AS do_date, -- 时间分区
count (client_ip) AS pv, -- 客户端的IP
count (DISTINCT client_ip) AS uv, -- 客户端去重
max(access_time) AS access_time -- 请求的时间
FROM
source_ods_fact_user_ippv
GROUP BY dt;
-- 将每分钟的pv/uv统计结果写入kafka upsert表
INSERT INTO result_total_pvuv_min
SELECT
do_date,
cast(DATE_FORMAT (access_time,'HH:mm') AS STRING) AS do_min,-- 分钟级别的时间
pv,
uv,
CURRENT_TIMESTAMP AS currenttime
from
view_total_pvuv_min;
该处使用示例数据和验证结果如下:
代码语言:javascript复制kafak 数据源:
{"user_id":"1","client_ip":"192.168.12.1","client_info":"phone","pagecode":"1001","access_time":"2021-01-23 11:32:24","dt":"2021-01-08"}
{"user_id":"1","client_ip":"192.168.12.1","client_info":"phone","pagecode":"1201","access_time":"2021-01-23 11:32:55","dt":"2021-01-08"}
{"user_id":"2","client_ip":"192.165.12.1","client_info":"pc","pagecode":"1031", "access_time":"2021-01-23 11:32:59","dt":"2021-01-08"}
{"user_id":"1","client_ip":"192.168.12.1","client_info":"phone","pagecode":"1101","access_time":"2021-01-23 11:33:24","dt":"2021-01-08"}
{"user_id":"3","client_ip":"192.168.10.3","client_info":"pc","pagecode":"1001", "access_time":"2021-01-23 11:33:30","dt":"2021-01-08"}
{"user_id":"1","client_ip":"192.168.12.1","client_info":"phone","pagecode":"1001","access_time":"2021-01-23 11:34:24","dt":"2021-01-08"}
实时统计的结果表(TOPIC:result_total_pvuv_min):
{"do_date":"2021-01-08","do_min":"11:32","pv":1,"uv":1,"currenttime":"2021-01-23 08:22:06.431"}
{"do_date":"2021-01-08","do_min":"11:32","pv":2,"uv":1,"currenttime":"2021-01-23 08:22:06.526"}
{"do_date":"2021-01-08","do_min":"11:32","pv":3,"uv":2,"currenttime":"2021-01-23 08:22:06.527"}
{"do_date":"2021-01-08","do_min":"11:33","pv":4,"uv":2,"currenttime":"2021-01-23 08:22:06.527"}
{"do_date":"2021-01-08","do_min":"11:33","pv":5,"uv":3,"currenttime":"2021-01-23 08:22:06.528"}
{"do_date":"2021-01-08","do_min":"11:34","pv":6,"uv":3,"currenttime":"2021-01-23 08:22:06.529"}
----------------分割线--------------------
重测试输入如下示例数据:
{"user_id":"10","client_ip":"192.168.12.1","client_info":"phone","pagecode":"1001","access_time":"2021-01-22 10:10:24","dt":"2021-01-22"}
{"user_id":"11","client_ip":"192.168.12.2","client_info":"phone","pagecode":"1002","access_time":"2021-01-22 11:10:24","dt":"2021-01-22"}
{"user_id":"10","client_ip":"192.168.12.1","client_info":"phone","pagecode":"1001","access_time":"2021-01-22 10:11:24","dt":"2021-01-22"}
{"user_id":"11","client_ip":"192.168.12.3","client_info":"phone","pagecode":"1002","access_time":"2021-01-22 11:12:14","dt":"2021-01-22"}
打印待更新结果:
---- -------------------------------- -------------------------------- ---------------------- ---------------------- -----------------------
| op | do_date | do_min | pv | uv | currenttime |
---- -------------------------------- -------------------------------- ---------------------- ---------------------- -----------------------
| I | 2021-01-22 | 10:10 | 1 | 1 | 2021-01-23T08:33:2... |
| -U | 2021-01-22 | 10:10 | 1 | 1 | 2021-01-23T08:33:2... |
| U | 2021-01-22 | 11:10 | 2 | 2 | 2021-01-23T08:33:2... |
| -U | 2021-01-22 | 11:10 | 2 | 2 | 2021-01-23T08:33:2... |
| U | 2021-01-22 | 11:10 | 3 | 2 | 2021-01-23T08:33:2... |
| -U | 2021-01-22 | 11:10 | 3 | 2 | 2021-01-23T08:33:3... |
| U | 2021-01-22 | 11:12 | 4 | 3 | 2021-01-23T08:33:3... |
3. Kafka -> FLINK -> TIDB
Flink on TIDB 在当前已经有小红书、贝壳金服等在使用,作为一个支持upsert的实时数据同步方案具备一定的可行性。
代码语言:javascript复制select version(); -- 5.7.25-TiDB-v4.0.8
drop table if exists result_user_behavior;
CREATE TABLE `result_user_behavior` (
`user_id` int(11) unsigned NOT NULL AUTO_INCREMENT,
`client_ip` varchar(30) COLLATE utf8mb4_general_ci DEFAULT NULL,
`client_info` varchar(30) COLLATE utf8mb4_general_ci DEFAULT NULL,
`page_code` varchar(30) COLLATE utf8mb4_general_ci DEFAULT NULL,
`access_time` TIMESTAMP COLLATE utf8mb4_general_ci DEFAULT NULL,
`dt`varchar(30) COLLATE utf8mb4_general_ci DEFAULT NULL,
PRIMARY KEY (`user_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;
代码语言:javascript复制// 支持upsert的一种可行数据同步方案
tenv.executeSql("CREATE TABLE source_kafka_user_behavior (n"
" user_id INT,n"
" client_ip STRING, n"
" client_info STRING, n"
" page_code STRING, n"
" access_time TIMESTAMP, n"
" dt STRING, n"
" WATERMARK FOR access_time AS access_time - INTERVAL '5' SECOND n"
") WITH (n"
" 'connector' = 'kafka',n"
" 'topic' = 'user_ippv',n"
" 'scan.startup.mode' = 'latest-offset',n"
" 'properties.group.id' = 'test-group1',n"
" 'properties.bootstrap.servers' = 'xx:9092', n"
" 'format' = 'json', n"
" 'json.fail-on-missing-field' = 'false',n"
" 'json.ignore-parse-errors' = 'true'n"
")").print();
tenv.executeSql("CREATE TABLE sink_upsert_tidb (n"
" user_id INT,n"
" client_ip STRING, n"
" client_info STRING, n"
" page_code STRING, n"
" access_time TIMESTAMP, n"
" dt STRING, n"
" PRIMARY KEY (user_id) NOT ENFORCED"
") WITH (n"
" 'connector' = 'jdbc',n"
" 'url' = 'jdbc:mysql://xxx:4000/bi',n"
" 'username' = 'bi_rw',n"
" 'password' = 'xxx',n"
" 'table-name' = 'result_user_behavior'n"
")");
tenv.executeSql("insert into sink_upsert_tidb"
" select "
" user_id ,n"
" client_ip , n"
" client_info , n"
" page_code , n"
" access_time , n"
" dt n"
"from source_kafka_user_behavior").print();
测试输入:
代码语言:javascript复制测试数据:
{"user_id":"11","client_ip":"192.168.12.3","client_info":"phone","page_code":"1002","access_time":"2021-01-25 11:12:14","dt":"2021-01-25"}
{"user_id":"11","client_ip":"192.168.12.3","client_info":"phone","page_code":"1003","access_time":"2021-01-25 11:12:14","dt":"2021-01-25"}
{"user_id":"11"} -- 值全部置空
{"user_id":"11","client_ip":"192.168.12.4","client_info":"phone","page_code":"10","access_time":"2021-01-25 11:35:14","dt":"2021-01-25"}
{"user_id":"12","client_ip":"192.168.12.5","client_info":"phone","page_code":"10","access_time":"2021-01-25 11:35:14","dt":"2021-01-25"}
Tidb查询结果示例:
总结
这里演示了使用kaka作为source和sink的使用示例,其中我们把从kafka source中消费的数据进行视图查询的时候则显示以上更新结果,每一条以统计日期和统计分钟作为联合主键的数据插入都会被解析为 I(插入)-U(标记待删除值) U (更新新值),这样在最新的result_total_pvuv_min 的kafka upsert 结果表中就是最新的数据。
当前kafka-upsert connector 适用于Flink-1.12的版本,作为一个数据聚合的中转对于很多业务场景有一定的普适性,比如kafka upsert结果表还可以作为维表join, 或者通过flink sink 到HDFS, iceberg table等进行离线分析。
如果想真正实时,Flink Tidb就是一个很好的解决方案。虽然Tidb存储和计算不分离,但是能使用加机器解决的问题,性能都不是事,况且Tidb完全兼容MySQL语法,非常适合MySQL平迁,而且支持事务,和使用MySQL没有什么特别大的区别,
官方已出TiSpark查询引擎,虽还未实测性能,但想必会比MySQL 引擎查询的效率要高。我司也开始着手Tidb的使用,目前的实时的任务是基于微批的形式处理,还不能算是完全的实时,后面随着对其的了解原来越完善,完全实时化则指日可待。