开源共建 | 中国移动冯江涛:ChunJun(原FlinkX)在数据入湖中的应用

2022-11-23 10:27:06 浏览数 (2)

ChunJun(原 FlinkX)是一个基于 Flink 提供易用、稳定、高效的批流统一的数据集成工具。2018 年 4 月,秉承着开源共享的理念,数栈技术团队在 github 上开源了 FlinkX,承蒙各位开发者的合作共建,FlinkX 得到了快速发展。

两年后的 2022 年 4 月,技术团队决定对 FlinkX 进行整体升级,并更名为 ChunJun,希望继续和各位优秀开发者合作,进一步推动数据集成 / 同步的技术发展。

因该文创作于于 FlinkX 更名为 ChunJun 之前,因此文中仍用 FlinkX 来进行分享,重要的事情说三遍:

FlinkX 即是 ChunJun

FlinkX 即是 ChunJun

FlinkX 即是 ChunJun

进入正文分享⬇️⬇️⬇️

分享嘉宾:冯江涛 中国移动云能力中心

编辑整理:陈凯翔 亚厦股份

出品平台:DataFunTalk

导读:

随着本地数据迁移上云、云上数据交换等多源异构数据源数据同步需求日益增多,传统通过编写脚本进行数据同步的方式投入高、效率低、运维管理困难;在公司内部,多款移动云数据库和大数据类产品根据客户需求迫切希望集成数据同步能力,但缺少易用的框架,从 0 开始研发投入研发成本高。

针对上述问题,基于 FlinkX 多源异构数据同步框架,实现了用户自建和移动云上消息中间件、数据库、对象存储等多种异构数据源双向读写,基于社区版实现了 On k8s 改造,需简单配置即可满足用户数据快速上云及云上数据高效交换需求,降低开发运维投入,该成果已在移动云至少 3 款产品中应用。

本文的主要内容包括:

FlinkX 简介

功能及原理

云上入湖改造

展望

一、FlinkX 简介

1. 背景介绍

现在市面上有很多种数据库产品,包括传统的 RDB 和大数据相关的 NoSQL,一般企业稍微大一点规模都会同时有各种各样的数据库。为什么会有这么多数据源?是因为不同的数据源适应不同的场景,但这么多数据源会给开发带来困难。

传统的企业业务库多数还是 MySQL,Oracle 这种传统 RDB,如果进行简单的增删改查是没有问题的,但遇到大批量的数据计算这些 RDB 就无法支持了,所以就需要大数据的存储。但是业务数据又在传统数据库中,所以传统数据库和大数据之间需要一个同步迁移的工具。

FlinkX 这个工具相对比较小众,是袋鼠云开源的项目。更熟悉的工具可能是 Sqoop 和阿里开源的 DataX,上图是一个简单的对比,我们开始选型的时候也做过调研,包括它的运行模式,插件丰富度,是否支持断点续传等功能,特别是我们是做数据湖的,需要对数据湖插件的支持,还有考虑新增插件开发的难度。综合调研下来,我们最终选择了 FlinkX。多数传统的企业使用 Sqoop 比较多,因为他们只会在 RDB 和大数据之间做迁移,但是 Sqoop 已经在今年 6 月份被移除了 Apache 顶级项目,上一次更新是在 2019 年 1 月份,已经 2 年多没有任何的开发更新了,所以这个项目已经没有新功能开发,这也不满足我们的需求。之前我们也在移动云上基于 Sqoop 做了一个插件,但是发现 Sqoop 在开发、运行上不太符合我们的场景。最终我们选定了 FlinkX 这个工具。

2. Flink 简介

什么是 FlinkX 呢?FlinkX 是一个基于 Flink 流计算框架实现的数据同步插件,它可以实现多种数据源高效的数据同步,基本功能和 DataX 和 Sqoop 差不多。

批同步方面支持的数据源跟 DataX 相当,但是在流式同步方面比较有优势,因为它是基于 Flink 开发的,所以在流式数据方面支持的数据源比较全,比如 Kafka,Pulsar 这种消息队列,还有数据库的 Binlog 这种增量更新的数据同步,功能非常强大。基于开源社区 1.11 版本我们自己又开发了一些插件:对 S3 的写入、Hudi 数据湖的写入、对 Pulsar 的写入。Pulsar 部门已经开源提交到社区了,S3 和 Hudi 暂时还没有提交。

二、功能及原理

接下来看一下 FlinkX 的功能和简单的原理。

1. 断点续传

