在过去三年中,由于用户和内容的增长,Notion 的数据增长了 10 倍,以 6-12 个月的速度翻了一番。要管理这种快速增长,同时满足关键产品和分析用例不断增长的数据需求,尤其是我们最近的 Notion AI 功能,意味着构建和扩展 Notion 的数据湖。以下来介绍我们是如何做到的。
Notion 的数据模型和增长
在 Notion 中看到的所有内容(文本、图像、标题、列表、数据库行、页面等)尽管前端表示和行为不同,但在后端被建模为“块”实体,并存储在具有一致结构、架构和相关元数据的 Postgres 数据库中(了解有关 Notion 数据模型的更多信息)。
在用户活动和内容创作的推动下,所有这些区块数据每 6 到 12 个月翻一番。在 2021 年初,我们在 Postgres 中有超过 200 亿个区块行,此后这个数字已经增长到超过 2000 亿个区块——即使压缩后的数据量也高达数百 TB。为了在增强用户体验的同时管理这种数据增长,我们战略性地将数据库基础设施从一个 Postgres 实例扩展到更复杂的分片架构。我们从 2021 年开始将 Postgres 数据库水平分片为 32 个物理实例,每个实例包含 15 个逻辑分片,并在 2023 年继续将物理实例数量增加到 96 个,每个实例有 5 个逻辑分片。因此,我们总共维护了 480 个逻辑分片,同时确保了长期可扩展的数据管理和检索能力。
到 2021 年,Postgres 构成了我们生产基础设施的核心,处理从在线用户流量到各种离线数据分析和机器学习需求的所有内容。随着对线上和线下数据需求的增加,我们意识到构建一个专用的数据基础设施来处理离线数据而不干扰在线流量至关重要。
2021 年 Notion 的数据仓库架构
2021 年,我们通过一个简单的 ELT(提取、加载和转换)管道启动了这个专用数据基础设施,该管道使用第三方工具 Fivetran 将数据从 Postgres WAL(预写日志)摄取到 Snowflake,并为 480 个分片设置了 480 个每小时运行的连接器,以写入相同数量的原始 Snowflake 表。然后我们将这些表合并为一个大表,用于分析、报告和机器学习用例。
扩展挑战
随着 Postgres 数据的增长,我们遇到了一些扩展挑战。
速度、数据新鲜度和成本
将数据摄取到 Snowflake 的速度变慢且成本更高,这主要是由于 Notion 独特的更新繁重工作负载。Notion 用户更新现有块(文本、标题、标题、项目符号列表、数据库行等)的频率远远高于添加新块的频率。这导致块数据主要是更新量大的 ~90% 的 Notion 更新插入是更新。大多数数据仓库(包括 Snowflake)都针对插入繁重的工作负载进行了优化,这使得它们摄取块数据变得越来越具有挑战性。
用例支持
数据转换逻辑变得更加复杂和繁重,超过了现成数据仓库提供的标准 SQL 接口的功能。
- • 一个重要的用例是为关键产品(例如 AI 和搜索)构建 Notion 区块数据的非规范化视图。例如,权限数据确保只有正确的人才能读取或更改块(本博客讨论 Notion 的块权限模型)。但是一个区块的权限并不是静态地存储在相关的Postgres中,它必须通过昂贵的树遍历计算来动态构建。
- • 在以下示例中,
block_1
,block_2
, 并block_3
继承其直接父级 (page_3
和page_2
) 和祖先 (page_1 ``workspace_a).
和 和 要为每个块构建权限数据,我们必须遍历其祖先树一直到根 (workspace_a
),以确保完整性。由于有数千亿个区块,其祖先深度从几个到几十个不等,这种计算成本非常高,而且只会在 Snowflake 中超时。
由于这些挑战,我们开始探索构建我们的数据湖。
构建和扩展 Notion 的内部数据湖
以下是我们构建内部数据湖的目标:
- • 建立一个能够大规模存储原始数据和处理数据的数据存储库。
- • 为任何工作负载(尤其是 Notion 的更新密集型块数据)实现快速、可扩展、可操作且经济高效的数据摄取和计算。
- • 解锁需要非规范化数据的 AI、搜索和其他产品用例。
但是,虽然我们的数据湖是向前迈出的一大步,但重要的是要澄清它不打算做什么:
- • 完全替换 Snowflake。我们将继续受益于 Snowflake 的操作和生态系统易用性,将其用于大多数其他工作负载,尤其是那些插入量大且不需要大规模非规范化树遍历的工作负载。
- • 完全替换 Fivetran。我们将继续利用 Fivetran 在非更新繁重表、小型数据集摄取以及多样化的第三方数据源和目标方面的有效性。
- • 支持需要二级或更严格延迟的在线用例。Notion 数据湖将主要关注可以容忍几分钟到几小时延迟的离线工作负载。
数据湖的高级设计
自 2022 年以来,我们一直使用如下所示的内部数据湖架构。我们使用 Debezium CDC 连接器将增量更新的数据从 Postgres 摄取到 Kafka,然后使用 Apache Hudi(一个开源数据处理和存储框架)将这些更新从 Kafka 写入 S3。然后利用这些原始数据,我们可以进行转换、非规范化(例如,每个块的树遍历和权限数据构建)和扩充,然后将处理后的数据再次存储在 S3 中或下游系统中,以满足分析和报告需求,以及 AI、搜索和其他产品要求。
接下来,我们将描述和说明我们在广泛的研究、讨论和原型设计工作后得出的设计原则和决策。
设计决策 1:选择数据存储库和湖
我们的第一个决定是将 S3 用作数据存储库和湖来存储所有原始和处理过的数据,并将数据仓库和其他面向产品的数据存储(如 ElasticSearch、Vector Database、Key-Value Store 等)定位为其下游。我们做出这个决定有两个原因:
- • 它与 Notion 的 AWS 技术堆栈保持一致,例如,我们的 Postgres 数据库基于 AWS RDS,其导出到 S3 的功能(在后面的部分中描述)允许我们轻松地在 S3 中引导表。
- • S3 已经证明了它能够以低成本存储大量数据并支持各种数据处理引擎(如 Spark)。
通过将繁重的摄取和计算工作负载卸载到 S3,并仅将高度清理的业务关键型数据摄取到 Snowflake 和面向产品的数据存储,我们显著提高了数据计算的可扩展性和速度,并降低了成本。
设计决策 2:选择处理引擎
我们选择Spark作为我们的主要数据处理引擎,因为作为一个开源框架,它可以快速设置和评估,以验证它是否满足我们的数据转换需求。Spark 具有四个主要优势:
- • Spark 除了 SQL 之外,还具有广泛的内置函数和 UDF(用户定义函数),可实现复杂的数据处理逻辑,如树遍历和块数据非规范化,如上所述。
- • 它为大多数轻量级用例提供了用户友好的 PySpark 框架,并为高性能、繁重的数据处理提供了高级 Scala Spark。
- • 它以分布式方式处理大规模数据(例如,数十亿个块和数百 TB),并公开广泛的配置,这使我们能够微调对分区、数据倾斜和资源分配的控制。它还使我们能够将复杂的作业分解为更小的任务,并优化每个任务的资源配置,这有助于我们实现合理的运行时,而不会过度配置或浪费资源。
- • 最后,Spark的开源特性提供了成本效益优势。
设计决策 3:优先于快照转储增量摄取
在完成我们的数据湖存储和处理引擎后,我们探索了将 Postgres 数据摄取到 S3 的解决方案。我们最终考虑了两种方法:增量摄取更改的数据和 Postgres 表的定期完整快照。最后,基于性能和成本的比较,我们选择了混合设计:
- • 在正常操作期间,以增量方式摄取更改的 Postgres 数据并将其持续应用于 S3。
- • 在极少数情况下,导出完整的 Postgres 快照以引导 S3 中的表。
增量方法可确保以更低的成本和最小的延迟(几分钟到几个小时,具体取决于表大小)获得更新鲜的数据。相比之下,导出完整快照并转储到 S3 需要 10 多个小时,成本是 S3 的两倍,因此在 S3 中引导新表时,我们很少这样做。
设计决策 4:简化增量引入
- • 用于 Postgres → Kafka 的 Kafka CDC 连接器
我们选择了 Kafka Debezium CDC(更改数据捕获)连接器将增量更改的 Postgres 数据发布到 Kafka,类似于 Fivetran 的数据摄取方法。我们之所以选择它与 Kafka 一起,是因为它们具有可扩展性、易于设置以及与我们现有基础架构的紧密集成。
- • 用于 Kafka → S3 的 Hudi
为了将增量数据从 Kafka 引入到 S3,我们考虑了三种出色的数据湖解决方案:Apache Hudi、Apache Iceberg 和 Databricks Delta Lake。最后我们选择了 Hudi,因为它具有出色的性能,可以处理大量更新的工作负载,并且具有开源特性以及与 Debezium CDC 消息的原生集成。另一方面,当我们在 2022 年考虑 Iceberg 和 Delta Lake 时,它们并没有针对我们的更新繁重工作负载进行优化。Iceberg 还缺乏一个能够理解 Debezium 消息的开箱即用的解决方案;Delta Lake 有一个但并不开源。如果我们采用这两种解决方案中的任何一个,我们将不得不实现我们自己的 Debezium 消费者。
设计决策 5:在处理之前引入原始数据
最后,我们决定将原始 Postgres 数据摄取到 S3,而无需进行动态处理,以便建立单一事实来源并简化整个数据管道的调试。一旦原始数据进入 S3,我们就会进行转换、非规范化、扩充和其他类型的数据处理。我们再次将中间数据存储在 S3 中,并且仅将高度清理、结构化和关键业务数据引入下游系统,以满足分析、报告和产品需求。
扩展和运营我们的数据湖
我们尝试了许多详细的设置,以解决与 Notion 不断增长的数据量相关的可扩展性挑战。以下是我们尝试的内容和进展情况:
CDC 连接器和 Kafka 设置
我们在每个 Postgres 主机上设置一个 Debezium CDC 连接器,并将它们部署在 AWS EKS 集群中。由于 Debezium 和 EKS 管理的成熟度以及 Kafka 的可扩展性,我们在过去两年中只需要升级几次 EKS 和 Kafka 集群。截至 2024 年 5 月,它可以顺利处理数十 MB/秒的 Postgres 行变更。我们还为每个 Postgres 表配置一个 Kafka 主题,并让所有消耗 480 个分片的连接器写入该表的同一主题。此设置显著降低了为每个表维护 480 个主题的复杂性,并简化了下游 Hudi 对 S3 的摄取,从而显著降低了运营开销。
Hudi设置
我们使用 Apache Hudi Deltastreamer(一个基于 Spark 的摄取作业)来使用 Kafka 消息并在 S3 中复制 Postgres 表的状态。经过几轮性能优化后,我们建立了一个快速、可扩展的摄取设置,以确保数据新鲜度。对于大多数表,此设置仅提供几分钟的延迟,而对于最大的表(块表)则提供长达两个小时的延迟(见下图)。
- • 我们使用默认的 COPY_ON_WRITE Hudi 表类型和 UPSERT 操作,这适合我们的更新繁重工作负载。
- • 为了更有效地管理数据并最大程度地减少写入放大(即每次批处理摄取运行更新的文件数),我们微调了三种配置:
- • 使用相同的 Postgres 分片方案对数据进行分区/分片,即
hoodie.datasource.write.partitionpath.field: db_schema_source_partition
配置。这会将 S3 数据集划分为 480 个分片,从shard0001
到shard0480
, 更有可能将一批传入更新映射到同一分片中的同一组文件。 - • 根据上次更新时间 (event_lsn) 对数据进行排序,即
source-ordering-field: event_lsn
配置。这是基于我们的观察,即较新的块更有可能得到更新,这使我们能够仅使用过时的块来修剪文件。 - • 将索引类型设置为 bloom filter,即
hoodie.index.type: BLOOM
配置,以进一步优化工作负载。
- • 使用相同的 Postgres 分片方案对数据进行分区/分片,即
Spark数据处理设置
对于我们的大多数数据处理工作,我们使用 PySpark,其相对较低的学习曲线使许多团队成员都可以使用它。对于更复杂的工作,如树遍历和非规范化,我们在几个关键领域利用了Spark的卓越性能:
- • 我们受益于 Scala Spark 的性能效率。
- • 我们通过分别处理大分片和小分片来更有效地管理数据(请记住,我们在 S3 中保留了相同的 480 分片方案,以便与 Postgres 保持一致);小分片将其全部数据加载到 Spark 任务容器内存中以便快速处理,而超出内存容量的大分片则通过磁盘重新洗牌进行管理。
- • 我们利用多线程和并行处理来加快 480 个分片的处理速度,从而优化运行时间和效率。
引导设置
以下是我们引导新表的方法:
- • 我们首先设置了 Debezium 连接器,以将 Postgres 更改引入 Kafka。
- • 从 timestamp
t
开始,我们启动 AWS RDS 提供的导出到 S3 作业,将 Postgres 表的最新快照保存到 S3。然后,我们创建一个 Spark 作业来从 S3 读取这些数据,并将它们写入 Hudi 表格式。 - • 最后,我们通过设置 Deltastreamer 从 Kafka 消息中读取
t
来捕获快照过程中所做的所有更改。此步骤对于保持数据完整性和完整性至关重要。
由于 Spark 和 Hudi 的可扩展性,这三个步骤通常在 24 小时内完成,使我们能够在可管理的时间内执行重新引导,以适应新的表请求和 Postgres 升级和重新分片操作。