使用PeerDB实现Postgres到Elasticsearch的实时同步与复制

2024-05-11 09:55:53 浏览数 (2)

PeerDB 团队最近完成了针对 Elasticsearch 的数据集成目标连接器的初步开发,并已进入测试阶段。 EElasticsearch 是一个广泛使用的搜索和分析引擎,它建立在分布式多用户能力的文档数据库之上。在多个行业的数据架构案例中都有 Elasticsearch 的广泛应用。

本文解释了一些通过 Postgres 到 Elasticsearch 的实时同步用例,然后通过一个快速演示展示了使用 PeerDB 进行 Postgres 到 Elasticsearch 复制的高性能和低延迟。最后,我们对连接器的架构进行了高级概述。

Postgres到Elasticsearch复制的使用案例

通过CDC或查询复制从Postgres到Elasticsearch复制的一些常见用例包括:

  1. 大容量数据的高效搜索:Elasticsearch的主要用途是作为一个搜索引擎,即使在海量的数据上也能高效运行。从全文和加权搜索,甚至到使用内置的NLP模型进行复杂的语义搜索,Elasticsearch都非常灵活且可调整。它常用于摄取和索引大量的日志,甚至作为搜索大型网站和内部知识库的支持引擎。
  2. 将数据从规范化转换为文档化:数据模型通常以高度规范化的形式存储在Postgres中,这对于事务完整性非常好,但对于可能需要使用联接或CTE的复杂查询来说就不利了。作为一个文档数据库,Elasticsearch更喜欢以非规范化的形式存储数据。使用PeerDB的查询复制功能,你可以定期将你的数据转换成非规范化的形式,这使得它更适合下游消费者查询。一些处理也可以使用Elasticsearch的摄取管道进行。

使用PeerDB从Postgres到Elasticsearch的低延迟复制

在这一部分,我将通过一个快速演示,介绍如何在变更数据捕获(CDC)模式下,使用 PeerDB 进行 Postgres 到 Elasticsearch 的复制。使用 PeerDB 从 Postgres 到 Elasticsearch 的复制有一些好处,主要的优点是快速的初始加载,和通过不断读取插槽来实现的亚分钟延迟,PeerDB 能够提供这些,因为它专注于 Postgres 的复制。

Postgres设置

你可以在云上或者在本地使用任何Postgres数据库。为了简单起见,我在这个演示中使用了一个在 Docker 容器中本地运行的 Postgres 集群。我们创建了一个名为 oss1 的表,使用一个多值插入语句每秒连续插入1000行。

代码语言:javascript复制
postgres=# CREATE TABLE oss1 (
           id INT PRIMARY KEY GENERATED BY DEFAULT AS IDENTITY,
           c1 INT,
           c2 INT,
           t TEXT,
           updated_at TIMESTAMP WITH TIME ZONE DEFAULT now()
         );
CREATE TABLE
postgres=# INSERT INTO oss1 (c1, c2, t)
SELECT
    generate_series AS c1,
    generate_series * 2 AS c2,
    'text_' || generate_series AS t
FROM 
    generate_series(1, 1000); 
# 每秒运行一次INSERT    
postgres=# watch 1
INSERT 0 1000
INSERT 0 1000
INSERT 0 1000

Elasticsearch设置

你可以在本地或者云虚拟机上使用它的 Docker compose设置来设置一个 Elasticsearch 实例。或者你也可以使用腾讯云 ES 或者 Elasticsearch Cloud。在这个演示中,我使用了本地运行的 Docker compose 设置。

PeerDB设置

你可以使用 PeerDB开源版 或者 PeerDB云版 来部署一个PeerDB实例。在这个演示中,我通过Docker compose在本地部署了PeerDB开源版。

创建对等体和镜像以进行Postgres到Elasticsearch的复制

在 PeerDB 世界中,对等体指的是源数据存储或目标数据存储。你可以使用 PeerDB 的用户界面来创建Postgres和Elasticsearch对等体。然后在源对等体和目标对等体之间创建一个镜像进行数据复制。你可以使用 PeerDB 的用户界面来创建一个从 Postgres 到 Elasticsearch 复制数据的 MIRROR。

