作者:龙逸尘,腾讯 CSIG 高级工程师
腾讯云原生实时数仓建设实践
实时数仓面临的挑战
实时数仓被广泛应用于腾讯各大业务,涉及的平台众多,从统计信息中可以看出,集群规模庞大,数据量极大。
复杂的使用场景和超大的数据量,导致我们在实时数仓的建设与使用过程中遇到许多挑战。
- 时效性 数仓使用者对时效性有非常强烈的诉求:希望查询响应更快,看板更新更及时,指标开发更快完成。因为时效性越高,数据价值也就越高。如何保障数仓的时效性是首要难题。
- 架构复杂度 如何在保障时效性的同时,降低架构复杂度以减少开发和维护成本,是需要重点考虑的问题。
- 数据准确性 保证数仓中数据和指标的准确性,不能存在异常或者错误,是对实时数仓的基础要求。
- 成本 在实际的生产使用中,计算与存储资源并不是免费的,如何优化实时数仓的使用成本也是值得关注的问题。
实时数仓的演进
实时数仓功能需求
总结来看,对于实时数仓,我们最期待的功能是以下四点:
- 保证全流程 T 0 级别时效性
- 统一并简化数仓架构
- 保障数据准确性
- 降低计算与存储成本
为了解决上述挑战,满足实时数仓的功能需求,我们进行了大量探索和实践,也见证了实时数仓架构的演进过程。
离线数仓
数据仓库一般采用分层架构来构建。在数仓发展的早期阶段,流处理引擎未成熟之前,离线数仓处于主流地位。
离线数仓采用 T 1 级别的离线 ETL 导入数据,在 ODS、DWD 和 DWS 层使用 Hive、Spark 等批处理引擎计算前一天的数据。为了提升时效性,在 ADS 层引入了 ClickHouse 提供秒级查询能力。然而总体上还是只能提供 T 1 级别的时效,无法满足我们对时效性的需求。
Lambda 架构
随着业务的发展,演化出了 Lambda 架构。Lambda 架构在原有离线数仓的基础上,增加了实时层。实时层从源头开始做实时化改造,完成数据的实时、增量计算。离线层与实时层并行运行,最终由统一的数据服务层将计算结果合并。
Lambda 架构充分发挥实时计算的优势,提升了数仓的时效性。同时,还可以利用离线批处理的数据重放能力保障数据的准确性,例如实时流处理遇到了故障导致输出的结果不够精确时,离线批处理系统可以把数据重跑一次,用更精确的数据来覆盖实时流结果。
但是 Lambda 架构的缺点也很明显。首先,它存在批处理和流处理两个相互独立的数据处理流程,同一套业务逻辑代码需要适配性的开发两次,开发成本高;而且需要同时维护实时和离线两套引擎,架构复杂,运维成本高。另外,Lambda 架构对存量数据更新不友好,需要重跑整个离线链路,消耗大量资源。
Kappa 架构
为了解决 Lambda 架构带来的诸多问题, Kappa 架构诞生了。
Kappa 架构将流和批融为一体,不再分为两条数据处理链路。数仓各层使用消息队列作为存储,数据经过 Flink 处理后通过消息队列传递,保障了 T 0 级别时效。
Kappa 架构解决了 Lambda 架构中离线层和实时层之间由于引擎不同,导致的开发运维成本高昂的问题,整体架构简洁明了。但 Kappa 架构也有其痛点。
- Kappa 架构基于消息队列的数据回放能力以及流处理引擎提供的 Exactly-Cnce 语义完成历史数据的回溯,然而回溯过程中,流处理引擎的吞吐量是比不上批处理的,这可能导致一些延迟。
- Kappa 架构强依赖支持回放的消息队列作为底层存储,为了保证作业崩溃或逻辑修改后可以随时回溯历史数据,消息需要有很长的保存期,大大增加了存储成本。
- Kappa 架构数仓中间层没有采用可落盘的文件存储,当前无法使用 OLAP 引擎直接分析数仓中间层的数据,通常需要启动一个单独的作业来导出数据才能进行分析,灵活度欠佳。
- Kappa 架构无法复用目前已经非常成熟的基于离线数仓的数据血缘、数据质量管理体系,需要重新实现自己的体系。
经过分析,我们发现 Kappa 架构的主要问题出在它的存储上,因此若想要对其优化,就需要更新其存储架构。那么是否存在一种存储技术,既能够实现分钟级到秒级的数据接入和处理,保障数仓时效性;又能实现数据的流批统一读写,简化数仓架构;还可以支持高效数据回溯和历史数据更新,保证数据准确性呢?
仔细研究后,我们发现基于 Iceberg 的数据湖能很好地满足我们的需求。
Iceberg 的引入
Iceberg 支持流式读写和批量读写,可以统一数仓架构。同时,Iceberg 可以通过小批量的数据增量读写,将数仓整体延迟减小到分钟级甚至是秒级。对于流作业崩溃等情形,可以借助 Iceberg 高效的历史数据回溯能力,从特定的快照开始重新消费数据。
Iceberg 还支持对大规模数据集进行更新删除,因此数据回溯时无需全量重新计算,降低了数据更新延迟。此外,它还支持超长的数据保存期,不必担心数据保存期过短,历史数据被清理而难以回溯等传统数据管道会遇到的难题。
而对于需要分析中间层的数据等需求,Iceberg 支持 parquet 等列式存储格式,因此可以使用常见的批处理分析工具来直接分析 Iceberg 数据文件,结合谓词下推等 OLAP 优化策略,非常高效、便捷。
Iceberg 可以在一定程度上代替 Kafka 等传统流式数据管道作为数仓存储。它的引入,弥补了 Kappa 架构的许多不足。
基于数据湖 Iceberg 的架构
基于数据湖 Iceberg 的实时数仓架构可以很好地满足我们对实时数仓的需求:
- 支持流式写入增量拉取,可以实现全流程 T 0 级的时效
- 实现存储层的流批统一,简化架构
- 完美支持数据高效回溯,保障数据准确性
- 相比于消息队列基于日志的存储架构,支持列式存储,大大降低存储成本
引入 Iceberg 后,实时数仓的发展已经进入了新的阶段,但当前方案也并非毫无缺陷。
云原生的价值
当前 Iceberg 底层存储仍然以 HDFS 为主,这种基于 Hadoop 生态与 HDFS 存储的的传统实时数仓体系,面临着存算耦合的问题,导致弹性能力不足、资源利用率低,同时存储与计算资源的错配使得成本居高不下。
而随着云原生技术的普及和落地,云原生架构的核心优势可以很好地解决传统实时数仓的痛点。
云原生架构,本质上就是存算分离的。底层是对象存储等分布式共享存储,上层是无状态的分布式共享计算池,借助 K8s 等资源编排引擎,实现计算资源的弹性伸缩。
借助云原生架构提供的弹性计算能力,我们能够削峰填谷地利用计算资源,业务繁忙时迅速增加资源,业务空闲时及时释放多余资源,可以极大提升资源利用率,节省大量成本。
同时,云原生带来的规模经济,使得存储等基础设施变得廉价,为用户提供了更多选择。拿对象存储来举例,相比于 HDFS,对象存储不存在 NameNode 单点问题,可以提供更大的容量和更低的成本,避免了自建存储集群的维护代价,因此受到越来越多用户的青睐。
可以说云原生架构相比于传统 Hadoop 生态架构,最具优势的点在于存算分离特性、资源弹性调度能力以及对象存储等新型基础设施提供的成本优势。
云原生实时数仓架构
在原数据湖数仓架构的基础上,基于云原生的实时数仓应运而生。我们充分发挥云原生的优势,引入 K8s 作为资源调度引擎,提供弹性计算能力,并将底层存储替换为对象存储,提供更大容量与更低成本。
云原生实时数仓充分采用存算分离思想,整个生态体系由计算层,存储层与公共服务组成。
- 存储层使用对象存储作为底层的数据存储,支持 Parquet、Avro 等存储格式,并基于 Iceberg 更有效率地组织数据。
- 计算层利用 Alluxio 缓存加速数据的读取,提供数据本地化能力,K8S 提供资源弹性调度的能力,支持上层的 Flink 与 ClickHouse 等计算引擎。
- 公共服务包括租户管理、权限管理、元数据管理、数据血缘、数据质量等。
云原生实时数仓可以很好地满足我们对实时数仓的功能需求,还提供了更强的弹性计算能力与更低的成本。
云原生实时数仓建设实践
云原生实时数仓建设实践核心是以下三个部分,弹性计算实践 Flink on k8s,存算分离实践 ClickHouse 以及弹性存储实践 Iceberg。
Flink on Kubernetes 实践
Flink on Kubernetes 面临的挑战
Flink 已经成为实时计算的事实标准,为了使其与 K8s 更好地结合,发挥 K8s 弹性调度的优势,社区已经有了很多实践,然而当前仍然存在着一些挑战。
- Serverless 当前大多数 K8s 集群仍然是部署在物理机或者云虚拟机上,因此弹性能力受到集群机器的容量限制,无法真正做到 Serverless。
- 调度性能 对于云原生实时数仓场景,原生的 K8s 调度器无法提供所需的功能,比如保证多租户资源隔离、资源公平调度、资源优先级感知等,急需提升调度能力。
- 调优诊断困难 在 K8s 环境下,一旦 Flink 作业出现故障,需要面对的是成千上万个运行中的容器和复杂的网络环境。而且随着 pod 的退出,故障现场很可能丢失。这也导致 Flink on K8s 作业很难进行诊断与调优。
- 扩缩容速度 Flink 作业进行弹性扩缩容操作时,需要对作业进行重启,导致数据断流。用户希望能提升扩缩容速度,更快地启动作业,减少停流时间。
这些问题推动我们对 Flink on K8s 进行了进一步的优化。
Flink on Kubernetes 整体方案
首先看下 Flink on K8s 的整体方案,腾讯内部采用的是 Flink Application Mode on Native K8s 的架构。
Application Mode 允许用户作业的 JobGraph 在 Flink Master 中编译,在 JobManager 中运行 main 方法。因此无需将所有依赖下载到 Client 端,节省大量带宽的同时,将 client 端负载均匀分散到集群的每个节点上,使得 Client 更轻量化并且具有可扩展能力,能更好地适应 Native K8s 环境。
Flink on Kubernetes 实践 - Serverless
之前提到,当前大多数 k8s 集群仍然是部署在物理机或者云虚拟机上,需要提前购买并维护机器,导致弹性能力受到很大限制。另外机器存在闲置时间,降低资源利用率,增加了成本。
因此我们引入了弹性容器服务 EKS,用户无须管理任何计算节点,而是从整个腾讯的资源池中申请资源,这些资源是以 pod 形式交付的,按需申请,实时交付,能完美适配原生 K8s 的接口请求。
这种无服务器形态带来了灵活的弹性伸缩能力,真正做到 Serverless,为用户实现更高的资源利用率和更低的成本。
Flink on Kubernetes 实践 - 自定义调度器
针对 K8s 默认调度器的能力缺陷,我们实现了自定义调度器。
自定义调度器内部与 yarn 队列类似,可以对多租户进行资源隔离。管理员可以为各租户配置保障资源与最大资源,并且预留弹性资源以保证一定的弹性能力。多个租户可能同时发起作业调度请求,此时调度器会根据租户资源队列使用状况、租户优先级策略等信息,为多租户的作业进行有序调度。除此之外自定义调度器还实现了反亲和策略,会将 jobmanager 和 taskmanager 尽可能调度到不同的 k8s 节点上。
自定义调度器补足了 k8s 默认调度器的能力缺陷,满足了云原生实时数仓作业调度的需求。
Flink on Kubernetes 实践 - AutoPilot 自动诊断调优
为了增强 Flink on k8s 作业的诊断与调优能力,我们实现了 AutoPilot 机制。整体的架构如下图所示。
首先为了快速感知作业是否异常,我们设计了作业状态感知方案,包括以下要点:
- 扩展 Flink 内核,增加实时事件推送能力
- 除了采集 Flink 的各种指标,还采集 k8s 的 deployment 和 pod 事件
- 引入 LogListener 和 DiagnosisDelivery 采集作业现场
- 打造事件中心,事件中心采取主动拉取与被动接收推送相结合的方式,将多源的事件汇集成一个综合事件
通过作业状态感知,我们现在可以采集到作业几乎所有的运行状态与事件。但如果基于事后的观察来进行作业指标调优的话,得到的结果往往是滞后的,我们希望根据历史指标对作业未来的运行状态进行预测。因此我们会保存一段时间内的历史指标,根据历史指标与预测模型生成预测指标,并结合当前的实时指标,综合起来提供给 AutoPilot,AutoPilot 学习开发人员输入的经验与规则,对指标进行分析。它判断作业是否发生了异常,异常原因是什么;作业是否需要参数调优,要调整到什么配置;作业是否需要扩缩容,扩缩容的大小等。比如,发现作业发生重启,直接原因是 Checkpoint 超时,根本原因是作业中数据倾斜,修复方案就是提高作业并行度,随后进行行动,增加作业 TaskManager 数量。
开发人员通过制定各类规则,将他们的工作经验和知识沉淀到 AutoPilot中。AutoPilot 可以极大减少人工的投入,将开发运维人员从繁琐的故障定位、作业配置调优等工作中解放出来。
Flink on Kubernetes 实践 - 加速作业扩缩容
引入 Auto pilot 之后,在作业的运行过程中,随着数据量和数据特征的改变,作业可能会发生多次自动扩缩容。用户希望能提升扩缩容速度,更快地启动作业,减少停流时间。
分析整个作业的调度流程,发现所有 TaskManager 的 Slot 分配并注册完成后,作业才能启动,而根据木桶效应,耗时最长的 TaskManager 就会成为短板,影响作业的整体启动时间。
TaskManager 注册阶段的耗时主要集中在 Pod 启动以及 Slot 注册两个阶段。
Pod 启动时需要下载作业镜像和依赖,整个过程主要受带宽与作业镜像的大小影响。首先通过弹性网卡(Elastic Network Interface, ENI)直接通信,减少带宽损耗;并且定制化 Flink 镜像,按需裁剪,获得最简单的镜像,减少下载内容的大小;另外将用户依赖与 Flink 镜像分离,采取多线程方式同时下载;此外还可以预加载作业依赖与镜像,延迟绑定 Pod。这些措施使得 Pod 启动耗时减少了 50% 左右。
而在 Slot 注册阶段,如果 JobManager 繁忙无法及时回应,可能导致注册失败。此时 TaskManager 会采取指数退避的方式,不断尝试重新注册 Slot。在此过程中 DNS 反解析的耗时较长,造成 JobManager 处理线程拥塞,导致注册失败。但是此处的反解析只是为了友好化日志打印,并不是一定要在注册 Slot 时进行。因此提供选项直接跳过,后续用到时按需加载。
除此之外我们还会针对慢节点申请冗余 TaskManager ,以提升 TaskManager 启动速度,已注册的 TaskManager 充足后及时释放冗余的节点,减少资源占用。
通过以上优化,我们成功将 99% 50CU 内的作业的启动时间降低到 1 分钟以内。
ClickHouse 存算分离实践
ClickHouse 面临的挑战
ClickHouse 采用 Shared-Nothing 架构,存储计算一体化。每个节点都有自己的本地存储,每个节点的计算资源只关注处理本节点存储的数据。正是因为 ClickHouse 的架构简单而纯粹,为其带来了强大的性能。
然而这种存储与计算资源耦合的设计也造成了使用的局限性。ClickHouse 集群新增节点后数据无法自动均衡,也没有办法简单地卸载掉多余的计算资源,最终导致 ClickHouse 不具备弹性计算的能力。
ClickHouse 存算分离方案设计
为了支持弹性伸缩,我们对 ClickHouse 进行了关键改造,将 ClickHouse 架构分为三层。
- 元数据服务层,主要存储 ClickHouse 集群的关键元数据,包括表的 Schema 数据 以及数据分布的映射关系。并且通过心跳机制排除失效节点,保证数据重分布。
- 计算层每个节点都是无状态的存在。每个计算节点都能完整执行 SQL,且具备本地缓存,以及运行所需的索引数据等。具备秒级弹性能力。
- 存储层使用云原生共享存储服务作为底层存储,例如对象存储 COS 等,提供全局一致的数据视角。
ClickHouse 每个实例都可以成为一个分片(Shard),ClickHouse 集群由若干个 Shard 构成,我们将 Shard 内部的数据切分成了更小的 Bucket。Bucket 和 Shard 的映射关系由元数据服务统一管理。弹性伸缩时, Bucket 与节点所属关系会重新计算。
左图集群缩容时 Shard3 被剔除,原先处于 Shard3 上的 Bucket6/7 将被重新分配到 Shard2 上;而右图集群扩容 新增 Shard4 时,将原先位于 Shard2 上的 Bucktet3 重新分配到新节点 Shard4 上。
Bucket 与节点的所属关系通过心跳告知现有节点,如果发现自己的数据分布关系有变化,则会直接去元数据服务层取出对应 Bucekt 的元数据并加载。
存储层基于云原生的共享存储 COS,提供全局一致的数据视角。由于 Bucket 的真实数据都保存在 COS 中,因此扩缩容只需要加载 Bucket 的元数据,并不需要对数据进行复制与同步,实现了数据的 Zero-Copy。基于 COS 的存储层也带来了可观的成本降低。
上述改造使得 Clickhouse 具有秒级弹性伸缩能力,同时降低了成本,可以在云原生实时数仓的 OLAP 分析中发挥重要作用。
Flink ClickHouse Connector 设计
为了使得 ClickHouse 可以更好地与 Flink 结合使用,我们对 Flink ClickHouse Connector 也进行了改进与优化。
首先基于 FLIP-27 构造了 ClickHouse Source,将 ClickHouse 的读取任务也使用 Flink 来完成,可以更优雅地完成 ClickHouse 集群间的数据复制与同步工作,也便于在实时开发平台管理任务。
写入 ClickHouse 时,既可以写分布式表,也可以直接写本地表。为了减少数据延迟、节省网络带宽,我们事先获取各节点的连接地址,通过写本地表的方式直接写入各个分片,支持随机、轮询、散列等多种写分片方式。
另外我们注意到,流式数据通常会包含大量的更新和删除操作。为了支持频繁变更的数据,可以将 Flink 的 Retract Stream(回撤流)、Upsert Stream(更新-插入流)等含有状态标记的数据流,写入到 ClickHouse 的 CollapsingMergeTree 引擎表中,实现数据更新的语义。
考虑到 ClickHouse 擅长大批量写入的特点,还需要对 Flink ClickHouse Sink 增加攒批写入的支持,避免频繁写入造成的性能下降问题;此外还有故障重试策略、Flink 与 ClickHouse 之间的 SQL 类型映射等需要关注的点。
这些优化使得 Flink ClickHouse Connector 的性能和稳定性得到提升。
Iceberg 结合对象存储的实践
Iceberg 面临的挑战
基于对象存储的 Iceberg 面临的挑战可以总结为以下几点:
- 底层存储局限:对于海量数据存储的场景,HDFS 和对象存储都存在局限性。
- 小文件问题:Iceberg 采用实时方式写入会导致大量小文件的生成,大量小文件会影响存储性能。
- 查询性能不足:对象存储对于海量文件的操作能力有限,导致查询能力不足。如何加速 Iceberg 的查询也是值得探讨的问题。
针对以上的挑战,结合实时数仓数据存储的特征,腾讯大数据团队在 Iceberg 结合对象存储方案的稳定性和性能方面做了大量的优化。
Iceberg 结合对象存储的优势
首先我们进行了底层存储的替换,将 HDFS 替换为对象存储。相较于传统的 HDFS 存储,对象存储不存在 Namenode 的单点问题,可以支持海量文件的存储,同时允许存储容量和计算引擎各自独立地进行扩展,并且采用按量计费方式,成本可控。
然而对象存储也存在一定的局限。例如没有原生的 Rename 语义、List 操作性能弱、失去数据本地化优势导致查询缓慢等。
Iceberg 在数据组织方式上充分考虑了对象存储的特性,补齐了对象存储能力的不足。支持多版本无需进行 Rename 操作;自身存储分区文件列表避免耗时的 Listing 操作;引入稀疏索引,结合谓词下推,减少文件扫描量,加速查询。
这些设计使得 Iceberg 在与对象存储的适配上更有优势。
Iceberg 实践 - 数据优化服务
为了应对 Flink 实时、大量并发写入 Iceberg 导致的小文件问题,我们提供了数据优化服务,包括实时小文件合并、过期快照清理、遗留文件清理三种服务。
通过小文件合并服务,将文件数控制在一个比较稳定的范围内。同时通过过期快照清理、遗留文件清理保证数据的有效性。
数据优化服务可以增强 Iceberg 的可用性,提升用户体验。
Iceberg 实践查询三级加速
为了提升基于对象存储的 Iceberg 的查询性能,我们针对实时数仓的数据 IO 特点做了进一步性能优化,采取三级加速,分别在计算端、可用区端、存储端提供了性能加速能力。
在数据 Scan 阶段,可以用很少的缓存来撬动很好的加速效果。因此在计算端引入了缓存,与计算节点混合部署,利用计算集群本地的内存或磁盘资源缓存热数据,提供 Data Localization 能力,利用高速缓存功能解决存储性能问题。
可用区端引入数据加速器,这是在各可用区单独部署的高速缓存集群,采用全 SSD 存储介质,提供超大带宽与超低时延,加速同可用区的数据处理与查询,减少数据复制等场景的延迟。
而在数据端,在对象存储之上,我们构建了可扩展的元数据服务,为上层计算业务提供兼容 HDFS ⽂件系统语义的元数据操作能⼒。有了元数据加速能力的加持,就可以直接将对象存储当做 HDFS 用,用文件系统语义来访问对象存储服务。一方面,这一能力极大地提升了 List 等大数据文件系统操作的性能;另一方面,也提供了 Rename、Truncate 等典型的文件系统操作指令,提供了大数据生态兼容支持。
三级加速位于 Iceberg 和对象存储 COS 之间,将数据从对象存储移动到距离数据应用更近的位置,使数据更容易被访问到。这种层次化的加速架构,使得基于对象存储的 Iceberg 的查询性能比原生方案具有显著提升。
云原生实时数仓收益总结
云原生实时数仓建设的收益可以总结为以下几点:
- 云原生实时数仓实现存算分离,将存储与计算资源解耦,提升了架构可拓展性与灵活性
- 基于 K8s 提供弹性计算能力,按需使用存储与计算资源,实现自动、弹性扩缩容
- 存储与计算按使用量付费,并且使用对象存储代替传统 hdfs 存储,大大降低了使用成本。
云原生实时数仓展望
当前腾讯云原生实时数仓建设取得了一些成果,也将在未来进行进一步的升级优化
我们希望对实时数仓的流批一体能力进行持续演进,提升各个数仓组件的内核能力,适配 AI、物联网等更多场景。
流计算 Oceanus 是大数据产品生态体系的实时化分析利器,是基于 Apache Flink 构建的具备一站开发、无缝连接、亚秒延时、低廉成本、安全稳定等特点的企业级实时大数据分析平台。流计算 Oceanus 产品在实时数仓领域长期深耕,也将批流融合数仓也作为重点发展方向。在不久的将来,流计算 Oceanus 会提供全套实时数仓构建的解决方案,助力企业数据价值最大化,加速企业实时化数字化的建设进程。
流计算 Oceanus 限量秒杀专享活动火爆进行中↓↓
点击文末「阅读原文」,了解腾讯云流计算 Oceanus 更多信息~
腾讯云大数据
长按二维码 关注我们