GIAC(GLOBAL INTERNET ARCHITECTURE CONFERENCE)是长期关注互联网技术与架构的高可用架构技术社区和msup推出的,面向架构师、技术负责人及高端技术从业人员的年度技术架构大会,是中国地区规模最大的技术会议之一。
今年的第六届GIAC大会上,在大数据架构专题,腾讯数据平台部实时计算负责人施晓罡发表了《基于Flink的高可靠实时ETL系统》的主题演讲。以下为嘉宾演讲实录:
施晓罡毕业于北京大学,获得博士学位,是Apache Flink项目Committer。在SIGMOD, TODS和IPDPS等国际顶级会议和期刊上发表过多篇论文,并担任KDD,DASFAA等国际顶级会议的程序委员会委员。
实时计算平台Oceanus
近年来,实时计算在腾讯得到了越来越广泛的应用。为了提高用户流计算任务持续集成和持续发布的效率,腾讯大数据团队从2017年开始围绕Flink打造了Oceanus,一个集开发、测试、部署和运维于一体的一站式可视化实时计算平台。
Oceanus提供了三种不同的应用开发方式,包括画布,SQL和Jar,来满足不同用户的开发需求。通过这三种方式,不同应用场景的用户不需要了解底层框架的技术细节,可以很快的进行实时计算任务的开发,降低了用户开发的门槛。
在完成作业开发之后,用户可以通过Oceanus对作业进行测试、配置和部署。Oceanus为用户程序提供了一系列的工具来协助作业测试。用户既可以使用Oceanus提供的一键生成功能产生测试数据,也可以自己向Oceanus上传自己的的测试数据,通过对比预期结果和实际结果来验证应用逻辑的正确性。Oceanus依托腾讯内部的资源调度系统Gaia来进行资源管理和作业部署。用户可以通过Oceanus配置作业所需要的CPU和内存资源,并指定作业需要部署的集群。当用户完成配置之后,Oceanus会向Gaia申请对应的资源并将作业提交到Gaia上运行。
Oceanus对Flink作业运行时的多个运行指标进行采集,包括Task Manger的内存,I/O和GC等。通过这些丰富的运行指标,用户能够很好的了解应用运行的情况,并在出现异常时能协助用户及时的定位问题。运维人员则可以通过这些采集到的指标,设置报警策略并实现精细化的运营。
而在Oceanus之上,腾讯大数据还对ETL,监控告警和在线学习等常见的实时计算任务提供了场景化的支持。例如Oceanus-ML提供端到端的在线机器学习,涵盖数据接入,数据处理,特征工程,算法训练,模型评估,模型部署整个机器学习流程。通过Oceanus-ML,用户可以方便地利用完备的数据处理函数,丰富的在线学习算法来构建自己的在线学习任务,轻松地完成模型训练和评估,进行一键部署模型。
而对ETL场景,Oceanus也提供了Oceanus-ETL产品来帮助用户将应用和产品中采集的数据实时地导入到数据仓库中。目前腾讯大数据团队为腾讯内部包括微信、QQ音乐、腾讯游戏在内的多个业务提供了数据接入服务,每天处理的消息数超过了40万亿条,每秒接入的峰值超过了4亿条。
实时数据接入平台Oceanus-ETL
腾讯大数据早在2012年起就开始了进行数据接入的工作,并基于Storm构建了第一代的腾讯数据银行(TDBank),成为了腾讯大数据平台的第一线,提供了文件、消息和数据库等多种接入方式,统一了数据接入入口,提供了高效实时的分布式数据分发。
而在2017年,腾讯大数据基于Flink在易用性、可靠性和性能上的优势,通过Flink对TDBank的数据接入进行了重构。相比于Storm,Flink对state提供了更多的支持。一方面Flink将程序的状态保存在本地的内存或者RocksDB中,用户不需要通过网络远程访问状态数据,因此可以获得较好的作业性能。而另一方面,Flink通过Chandy-Lamport算法提供了高效和轻量的检查点机制,可以保证在发生故障时仍能实现Exactly Once和At-Least Once的数据处理语义。
而随着腾讯业务规模的不断增加,对数据接入也提出了更高的要求,需要能够
- 保证端到端的“有且仅有一次”和“强一致”的语义
- 保证ACID事务和读写分离,避免下游出现脏读等错误
- 支持对数据进行修正和格式变更
为了能够满足上述要求,我们今年引入了Iceberg,通过Iceberg提供的ACID事务机制和增量更新能力提供更可靠和更强大的数据接入服务。
基于Flink实现端到端Exactly Once传输
Flink通过检查点(Checkpoint)机制来进行任务状态的备份和恢复。在任务发生故障时,任务可以从上次备份的状态恢复,而不必从头开始重新执行。通过检查点机制,Flink可以保证在发生故障时,仍然可以实现Exactly Once的数据传输。
但在整个数据接入的链路中,除了Flink之外还包括了上游的中间件和下游的数据仓库等多个组件。仅仅依靠Flink的检查点机制只能够保证在Flink作业内部的Exactly Once的数据传输,而并不能保证在整个数据接入链路中端到端的Exactly Once的传输语义。如果我们将Flink收到的数据直接写到下游的存储系统,那么当Flink发生故障并从故障中恢复时,从上次检查点之后被写到下游存储系统中的数据将被重复,导致后续数据分析发生误差。
而为了保证端到端的Exactly Once数据传输,TDBank利用了Flink的检查点机制实现了一个两阶段提交的协议,并会对数据接入各个环节产生的指标进行聚合和对账,确保端到端数据传输的可靠性。
为了保证数据链路的Exactly Once,我们将Flink收到的数据会先写入到一个临时目录中,并将写出的文件列表保存起来。执行checkpoint的时候,我们会将这些文件列表保存到checkpoint中并记录下来。而当checkpoint完成时,Flink会通知所有的节点。此时这些节点就会将checkpoint中保存的文件移动到正式目录中。
在这种实现方式中,Flink利用已有的checkpoint机制实现了一个两阶段提交的机制。所有节点在执行checkpoint时执行了预提交的操作,将所有数据都先写入到一个可靠的分布式存储中。当checkpoint在JobManager上完成时,即认为这个事务被提交了。所有节点在收到checkpoint成功的消息后会完成最后的事务提交操作。
如果有节点在执行最后文件移动的时候出现故障,那么Flink作业将从上次完成的checkpoint中恢复,并从上次完成的checkpoint中获得完整的文件列表。Flink作业会检查这个文件列表中的文件,并将所有还未移动的文件移动到最终的目录中。
而为了确保数据在整个接入过程在不会发生数据丢失和重复,我们会对整个数据链路中的每个组件发送和接收到的数据数目进行了采集和对账。由于一般的指标系统并不能保证指标的时效性和正确性,因此我们也基于Flink实现了高可靠和强一致性的指标聚合。
类似于数据链路,我们也采用Flink的checkpoint机制来保证指标数据的一致性。我们通过Flink将采集到的指标按照分钟粒度进行聚合,并在执行checkpoint时将这些聚合指标保存到外部存储中。在保存聚合指标时,除了一般的标签之外,我们还会带上写出这些指标时的checkpoint编号。而当checkpoint完成时,每个节点还会将完成的checkpoint编号也记录到外部存储中。当我们需要查询指标时,我们只需要将已完成的checkpoint编号和聚合指标进行连接就可以获得一致性的指标结果。
通过Flink的checkpoint机制,我们可以保证数据链路和指标链路中数据传输和指标聚合的一致性,确保在整个数据接入链路实现端到端的Exactly Once数据传输。
基于Iceberg实现ACID的实时数据接入
Apache Iceberg是一个通用的表格式(数据组织格式),它可以适配Presto,Spark等引擎提供高性能的读写和元数据管理功能。Iceberg的定位是在计算引擎之下存储之上。它是一种数据存储格式,Iceberg称其为"table format"。准确的说,它是介于计算引擎和数据存储格式之间的数据组织格式 - 通过特定的方式将数据和元数据组织起来,因此称之为数据组织格式更为合理。
Iceberg通过锁机制实现了ACID的能力。在每次元数据更新时它会从metastore中获取锁并进行更新。同时Iceberg保证了线性一致性(Serializable isolation),确保表的修改操作是原子性的,读操作永远不会读到部分或是没有commit的数据。Iceberg提供了乐观锁的机制降低锁的影响,并且使用冲突回退和重试机制来解决并发写所造成的冲突问题。
基于ACID的能力,Iceberg提供了类似于MVCC的读写分离能力。首先,每次写操作都会产生一个新的快照(snapshot),快照始终是往后线性递增,确保了线性一致性。而读操作只会读取已经存在了的快照,对于正在生成的快照读操作是不可见的。每一个快照拥有表在那一时刻所有的数据和元数据,因此提供了用户回溯(time travel)表数据的能力。利用Iceberg的time travel能力,用户可以读取那一时刻的数据,同时也提供了用户快照回滚和数据重放的能力。
相比于Hudi,Delta Lake,Iceberg提供了更为完整的表格式的能力、类型的定义和操作的抽象,并与上层数据处理引擎和底层数据存储格式的解耦。此外,Iceberg在设计之初并没有绑定某种特定的存储引擎,同时避免了与上层引擎之间的相互调用,使得Iceberg可以非常容易地扩展到对于不同引擎的支持。
而在数据接入中,通过Iceberg可以保证ACID事务和强一致性,实现“有且仅有一次”的写入;读写分离使交互式查询引擎(如Hive和Presto等)可以第一时间读到正确的数据;Row-level update和delete支持通过计算引擎进行数据修正;增量消费使得已落地的数据可以进一步的返回流式处理引擎,并只处理和向后传递变化的部分;Iceberg高效的查询能力也能省去导入MySQL或ClickHouse等环节,直接被报表和BI系统消费。
为了能够使用Iceberg,腾讯大数据实现了支持Iceberg的Flink连接器,允许Flink将数据写入到Iceberg中。Flink的Iceberg Sink由两部分组成,一个称为Writer,而另一个是Committer。Writer负责将收到的数据写到外部的存储中,形成一系列的DataFile。目前为了简化适配并最大限度利用已有逻辑,腾讯内部使用Avro作为数据的中间格式。后续社区将引入一个Flink内建类型的转换器,使用Iceberg内建的数据类型作为输入。当Writer执行checkpoint时,Writer会关闭自己的文件,将构建的DataFile发送给下游的Committer。
Committer在一个Flink作业中是全局唯一的。在收到上游所有Writer发送的DataFile后,Committer会将这些DataFile写到一个ManifestFile中,并将ManifestFile保存到checkpoint中。当checkpoint完成之后,Committer会将ManifestFile通过merge append提交给Iceberg。Iceberg内部会通过一系列操作完成commit操作,最终让新加入的数据对下游的数据仓库可见。
腾讯对Iceberg进行了大量的改进和优化。除了支持了Flink的读写之外,腾讯还完成了行级的删除和更新操作,极大的节约了数据批改和删除所带来的开销。同时,腾讯还对Spark 3.0中的Data Source V2进行了适配,利用Spark 3.0中的SQL和DataFrame可以无缝的对接Iceberg。
而在后面的工作中,腾讯会继续增强Iceberg的核心能力,主要包括:
- 为Flink sink增加update和delete的语义,使延迟到达的数据可以得到正确的处理,以支持CDC的场景;
- 增加对Hive的支持;
- 增加以Merge-On-Read方式进行row-level update和delete操作等。
后台回复关键词【GIAC】可获取嘉宾分享PPT。