我创建了一个基于变更数据捕获(CDC)的 MIRROR,它使用 Postgres 的预写日志(WAL)和逻辑解码来复制数据。它包括两个步骤:

  1. 初始加载:首先对 Postgres 中现有的数据进行完全一致的快照,并将其复制到 Elasticsearch;通过 PeerDB 的并行快照,你可以期望显著地加快初始加载速度。我们已经看到在几个小时内移动了几个太字节的数据,而不是几天。
  2. 变更数据捕获(CDC):一旦初始加载完成,PeerDB 将不断从逻辑复制插槽中读取 Postgres 中的变化,并将这些变化复制到 Elasticsearch。由于我们的流式架构,你可以期望对于持续运行的镜像到 Elasticsearch 的数据延迟在几秒钟的范围内。

初始加载应该很快就能完成,而且应该能在创建的 Elasticsearch 索引中看到行。在进入连续的 CDC 模式后,新的行应该会随着它们被插入而显示出来。下面附上了一个显示 Postgres 到 Elasticsearch CDC 镜像的快速视频。

架构和设计选择

我们之前已经详细讨论过 PeerDB 的流式架构,但总的来说,PeerDB 利用 Go 的 goroutines 和通道,通过逻辑复制有效地从 PostgreSQL 读取数据,然后通过 Bulk API 批量将数据推送到 Elasticsearch。这种方法通过启用并行处理来提高执行时间。

我们的数据仓库连接器在将数据推送到最终表之前,先将数据存储在一个暂存表中,这是出于成本和性能的考虑。由于 Elasticsearch 的架构和查询语言,我们也能够避免这个中间步骤,直接将处理过的记录流发送到 Elasticsearch 索引,通过批量 API。

在Elasticsearch中处理更新和删除

PeerDB 支持使用 Elasticsearch 作为 CDC 和查询复制的目标。在大多数情况下,我们推荐使用 CDC,因为它的使用更简单,可靠性更高,而且能够将 DELETE 复制到 Elasticsearch。然而,这限制了在加载到 Elasticsearch 之前可以进行的转换的范围。

为了在 Elasticsearch 侧支持去重,我们需要一个对每个文档保持一致的唯一 ID,这样我们就可以根据源更新或删除它。对于主键中只有一列的表,可以使用该列的值。对于主键中有多列的表,我们选择将列的值一起哈希,从而得到一个小的唯一标识符,无论行的宽度如何。

代码语言:javascript复制
// 简化的 Go 代码
func primaryKeyColsHash(record []any, colIndices []int) string {
    hasher := sha256.New()

    for _, colIndex := range colIndices {
        // 将值写入哈希器
        _, _ = fmt.Fprint(hasher, record[colIndex])
    }
    hashBytes := hasher.Sum(nil)
    return base64.RawURLEncoding.EncodeToString(hashBytes)
}

代码语言:javascript复制
# PeerDB上传到Elasticsearch的样本文档。
# 注意 _id 字段是主键列 id 和 c1 的(base64 编码的)哈希值。
{
  "_index": "public.oss2",
  "_id": "SAgdSqEaQyGYWxOo8Dj2s0DbXsQXLTC_CWlds8-c4kY",
  "_version": 1,
  "_seq_no": 0,
  "_primary_term": 1,
  "found": true,
  "_source": {
    "c1": 434017,
    "c2": 922856,
    "id": 8,
    "t": "pgbenchinsertc4b998821cc6b161e65489b3",
    "updated_at": "2024-05-08T18:33:39.031107Z"
  }
}

查询复制可以以追加模式进行,其中任何变化都会在 Elasticsearch 中创建一个新文档,或者以 upsert 模式进行,其中一些列被指定为键列,这些列在类似于 CDC 的方式中进行去重。

数据类型的动态映射

默认情况下,PeerDB 目前使用 Elasticsearch 的动态映射来自动根据索引中的文档内容推断出数据类型映射。在实践中,数字类型被映射为 longfloat,时间戳类型被映射为 date,大多数其他类型被映射为 text。更详细的映射可以在这里找到。这对许多用例都有效。如果需要,用户可以在手动创建索引时提供显式映射,PeerDB 将向此索引加载文档。

结论

Elasticsearch 连接器处于测试阶段 -- 我们已经有客户使用 PeerDB 将数十亿行从 Postgres 移动到 Elasticsearch。如果你是一个 Elasticsearch 用户,并希望使用 PeerDB 将数据从 Postgres 复制到 Elasticsearch,请试试 PeerDB!我们很乐意帮助你或者得到你的反馈:

  1. 免费试用 PeerDB 云。
  2. 访问 PeerDB 的 GitHub 仓库以开始使用。
  3. 加入我们的 Slack 并打个招呼!

0 人点赞