Edge2AI之使用 Flink/SSB 进行CDC捕获

2022-04-27 17:00:07 浏览数 (1)

在本次实验中,您将使用 Cloudera SQL Stream Builder来捕获和处理来自外部数据库中活动的更改。

介绍

Flink 和 SQL Stream Builder 使用 Debezium 库内置了对变更数据捕获 (CDC) 的支持。

Debezium 是一个 CDC 工具,可以将 MySQL、PostgreSQL、Oracle、Microsoft SQL Server 和许多其他数据库的实时变化流式传输到 Kafka。Debezium 为变更日志提供统一格式的Schema,并支持使用 JSON 和 Apache Avro来序列化消息。

Flink 支持将 Debezium JSON 和 Avro 消息解释为 INSERT/UPDATE/DELETE 消息到 Flink SQL 系统中。在许多情况下,这对于利用此功能很有用,例如

  • 将增量数据从数据库同步到其他系统
  • 审核日志
  • 数据库的实时物化视图
  • 数据库表的临时连接更改历史记录等。

有关在 Flink 中使用 Debezium 的更多信息,请查看Flink 文档

数据库前置要求

笔记

本节中的详细信息仅供您参考。车间的所有这些设置都已经完成。要开始实验,请跳到下一部分。

在本次实验中,您将从 PostgreSQL 数据库中捕获变更日志信息。

Debezium 正常工作所需的数据库配置已经为您完成,并且超出了实验的范围。本节让您了解已为 PostgreSQL 数据库完成的准备步骤。有关其他类型数据库的更多信息和/或指南,请参阅 Flink 和 Debezium 官方文档。

PostgreSQL 版本

PostgreSQL 10 及更高版本已内置支持pgoutput我们在本次实验中使用的解码器。我们建议使用这些版本之一。

主机连接

数据库管理员必须确保允许 Flink 作业连接到 PostgreSQL 数据库。这是通过配置pg_hba.conf配置文件以允许来自运行 Flink 和 SSB 的主机的连接来完成的。

下面的配置使用通配符来允许从所有主机到所有数据库的连接,如cdc_user. 这可以根据需要更具体到选定的数据库和主机。

代码语言:javascript复制
#    DATABASE  USER  ADDRESS     METHOD
host all       all   0.0.0.0/0  md5
数据库配置

PostgreSQL 数据库服务器还必须配置为启用逻辑数据复制并进行调整以支持所需数量的 Debezium 连接到复制槽。

下面的postgresql.conf配置是作为本次实验的准备工作的简单配置。对于真实世界的用例,请查阅 PostgreSQL 和 Debezium 文档并根据预期负载配置数据库。

代码语言:javascript复制
wal_level = logical
max_wal_senders = 10
max_replication_slots = 10
数据库权限

在 Flink/Debezium 可以连接到 PostgreSQL 数据库以收集更改日志数据之前,有必要:

  • 向提供给 Debezium 的用户授予适当的权限;和
  • 在将捕获更改日志的数据库中创建必要的发布和复制槽。

如果提供给 Flink/Debezium 的用户是数据库超级用户,则 Debezium 连接器将负责创建所需的发布和复制槽。这是更简单的入门方式,但也可能由于 Flink/Debezium 的特权提升而产生安全问题。

下面的命令将cdc_user用户/角色创建为超级用户:

代码语言:javascript复制
-- Create cdc_user as a superuser
CREATE ROLE cdc_user WITH SUPERUSER CREATEDB CREATEROLE LOGIN ENCRYPTED PASSWORD 'supersecret1';

或者,您可以要求数据库管理员设置数据库,以便 Debezium 用户不需要是超级用户,并且只具有连接和捕获来自特定数据库的更改日志的权限。

以下命令将cdc_user用户/角色创建为超级用户:

代码语言:javascript复制
-- Create cdc_user user with only LOGIN and REPLICATION privileges
CREATE ROLE cdc_user WITH REPLICATION LOGIN PASSWORD 'supersecret1';
-- Grant privileges on the cdc_test database to the cdc_user
GRANT CONNECT ON DATABASE cdc_test TO cdc_user;
-- Connect to the cdc_test database
c cdc_test
-- Create a replication slot named 'flink'
SELECT * FROM pg_create_logical_replication_slot('flink', 'pgoutput',  false);
-- Create a publication named 'dbz_publication' in the cdc_test database
CREATE PUBLICATION dbz_publication FOR ALL TABLES;
-- Grant SELECT privileges on all tables of the cdc_test database to the cdc_user
GRANT SELECT ON ALL TABLES IN SCHEMA public TO cdc_user;
-- Grant SELECT privileges on all *future* tables of the cdc_test database to the cdc_user
ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT ALL ON TABLES TO cdc_user;

