目录
一、功能详解
1. 背景
2. 核心概念
3. 使用规范
二、实现细节
1. 原理说明
2. 执行阶段说明
(1)准备阶段
(2)存量数据迁移阶段
(3)增量数据同步阶段
(4)规则切换阶段
3. 限流与熔断
三、用例测试
1. 数据迁移
(1)准备测试用例环境
(2)创建数据迁移作业
(3)割接
2. 数据库节点扩容
(1)准备测试用例环境
(2)创建数据迁移作业
(3)割接
参考:
一、功能详解
1. 背景
对于使用单数据库运行的系统来说,如何安全简单地将数据迁移至水平分片的数据库上,一直以来都是一个迫切的需求。对于已经使用了ShardingSphere的用户来说,随着业务规模的快速变化,也可能需要对现有的分片集群进行弹性扩容或缩容。
ShardingSphere在分片算法上提供给用户极大的自由度,但却给弹性伸缩造成了极大的挑战。找寻既能支持自定义的分片算法,又能高效地将数据节点进行扩缩容的方式,是弹性伸缩面临的第一个挑战。
同时,在伸缩过程中,不应该对正在运行的业务造成影响。尽可能减少伸缩时数据不可用的时间窗口,甚至做到用户完全无感知,是弹性伸缩的另一个挑战。
最后,弹性伸缩不应该对现有的数据造成影响。如何保证数据的正确性,是弹性伸缩的第三个挑战。
弹性伸缩过程示意如下图。
支持自定义分片算法,减少数据伸缩及迁移时的业务影响,提供一站式的通用弹性伸缩解决方案,是ShardingSphere弹性伸缩的主要设计目标。ShardingSphere-Scaling是一个提供给用户的通用数据接入迁移及弹性伸缩的解决方案。ShardingSphere-Scaling从 4.1.0 版本开始向用户提供,当前处于 alpha 开发阶段。
2. 核心概念
- 弹性伸缩作业:指一次将数据由旧规则迁移至新规则的完整流程。
- 存量数据:在弹性伸缩作业开始前,数据节点中已有的数据。
- 增量数据:在弹性伸缩作业执行过程中,业务系统所产生的新数据。
3. 使用规范
支持项:
- 将外围数据迁移至ShardingSphere所管理的数据库。
- 将ShardingSphere的数据节点进行扩容或缩容。
不支持项:
- 无主键表扩缩容。
- 复合主键表扩缩容。
- 不支持在当前存储节点之上做迁移,需要准备一个全新的数据库集群作为迁移目标库。
二、实现细节
1. 原理说明
考虑到ShardingSphere的弹性伸缩模块的几个挑战,目前的弹性伸缩解决方案为:临时地使用两个数据库集群,伸缩完成后切换的方式实现,如下图所示。
这种实现方式有以下优点:
- 伸缩过程中,原始数据没有任何影响。
- 伸缩失败无风险。
- 不受分片策略限制。
同时也存在一定的缺点:
- 在一定时间内存在冗余服务器。
- 所有数据都需要移动。
弹性伸缩模块会通过解析旧分片规则,提取配置中的数据源、数据节点等信息,之后创建伸缩作业工作流,将一次弹性伸缩拆解为 4 个主要阶段:
- 准备阶段。
- 存量数据迁移阶段。
- 增量数据同步阶段。
- 规则切换阶段。
弹性伸缩工作流如下图所示。
2. 执行阶段说明
(1)准备阶段
在准备阶段,弹性伸缩模块会进行数据源连通性及权限的校验,同时进行存量数据的统计、日志位点的记录,最后根据数据量和用户设置的并行度,对任务进行分片。
(2)存量数据迁移阶段
执行在准备阶段拆分好的存量数据迁移作业,存量迁移阶段采用 JDBC 查询的方式,直接从数据节点中读取数据,并使用新规则写入到新集群中。
(3)增量数据同步阶段
由于存量数据迁移耗费的时间受到数据量和并行度等因素影响,此时需要对这段时间内业务新增的数据进行同步。不同的数据库使用的技术细节不同,但总体上均为基于复制协议或 WAL 日志实现的变更数据捕获功能。
- MySQL:订阅并解析 binlog;
- PostgreSQL:采用官方逻辑复制 test_decoding。
这些捕获的增量数据,同样会由弹性伸缩模块根据新规则写入到新数据节点中。当增量数据基本同步完成时(由于业务系统未停止,增量数据是不断的),则进入规则切换阶段。
(4)规则切换阶段
在此阶段,可能存在一定时间的业务只读窗口期,通过设置数据库只读或ShardingSphere的熔断机制,让旧数据节点中的数据短暂静态,确保增量同步已完全完成。
这个窗口期时间短则数秒,长则数分钟,取决于数据量和用户是否需要对数据进行强校验。确认完成后,ShardingSphere可通过配置中心修改配置,将业务导向新规则的集群,弹性伸缩完成。
3. 限流与熔断
面对超负荷的流量下,针对某一节点进行熔断和限流,以保证整个数据库集群得以继续运行,是分布式系统下对单一节点控制能力的挑战。熔断指的是阻断 ShardingSphere 和数据库的连接。当某个 ShardingSphere 节点超过负载后,停止该节点对数据库的访问,使数据库能够保证足够的资源为其他节点提供服务。限流是指面对超负荷的请求开启流量限制,以保护部分请求可以得以高质量的响应。
目前实现的限流是在数据迁移或扩缩容过程中,限制源端或目标端的流量。下表为目前提供的熔断语句。
语句 | 说明 | 示例 |
---|---|---|
[ENABLE / DISABLE] READWRITE_SPLITTING (READ)? resourceName [FROM databaseName] | 启用/禁用读库 | ENABLE READWRITE_SPLITTING READ resource_0 |
[ENABLE / DISABLE] INSTANCE instanceId | 启用/禁用 Proxy 实例 | DISABLE INSTANCE instance_1 |
SHOW INSTANCE LIST | 查询 Proxy 实例信息 | SHOW INSTANCE LIST |
SHOW READWRITE_SPLITTING (READ)? resourceName [FROM databaseName] | 查询所有读库的状态 | SHOW READWRITE_SPLITTING READ RESOURCES |
实例级熔断用于多Proxy实例场景下,启用/禁用某个Porxy实例。库级熔断主要用于读写分离场景下启用/禁用读库。
三、用例测试
本节演示两个例子,例1是从MySQL实例迁移到ShardingSphere-Proxy,例2是对现有数据库节点进行扩容。两个例子都用ShardingSphere-Scaling实现,过成类似。
ShardingSphere-Scaling目前不是一个独立的产品,而是以ShardingSphere-Proxy中的一个配置项提供相应功能。如果后端连接以下数据库,需要下载相应 JDBC 驱动 jar 包,并将其放入 ${shardingsphere-proxy}/lib 目录中。
数据库 | JDBC驱动 | 参考 |
---|---|---|
MySQL | mysql-connector-java-5.1.47.jar | Connector/J Versions |
PostgreSQL | opengauss-jdbc-2.0.1-compatibility.jar |
# 引入 JDBC 驱动
cp ~/mysql-connector-java-5.1.47/mysql-connector-java-5.1.47.jar ~/apache-shardingsphere-5.1.1-shardingsphere-proxy-bin/lib/
# 重启Proxy
/root/apache-shardingsphere-5.1.1-shardingsphere-proxy-bin/bin/stop.sh
/root/apache-shardingsphere-5.1.1-shardingsphere-proxy-bin/bin/start.sh
1. 数据迁移
需求:现有一个正在使用的MySQL数据库,需要将其中的两个表迁移到Proxy下的数据节点。两个表的数据实时变化,要求尽量缩短业务影响时间。
源:172.18.26.198:3306/migrating_db 目标:172.18.10.66:3306/db1、172.18.10.66:3306/db2、172.18.18.102:3306db1、172.18.18.102:3306db2 Proxy:172.18.10.66:3307、172.18.18.102:3307,Cluster运行模式
(1)准备测试用例环境
目标66、102执行:
代码语言:javascript复制drop database if exists db1;
drop database if exists db2;
create database db1;
create database db2;
源198执行:
代码语言:javascript复制-- 建库
drop database if exists migrating_db;
create database migrating_db;
use migrating_db;
-- 建表
create table t_order (
order_id bigint auto_increment primary key,
order_datetime datetime not null,
user_id bigint not null,
order_amount decimal(10,2) not null default 0,
key idx_order_datetime (order_datetime),
key idx_user_id (user_id));
create table t_order_item (
order_item_id bigint auto_increment primary key,
order_id bigint not null,
item_id int null,
item_quantity int not null default 0,
key idx_order_id (order_id));
-- 创建负载模拟存储过程
delimiter //
create procedure sp_generate_order_data(p_seconds int)
begin
set @start_ts := now();
set @start_date := unix_timestamp('2022-03-01');
set @end_date := unix_timestamp('2022-06-01');
while timestampdiff(second,@start_ts,now()) <= p_seconds do
start transaction;
set @order_datetime := from_unixtime(@start_date rand() * (@end_date - @start_date));
set @user_id := floor(1 rand() * 100000000);
set @order_amount := round((10 rand() * 2000),2);
insert into t_order (order_datetime, user_id, order_amount)
values (@order_datetime, @user_id, @order_amount);
set @order_id := last_insert_id();
set @quantity := floor(1 rand() * 50);
set @i := 1;
while @i <= @quantity do
set @item_id := floor(1 rand() * 10000);
set @item_quantity := floor(1 rand() * 20);
insert into t_order_item (order_id, item_id, item_quantity) values (@order_id, @item_id, @item_quantity);
set @i:=@i 1;
end while;
commit;
end while;
end
//
delimiter ;
-- 执行存储过程
call sp_generate_order_data(1800);
以下步骤在执行存储过程过程(半小时)进行,模拟线上实际迁移过程。
(2)创建数据迁移作业
创建数据迁移作业包含以下步骤:
- 创建逻辑库。
- 添加源资源。
- 将单表规则改为单片规则。
- 创建sharding scaling规则。
- 添加目标资源。
- 修改分片规则,触发迁移。
- 监控迁移作业
连接Proxy:
代码语言:javascript复制mysql -u root -h 172.18.10.66 -P 3307 -p123456
创建逻辑库:
代码语言:javascript复制drop database if exists migrating_db;
create database migrating_db;
use migrating_db;
添加源资源:
代码语言:javascript复制add resource
resource_source (host=172.18.26.198, port=3306, db=migrating_db, user=wxy, password=mypass);
show schema resourcesG
确认自动创建单表规则:
代码语言:javascript复制count schema rules;
将单表规则改为单片规则:
代码语言:javascript复制create sharding table rule
t_order (datanodes("resource_source.t_order")),
t_order_item (datanodes("resource_source.t_order_item"));
预览当前分片规则:
代码语言:javascript复制preview select count(1) from t_order;
preview select count(1) from t_order_item;
执行SQL确认可以正确查询到数据:
代码语言:javascript复制select count(1) from t_order;
select count(1) from t_order_item;
创建手动模式scaling规则:
代码语言:javascript复制create sharding scaling rule scaling_manual (
input(
worker_thread=40,
batch_size=1000
),
output(
worker_thread=40,
batch_size=1000
),
stream_channel(type(name=memory, properties("block-queue-size"=10000))),
data_consistency_checker(type(name=data_match, properties("chunk-size"=1000)))
);
查看sharding_scaling规则:
代码语言:javascript复制show sharding scaling rulesG
参数说明:
- worker_thread:从源端摄取 / 写入到目标端全量数据的线程池大小。
- batch_size:一次查询操作返回的最大记录数。
- stream_channel:数据通道,连接生产者和消费者,用于 input 和 output 环节。type 指定算法类型,可选项 MEMORY;block-queue-size 算法属性指定阻塞队列大小。
- data_consistency_checker:数据一致性校验算法。type 指定算法类型,可选项 DATA_MATCH、CRC32_MATCH;chunk-size 算法属性指定一次查询操作返回的最大记录数。
dataconsistencychecker 的 type 可以通过执行 show scaling check algorithms 查询到:
代码语言:javascript复制mysql> show scaling check algorithms;
------------- ---------------------------- -------------------------------------------------------------- ----------------
| type | description | supported_database_types | provider |
------------- ---------------------------- -------------------------------------------------------------- ----------------
| CRC32_MATCH | Match CRC32 of records. | MySQL | ShardingSphere |
| DATA_MATCH | Match raw data of records. | SQL92,openGauss,PostgreSQL,MySQL,MariaDB,H2,Oracle,SQLServer | ShardingSphere |
------------- ---------------------------- -------------------------------------------------------------- ----------------
2 rows in set (0.01 sec)
data_match 支持所有数据库,但是性能不是最好的;crc32_match 只支持 MySQL,但是性能更好。
这里还有completion_detector、rateLimiter两个遗留配置问题没有解决。
除手动模式外,官方文档中说明还支持自动模式配置:
代码语言:javascript复制create sharding scaling rule scaling_auto (
input(
worker_thread=40,
batch_size=1000
),
output(
worker_thread=40,
batch_size=1000
),
stream_channel(type(name=memory, properties("block-queue-size"=10000))),
completion_detector(type(name=idle, properties("incremental-task-idle-seconds-threshold"=1800))),
data_consistency_checker(type(name=data_match, properties("chunk-size"=1000)))
);
completion_detector 指定作业是否接近完成检测算法。如果不配置则无法自动进行后续步骤,可以通过 DistSQL 手动操作。type 指定算法类型,可选项 IDLE;incremental-task-idle-seconds-threshold 算法属性指定如果增量同步任务不再活动超过一定时间(秒数),那么可以认为增量同步任务接近完成,适用算法类型 IDLE。
但是,我在配置自动模式触发迁移时报错:
代码语言:javascript复制[ERROR] 2022-06-06 06:35:21.727 [0130317c30317c3054317c7363616c696e675f6462_Worker-1] o.a.s.e.e.h.g.LogJobErrorHandler - Job '0130317c30317c3054317c7363616c696e675f6462' exception occur in job processing
java.lang.IllegalArgumentException: incremental task idle threshold can not be null.
第二个遗留问题是限流配置。Proxy 5.1.1 文档中关于 scaling 配置项中有如下说明:
代码语言:javascript复制rateLimiter: # 限流算法。如果不配置则不限流。
type: # 算法类型。可选项:
props: # 算法属性
可以看到文档中没有给出可选的算法类型和相关算法属性。DistSQL 实例中也没有 rateLimiter 限流配置项。
添加目标资源:
代码语言:javascript复制add resource
resource_1 (host=172.18.10.66, port=3306, db=db1, user=wxy, password=mypass),
resource_2 (host=172.18.10.66, port=3306, db=db2, user=wxy, password=mypass),
resource_3 (host=172.18.18.102, port=3306, db=db1, user=wxy, password=mypass),
resource_4 (host=172.18.18.102, port=3306, db=db2, user=wxy, password=mypass);
show schema resourcesG
修改分片规则,触发迁移:
代码语言:javascript复制alter sharding table rule
t_order (
resources(resource_1,resource_2,resource_3,resource_4),
sharding_column=order_id,type(name=hash_mod,properties("sharding-count"=8)),
key_generate_strategy(column=order_id,type(name=snowflake))),
t_order_item (
resources(resource_1,resource_2,resource_3,resource_4),
sharding_column=order_id,type(name=hash_mod,properties("sharding-count"=8)),
key_generate_strategy(column=order_item_id,type(name=snowflake)));
目前只有通过执行 ALTER SHARDING TABLE RULE DistSQL 来触发迁移。本例中两个迁移表的分片规则从单片datanodes改为四个数据源的8个分片,会触发迁移。
监控迁移作业:
代码语言:javascript复制mysql> show scaling list;
------------------------------------------------ ---------------------- ---------------------- -------- --------------------- -----------
| id | tables | sharding_total_count | active | create_time | stop_time |
------------------------------------------------ ---------------------- ---------------------- -------- --------------------- -----------
| 0130317c30317c3054317c6d6967726174696e675f6462 | t_order,t_order_item | 1 | true | 2022-06-06 18:19:47 | NULL |
------------------------------------------------ ---------------------- ---------------------- -------- --------------------- -----------
1 row in set (0.01 sec)
mysql> show scaling status 0130317c30317c3054317c6d6967726174696e675f6462;
------ ----------------- ------------------------ -------- ------------------------------- --------------------------
| item | data_source | status | active | inventory_finished_percentage | incremental_idle_seconds |
------ ----------------- ------------------------ -------- ------------------------------- --------------------------
| 0 | resource_source | EXECUTE_INVENTORY_TASK | true | 80 | 0 |
------ ----------------- ------------------------ -------- ------------------------------- --------------------------
1 row in set (0.00 sec)
SHOW SCALING LIST 查询所有迁移作业。这是个全局 DistSQL 命令,返回所有状态的Scaling作业,而不是只针对回当前逻辑数据库的。SHOW SCALING STATUS 命令查询某个迁移作业的进度。
(3)割接
割接步骤包括:
- 源数据库应用停写,以免丢失数据。
- 查看迁移作业进度。
- Proxy停写,即熔断。
- 数据一致性校验。
- 切换元数据。
- 确认目标分片规则生效。
- 创建绑定表规则(可选)。
- 确认迁移作业已完成。
- 应用连接到Proxy访问数据库。
应用停写:Ctrl c 或kill 掉正在运行的存储过程。
查询迁移作业进度:
代码语言:javascript复制mysql> show scaling status 0130317c30317c3054317c6d6967726174696e675f6462;
------ ----------------- -------------------------- -------- ------------------------------- --------------------------
| item | data_source | status | active | inventory_finished_percentage | incremental_idle_seconds |
------ ----------------- -------------------------- -------- ------------------------------- --------------------------
| 0 | resource_source | EXECUTE_INCREMENTAL_TASK | true | 100 | 5 |
------ ----------------- -------------------------- -------- ------------------------------- --------------------------
1 row in set (0.00 sec)
当 status 达到 EXECUTE_INCREMENTAL_TASK,全量迁移已完成,在增量迁移阶段。inventory_finished_percentage表示存量数据完成百分比,incremental_idle_seconds表示增量空闲秒数,指示增量是否接近完成。
Proxy停写,选择一个业务低峰期,对源端库或数据操作入口做停写:
代码语言:javascript复制stop scaling source writing 0130317c30317c3054317c6d6967726174696e675f6462;
数据一致性校验,根据数据量,这步可能执行较长时间:
代码语言:javascript复制mysql> check scaling 0130317c30317c3054317c6d6967726174696e675f6462 by type (name=crc32_match);
-------------- ---------------------- ---------------------- ----------------------- -------------------------
| table_name | source_records_count | target_records_count | records_count_matched | records_content_matched |
-------------- ---------------------- ---------------------- ----------------------- -------------------------
| t_order | 57599 | 57599 | true | true |
| t_order_item | 1468140 | 1468140 | true | true |
-------------- ---------------------- ---------------------- ----------------------- -------------------------
2 rows in set (2.17 sec)
切换元数据:
代码语言:javascript复制apply scaling 0130317c30317c3054317c6d6967726174696e675f6462;
预览目标分片是否已生效:
代码语言:javascript复制mysql> preview select count(1) from t_order;
------------------ -------------------------------------------------------------------------
| data_source_name | actual_sql |
------------------ -------------------------------------------------------------------------
| resource_1 | select count(1) from t_order_0 UNION ALL select count(1) from t_order_4 |
| resource_2 | select count(1) from t_order_1 UNION ALL select count(1) from t_order_5 |
| resource_3 | select count(1) from t_order_2 UNION ALL select count(1) from t_order_6 |
| resource_4 | select count(1) from t_order_3 UNION ALL select count(1) from t_order_7 |
------------------ -------------------------------------------------------------------------
4 rows in set (0.01 sec)
mysql> preview select count(1) from t_order_item;
------------------ -----------------------------------------------------------------------------------
| data_source_name | actual_sql |
------------------ -----------------------------------------------------------------------------------
| resource_1 | select count(1) from t_order_item_0 UNION ALL select count(1) from t_order_item_4 |
| resource_2 | select count(1) from t_order_item_1 UNION ALL select count(1) from t_order_item_5 |
| resource_3 | select count(1) from t_order_item_2 UNION ALL select count(1) from t_order_item_6 |
| resource_4 | select count(1) from t_order_item_3 UNION ALL select count(1) from t_order_item_7 |
------------------ -----------------------------------------------------------------------------------
4 rows in set (0.00 sec)
可以看到数据已经分片到新的数据库资源。
创建绑定表:
代码语言:javascript复制create sharding binding table rules (t_order,t_order_item);
确认绑定表规则生效:
代码语言:javascript复制mysql> preview select i.* from t_order o join t_order_item i on o.order_id=i.order_id where o.order_id in (10, 11);
------------------ ---------------------------------------------------------------------------------------------------------
| data_source_name | actual_sql |
------------------ ---------------------------------------------------------------------------------------------------------
| resource_3 | select i.* from t_order_2 o join t_order_item_2 i on o.order_id=i.order_id where o.order_id in (10, 11) |
| resource_4 | select i.* from t_order_3 o join t_order_item_3 i on o.order_id=i.order_id where o.order_id in (10, 11) |
------------------ ---------------------------------------------------------------------------------------------------------
2 rows in set (0.06 sec)
确认迁移作业已完成:
代码语言:javascript复制mysql> show scaling status 0130317c30317c3054317c7363616c696e675f6462;
------ ----------------- ---------- -------- ------------------------------- --------------------------
| item | data_source | status | active | inventory_finished_percentage | incremental_idle_seconds |
------ ----------------- ---------- -------- ------------------------------- --------------------------
| 0 | resource_source | FINISHED | false | 100 | 251 |
------ ----------------- ---------- -------- ------------------------------- --------------------------
1 rows in set (0.00 sec)
2. 数据库节点扩容
需求:现有两个正在使用的8分片表,需要将它们的数据节点扩容到16分片。两个表的数据实时变化,要求尽量缩短业务影响时间。
源:172.18.26.198:3306/db1、172.18.26.198:3306/db2 目标:172.18.10.66:3306/db1、172.18.10.66:3306/db2、172.18.18.102:3306db1、172.18.18.102:3306db2 Proxy:172.18.10.66:3307、172.18.18.102:3307,Cluster运行模式
(1)准备测试用例环境
源198,目标66、102执行:
代码语言:javascript复制drop database if exists db1;
drop database if exists db2;
create database db1;
create database db2;
连接Proxy:
代码语言:javascript复制mysql -u root -h 172.18.10.66 -P 3307 -p123456
创建逻辑库:
代码语言:javascript复制drop database if exists scaling_db;
create database scaling_db;
use scaling_db;
添加源资源:
代码语言:javascript复制add resource
resource_source1 (host=172.18.26.198, port=3306, db=db1, user=wxy, password=mypass),
resource_source2 (host=172.18.26.198, port=3306, db=db2, user=wxy, password=mypass);
show schema resourcesG
创建规则:
代码语言:javascript复制-- 分片表
create sharding table rule
t_order (
resources(resource_source1,resource_source2),
sharding_column=order_id,type(name=hash_mod,properties("sharding-count"=8)),
key_generate_strategy(column=order_id,type(name=snowflake))),
t_order_item (
resources(resource_source1,resource_source2),
sharding_column=order_id,type(name=hash_mod,properties("sharding-count"=8)),
key_generate_strategy(column=order_item_id,type(name=snowflake)));
-- 绑定表
create sharding binding table rules (t_order,t_order_item);
建表:
代码语言:javascript复制create table t_order (
order_id bigint auto_increment primary key,
order_datetime datetime not null,
user_id bigint not null,
order_amount decimal(10,2) not null default 0,
key idx_order_datetime (order_datetime),
key idx_user_id (user_id));
create table t_order_item (
order_item_id bigint auto_increment primary key,
order_id bigint not null,
item_id int null,
item_quantity int not null default 0,
key idx_order_id (order_id));
预览规则是否已生效:
代码语言:javascript复制mysql> preview select count(1) from t_order;
------------------ -----------------------------------------------------------------------------------------------------------------------------------------------------------
| data_source_name | actual_sql |
------------------ -----------------------------------------------------------------------------------------------------------------------------------------------------------
| resource_source1 | select count(1) from t_order_0 UNION ALL select count(1) from t_order_2 UNION ALL select count(1) from t_order_4 UNION ALL select count(1) from t_order_6 |
| resource_source2 | select count(1) from t_order_1 UNION ALL select count(1) from t_order_3 UNION ALL select count(1) from t_order_5 UNION ALL select count(1) from t_order_7 |
------------------ -----------------------------------------------------------------------------------------------------------------------------------------------------------
2 rows in set (0.00 sec)
mysql> preview select count(1) from t_order_item;
------------------ -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
| data_source_name | actual_sql |
------------------ -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
| resource_source1 | select count(1) from t_order_item_0 UNION ALL select count(1) from t_order_item_2 UNION ALL select count(1) from t_order_item_4 UNION ALL select count(1) from t_order_item_6 |
| resource_source2 | select count(1) from t_order_item_1 UNION ALL select count(1) from t_order_item_3 UNION ALL select count(1) from t_order_item_5 UNION ALL select count(1) from t_order_item_7 |
------------------ -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
2 rows in set (0.00 sec)
mysql> preview select i.* from t_order o join t_order_item i on o.order_id=i.order_id where o.order_id in (10, 11);
------------------ ---------------------------------------------------------------------------------------------------------
| data_source_name | actual_sql |
------------------ ---------------------------------------------------------------------------------------------------------
| resource_source1 | select i.* from t_order_2 o join t_order_item_2 i on o.order_id=i.order_id where o.order_id in (10, 11) |
| resource_source2 | select i.* from t_order_3 o join t_order_item_3 i on o.order_id=i.order_id where o.order_id in (10, 11) |
------------------ ---------------------------------------------------------------------------------------------------------
2 rows in set (0.00 sec)
添加存量数据:
代码语言:javascript复制insert into t_order (order_id, order_datetime, user_id, order_amount)
values
(1, now(), 1, 100),(2, now(), 2, 200),(3, now(), 3, 300),(4, now(), 4, 400),
(5, now(), 5, 500),(6, now(), 6, 600),(7, now(), 7, 700),(8, now(), 8, 800);
insert into t_order_item (order_item_id, order_id, item_id, item_quantity)
values
(1,1,1,10),(2,1,2,20),(3,2,3,30),(4,2,4,40),(5,3,1,10),(6,3,2,20),(7,4,3,30),(8,4,4,40),
(9,5,1,10),(10,5,2,20),(11,6,3,30),(12,6,4,40),(13,7,1,10),(14,7,2,20),(15,8,3,30),(16,8,4,40);
ShardingSphere-Proxy只支持对非分片表使用MySQL存储函数、存储过程操作,因此这里只能使用普通的insert语句。
(2)创建数据迁移作业
创建数据迁移作业包含以下步骤:
- 创建逻辑库。
- 添加源资源。
- 把现有系统中的表配置到规则里。
- 创建sharding scaling规则。
- 添加目标资源。
- 修改分片规则,触发迁移。
- 监控迁移作业
本例中因为我们的源和目标使用同一个Proxy集群,第1-3步已经在上一步“准备测试用例环境”中完成了。
创建手动模式scaling规则:
代码语言:javascript复制create sharding scaling rule scaling_manual (
input(
worker_thread=40,
batch_size=1000
),
output(
worker_thread=40,
batch_size=1000
),
stream_channel(type(name=memory, properties("block-queue-size"=10000))),
data_consistency_checker(type(name=data_match, properties("chunk-size"=1000)))
);
添加目标资源:
代码语言:javascript复制add resource
resource_1 (host=172.18.10.66, port=3306, db=db1, user=wxy, password=mypass),
resource_2 (host=172.18.10.66, port=3306, db=db2, user=wxy, password=mypass),
resource_3 (host=172.18.18.102, port=3306, db=db1, user=wxy, password=mypass),
resource_4 (host=172.18.18.102, port=3306, db=db2, user=wxy, password=mypass);
show schema resourcesG
修改分片规则,触发迁移,注意绑定表只能一块迁移:
代码语言:javascript复制alter sharding table rule
t_order (
resources(resource_1,resource_2,resource_3,resource_4),
sharding_column=order_id,type(name=hash_mod,properties("sharding-count"=16)),
key_generate_strategy(column=order_id,type(name=snowflake))),
t_order_item (
resources(resource_1,resource_2,resource_3,resource_4),
sharding_column=order_id,type(name=hash_mod,properties("sharding-count"=16)),
key_generate_strategy(column=order_item_id,type(name=snowflake)));
监控迁移作业:
代码语言:javascript复制mysql> show scaling list;
------------------------------------------------ ---------------------- ---------------------- -------- --------------------- ---------------------
| id | tables | sharding_total_count | active | create_time | stop_time |
------------------------------------------------ ---------------------- ---------------------- -------- --------------------- ---------------------
| 0130317c30317c3054317c6d6967726174696e675f6462 | t_order,t_order_item | 1 | false | 2022-06-06 18:19:47 | 2022-06-06 18:22:56 |
| 0130317c30317c3054317c7363616c696e675f6462 | t_order,t_order_item | 2 | true | 2022-06-07 07:13:30 | NULL |
------------------------------------------------ ---------------------- ---------------------- -------- --------------------- ---------------------
2 rows in set (0.01 sec)
mysql> show scaling status 0130317c30317c3054317c6d6967726174696e675f6462;
------ ----------------- ---------- -------- ------------------------------- --------------------------
| item | data_source | status | active | inventory_finished_percentage | incremental_idle_seconds |
------ ----------------- ---------- -------- ------------------------------- --------------------------
| 0 | resource_source | FINISHED | false | 100 | 46550 |
------ ----------------- ---------- -------- ------------------------------- --------------------------
1 row in set (0.01 sec)
mysql> show scaling status 0130317c30317c3054317c7363616c696e675f6462;
------ ------------------ -------------------------- -------- ------------------------------- --------------------------
| item | data_source | status | active | inventory_finished_percentage | incremental_idle_seconds |
------ ------------------ -------------------------- -------- ------------------------------- --------------------------
| 0 | resource_source1 | EXECUTE_INCREMENTAL_TASK | true | 100 | 215 |
| 1 | resource_source2 | EXECUTE_INCREMENTAL_TASK | true | 100 | 215 |
------ ------------------ -------------------------- -------- ------------------------------- --------------------------
2 rows in set (0.01 sec)
添加增量数据:
代码语言:javascript复制insert into t_order (order_id, order_datetime, user_id, order_amount)
values (9, now(), 1, 100),(10, now(), 2, 200);
insert into t_order_item (order_item_id, order_id, item_id, item_quantity)
values
(17,9,1,10),(18,9,2,20),(19,9,3,30),(20,9,4,40),(21,10,1,10),(22,10,2,20),(23,10,3,30),(24,10,4,40);
(3)割接
割接步骤包括:
- 源数据库应用停写,以免丢失数据。
- 查看迁移作业进度。
- Proxy停写,即熔断。
- 数据一致性校验。
- 切换元数据。
- 确认目标分片规则生效。
- 确认迁移作业已完成。
- 应用连接到Proxy访问数据库。
查询迁移作业进度:
代码语言:javascript复制mysql> show scaling status 0130317c30317c3054317c7363616c696e675f6462;
------ ------------------ -------------------------- -------- ------------------------------- --------------------------
| item | data_source | status | active | inventory_finished_percentage | incremental_idle_seconds |
------ ------------------ -------------------------- -------- ------------------------------- --------------------------
| 0 | resource_source1 | EXECUTE_INCREMENTAL_TASK | true | 100 | 51 |
| 1 | resource_source2 | EXECUTE_INCREMENTAL_TASK | true | 100 | 51 |
------ ------------------ -------------------------- -------- ------------------------------- --------------------------
2 rows in set (0.01 sec)
Proxy停写,选择一个业务低峰期,对源端库或数据操作入口做停写:
代码语言:javascript复制stop scaling source writing 0130317c30317c3054317c7363616c696e675f6462;
数据一致性校验,根据数据量,这步可能执行较长时间:
代码语言:javascript复制mysql> check scaling 0130317c30317c3054317c7363616c696e675f6462 by type (name=crc32_match);
-------------- ---------------------- ---------------------- ----------------------- -------------------------
| table_name | source_records_count | target_records_count | records_count_matched | records_content_matched |
-------------- ---------------------- ---------------------- ----------------------- -------------------------
| t_order | 10 | 10 | true | true |
| t_order_item | 24 | 24 | true | true |
-------------- ---------------------- ---------------------- ----------------------- -------------------------
2 rows in set (0.46 sec)
切换元数据:
代码语言:javascript复制apply scaling 0130317c30317c3054317c7363616c696e675f6462;
预览分片规则是否已生效:
代码语言:javascript复制mysql> preview select count(1) from t_order;
------------------ -------------------------------------------------------------------------------------------------------------------------------------------------------------
| data_source_name | actual_sql |
------------------ -------------------------------------------------------------------------------------------------------------------------------------------------------------
| resource_1 | select count(1) from t_order_0 UNION ALL select count(1) from t_order_4 UNION ALL select count(1) from t_order_8 UNION ALL select count(1) from t_order_12 |
| resource_2 | select count(1) from t_order_1 UNION ALL select count(1) from t_order_5 UNION ALL select count(1) from t_order_9 UNION ALL select count(1) from t_order_13 |
| resource_3 | select count(1) from t_order_2 UNION ALL select count(1) from t_order_6 UNION ALL select count(1) from t_order_10 UNION ALL select count(1) from t_order_14 |
| resource_4 | select count(1) from t_order_3 UNION ALL select count(1) from t_order_7 UNION ALL select count(1) from t_order_11 UNION ALL select count(1) from t_order_15 |
------------------ -------------------------------------------------------------------------------------------------------------------------------------------------------------
4 rows in set (0.01 sec)
mysql> preview select count(1) from t_order_item;
------------------ ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
| data_source_name | actual_sql |
------------------ ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
| resource_1 | select count(1) from t_order_item_0 UNION ALL select count(1) from t_order_item_4 UNION ALL select count(1) from t_order_item_8 UNION ALL select count(1) from t_order_item_12 |
| resource_2 | select count(1) from t_order_item_1 UNION ALL select count(1) from t_order_item_5 UNION ALL select count(1) from t_order_item_9 UNION ALL select count(1) from t_order_item_13 |
| resource_3 | select count(1) from t_order_item_2 UNION ALL select count(1) from t_order_item_6 UNION ALL select count(1) from t_order_item_10 UNION ALL select count(1) from t_order_item_14 |
| resource_4 | select count(1) from t_order_item_3 UNION ALL select count(1) from t_order_item_7 UNION ALL select count(1) from t_order_item_11 UNION ALL select count(1) from t_order_item_15 |
------------------ ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
4 rows in set (0.01 sec)
mysql> preview select i.* from t_order o join t_order_item i on o.order_id=i.order_id where o.order_id in (10, 11);
------------------ -----------------------------------------------------------------------------------------------------------
| data_source_name | actual_sql |
------------------ -----------------------------------------------------------------------------------------------------------
| resource_3 | select i.* from t_order_10 o join t_order_item_10 i on o.order_id=i.order_id where o.order_id in (10, 11) |
| resource_4 | select i.* from t_order_11 o join t_order_item_11 i on o.order_id=i.order_id where o.order_id in (10, 11) |
------------------ -----------------------------------------------------------------------------------------------------------
2 rows in set (0.00 sec)
可以看到数据已经分片到新的数据库资源。 确认迁移作业已完成:
代码语言:javascript复制mysql> show scaling status 0130317c30317c3054317c7363616c696e675f6462;
------ ------------------ ---------- -------- ------------------------------- --------------------------
| item | data_source | status | active | inventory_finished_percentage | incremental_idle_seconds |
------ ------------------ ---------- -------- ------------------------------- --------------------------
| 0 | resource_source1 | FINISHED | false | 100 | 276 |
| 1 | resource_source2 | FINISHED | false | 100 | 276 |
------ ------------------ ---------- -------- ------------------------------- --------------------------
2 rows in set (0.01 sec)
mysql> preview select i.* from t_order o join t_order_item i on o.order_id=i.order_id where o.order_id in (10, 11);
------------------ ---------------------------------------------------------------------------------------------------------
| data_source_name | actual_sql |
------------------ ---------------------------------------------------------------------------------------------------------
| resource_3 | select i.* from t_order_2 o join t_order_item_2 i on o.order_id=i.order_id where o.order_id in (10, 11) |
| resource_4 | select i.* from t_order_3 o join t_order_item_3 i on o.order_id=i.order_id where o.order_id in (10, 11) |
------------------ ---------------------------------------------------------------------------------------------------------
2 rows in set (0.01 sec)
参考:
https://shardingsphere.apache.org/document/current/cn/features/scaling/ https://shardingsphere.apache.org/document/current/cn/reference/scaling/ https://shardingsphere.apache.org/document/current/cn/user-manual/shardingsphere-proxy/scaling/ https://shardingsphere.apache.org/document/current/cn/user-manual/shardingsphere-proxy/distsql/syntax/ral/#弹性伸缩