首先一个很棒的功能是断点续传,当然这个断点续传不是针对消息队列来说的,因为消息队列天然支持断点续传。FlinkX 依赖 Flink 的 checkpoint 机制,对 RDB 扩展了断点续传的功能。但是它有一个前提,首先是关系型数据库需要包含升序的字段,然后是需要支持数据的过滤,最后是需要支持事务。比如使用 MySQL 时如果需要断点续传,配置了这个功能后它会根据字段做一个取模,然后在数据搬运的过程中当前节点的数据会在 checkpoint 里面保存,当需要重新运行任务的时候它会取上一次失败的节点开始的那个点,因为它还需要根据保存失败的 id 的数据做一下过滤,最后再从那个节点重新开始运行任务,这样数据量比较大的时候会稍微节约一点时间。

2. 指标监控

监控方面它会依赖 Flink 本身的监控功能,Flink 内部有一些 Accumulators 和 metric 统计指标,如果把它运行在 Flink 上的话就可以通过 Flink 的 DashBoard 来查看 Job 的状态。

或者是把一些指标数据收集到 Prometheus 里面,例如基本的条数,统计的数据量和错误的数据量都可以通过 Prometheus 收集之后再通过 Grafana 这样的一些工具做展示。目前线上的这个功能还在开发中。

3. 错误统计和数据限制

它还有一个比较好的功能是速率限制。当我们读取数据写入的时候,很多用户首先担心的问题是它会影响到生产库,因为多数企业的库可能没有主从策略,生产库是单实例运行的。如果这种搬运数据的任务影响到生产库的话用户基本上是不能接受的。所以做速率限制的功能对传统用户就非常友好。它的速率限制是基于 Guava 的 RateLimit,根据令牌工厂生产令牌的方式做的速率限制,跟另外的漏斗算法稍微有一点差别。缺点是峰值还是会很高,因为它保证的是平均速率限制在某一个范围之内。

4. 插件式开发

FlinkX 的插件式开发模式,与 Sqoop 和 DataX 类似,不同的数据源都抽象成一个 Reader 插件和一个 Writer 插件,然后整个数据同步的任务和公有的逻辑就被抽象在一个统一的模块中。一个模块再根据同步任务的配置加载相应的 Reader,Writer 最后组装成 Flink 任务,并提交到 Flink 集群去执行。

我们可以简单看一下任务配置,都是基于 JSON 的方式配置基础的 Reader,Writer,然后是一些综合的错误条数限制和速率限制,这边的代码就会根据配置文件通过 Reader 生成一个 Flink Source,再通过 Writer 生成 Sink,熟悉 Flink 代码的小伙伴对这块应该比较熟悉,其实就是 Flink 从 Source 端读数据然后往 Sink 端写数据,相对来说比较简单。

三、云上入湖改造

云上入湖这里我们做了一些改造。

1. K8s

首先是 K8S 的改造,因为社区的 1.11 版本支持的是 Local,Standalone,YARN Session,YARN Perjob 的模式,对云原生方式的开发不是太友好。并且 Flink 原生的 1.12 版本已经支持 K8S 调度运行了,所以我们把基于 FlinkX 的 1.11 版本 Flink 升级到了 1.12,让它原生就可以支持 K8S 运行,这样的话对我们任务的弹性扩缩容就更加友好,对入湖的任务资源隔离也比较友好,相互之间没有影响。这里也是基于 Flink 1.12 把里面的 ApplicationClusterDeployer 这部分代码做了一些简单的改造,来适配我们的一些系统。基本上是把 K8S 的一些配置组装一下,然后把 FlinkX 的一些 Jar 包的路径写进去,最后提交任务到我们的 K8S 集群。

我们的 JobManager 会通过 Quartz 来做 FlinkX 任务的调度,然后通过 Flink 的客户端调用 K8S 的客户端,最终把任务提交到 K8S 上去执行。

2. Hudi 写入

我们扩展了一个 Hudi 的插件,因为 FlinkX 里面插件非常多,我们这边会考虑到写 HBase 和写 Hive 之类的情况,开发过程中遇到了很多 Jar 包冲突的问题,所以我们给 Hudi 社区版 0.09 版本打了非常多的 shade Jar,保证我们的线上运行没有冲突,主要是 avro 的版本依赖问题。我们这边 HBase 和 Hive 依赖的 avro 版本跟 Hudi 的版本会不一致,版本兼容性之间有一些问题。

这里看一下 Hudi 插件预览的样子,参考了 Hudi 源码里面加了 Client 的 Example,也就是先加载 Hudi 配置,初始化表和 Hive 的配置,最后通过 Kafka 做实时数据写入。目前只提供 Upsert 的支持,后期考虑 MySQL Binlog 支持的话会增加 Delete 功能的支持。

3. 日志