如前所述,有关上述内容的详细信息,请参阅官方文档。解释上述命令超出了本次实验的范围。

实验的 PostgreSQLcdc_test数据库已经建立了以下内容:

  • 一个名为dbz_publication的发布
  • 一个名为flink的复制槽

实验总结

  • 实验 1 - 创建数据库表
  • 实验 2 - 在 SSB 中创建 PostgreSQL CDC 表
  • 实验 3 - 捕获表更改
  • 实验 4 - 复制表更改
  • 实验 5 - 捕获变更日志事件

实验 1 - 创建数据库表

在本次实验中,您将在cdc_test数据库中创建一个表,在其上生成一些事务,并使用 Flink/SSB 捕获和复制这些事务。

让我们从连接到 PostgreSQL 并创建表开始。

  1. 使用 SSH 连接到您的集群主机
  2. 执行以下命令以连接到cdc_test数据库cdc_user。此用户的密码是supersecret1。
代码语言:javascript复制
psql --host localhost --port 5432 --username cdc_user cdc_test

连接后,您应该会看到psql带有数据库名称的提示,如下所示:

代码语言:javascript复制
cdc_test=#
  1. 运行以下命令来创建您的测试表:
代码语言:javascript复制
CREATE TABLE transactions (
id INT,
name TEXT,
PRIMARY KEY (id)
);

ALTER TABLE transactions REPLICA IDENTITY FULL;

这些ALTER TABLE … REPLICA IDENTITY FULL命令允许捕获表UPDATE和DELETE事务。没有这个设置,Debezium 只能捕获INSERT事件。

  1. 将一些初始数据插入到您的表中并选择它以验证它是否已正确插入:
代码语言:javascript复制
INSERT INTO transactions VALUES (100, 'flink is awesome');
SELECT * FROM transactions;
  1. 尝试一些其他psql命令:
代码语言:javascript复制
d- 列出数据库中的所有表
d <table_name>- 描述表结构

实验 2 - 在 SSB 中创建 PostgreSQL CDC 表

在本实验中,您将设置一个 SSB 表来捕获该transactions表的变更日志流。

  1. 在 SSB UI 中,单击Console(在左侧栏上)> Compose
  2. 单击模板> postgres-cdc

您会注意到 SQL 编辑器框将填充一个语句的通用模板,以使用postgres-cdc连接器创建一个表。

在接下来的步骤中,您将定制此语句以匹配PostgreSQL transaction表的结构并使用必要的属性对其进行配置。

  1. transactions您在步骤 1 中创建的表有两列:id, 类型integer和name, 类型text。

在 Flink ANSI SQL 方言中,上述的等价数据类型如下:

PostgreSQL

Flink

integer

INT

text

STRING

更改CREATE TABLE模板以将 SSB 表重命名为transactions_cdc并将列和数据类型与transactions表的列和数据类型匹配。

更改后,语句的开头应如下所示:

