Flink 1.10 升级 Flink 1.12 预期收益评估

2022-06-23 14:53:19 浏览数 (1)

前言

Flink 1.12 版本在 20 年 12 月已经正式 Release,目前我们的 Flink SQL 作业的 Flink 引擎版本还是 1.10,本文主要用以评估 Flink 1.10 升级到 1.12 整体所能带来的预期收益,同时结合所需投入的成本,决定是否需要升级 Flink SQL 引擎版本到 1.12。本次升级所评估的收益包含 1.11 和 1.12 版本所带来的收益,如有理解错误,欢迎指出,一起交流。

一、 Flink SQL 语法更加简洁,提升实时作业开发效率

1.1 收益:

FLIP-122 提出了新的 Connector 属性 key, 具体参考 FLIP-122: New Connector Property Keys for New Factory 。FLIP-122 在 Flink 1.11 Released,Flink 1.11 SQL 语法会更加简洁,这能够提升实时用户开发作业的效率。

新的代码结构(Kafka Source 举例):

代码语言:javascript复制
CREATE TABLE kafka_table (
 ...
) WITH (
 'connector' = 'kafka-0.10',
 'topic' = 'test-topic',
 'properties.bootstrap.servers' = 'localhost:9092',
 'properties.group.id' = 'hello_world',
 'format' = 'json',
 'json.fail-on-missing-field' = 'false'
);

可以看到,新的 Flink SQL 语法,整体对于用户来说,更为简洁和直观,用户开发时,也会更为的方便。

二、Flink SQL 支持 Kafka Upsert Connector

2.1 背景

FLIP-149 云邪提出了 upsert-kafka Connector,具体链接:https://cwiki.apache.org/confluence/display/FLINK/FLIP-149: Introduce the upsert-kafka Connector。首先要理解 upsert 的含义:一条记录(有 主键),如果不存在,则插入,有则更新,全称:insert / update。Upsert-kafka connector 产生一个changelog 流,changelog 流中的数据记录可以理解为 UPSERT 流,也就是INSERT/UPDATE,因为具有相同键的任何现有行都会被覆盖。同样,空值可以用一种特殊的方式理解:带有空值的记录表示“删除”。

Upsert-kafka Connector 对于我们来说,解决最常用的场景是:从 Kafka Topic 按某类 Key 取最新数据,然后下游聚合,最后写入到外部存储。这种通用的实时开发流程一般是:上游为 mysql binglog -> Kafka 的数据同步任务,然后下游需要按照某类key 取最新数据进行聚合等等。

下面是 Flink 1.10 按照 a 取最新值,然后下游进行聚合的 SQL 代码,主要使用到了 last_value:

代码语言:javascript复制
create table hello_world
(
    a                     varchar
  , b                 bigint
  , c                   bigint
  , d                     bigint
) with (
     xxx
       );

create view temp_hello as
select
    a
  , last_value( b                 )  as  b
  , last_value( c                   )  as  c
  , last_value( d              )  as  d
from
    hello_world
group by
    a;
create view temp_world as 
select 
    sum(b) as sum_b
    ,sum(c) as sum_c
    ,sum(d) as sum_d
from temp_hello;

在 Flink 1.10 中,当前这类任务开发对于用户来说,还是不够友好,需要很多代码,同时也会造成 Flink SQL 冗长。Flink 1.12 SQL Connector 支持 Kafka Upsert Connector,这也是我们公司内部业务方对实时平台提出的需求。

2.2 收益

Flink 1.12 支持了 Flink SQL Kafka upsert connector ,下面是使用 Flink 1.12 代码改写上述逻辑:

代码语言:javascript复制
CREATE TABLE temp_hello (
    a                     varchar
  , b                  bigint
  , c                   bigint
  , d                     bigint
  PRIMARY KEY (a) NOT ENFORCED
) WITH (
  xx
);

create view temp_world as 
select 
    sum(b) as sum_b
    ,sum(c) as sum_c
    ,sum(d) as sum_d
from temp_hello;

收益:便利用户有这种需要从 kafka 取最新记录操作的实时任务开发,比如这种 binlog -> kafka,然后用户聚合操作,这种场景还是非常多的,这能提升实时作业开发效率,同时 1.12 做了优化,性能会比单纯的 last_value 性能要好

三、Flink Yarn 作业 On k8s 的生产级别能力

