PeerDB 团队最近完成了针对 Elasticsearch 的数据集成目标连接器的初步开发,并已进入测试阶段。 EElasticsearch 是一个广泛使用的搜索和分析引擎,它建立在分布式多用户能力的文档数据库之上。在多个行业的数据架构案例中都有 Elasticsearch 的广泛应用。
本文解释了一些通过 Postgres 到 Elasticsearch 的实时同步用例,然后通过一个快速演示展示了使用 PeerDB 进行 Postgres 到 Elasticsearch 复制的高性能和低延迟。最后,我们对连接器的架构进行了高级概述。
Postgres到Elasticsearch复制的使用案例
通过CDC或查询复制从Postgres到Elasticsearch复制的一些常见用例包括:
- 大容量数据的高效搜索:Elasticsearch的主要用途是作为一个搜索引擎,即使在海量的数据上也能高效运行。从全文和加权搜索,甚至到使用内置的NLP模型进行复杂的语义搜索,Elasticsearch都非常灵活且可调整。它常用于摄取和索引大量的日志,甚至作为搜索大型网站和内部知识库的支持引擎。
- 将数据从规范化转换为文档化:数据模型通常以高度规范化的形式存储在Postgres中,这对于事务完整性非常好,但对于可能需要使用联接或CTE的复杂查询来说就不利了。作为一个文档数据库,Elasticsearch更喜欢以非规范化的形式存储数据。使用PeerDB的查询复制功能,你可以定期将你的数据转换成非规范化的形式,这使得它更适合下游消费者查询。一些处理也可以使用Elasticsearch的摄取管道进行。
使用PeerDB从Postgres到Elasticsearch的低延迟复制
在这一部分,我将通过一个快速演示,介绍如何在变更数据捕获(CDC)模式下,使用 PeerDB 进行 Postgres 到 Elasticsearch 的复制。使用 PeerDB 从 Postgres 到 Elasticsearch 的复制有一些好处,主要的优点是快速的初始加载,和通过不断读取插槽来实现的亚分钟延迟,PeerDB 能够提供这些,因为它专注于 Postgres 的复制。
Postgres设置
你可以在云上或者在本地使用任何Postgres数据库。为了简单起见,我在这个演示中使用了一个在 Docker 容器中本地运行的 Postgres 集群。我们创建了一个名为 oss1
的表,使用一个多值插入语句每秒连续插入1000行。
postgres=# CREATE TABLE oss1 (
id INT PRIMARY KEY GENERATED BY DEFAULT AS IDENTITY,
c1 INT,
c2 INT,
t TEXT,
updated_at TIMESTAMP WITH TIME ZONE DEFAULT now()
);
CREATE TABLE
postgres=# INSERT INTO oss1 (c1, c2, t)
SELECT
generate_series AS c1,
generate_series * 2 AS c2,
'text_' || generate_series AS t
FROM
generate_series(1, 1000);
# 每秒运行一次INSERT
postgres=# watch 1
INSERT 0 1000
INSERT 0 1000
INSERT 0 1000
Elasticsearch设置
你可以在本地或者云虚拟机上使用它的 Docker compose设置来设置一个 Elasticsearch 实例。或者你也可以使用腾讯云 ES 或者 Elasticsearch Cloud。在这个演示中,我使用了本地运行的 Docker compose 设置。
PeerDB设置
你可以使用 PeerDB开源版 或者 PeerDB云版 来部署一个PeerDB实例。在这个演示中,我通过Docker compose在本地部署了PeerDB开源版。
创建对等体和镜像以进行Postgres到Elasticsearch的复制
在 PeerDB 世界中,对等体指的是源数据存储或目标数据存储。你可以使用 PeerDB 的用户界面来创建Postgres和Elasticsearch对等体。然后在源对等体和目标对等体之间创建一个镜像进行数据复制。你可以使用 PeerDB 的用户界面来创建一个从 Postgres 到 Elasticsearch 复制数据的 MIRROR。
我创建了一个基于变更数据捕获(CDC)的 MIRROR,它使用 Postgres 的预写日志(WAL)和逻辑解码来复制数据。它包括两个步骤:
- 初始加载:首先对 Postgres 中现有的数据进行完全一致的快照,并将其复制到 Elasticsearch;通过 PeerDB 的并行快照,你可以期望显著地加快初始加载速度。我们已经看到在几个小时内移动了几个太字节的数据,而不是几天。
- 变更数据捕获(CDC):一旦初始加载完成,PeerDB 将不断从逻辑复制插槽中读取 Postgres 中的变化,并将这些变化复制到 Elasticsearch。由于我们的流式架构,你可以期望对于持续运行的镜像到 Elasticsearch 的数据延迟在几秒钟的范围内。
初始加载应该很快就能完成,而且应该能在创建的 Elasticsearch 索引中看到行。在进入连续的 CDC 模式后,新的行应该会随着它们被插入而显示出来。下面附上了一个显示 Postgres 到 Elasticsearch CDC 镜像的快速视频。
架构和设计选择
我们之前已经详细讨论过 PeerDB 的流式架构,但总的来说,PeerDB 利用 Go 的 goroutines 和通道,通过逻辑复制有效地从 PostgreSQL 读取数据,然后通过 Bulk API 批量将数据推送到 Elasticsearch。这种方法通过启用并行处理来提高执行时间。
我们的数据仓库连接器在将数据推送到最终表之前,先将数据存储在一个暂存表中,这是出于成本和性能的考虑。由于 Elasticsearch 的架构和查询语言,我们也能够避免这个中间步骤,直接将处理过的记录流发送到 Elasticsearch 索引,通过批量 API。
在Elasticsearch中处理更新和删除
PeerDB 支持使用 Elasticsearch 作为 CDC 和查询复制的目标。在大多数情况下,我们推荐使用 CDC,因为它的使用更简单,可靠性更高,而且能够将 DELETE 复制到 Elasticsearch。然而,这限制了在加载到 Elasticsearch 之前可以进行的转换的范围。
为了在 Elasticsearch 侧支持去重,我们需要一个对每个文档保持一致的唯一 ID,这样我们就可以根据源更新或删除它。对于主键中只有一列的表,可以使用该列的值。对于主键中有多列的表,我们选择将列的值一起哈希,从而得到一个小的唯一标识符,无论行的宽度如何。
代码语言:javascript复制// 简化的 Go 代码
func primaryKeyColsHash(record []any, colIndices []int) string {
hasher := sha256.New()
for _, colIndex := range colIndices {
// 将值写入哈希器
_, _ = fmt.Fprint(hasher, record[colIndex])
}
hashBytes := hasher.Sum(nil)
return base64.RawURLEncoding.EncodeToString(hashBytes)
}
代码语言:javascript复制# PeerDB上传到Elasticsearch的样本文档。
# 注意 _id 字段是主键列 id 和 c1 的(base64 编码的)哈希值。
{
"_index": "public.oss2",
"_id": "SAgdSqEaQyGYWxOo8Dj2s0DbXsQXLTC_CWlds8-c4kY",
"_version": 1,
"_seq_no": 0,
"_primary_term": 1,
"found": true,
"_source": {
"c1": 434017,
"c2": 922856,
"id": 8,
"t": "pgbenchinsertc4b998821cc6b161e65489b3",
"updated_at": "2024-05-08T18:33:39.031107Z"
}
}
查询复制可以以追加模式进行,其中任何变化都会在 Elasticsearch 中创建一个新文档,或者以 upsert 模式进行,其中一些列被指定为键列,这些列在类似于 CDC 的方式中进行去重。
数据类型的动态映射
默认情况下,PeerDB 目前使用 Elasticsearch 的动态映射来自动根据索引中的文档内容推断出数据类型映射。在实践中,数字类型被映射为 long
或 float
,时间戳类型被映射为 date
,大多数其他类型被映射为 text
。更详细的映射可以在这里找到。这对许多用例都有效。如果需要,用户可以在手动创建索引时提供显式映射,PeerDB 将向此索引加载文档。
结论
Elasticsearch 连接器处于测试阶段 -- 我们已经有客户使用 PeerDB 将数十亿行从 Postgres 移动到 Elasticsearch。如果你是一个 Elasticsearch 用户,并希望使用 PeerDB 将数据从 Postgres 复制到 Elasticsearch,请试试 PeerDB!我们很乐意帮助你或者得到你的反馈:
- 免费试用 PeerDB 云。
- 访问 PeerDB 的 GitHub 仓库以开始使用。
- 加入我们的 Slack 并打个招呼!