代码语言:javascript复制
CREATE TABLE transactions_cdc (
id   INT,
name STRING
) WITH (
...
  1. 该模板在子句中有许多属性WITH,允许您配置表以连接到您的数据库和表。有些属性必须指定,有些属性是可选的并且在模板中被注释掉。

在本实验中,您将设置所有必需的属性以及一些可选属性。您可以忽略其他可选属性。

在语句中设置以下必需属性:

代码语言:javascript复制
connector:     postgres-cdc
hostname:      <CLUSTER_HOSTNAME>
username:      cdc_user
password:      supersecret1
database-name: cdc_test
table-name:    transactions
schema-name:   public
  1. 在您的语句中取消注释并设置以下可选属性。

笔记

确保您的最终语句使用逗号分隔属性分配。

代码语言:javascript复制
decoding.plugin.name: pgoutput
debezium.publication.name: dbz_publication
debezium.slot.name: flink

您的最终CREATE TABLE语句应如下所示:

代码语言:javascript复制
CREATE TABLE transactions_cdc (
id   INT,
name STRING
) WITH (
'connector' = 'postgres-cdc',
'hostname' = '<CLUSTER_HOSTNAME>',
'username' = 'cdc_user',
'password' = 'supersecret1',
'database-name' = 'cdc_test',
'table-name' = 'transactions',
'schema-name' = 'public',
'decoding.plugin.name' = 'pgoutput',
'debezium.publication.name' = 'dbz_publication',
'debezium.slot.name' = 'flink',
'debezium.snapshot.mode' = 'initial'
);
  1. 单击执行按钮以执行语句并创建transactions_cdc表。
  2. 单击Tables选项卡并导航到新创建的表以验证其详细信息:

实验 3 - 捕获表更改

您在上面创建的表接收该transactions表的更改流。

正如您在CREATE TABLE上面的语句中可能已经注意到的那样,该表正在使用快照模式initial。此模式在第一次执行查询时获取表内容的完整快照,然后相同查询的后续运行可以读取自上次执行以来更改的内容。还有许多其他快照模式。有关可用模式及其行为的详细信息,请参阅Debezium PostgreSQL 连接器文档。

在本实验中,您将探索在 SSB 中捕获变更日志。

  1. 在SSB UI Console的Compose窗格中,单击Settings选项卡并选择“ Sample all messages ”示例行为。

默认情况下,当您在 SSB 中运行查询时,UI 中只会显示一小部分选定的消息(每秒一条消息)。这可以避免减慢 UI 并导致作业出现性能问题。

在这里,由于数据量很小,并且我们要验证是否已捕获所有更改日志消息,因此您正在设置 SSB 以在 UI 中显示所有消息。

  1. 单击SQL选项卡并执行以下查询:
代码语言:javascript复制
SELECT *
FROM transactions_cdc

由于这是作业第一次运行,PostgreSQL 连接器将对现有表进行完整快照,您应该在结果选项卡上看到其内容:

  1. 现在,尝试停止作业并再次执行它。

发生了什么?您是否再次获得初始快照数据?为什么?

当使用initial快照模式时,Flink 会跟踪最后处理的变更日志并将此信息存储在作业状态中。当您在 SSB 中停止作业时,它会创建作业状态的保存点,可用于稍后恢复执行。

但是,默认情况下,在启动作业时不会自动使用保存点,并且每次执行相同的查询都从头开始,导致 PostgreSQL 连接器对整个表进行另一个初始快照。

在接下来的步骤中,您将启用保存点。

  1. 停止工作。
  2. 再次单击Settings选项卡,这次将Restore From Savepoint属性设置为true。
  1. 返回SQL选项卡并再次执行作业。

你又看到第一行了吗?您不应该这样做,因为该作业从上次执行停止的同一点恢复,并且已经读取了初始行快照。

由于没有数据进入,您应该只会在“日志”选项卡中看到以下行,计数器在增加,表明作业正在运行:

  1. 现在,在作业执行时,通过 SSH 再次连接到您的集群主机,并使用以下命令连接到 PostgreSQL 数据库:

psql --host localhost --port 5432 --username cdc_user cdc_test

在psql提示符下,执行以下命令以在transactions表中再插入一条记录并使用 id 更新记录101:

代码语言:javascript复制
INSERT INTO transactions
VALUES (101, 'SQL Stream Builder rocks!');


UPDATE transactions
SET name = 'Flink is really awesome!!!'
WHERE id = 100;

检查 SSB UI,您现在应该会看到已修改的 2 行的新状态。

  1. 单击停止以停止 Flink 作业。

实验 4 - 复制表更改

在上一个实验中,您可视化了应用到 SSB 中的数据库表的更改的捕获。现在您将创建一个 SSB 作业以将捕获的更改复制到另一个表。

由于我们已经有一个 PostgreSQL 数据库可用,我们将在同一个数据库中创建目标表。不过,您可以通过 JDBC 或其他可用的 Flink/SSB 连接器(例如 Kudu)将数据复制到任何其他可访问的数据库。

  1. 在 SSH 会话中再次连接到 PostgreSQL 数据库并创建一个新表来接收复制的数据:
代码语言:javascript复制
psql --host localhost --port 5432 --username cdc_user cdc_test
CREATE TABLE trans_replica (
id INT,
name TEXT,
PRIMARY KEY (id)
);
  1. 在 SSB UI 中,打开 SQL 编辑器并键入以下命令,但不要执行它
代码语言:javascript复制
INSERT INTO trans_replica
SELECT *
FROM transactions_cdc
  1. 单击模板 > jdbc。这将在 SQL 编辑器中添加 JDBC 表的模板。

请注意,SSB 将正在创建的表的结构与您在上一步中键入的查询结果相匹配!

  1. 但是,该CREATE TABLE模板没有指定主键,这是允许更新和删除所必需的。

将PRIMARY KEY (id) NOT ENFORCED子句添加到语句中,如下所示。name请注意,列规范和PRIMARY KEY子句之间需要逗号。

  1. 为表指定以下属性(其余的可以从编辑器中删除):
代码语言:javascript复制
connector:  jdbc
url:        jdbc:postgresql://<CLUSTER_HOSTNAME>:5432/cdc_test
table-name: trans_replica
username:   cdc_user
password:   supersecret1
driver:     org.postgresql.Driver
  1. 配置表属性后,单击执行以启动作业。

表将被创建,INSERT … SELECT …语句将开始执行以将数据从transactions表中复制到trans_replica表中。

  1. 返回 SSH 会话,在psql提示符下,执行以下语句以在transactions表上生成活动并验证更改是否已成功复制到该trans_replica表。
代码语言:javascript复制
-- Check the contents of both tables - they should be the same
SELECT * FROM transactions;
SELECT * FROM trans_replica;


-- Generate activity on transactions (INSERT, UPDATE, DELETE)
INSERT INTO transactions VALUES (103, 'Live long and replicate with SSB and Flink.');
UPDATE transactions SET name = 'Flink is *REALLY* awesome!!!' where id = 100;
DELETE FROM transactions WHERE id = 101;


-- Check the contents of both tables - they should still be the same!
SELECT * FROM transactions;
SELECT * FROM trans_replica;
  1. 单击停止以停止 Flink 作业。

实验 5 - 捕获变更日志事件

也可以使用 SSB/Debezium 来捕获变更日志事件(INSERT、UPDATE和DELETE)。这会将其他元数据暴露给流,例如对表执行的操作类型以及更改列的前后值。

这种类型的信息对于分析数据如何变化的用例可能很重要,而不是简单地查看它的最新状态。

在本实验中,您将创建一个 SSB 作业,该作业从源数据库中读取更改日志并将其发布到 Kafka 中的主题,以及 Debezium 提供的其他元数据信息。

  1. 在 SSB UI 中,打开 SQL 编辑器并键入以下命令,但不要执行它:
代码语言:javascript复制
INSERT INTO trans_changelog
SELECT *
FROM transactions_cdc
  1. 单击模板 > kafka。这将在 SQL 编辑器中添加Kafka Json 表的模板。

请注意,SSB 将正在创建的表的结构与您在上一步中键入的查询结果相匹配!

  1. 为表指定以下属性(其余的可以从编辑器中删除):
代码语言:javascript复制
connector:                    kafka
properties.bootstrap.servers: <CLUSTER_HOSTNAME>:9092
topic:                        trans_changelog
key.format:                   json
key.fields:                   id
value.format:                 debezium-json


CREATE TABLE  `ssb`.`ssb_default`.`trans_changelog` (
`id` INT,
`name` VARCHAR(2147483647)
) WITH (
'connector' = 'kafka', -- Specify what connector to use, for Kafka it must use 'kafka'.
'key.format' = 'json', -- Data format'
'properties.bootstrap.servers' = '10.0.211.167:9092', -- Comma separated list of Kafka brokers.
'topic' = 'trans_changelog', -- To read data from when the table is used as source. It also supports topic list for source by separating topic by semicolon. Note, only one of "topic-pattern" and "topic" can be specified for sources. When the table is used as sink, the topic name is the topic to write data to. Note topic list is not supported for sinks.
'key.fields' = 'id', -- Defines an explicit list of physical columns from the table schema that configure the data type for the key format. By default, this list is empty and thus a key is undefined. The list should look like 'field1;field2'.
'value.format' = 'debezium-json' -- Specifies the format identifier for encoding value data. The format used to deserialize and serialize the value part of Kafka messages. Note: Either this option or the 'format' option are required.
);


INSERT INTO trans_changelog
SELECT *
FROM transactions_cdc
  1. 配置表属性后,单击执行以启动作业。

该表将被创建,该INSERT … SELECT …语句将开始执行以将更改日志从transactions表复制到trans_changelogKafka 中的主题。

  1. 回到 SSH 会话,在psql提示符下,执行以下语句以在transactions表上生成活动。
代码语言:javascript复制
INSERT INTO transactions VALUES (200, 'This is an insert.');
UPDATE transactions SET name = 'This is an update.' WHERE id = 200;
DELETE FROM transactions WHERE id = 200;
  1. 您将使用 Streams Messaging Manager (SMM) UI 检查写入trans_changelog主题的内容。

导航到 SMM UI:Cloudera Manager > Clusters > SMM > Streams Messaging Manager Web UI

  1. 在 SMM UI 中,单击左侧栏上的主题图标 ( )。
  2. 在搜索框中键入“trans_changelog”以过滤该主题,然后单击该主题的放大镜图标 () 以查看该主题的内容:

从上面的截图中,您可以注意到以下内容:

  • INSERT:操作生成单个op=c(用于Create)更改日志事件。该事件的before值null,因为该数据事先不存在。该after值是要插入的数据。
  • UPDATE:Debezium 将该操作转换为DELETE事件 ( op=d),然后是INSERT事件 ( op=c)。事件的before值DELETE是更新前的记录状态,而事件的after值INSERT是更新后的状态。
  • DELETE:操作会生成一个带有op=d. 该after值是null因为操作完成后数据不再存在。

结论

在本次实验中,您学习了如何使用 SQL Stream Builder (SSB)、Flink 和基于 Debezium 的 PostgreSQL 连接器 ( postgres-cdc) 从关系数据库中提取变更日志数据,并在 SSB 中以不同方式使用它。

0 人点赞