还有一个改造可能不属于 FlinkX,就是我们的日志功能,基于 K8S Fluentd 的一个小工具,EFK 这套系统去收集日志。整个过程对我们的业务是没有入侵,没有感知的,最终我们的日志解析收集到 ES 中。Fluentd 跟 K8S 结合的比较好的地方就是它可以采集到 NameSpace,PodName, NodeSelector 等数据,为后面查询错误日志提供了方便。

上图就是使用 Fluentd 收集到的一些 Pod 的日志,左侧这边看到有很多 K8S 的元数据信息,例如 ContainerName,镜像,NodeSelector,PodId 等等这些数据。当然这个 Kibana 是我们留给后端开发用来排错的,目前给前端用户展示的还是把原始日志数据做了汇总之后,通过页面对应到任务上去查看。

四、展望

最后一部分是我们对于 FlinkX 的一些展望,先来看一下 FlinkX V1.12 的一些新特性:

与 FlinkStreamSQL 融合;

增加了 transformer 算子,支持 SQL 的转换;

插件向 Flink 社区看齐,不再区分 Reader、Writer,统一命名成 Connector,遵循 Flink 社区的规范,这样统一以后 FlinkX 就可以和社区保持兼容。理论上在使用 FlinkX 时可以使用 Flink 的原生 Connector。Flink 也可以调用 FlinkX 的 Connector,这样的话 FlinkX 就可以做成插件放到 Flink 的集群里面,后面对于做湖仓一体或者 Server 开发就会非常的方便。

数据结构优化

支持二阶段提交、数据湖 Iceberg 和提交 kubernetes

对于数据入湖来说,目前的 FlinkX 有一个缺点,就是只支持结构化数据的传输,还不能原生支持二进制文件的同步。如果数据要入湖,会有很多媒体文件,Excel、Word、图片、视频等等,这一块后期可能会自己去开发一些插件支持。

升级到 1.12 后对 FlinkSQL 的支持会更加友好,这样传统的 Lambda 升级到 Kappa 架构,对于习惯写 SQL 做数据抽取转换的用户就非常友好,基本上可以靠一条 SQL 去实现流批一体化的任务,进一步降低开发维护的难度。我们可以从 Kafka 读取一条数据,中间做一些简单的转换后写到 MySQL。我们后面数据库肯定要支持越来越多的实时数据写入,所以后期用 SQL 的方式开发这些任务就会更加便捷。

五、问答环节

Q:一般情况下 FlinkX 作业分配多少 CPU 和内存资源?

A:我们这边一般定义一个 Slot 是一核 2g,普通的一个 MySQL 到 MySQL 这样的一个任务默认三个并发,用户更多的是担心我们的速度太快影响生产库,目前自定义还没有开放,后面具体的并发度会开放可以让用户自定义,目前 Slot 是固定的一核 2g。

Q:现在流批一体的应用范围广吗?

A:我认为是挺广的,对于移动集团的一些项目,其实我们在适配他们的一些场景,主要还是基于消息队列和 MySQL 的 Binlog。我们之前遇到的用户他在阿里云上订购了结构化数据,现在他想上移动云,但是他的生产库又不能断,他想做这样的一个迁移,这就是需要流批一体的场景。他需要先做一个批的任务,把他历史的数据搬运过来,再基于他的 Binlog 增量订阅,实时同步更新他的增量数据,这就是一个很典型的传统用户的场景。再一个就是有一些大批量数据走 Kafka,原始数据还是需要落一份到 HDFS,但是需要实时的做一些汇总,这也算是一个比较典型的场景,会做流批一体的任务,我目前主要是针对这两种场景做一些开发。

Q:FlinkX 相较于 FlinkCDC 优势在哪里?

A:单说 FlinkCDC 他只是支持结构化数据增量更新,FlinkX 如果是 1.12 版本它跟 FlinkCDC 之间的插件一些是共用的,然后他相较于原生的 FlinkCDC 做了一些扩展,特别是它会支持很多国产的数据库,比如达梦,FlinkCDC 目前还不支持。任务配置方式的话,FlinkX 是基于 JSON 的,对于写 Flink 代码的的普通用户更加友好。总结一句话就是扩展了更多插件。

Q:流批一体真的会减少机器的预算吗?计算资源减少了还是存储资源减少了?

A:存储会减少一点,计算可能不会减少,因为流批一体的话,是在用同一套代码维护批任务和流任务,中间的数据如果没有必要的话是不用落地的,这块肯定是节省存储资源的。计算资源跟原来分开跑的话可能是相当的,不会有明显的减少,主要是节省了存储资源。

想了解或咨询更多有关袋鼠云大数据产品、行业解决方案、客户案例的朋友,浏览袋鼠云官网:https://www.dtstack.com/?src=sztxkfz

项目地址:https://github.com/DTStack/Taier

0 人点赞