Streaming Data Changes from MySQL to Elasticsearch
MySQL Binary Log
包含了针对数据库执行DDL(Data Definition Language)和DML(Data Manipulation Language)操作的完整事件,其被广泛应用于数据复制和数据恢复场景。本文所分享的就是一种基于MySQL Binary Log
特性实现增量数据近实时同步到Elasticsearch的一种技术。要想实现增量数据的同步,仅仅有binary log是不够的,我们还需要一款变更数据捕获(CDC,Change Data Capture)工具,可能大家很快就会想到阿里巴巴开源的Canal
。没错,但本文今天给大家分享一款新的开源工具:Debezium
。Debezium构建于Kafka之上,它为MySQL
、MongoDB
、PostgreSQL
、Orcale
和Cassandra
等一众数据库量身打造了一套完全适配于Kafka Connect
的source connector
。首先,source connector会实时获取由INSERT
、UPDATE
和DELETE
操作所触发的数据变更事件;然后,将其发送到Kafka topic中;最后,我们使用sink connector将topic中的数据变更事件同步到Elasticsearch中去,从而最终实现数据的近实时流转,如下图所示。
Debezium支持Standalone
、Primary and replica
、High available clusters
和Multi-primary
等多种拓扑结构。
1 安装MySQL
1.1 解压与配置
代码语言:javascript复制tar -xzvf mysql-8.0.21-el7-x86_64.tar.gz -C /root/debezium/
在mysql-8.0.21-el7-x86_64根目录下,新增my.cnf
文本文件,然后将以下内容复制到my.cnf
文件内。
[client]
port=3306
socket=/root/debezium/mysql-8.0.21-el7-x86_64/mysql.sock
[mysqld]
port=3306
socket=/root/debezium/mysql-8.0.21-el7-x86_64/mysql.sock
key_buffer_size=16M
max_allowed_packet=128M
basedir=/root/debezium/mysql-8.0.21-el7-x86_64
datadir=/root/debezium/mysql-8.0.21-el7-x86_64/data
server-id=101
log-bin=mysql-bin
binlog-format=ROW
binlog-row-image=FULL
binlog_expire_logs_seconds=86400
[mysqldump]
quick
1.2 初始化
代码语言:javascript复制./bin/mysqld --defaults-file=/root/debezium/mysql-8.0.21-el7-x86_64/my.cnf --initialize
执行上述初始化操作后,仔细观察控制台:你会发现root
账号已经生成了一个临时密码。
2020-12-28T02:55:20.774965Z6 [Note] [MY-010454] [Server] Atemporarypasswordisgeneratedforroot@localhost:&,Yot7iMeT_T
1.3 启动MySQL Server
初始化操作并没有启动MySQL Server,所以你还需要手动启动MySQL Server。
代码语言:javascript复制./bin/mysqld --defaults-file=/root/debezium/mysql-8.0.21-el7-x86_64/my.cnf --user=root
1.4 重置'root'账号密码
代码语言:javascript复制USE mysql;
ALTER USER 'root'@'localhost' IDENTIFIED BY 'root账号新密码';
FLUSH PRIVILEGES;
1.5 更新远程访问权限
代码语言:javascript复制USE mysql;
UPDATE USER SET host = '%' WHERE user = 'root';
FLUSH PRIVILEGES;
2 安装Kafka
代码语言:javascript复制配置不多赘述,自行上网解决。目前Kafka依赖Zookeeper组件,故其内置了Zookeeper,我们可以直接使用,无需单独下载。
nohup ./bin/zookeeper-server-start.sh config/zookeeper.properties>/dev/null 2>&1 &
nohup ./bin/kafka-server-start.sh config/server.properties>/dev/null 2>&1 &
3 安装Debezium
3.1 解压source connector与sink connector
代码语言:javascript复制tar -xzvf debezium-connector-mysql-1.4.2.Final-plugin.tar.gz -C /root/debezium/connector-plugins
unzip confluentinc-kafka-connect-elasticsearch-11.0.3 -d /root/debezium/connector-plugins
3.2 Kafka Connect
为了更方便、更规范地整合Kafka与其他数据系统,Kafka提供了Kafka Connect
,Kafka Connect定义了source connector
和sink connector
接口规范。如果想从其他数据系统传输数据到Kafka,那么就需要实现source connector接口规范;如果想从Kafka传输数据到其他数据系统,那么就需要实现sink connector接口规范。此外,Kafka Connect还暴露了一套REST API,可以更方便地对connector进行管理。
3.2.1 配置
代码语言:javascript复制#################### connect-distributed.properties ###################
bootstrap.servers=localhost:9092
group.id=connect-cluster
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
# Topic to use for storing offsets.
# This topic should have many partitions and be replicated and compacted.
offset.storage.topic=connect-offsets
offset.storage.replication.factor=1
# Topic to use for storing connector and task configurations.
# This topic should be a single partition, highly replicated.
config.storage.topic=connect-configs
config.storage.replication.factor=1
# Topic to use for storing statuses.
# This topic can have multiple partitions and should be replicated and compacted.
status.storage.topic=connect-status
status.storage.replication.factor=1
# Hostname & Port for the REST API to listen on.
rest.host.name=localhost
rest.port=8083
# Set to a list of filesystem paths separated by commas (,) to enable class loading isolation for plugins
# The list should consist of top level directories.
# source connector和sink connector的依赖路径
plugin.path=/root/debezium/connector-plugins/
3.2.2 创建topic
代码语言:javascript复制./bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic connect-offsets --partitions 3 --replication-factor 1 --config cleanup.policy=compact
./bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic connect-configs --partitions 1 --replication-factor 1 --config cleanup.policy=compact
./bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic connect-status --partitions 3 --replication-factor 1 --config cleanup.policy=compact
3.2.3 启动
代码语言:javascript复制nohup ./bin/connect-distributed.sh config/connect-distributed.properties>/dev/null 2>&1 &
3.3 注册debezium source connector
参数 | 描述 | 默认值 |
---|---|---|
include.schema.changes | 若值为true,那么source connector会将schema变更事件发布到kakfa中;topic的命名和database.server.name一致 | true |
tombstones.on.delete | 若值为true,那么source connector针对delete操作会额外生成一个墓碑事件 | true |
database.server.id | 和mysql中server_id值一致 | 无 |
database.include.list | 指定数据库名称,多个数据库以逗号分割 | 无 |
database.history.kafka.topic | 指定保存mysql schema history的topic名称,该topic仅能由debezium自己消费 | 无 |
{
"name": "debezium-mysql-source-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"include.schema.changes": true,
"tombstones.on.delete": true,
"database.hostname": "10.254.9.82",
"database.port": "3306",
"database.user": "root",
"database.password": "Nz_3@sMw7P",
"database.server.id": "101",
"database.server.name": "master",
"database.include.list": "inventory",
"database.history.kafka.bootstrap.servers": "localhost:9092",
"database.history.kafka.topic": "master.schema.history"
}
}
代码语言:javascript复制curl -X POST -H 'Content-Type: application/json' -d '@/root/debezium/connector-configs/debezium-mysql-source-connector.json' http://localhost:8083/connectors
当source connector注册成功后,再次检查topic,你会发现两个全新的topic,它们分别是:master和master.schema.history,二者差异如下表所示。
topic名称 | 保存内容 | topic消费方 |
---|---|---|
master | schema变更事件,但仅仅涉及database.include.list所指定的数据库 | 第三方消费者 |
master.schema.history | schema变更事件,涉及所有数据库 | debezium |
3.4 注册confluent sink connector
参数 | 描述 | 默认值 |
---|---|---|
key.ignore | 若值为false,那么Elasticsearch文档ID将和MySQL保持一致 | false |
schema.ignore | 若值为false,那么Elasticsearch将禁用动态映射特性,转而根据schema来定义文档中字段的数据类型 | false |
write.method | 若值为UPSERT,那么Elasticsearch会根据文档是否存在来进行INSERT亦或UPDATE操作 | INSERT |
behavior.on.null.values | 若值为DELETE,那么sink connector将会根据文档ID删除该文档 | FAIL |
transforms.unwrap.type | ElasticsearchSinkConnector主要用于数据扁平化处理,因为Debezium所生成的数据变更事件是一种多层级的数据结构,这不利于在Elasticsearch中保存,所以需要对这种结构进行扁平化处理 | 无 |
transforms.unwrap.drop.tombstone | 若值为false,墓碑事件不会被丢弃 | true |
transforms.unwrap.delete.handling.mode | Debezium会为每个DELETE操作生成删除事件和墓碑事件;若值为none,那么墓碑事件将会保留 | drop |
transforms.key.type | ExtractField$Key可以从Debezium数据变更事件的Key中抽取特定字段值 | 无 |
transforms.key.field | 指定抽取字段 | 无 |
{
"name": "confluent-elasticsearch-sink-connector",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"tasks.max": "1",
"topics": "master.inventory.customers",
"batch.size": "500",
"key.ignore": false,
"schema.ignore": false,
"write.method": "UPSERT",
"connection.url": "http://10.254.8.14:9200",
"connection.username": "elastic",
"connection.password": "Qwe123!@cmss",
"connection.timeout.ms": "3000",
"read.timeout.ms": "5000",
"behavior.on.null.values": "DELETE",
"transforms": "unwrap, key",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": false,
"transforms.unwrap.delete.handling.mode": "none",
"transforms.key.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.key.field": "id"
}
}
代码语言:javascript复制curl -X POST -H 'Content-Type: application/json' -d '@/root/debezium/connector-configs/confluent-elasticsearch-sink-connector.json' http://localhost:8083/connectors
当你完成source connector和sink connector的注册后,你可以通过通过Kafka Connect提供的REST API来查看当前已注册的连接器,具体如下:
代码语言:javascript复制curl --location --request GET 'http://10.254.8.14:8083/connectors'
-----------------------------------
[
"confluent-elasticsearch-sink-connector",
"debezium-mysql-source-connector"
]
kafka transformation
- Source connectors pass records through the transformation before writing to the Kafka topic.
- Sink connectors pass records through the transformation before writing to the sink.
3.5 验证
3.5.1 插入数据
代码语言:javascript复制INSERT INTO `inventory`.`customers`
(`id`,
`first_name`,
`last_name`,
`email`,
`create_time`,
`update_time`)
VALUES ( 1001,
'optimus',
'prime',
'optimus@prime.com',
'2021-03-03 13:55:07.000000',
'2021-03-21 13:55:11.000000' );
代码语言:javascript复制GET /master.inventory.customers/_search
{
"query": {
"match_all": {}
}
}
-----------------------------------
{
"took": 750,
"timed_out": false,
"_shards": {
"total": 1,
"successful": 1,
"skipped": 0,
"failed": 0
},
"hits": {
"total": {
"value": 1,
"relation": "eq"
},
"max_score": 1.0,
"hits": [
{
"_index": "master.inventory.customers",
"_type": "_doc",
"_id": "1001",
"_score": 1.0,
"_source": {
"id": 1001,
"first_name": "optimus",
"last_name": "prime",
"email": "optimus@prime.com",
"create_time": 1614779707000000,
"update_time": 1616334911000000
}
}
]
}
}
-----------------------------------
3.5.2 删除数据
代码语言:javascript复制DELETE FROM `inventory`.`customers` WHERE `id` = 1002
代码语言:javascript复制GET /master.inventory.customers/_search
{
"query": {
"match_all": {}
}
}
-----------------------------------
{
"took": 435,
"timed_out": false,
"_shards": {
"total": 1,
"successful": 1,
"skipped": 0,
"failed": 0
},
"hits": {
"total": {
"value": 0,
"relation": "eq"
},
"max_score": null,
"hits": []
}
}
-----------------------------------
3.5.3 更新数据
代码语言:javascript复制UPDATE `inventory`.`customers`
SET `email` = 'optimus_prime@transformers.com'
WHERE `id` = 1001
代码语言:javascript复制GET /master.inventory.customers/_search
{
"query": {
"match_all": {}
}
}
-----------------------------------
{
"took": 1,
"timed_out": false,
"_shards": {
"total": 1,
"successful": 1,
"skipped": 0,
"failed": 0
},
"hits": {
"total": {
"value": 1,
"relation": "eq"
},
"max_score": 1.0,
"hits": [
{
"_index": "master.inventory.customers",
"_type": "_doc",
"_id": "1001",
"_score": 1.0,
"_source": {
"id": 1001,
"first_name": "optimus",
"last_name": "prime",
"email": "optimus_prime@transformers.com",
"create_time": 1614779707000000,
"update_time": 1616334911000000
}
}
]
}
}
-----------------------------------
3.5.4 更新主键
代码语言:javascript复制UPDATE `inventory`.`customers`
SET `id` = 1002
WHERE `id` = 1001
代码语言:javascript复制GET /master.inventory.customers/_search
{
"query": {
"match_all": {}
}
}
-----------------------------------
{
"took": 2,
"timed_out": false,
"_shards": {
"total": 1,
"successful": 1,
"skipped": 0,
"failed": 0
},
"hits": {
"total": {
"value": 1,
"relation": "eq"
},
"max_score": 1.0,
"hits": [
{
"_index": "master.inventory.customers",
"_type": "_doc",
"_id": "1002",
"_score": 1.0,
"_source": {
"id": 1002,
"first_name": "optimus",
"last_name": "prime",
"email": "optimus_prime@transformers.com",
"create_time": 1614779707000000,
"update_time": 1616334911000000
}
}
]
}
}
-----------------------------------
3.5.5 新增字段
代码语言:javascript复制ALTER TABLE `inventory`.`customers` ADD COLUMN `address` VARCHAR ( 255 ) NULL;
INSERT INTO `inventory`.`customers`
(`id`,
`first_name`,
`last_name`,
`email`,
`create_time`,
`update_time`,
`address`
)
VALUES ( 1001,
'optimus',
'prime',
'optimus@prime.com',
'2021-03-03 13:55:07.000000',
'2021-03-21 13:55:11.000000',
'77 Massachusetts Avenue');
代码语言:javascript复制GET /master.inventory.customers/_search
{
"query": {
"match_all": {}
}
}
-----------------------------------
{
"took": 1,
"timed_out": false,
"_shards": {
"total": 1,
"successful": 1,
"skipped": 0,
"failed": 0
},
"hits": {
"total": {
"value": 1,
"relation": "eq"
},
"max_score": 1.0,
"hits": [
{
"_index": "master.inventory.customers",
"_type": "_doc",
"_id": "1001",
"_score": 1.0,
"_source": {
"id": 1001,
"first_name": "optimus",
"last_name": "prime",
"email": "optimus@prime.com",
"create_time": 1614779707000000,
"update_time": 1616334911000000,
"address": "77 Massachusetts Avenue"
}
}
]
}
}
-----------------------------------
4 总结
本文为大家分享了一种基于Debezium实现增量数据实时流转的方案。当你通过INSERT
指令向MySQL新增一行记录时,那么Elasticsearch中也会实时新增一行记录;当你通过UPDATE
指令向MySQL更新一行记录时,那么Elasticsearch中也会实时对该行记录进行更新;当你通过DELETE
指令向MySQL删除一条记录时,那么Elasticsearch中也会实时删除该行记录。同时,Debezium在应对主键更新亦或字段新增两种场景时,依然有较好的表现。当然,如果你想将存量数据复制到Elasticsearch中,那么建议采用Logstash配合Kafka来实现。
5 参考文档
- https://debezium.io/
- https://docs.confluent.io/kafka-connect-elasticsearch/current/index.html
- https://downloads.apache.org/kafka/2.7.0/kafka_2.13-2.7.0.tgz
- https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/1.4.2.Final/debezium-connector-mysql-1.4.2.Final-plugin.tar.gz
- https://d1i4a15mxbxib1.cloudfront.net/api/plugins/confluentinc/kafka-connect-elasticsearch/versions/11.0.3/confluentinc-kafka-connect-elasticsearch-11.0.3.zip