3.1 背景:

之前我们内部 Flink Jar 作业已经全部 K8s 化,Flink SQL 作业由于是推广初期,还是在 Yarn 上面进行运行,为了将实时计算 Flink 全部 K8s 化(去 Yarn),所以我们 Flink SQL 作业也需要迁移到 K8s,目前 Flink 1.12 已经满足生产级别的 Flink k8s 功能,所以 Flink SQL K8s 化,打算直接使用社区的 On k8s 能力。

3.2 风险:

虽然和社区的人沟通,Flink 1.12 on k8s 没有什么问题,但是具体功能还是需要先 POC 验证一下,同时可能社区 Flink on k8s 的能力,可能会限制我们这边一些 k8s 功能使用,比如 hostpath volome 以及 Ingress 的使用,这里可能需要改底层源码来进行快速支持(社区有相关 JIRA 要做)。

3.3 收益:

(Flink 去 Yarn)Flink 1.12 on k8s 对于我们最主要的两个点:

  1. JobManager 的高可用,能够依赖 Zookeeper 或者 k8s ConfigMap
  2. Flink 云原生能力,ResourceManager 能够和 K8s API Server 交互,自动申请所需资源。
  3. 社区 Flink on K8s 方面的一些优化

最终实时集群 Flink 作业去 Yarn 化,同时为离线提供更多能够弹性扩缩资源,更好的降低成本。

四、Flink On Hive 能力(生产级别)

4.1 背景:

目前在有赞已经开始有部分实时业务方希望 Flink 能够支持 Hive,比如 Flink-Hive 近实时的数仓中间层【小时表可更快产出】,以及 Flink 实时任务和离线数据对比功能。而在 Flink 1.12 中,已经支持生产级别 Flink On Hive 任务运行(社区 Commiter 说),所以基于这次 Flink SQL 引擎版本升级,能够支持 Flink on hive 生产功能。

4.2 收益:

解决部分实时业务方, Flink On Hive 的业务需求,下面是 Flink 1.12 具体 Hive 相关功能:

  1. 支持 Sort-Merge Shuffle (FLIP-148)
  2. 在 FileSystem/Hive connector 的流式写入中支持小文件合并 (FLINK-19345),在 Flink 1.12 中,File Sink 增加了小文件合并功能,从而使得即使作业 checkpoint 间隔比较小时,也不会产生大量的文件。要开启小文件合并,可以按照文档[11]中的说明在 FileSystem connector 中设置 auto-compaction = true 属性。

五、Flink 基于 Savepoint 跨集群迁移能力

5.1 背景:

当前我们使用 Flink 版本是 1.10,而在Flink 1.11 以下,在任务 Savepoint Meta 文件里面,存储的这次 Savepoint 引用的状态文件路径都是 HDFS 全路径,所以在跨集群迁移时,Savepoint 是不能够进行复用的,所以一旦有集群迁移,Flink SQL 作业状态会丢失,如果有状态强相关的实时作业,可能会有故障风险。该问题已经在 1.11 已经修复,具体可以参考:FLINK-5763:Make savepoints self-contained and relocatable

5.2 收益:

(稳定性)Flink 作业有基于 Savepoint 跨集群不丢状态的恢复和迁移能力。

六、其他对我们有用收益

6.1 Flink Web UI
  1. [FLIP-103] 改善 Web UI 上 JM/TM 日志的展示
  2. [FLIP-99] 允许展示更多的历史 Failover 异常
  3. [Flink-14816] 允许用户直接在页面上进行 Thread Dump
6.2 Flink Connector/ Source Sinks
  1. Kafka Connector 支持 Watermark 下推 (FLINK-20041)
  2. Flink 1.11 introduces new table source and sink interfaces with changelog mode (see New TableSource and TableSink Interfaces) and support for the Debezium and Canal formats (FLIP-105).
  3. 利用 Multi-input 算子进行 Join 优化 (FLINK-19621),Shuffling 是一个 Flink 作业中最耗时的操作之一。为了消除不必要的序列化反序列化开销、数据 spilling 开销,提升 Table API / SQL 上批作业和流作业的性能, planner 当前会利用上一个版本中已经引入的N元算子(FLIP-92),将由 forward 边所连接的多个算子合并到一个 Task 里执行。

其他一些 Flink Bug Fix。

0 人点赞