大家好,我是一哥,今天分享一下,Flink在郑州本地银行的实践。
在构建实时场景的过程中,如何快速、正确的实时同步业务数据是最先面临的问题,本文主要讨论一下如何使用实时处理引擎Apache Flink和数据湖两种技术,来解决业务数据实时入湖的相关问题。两者的结合能良好的支持实时数据落地存储,借助Apache Flink出色的流批一体能力,可以为用户构建一个准实时数仓,满足用户准实时业务探索。
一、Apache Flink和数据湖介绍
1.1 Apache Flink CDC原理
CDC全称是Change Data Capture,捕获变更数据,是一个比较广泛的概念,只要是能够捕获所有数据的变化,比如数据库捕获完整的变更日志记录增、删、改等,都可以称为CDC。该功能被广泛应用于数据同步、更新缓存、微服务间同步数据等场景,本文主要介绍基于Flink CDC在数据实时同步场景下的应用。
Flink在1.11版本开始引入了Flink CDC功能,并且同时支持Table & SQL两种形式。Flink SQL CDC是以SQL的形式编写实时任务,并对CDC数据进行实时解析同步。相比于传统的数据同步方案,该方案在实时性、易用性等方面有了极大的改善。下图是基于Flink SQL CDC的数据同步方案的示意图。
Oracle的变更日志的采集有多种方案,如上图所示,这里采用的Debezium实时同步工具作为示例,该工具能够解析Oracle的change log数据,并实时发送数据到下游Kafka。Flink SQL通过创建Kafka映射表并指定 format格式为debezium-json,然后通过Flink进行解析后直接插入到其它外部数据存储系统。
下面详细解析一下数据同步过程。首先了解一下Debezium抽取的Oracle的change log的格式,以update为例,变更日志上记录了更新之前的数据和更新以后的数据,在Kafka下游的Flink接收到这样的数据以后,一条update操作记录就转变为了先delete、后insert两条记录。日志格式如下所示,该update操作的内容的name字段从tom更新为了jerry。
代码语言:javascript复制 {
"before": { --更新之前的数据
"id": 001,
"name": "tom"
},
"after": { --更新之后的数据
"id": 001,
"name": "jerry"
},
"source": {...},
"op": "u",
"ts_ms": 1589362330904
}
其次再来看一下Flink SQL内部是如何处理update记录的。Flink在1.11版本支持了完整的change log机制,对于每条数据本身只要是携带了相应增、删、改的标志,Flink就能识别这些数据,并对结果表做出相应的增、删、改的动作,如下图所示change log数据流经过Flink解析,同步到下游Sink Database。
通过以上分析,基于Flink SQL CDC的数据同步有如下优点:
a)业务解耦:无需入侵业务,和业务完全解耦,也就是业务端无感知数据同步的存在。
b)性能消耗:业务数据库性能消耗小,数据同步延迟低。
c)同步易用:使用SQL方式执行CDC同步任务,极大的降低使用维护门槛。
d)数据完整:完整的数据库变更记录,不会丢失任何记录,Flink 自身支持 Exactly Once。
1.2 数据湖介绍
数据湖(Data Lake)当前没有统一的定义,通常认为数据湖是一种支持存储多种原始数据格式、多种计算引擎、高效的元数据统一管理和海量统一数据存储。其中以Apache Hudi和Apache Iceberg为代表的表格式和Flink计算引擎组成的数据湖解决方案尤为亮眼。如图所示数据湖生态架构示意图。
提到数据湖就不得不说一下传统数据仓库,两者相比之下传统数仓的缺点有:
- 不支持ACID 不支持Upsert场景,不支持Row-level delete,数据修改成本高
- 时效性差 数据难以做到准实时可见,无法支持分钟级延迟的数据分析场景
- 只能存储结构化数据 传统数仓不支持存储非结构化和半结构化数据
传统数仓有这些缺点,那么就可以使用数据湖代替数仓吗?答案是否定的,数据仓库、数据湖是数据技术不断发展的结果,是传承不是取代。理想中的数据湖或者湖仓一体,对于用户来说不需要清晰的区别湖和仓,数据有着打通的元数据的格式,它们之间可以自由的流动,也可以对接上层多样化的计算生态。
数据仓库:是一个针对OLAP优化的数据库,用于分析来自事务系统和业务线应用程序的关系型数据,因此数据仓库存储的都是经过了清洗、转换的结构化数据。数据仓库对数据提供高效地存储,便于用户通过报表、看板和分析工具来获取查询结果,从数据中获得洞察力、决策指导。
数据湖:可以存储来自业务线应用程序的关系型数据,也可以存储来自移动应用程序的日志、图片视频等非关系型数据。当不清楚某些数据存在的价值时,将数据以原生格式天然沉积在数据湖,为后续用户需要提供更好的分析探索。
二、实时数据入湖实践
当前使用Flink最新版本1.12,支持CDC功能和更好的流批一体。Apache Iceberg最新版本0.12和Apache Hudi 0.9两者均已经适配Flink CDC功能。其中比较重点的是数据湖的更新删除功能,先来了解一下什么是Row-Level Delete。
Row-Level Delete功能是指根据从一个数据集里面删除指定行。那么为什么这个功能那么重要呢?众所周知,大数据中的行级删除不同于传统数据库的更新和删除功能,在基于HDFS架构的文件系统上数据存储只支持数据的追加,为了在该构架下支持更新删除功能,删除操作演变成了一种标记删除,更新操作则是转变为先标记删除、后插入一条新数据。具体实现方式可以分为Copy on Write(COW)模式和Merge on Read(MOR)模式,其中Copy on Write模式可以保证下游的数据读具有最大的性能,而Merge on Read模式保证上游数据插入、更新、和删除的性能,减少传统Copy on Write模式下写放大问题。
Flink SQL CDC和数据湖的架构设计和整合如何巧妙,不能局限于纸上谈兵,下面就实际操作一下,体验其功能的强大和带来的便捷。
2.1 数据入湖环境准备
以Flink SQL CDC方式将实时数据导入数据湖的环境准备非常简单直观,因为Flink支持流批一体功能,所以实时导入数据湖的数据,也可以使用Flink SQL离线或实时进行查询。如下测试是使用Flink SQL完成实时同步的示意步骤,以Iceberg为例,其中有一部分开发和适配工作。
step1:新建Kafka映射表,用于实时接收Topic中的change log数据
代码语言:javascript复制CREATE TABLE KafkaTable (
id STRING,
name STRING
) WITH (
'connector' = 'kafka',
'xxx' = 'xxx'
);
step2:新建iceberg结果表,用于存储实时入湖的数据
代码语言:javascript复制CREATE TABLE IcebergTable (
id STRING,
name STRING
)WITH (
'connector'='iceberg',
'xxx' = 'xxx'
);
step3:启动实时入湖的Flink SQL任务
代码语言:javascript复制INSERT INTO IcebergTable
SELECT * FROM KafkaTable;
step4:离线或者实时查询统计IcebergTable表中的数据行数,在Flink的sql-client中执行
代码语言:javascript复制----/ *a.离线方式* /----
SET execution.type=batch;
SELECT COUNT(*) FROM IcebergTable;
----/ *b.实时方式* /----
SET execution.type=streaming;
SELECT COUNT(*) FROM IcebergTable;
2.2 数据入湖速度测试
数据入湖速度测试会根据环境配置、参数配置、数据格式等不同有所不同,下面是列出主要配置和测试出的数据作为参考。
- 资源配置情况
TaskManager | 内存4G,slot:1 |
---|---|
Checkpoint | 1分钟 |
测试数据列数 | 10列 |
测试数据行数 | 1000万 |
iceberg存储格式 | parquet |
- 测试数据情况
数据入湖分为append和upsert两种方式。append方式只能新增数据,不能对结果数据进行更新操作;upsert方式即能够对结果数据更新。
append方式使用场景是导入数据之前已经明确该表数据不需要更新,如离线数据导入数据湖的场景,append方式下导入数据速度如下:
SQL | INSERT INTO IcebergTable SELECT * FROM KafkaTable; |
---|---|
并行度1 | 12.2万/秒 |
并行度2 | 19.6万/秒 |
并行度4 | 28.3万/秒 |
update方式使用场景是既有插入的数据又有对之前插入数据的更新的场景,如数据库实时同步,upsert方式下导入数据速度,该方式需要指定在更新时以那个字段查找,类似于update语句中的where条件,一般设置为表的主键即可,如下:
SQL | INSERT INTO IcebergTable /* OPTIONS('equality-field-columns'='id')*/SELECT * FROM KafkaTable; | |
---|---|---|
导入的数据 | 只有数据插入 | 只有数据更新 |
并行度1 | 3.2万/秒 | 2.9万/秒 |
并行度2 | 4.9万/秒 | 4.2万/秒 |
并行度4 | 6.1万/秒 | 5.1万/秒 |
- 结论
(1)append方式导入速度远大于upsert导入数据速度。在使用的时候,如没有更新数据的场景时,则不需要upsert方式导入数据;
(2)导入速度随着并行度的增加而增加;
(3)upsert方式数据的插入和更新速度相差不大,主要得益于MOR原因;
三、实时数据入湖经验
3.1 实时性
数据湖的实时性是什么级别的呢?
如图所示,实时计算或者流计算处理的是时延要求比较高的场景,可以实现端到端秒级实时分析,但是查询端的能力欠缺,无法长时间存储历史数据。批处理的数仓能力丰富但是数据时延比较大,用户可以实现小时级别的数据注入 HDFS/OSS,并且不支持更新和删除操作。然而在秒级到小时级时的分钟级场景还存在大量的用例,通常称之为准实时或者近实时(NEAR-REAL-TIME),数据湖的出现,恰巧解决了准实时场景的用例。如下图所示实时、准实时、批量处理时延。
实时数据发送到数据湖采用的是mini-batch增量写入方案,实时数据周期内可见,一般根据业务需求和数据量的大小设置为分钟级别。该方案既能节省计算成本,又能提高数据时延,实时的数据写入可供实时OLAP分析场景等。
3.2 一致性
当程序BUG或者任务重启等导致数据传输中断,如何保证数据的一致性呢?
数据一致性保证通过两个方面实现
a)借助Flink实现的exactly once语义和故障恢复能力,实现数据严格一致性。在运行过程中,checkpoint周期内任务异常重启时,会从上一个checkpoint点恢复,重新消费数据写入下游的数据湖。
b)借助Hudi/Iceberg ACID能力来隔离写入对分析任务的不利影响。在checkpoint周期内写入的数据,下游数据湖对这部分未commit的数据是不可见的,从而隔离这部分未提交数据对分析任务的影响,周期性的commit也是数据湖分钟级延迟的主要原因。
3.3 顺序性
数据入湖否可保证全局顺序性插入和更新?
这个问题类似于Kafka是否可以保证全局顺序性,答案是否定的,也就是不可以全局保证数据生产和数据消费的顺序性,但是可以保证同一条数据的插入和更新的顺序性。首先数据抽取的时候是单线程的,然后分发到Kafka的各个partition中,此时同一个key的变更数据打入到同一个Kafka的分区里面,Flink读取的时候也能保证顺序性消费每个分区中的数据,进而保证同一个key的插入和更新的顺序性。
3.4 初始化
数据湖中的历史数据如何初始化,才能够和实时增量的数据无缝对接呢?
为了构建实时同步链路,首先需要通过各种方式,将历史数据从数仓或者源库等导入到数据湖中,离线批量同步数据这块就不再阐述啦。然后将实时增量数据对接到历史数据上,先使用同步工具把数据的变更写到Kafka消息队列,然后通过Flink消费Kafka的数据进行实时的分析计算,最后将结果数据实时的写到数据湖中,在数据湖中完成历史数据和实时数据的无缝对接。如何将历史数据和实时数据正好对接上呢?主要有以下几种情况。
a)数据有主键,也就是数据写入到下游能够保证幂等
首先实时同步工具把变更数据写入Kafka,Kafka默认保存7天数据。
然后开始同步历史数据,保证历史数据和Kafka中的数据有交集。
最后启动Flink任务实时写入数据湖,且从Kafka中指定消费时间要早于批量同步的数据,因为存在主键,数据库提供upsert的能力,对相同主键的数据进行更新覆盖。
b)数据有时间字段
可以通过业务属性中的时间戳区别离线数据和实时数据,比如导入离线数据为某日凌晨之前,实时数据则是该日凌晨之后的数据,这就能保证数据无缝对接。
c)如果不满足a、b两种情况
这种情况无法完全保证数据正好对接上,退化为数据是否允许少量的重复或者丢失,实际的业务中这种表也几乎不存在的。
四、未来规划
高新的技术最终是要落地才能发挥其内在价值的,针对行内纷繁复杂的数据,结合流计算Flink和数据湖技术,在未来的落地规划集中在两个方面,一是数据湖集成到本行的实时计算平台中,解决易用性的问题;二是构建行内准实时数仓,提供近实时的场景探索。
4.1 整合数据湖数据源
中原银行实时计算平台是一个基于SQL的高性能实时大数据处理平台,该平台彻底规避繁重的底层流计算处理逻辑、繁琐的提交过程等,为用户打造一个只需关注实时计算逻辑的平台,助力企业向实时化、智能化大数据转型。
实时计算平台未来将会整合Apache Hudi和Apache Iceberg数据源,用户可以在界面配置Flink SQL任务,该任务既可以以upsert方式实时解析change log并导入到数据湖中,也可以分析数据湖中的实时或历史数据。并增加小文件监控、定时任务压缩小文件、清理过期数据等功能。
4.2 准实时数仓探索
本文对数据实时入湖原理做了比较多的阐述,入湖后的数据有哪些场景的使用呢?下一个目标当然是入湖的数据分析实时化。比较多的讨论是关于准实时数据湖的探索,结合行内数据特点探索适合落地的实时数据分析场景成为当务之急。
随着数据量的持续增大,和业务对时效性的严苛要求,基于Apache Flink和Apache Hudi/Iceberg构建准实时数仓愈发重要和迫切,作为实时数仓的两大核心组件,可以缩短数据导入、方便数据行级变更、支持数据流式读取等。并且有助于流批一体相关的探索,最终达到数据同源,存储一体,流批计算一致等终极诉求。
END