如何实时迁移MySQL到TcaplusDB

2020-08-26 12:18:32 浏览数 (2)

1.前言

随着业务数据量的剧增,传统MySQL在数据存储上变得越来越吃力,NoSQL因其良好的性能、扩展性、稳定性逐渐成为业务选型的首要考虑。TcaplusDB是腾讯云推出的一款全托管NoSQL数据库服务,旨在为客户提供极致的数据据存储体验,详细信息请参考官方文档。本文主要介绍如何将MySQL数据迁移到TcaplusDB。

2. 迁移说明

MySQL与TcaplusDB属于异构数据库,数据迁移之前需要考虑两者间数据的差异。

2.1 术语和概念

TcaplusDB

MySQL

集群

数据库

表格组

N/A

记录

字段

2.2 特性对比

特性

TcaplusDB

MySQL

数据模型

Key-Value, JSON格式

结构化数据

数据类型

丰富

丰富

分片

Yes

N/A

扩展性

SQL

类SQL

ANSI

主键

Yes

Yes

外键

N/A

Yes

事务

N/A

Yes

二进制数据

Yes

Yes

周边生态

2.3 数据类型对比

TcaplusDB支持两种协议来表述数据Schema, 分别为TDR (Tencent Data Representation)和Google Protobuf, 不同协议的数据类型在定义时有所区别,但在底层是统一的。

数据类型

TcaplusDB (TDR)

TcaplusDB (Protobuf)

MySQL

整形

int64,uint64

int64,uint64

BIGINT

整形

int32,uint32

int32,uint32

INT,INTEGER

整形

int16,uint16

N/A

SMALLINT

整形

int8,uint18

N/A

TINYINT

浮点型

float, double

float,double

FLOAT, DOUBLE

定点型

N/A

N/A

DECIMAL

N/A

N/A

BIT

字符型

char

bytes

CHAR

字符串

string

string

VARCHAR, TINYTEXT, TEXT, MEDIUMTEXT, LONGTEXT

二进制

array char

bytes

TINYBLOB, BLOB, MEDIUMBLOB,LONGBLOB, VARBINARY, BINARY

布尔型

bool

bool

BOOLEAN

数组列表

array修饰

repeated修饰

N/A

结构体

struct

message

N/A

枚举

N/A

enum

ENUM

共同体

union

N/A

N/A

2.4 迁移限制

TcaplusDB与MySQL属于异构数据库,数据迁移有一些限制。

序号

限制项

说明

1

不支持外键迁移

TcaplusDB没有外键,如果MySQL有定义外键迁移时外键对应列直接映射成TcaplusDB字段,不会维护原有外键关联表信息

2

不支持存储过程

如果MySQL定义有存储过程,迁移时将被忽略

3

数据类型转换

对于TcaplusDB不支持的数据类型,需要进行转换,如Decimal转成TcaplusDB长整形,日期类的转换成字符串类型等

4

迁移网络环境

本文只介绍同是腾讯云环境下MySQL迁移TcaplusDB场景,MySQL与TcaplusDB同属腾讯云一个地域

5

实时迁移删除操作限制

删除操作可能存在删空记录情况,需要避免后续离线迁移重新把待删除的记录写到表中,产生脏数据现象,具体做法是如果删除的是一条空记录把记录写到另一张待删除的表,待离线全量迁移完成后进行对账,把脏数据从业务表删除

6

MySQL数据订阅

开启数据订阅功能,需要修改数据源MySQL实例的参数,涉及重启实例,会影响当前存在的连接,用户需要评估重启期间断连对业务的影响

2.5 迁移场景

对于线上业务迁移,涉及实时数据迁移和离线存量数据迁移。以游戏举例,如果业务同意停服迁移,则直接采用离线全量数据一次性迁移最好,低价低; 如果业务不同意停服迁移,则需要同时考虑实时和离线。

2.5.1 实时数据迁移场景

MySQL实时数据迁移通用的方案是基于binlog进行实时数据采集,捕获数据更新操作如insert、update、delete。腾讯云目前针对MySQL实例支持数据订阅功能可实时捕获MySQL数据变更,数据订阅相关文档可参考官网文档。

数据订阅功能支持将数据采集传输至Kafka,这里会介绍如何用腾讯云CKafka实现数据流传输,同时借助腾讯云SCF无服务函数来消费CKafka数据并写到TcaplusDB。

2.5.2 离线数据迁移场景

MySQL离线数据迁移涉及存量数据的搬迁,存量数据搬迁需要考虑如何避免线上业务影响,如在业务低峰期迁移、从备机拉数据。存量数据导出并导入异构平台方案这里介绍两种:

  • 方案一:  用Select直接查询备机,以一定格式(如约定好分隔符间隔各字段值)将数据导出到本地文件,然后通过离线大数据批量解析(e.g., Map/Reduce, Spark)文件将数据按TcaplusDB数据格式写入TcaplusDB。这里涉及到的腾讯云产品:腾讯云COS用于存储导出的数据文件,腾讯云EMR用于从COS拉取数据文件进行批量解析并写入到TcaplusDB。此方案涉及开发数据文件解析代码。本文模拟数据量比较小,直接将导出的数据写到TcaplusDB即可,暂不涉及EMR。
  • 方案二: 用mysqldump从备机批量dump数据到文件,文件数据格式是SQL格式(INSERT语句),然后再把导出数据重新Load到新的MySQL,产生binlog,再按实时数据迁移方案把数据写到TcaplusDB。这里涉及的腾讯云产品:腾讯云COS存储数据文件,腾讯云MySQL实例存储新的load数据,腾讯云DTS服务数据订阅功能实时采集binlog, 腾讯云CKafka作为消息队列中间件,腾讯云SCF用于消费数据写到TcaplusDB。

3. 迁移成本

3.1 资源成本

迁移场景

资源名称

资源说明

实时迁移

腾讯云CVM实例

用于跑数据订阅程序

实时迁移

腾讯云CDB for MySQL

用于数据源模拟

实时迁移

腾讯云DTS服务数据订阅

实时订阅MySQL数据

实时迁移

腾讯云CKafka

消息队列,解藕订阅/消费过程

实时迁移

腾讯云SCF

消费CKafka数据

实时迁移

腾讯云TcaplusDB

数据存储平台

离线迁移

腾讯云COS

数据文件存储

离线迁移

腾讯云CDB for MySQL

用于中间临时数据存储

3.2 开发成本

迁移场景

开发项

实时迁移

数据订阅程序

实时迁移

SCF消费订阅数据程序

离线迁移

批量导出MySQL数据和批量解析程序

4. 实时迁移方案

4.1 环境准备

所有资源统一申请到腾讯云上海地域。

4.1.1 示例表结构

  • MySQL示例表

表信息

库表名

库: tcaplus

表: test

序号

字段名

字段类型

字段说明

1

player_id

bigint(20)

primary key

2

player_name

varchar(128)

3

player_info

varbinary(2048)

JSON字符串二进制写入

player_info的JSON原始数据结构如下:

序号

字段名

字段类型

1

player_email

varchar(64)

2

player_phone

varchar(32)

  • TcpalusDB示例表

表信息

集群:tw_tcaplus

表格组: tw_group_1

表名: test

序号

字段名

字段类型

字段说明

1

player_id

int64

primary key

2

player_name

string

3

player_info

struct

参考MySQL表player_info说明

4.1.2 资源准备

  • CVM实例: 申请一个2C4G的实例
  • MySQL数据源: 申请一个CDB for MySQL实例用于模拟业务原始数据,规格:2C4G高可用版,并创建好库表(库: tcaplus, 表: test,编码: utf8mb4),申请指引参考官方文档,创建表DDL语句如下:
代码语言:txt复制
CREATE TABLE `test` (
  `player_id` bigint(20) NOT NULL,
  `player_name` varchar(128) DEFAULT NULL,
  `player_info` varbinary(2048) DEFAULT NULL,
  PRIMARY KEY (`player_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
  • CKafka实例: 申请CKafka实例,规格:标准入门版,具体参考官方文档。
  • TcaplusDB: 创建一个TcaplusDB表,具体参考创建表指南。表相关信息:表所在集群名:tw_cluster, 表所在表格组名: tw_group_1,表名: test。TcaplusDB表定义Schema如下:
代码语言:txt复制
syntax = "proto3";               // Specify the version of the protocol buffers language

import "tcaplusservice.optionv1.proto"; // Use the public definitions of TcaplusDB by importing them.

message test {  // Define a TcaplusDB table with message

    // Specify the primary keys with the option tcaplusservice.tcaplus_primary_key
    // The primary key of a TcaplusDB table has a limit of 4 fields
    option(tcaplusservice.tcaplus_primary_key) = "player_id";

    // primary key fields
    int64 player_id = 1;
    //ordinary fields
    string player_name = 2;
    info player_info = 3;

}

message info {
    string player_email = 1;
    string player_phone = 2;
}
  • SCF: 在云函数控制台新建一个Python3.6版的空白模板函数,函数名:tw_migrate。具体指引参考官方文档。

4.2 迁移过程

4.2.1 MySQL实例检查

  • 字符集: 统一为utf8mb4
  • 库表: 从控制台登录实例后,检查库表创建是否OK, 如下所示:
mysql_ins_infomysql_ins_info
  • 专用账户: 创建迁移专用账户tw_tcaplus, 主机授权暂时设置为%,允许所有主机通过此账户来访问实例,如下所示:
mysql_account_createmysql_account_create

4.2.2 CKafka实例检查

  • Topic: 创建一个迁移专用的topic, topic名为tw_migrate,如下所示:
ckafka_topicckafka_topic

4.2.3 TcaplusDB实例检查

  • TcaplusDB表: 进TcpalusDB控制台,查看表格列表,如下所示:
tcaplusdb_tabletcaplusdb_table

4.2.4 数据订阅

  • 订阅通道创建: 在数据传输服务控制台新建一个数据订阅实例,如下:
data_subscribedata_subscribe
  • 订阅配置: 初始化订阅配置,选择MySQL实例作为数据源,选择VPC和子网(注意要与MySQL实例同属一个网络),在同步类型处只选择数据更新,库表任务处选择test表作为订阅的表。具体如下所示:
data_subscribe_configdata_subscribe_config
data_subscribe_typedata_subscribe_type
  • 订阅详情: 配置好后,点击启动会去修改MySQL实例参数并重启MySQL实例,订阅信息如下:
data_subscribe_infodata_subscribe_info
  • 注意 : 订阅配置过程会对MySQL实例修改一些binlog配置参数,修改过程会kill当前已有连接并重启MySQL实例,业务需要关注这点

4.2.5 数据拉取

数据订阅好后,需要把订阅的数据拉取到CKafka, 这里需要开发相应的程序,示例程序可参考官方文档。程序依赖的相关组件可从官方文档获取。

4.2.5.1 主要依赖
  • 示例程序: KafkaDemo.java,  参考第6章节资源下载
  • 数据订阅SDK: binlogsdk-2.8.2-jar-with-dependencies.jar, 参考第6章节资源下载
  • SLF4J组件: SLF4J.zip, 参考第6章节资源下载
  • kafka-clients: kafka-clients-1.1.0.jar, 参考第6章节资源下载
  • Json-simple: json-simple-1.1.jar, 参考第6章节资源下载
  • JDK1.8: 直接安全openjdk, 命令: yum install -y java-1.8.0-openjdk-devel
4.2.5.2 程序配置
  • 数据订阅信息获取:查看上述数据订阅详情截图的信息,主要是通道ID服务IP服务端口;
  • 示例程序修改: 修改上述下载的示例程序KafkaDemo.java,替换相关配置为自己的信息
代码语言:txt复制
//替换TOPIC为申请的topic名,这里为tw_migrate
final String TOPIC = "tw_migrate";
Properties props = new Properties();

//替换CKafka实例的连接信息,进Ckafka控制查看实例信息中的内网IP与端口
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "172.17.16.6:9092");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
final Producer<String, String> producer = new KafkaProducer<String, String>(props);

//替换SecretId为自己腾讯云账号下申请的密钥信息
context.setSecretId("xxx");
//替换SecretKey为自己腾讯云账号下申请的密钥信息
context.setSecretKey("xxx");

//替换Region信息
context.setRegion("ap-shanghai");

 // 替换数据订阅连接信息,从详情页获取的ip,port
context.setServiceIp("172.17.16.2");
context.setServicePort(7507);
final DefaultSubscribeClient client = new DefaultSubscribeClient(context);

// 填写对应要同步的数据库和表名
//替换订阅的库名
final String targetDatabase = "tcaplus";
client.addClusterListener(listener);
 // 替换订阅的通道名,订阅详情页获取的 dts-channel 配置信息,填写到此处
client.askForGUID("dts-channel-5ljfLZmhFvnTCQx9");
client.start();
4.2.5.3 程序编译

上传上述下载的组件包到CVM实例,并执行如下编译命令:

代码语言:txt复制
javac -cp binlogsdk-2.8.2-jar-with-dependencies.jar:kafka-clients-1.1.0.jar:slf4j-api-1.7.25.jar:slf4j-log4j12-1.7.2.jar:json-simple-1.1.jar  -encoding UTF-8  KafkaDemo.java
4.2.5.4 程序执行
代码语言:txt复制
java -XX:-UseGCOverheadLimit -Xms2g -Xmx2g  -classpath .:binlogsdk-2.6.0-release.jar:kafka-clients-1.1.0.jar:slf4j-api-1.7.25.jar:slf4j-log4j12-1.7.2.jar:json-simple-1.1.jar  KafkaDemo
4.2.5.5 程序验证

在MySQL实例中插入一条示例数据,验证数据是否能订阅成功。插入MySQL数据这里用Python3程序来模拟,代码如下:

代码语言:txt复制
import json
import MySQLdb
#替换DB连接信息,从已申请的MySQL实例中获取
db = MySQLdb.connect("172.17.16.17", "xxx", "xxx", "tcaplus", charset='utf8' )

cursor = db.cursor()
#模拟一个json结构数据转成字符串后以byte形式写入MySQL
info={"player_phone":"123456","player_email":"June@123.com"}
info_bin = json.dumps(info).encode('utf-8')
sql = "INSERT INTO test (player_id, player_name, player_info) VALUES (%s, %s, %s)"
val = (7, 'June', info_bin,)

cursor.execute(sql, val)
db.commit()
db.close()

打开另一个CVM终端,执行上述程序,并查看4.2.5.4启动程序的输出,如下所示:

ckafka_subscribeckafka_subscribe

从上图来看说明数据订阅程序已经捕获到MySQL的数据插入动作。

4.2.6 数据消费

数据消费通过腾讯云SCF来实现。SCF支持创建CKafka触发器,借助触发器机制可实时捕获CKafka的数据流,只要有数据发布到Ckafka指定topic, 会触发SCF自动拉取Topic新进的数据。触发器如下所示:

scf_triggerscf_trigger

SCF捕获到数据后,解析捕获的数据包并转换成TcaplusDB能识别的JSON记录格式,再通过调用TcaplusDB Python RESTful SDK接口把JSON记录写到TcaplusDB表。具体代码可在SCF控制台上传代码ZIP包即可, 下载地址 。代码关键逻辑:

  • 捕获插入操作: 针对数据是INSERT操作类型的,转换成TcaplusDB的AddRecord操作,即新增一条记录
  • 捕获删除操作: 针对数据是DELETE操作类型的,转换成TcaplusDB的DeleteRecord操作,即删除一条记录
  • 捕获更新操作: 针对数据是UPDATE操作类型的,转换成TcaplusDB的SetRecord操作,即更新一条记录
  • 脏数据注意事项: 对于删除操作,由于是实时迁移,全量数据暂未同步到TcaplusDB,所以可能会存在删除一条空记录的情况,需要针对删除为空记录场景时把待删除的记录先保存到另一张待删除表,等全量数据迁移至TcaplusDB后,进行一次全量对账,即检查待删除表中的记录是否重新通过 离线迁移方式写到了业务TcaplusDB表,如果是则需要把业务表中的记录进行删除,避免脏数据的出现。

4.2.7 数据验证

通过SCF转换写入到TcaplusDB的数据,如下所示:

tcaplus_datatcaplus_data

4.3 迁移总结

上面实现并验证了实时迁移数据流管道,通过数据订阅捕获MySQL增删改事件并实时通过订阅程序传输到Ckafka, 同时通过SCF触发器机制捕获CKafka的输入数据流并拉取解析最后写到TcaplusDB。针对删除操作,为避免空删场景,把删除时错误码为261(数据记录不存在)的单独处理,即把这部分数据重新写到新的一张待删除表,待全量迁移完成后再统一对账清理这部分脏数据。

5. 离线迁移方案

离线迁移主要有两种方式: 一种是dump方式把表数据dump成SQL文件形式,文件内容为Insert格式,然后可以把SQL文件回写到另一临时MySQL实例产生Binlog走实时迁移方案; 另一种是select方式,从表中查数据出来以指定格式保存到文本文件, 如JSON格式行,通过腾讯云批量解析的方式写到TcaplusDB。

5.1 Dump方式迁移

5.1.1 Dump表数据

dump全表数据可以用如下命令:

代码语言:txt复制
#替换MySQL连接账户名和密码
mysqldump -h172.17.16.17 -u[db_user] -p[db_password] -P3306 -B tcaplus --tables test  --skip-opt >test.sql

将上述导出test.sql文件重新Load到新的临时MySQL实例,产生Binlog后用实时迁移方案来进行数据采集,参考第4章节所述,这里不再展开。

5.2 Select方式迁移

5.2.1 Select表数据

Select方式可以选择数据输出格式如JSON,如果原表设计有时间字段可以将时间字段设置为索引,并按时间段进行数据导出避免一次导出全量数据。这里以导出全量数据举例,借助JSON_OBJECT函数可以导出TcaplusDB的JSON格式,如下命令所示:

代码语言:txt复制
#替换用户名和密码,行之间间隔符}}}{{{
mysql -h172.17.16.17 -u[db_user] -p[db_password] -P3306 -D tcaplus -Ne "select JSON_OBJECT('player_id',player_id,'player_name',player_name,'player_info',player_info)  from test" >test.json

test.json示例文件格式如下:

代码语言:txt复制
{"player_id": 1, "player_info": "base64:type15:eyJwbGF5ZXJfZW1haWwiOiAiMUB0ZXN0LmNvbSIsICJwbGF5ZXJfcGhvbmUiOiAiMTIzNDU2NiJ9", "player_name": "ball"}

从上述示例格式来看,MySQL底层对Varbinary数据类型会自动转成base64编码,在解析时需要把这个base64进行解码转换成TcaplusDB的字符串格式。

5.2.2 数据解析

在2.5.2章节介绍了离线数据迁移场景,如果业务表数据量很大,为加快导入TcaplusDB速度,可考虑批量解析,批量解析文件目前业界用得较多的方案是用Spark或Map/Reduce进行文件解析将解析后的数据写入到TcaplusDB,后续针对批量解析这块单独介绍,这里只简单介绍上述导出的JSON文件导入到TcaplusDB。

JSON文件解析采用Python进行,同时引入TcaplusDB Python RESTful SDK,SDK使用方法参考官方文档。示例代码如下:

代码语言:txt复制
import json
import base64
#引入TcaplusRestClient类
from tcaplusdb_client.tcaplusdb_rest_client import TcaplusRestClient
client = None
#替换TcaplusDB集群访问地址
endpoint = "http://172.17.16.15"
#替换TcaplusDB访问ID
access_id = 20
#替换TcaplusDB集群密码
access_password = "xxx"
#替换TcaplusDB表格组ID
tablegroup_id = 1
#替换TcaplusDB表名
table_name =  "test"
def init():
    #初始化TcaplusDB客户端
    global client
    client = TcaplusRestClient(endpoint, access_id, access_passwd)
    client.SetTargetTable(table_group_id=tablegroup_id, table_name=table_name)
def write():
    #从文件读数据并写入TcaplusDB
    filename='test.json'
    with open(filename,'r') as f:
        lines = f.readlines()
        for line in lines:
            json_str = json.loads(line)
            player_info = json_str['player_info'].replace("base64:type15:","")
            #将base64编码的字段反解码成字符串并转换成JSON格式
            json_str['player_info'] = json.loads(base64.b64decode(player_info))
            stat, resp = client.AddRecord(json_str)
if __name__ == '__main__':
    init()
    write()

5.2.3 数据COS存储

对于MySQL导出的数据文件可以放腾讯云COS存储,方便其它组件拉取数据进行处理。COS相关介绍可参考官方文档。这里介绍Python SDK操作方法,具体使用手册可参考官方文档。

  • SDK安装
代码语言:txt复制
pip install -U cos-python-sdk-v5
  • COS上传和下载
代码语言:txt复制
from qcloud_cos import CosConfig
from qcloud_cos import CosS3Client

#替换COS所在Region名
cosRegionName='ap-shanghai'
#替换腾讯云账户的密钥信息
SECRET_ID='xxx'
SECRET_KEY='xxx'

scheme='https'
config = CosConfig(Region=cosRegionName, SecretId=SECRET_ID, SecretKey=SECRET_KEY,Scheme=scheme)
client = CosS3Client(config)

# 创建bucket, 注意格式为migrate名-appid (appid是自己腾讯云账号下的appid)
response = client.create_bucket(
    Bucket='migrate-1258272208'
)
#上传文件到COS
#替换bucket名,要上传的文件名及Key, PartSize指定分包大小(单位MB),MAXThread指定并发上传的线程数
response = client.upload_file(
            Bucket='migrate-1258272208',
            LocalFilePath='1.txt',
            Key='1.txt',
            PartSize=1,
            MAXThread=10,
            EnableMD5=False
        )

#从COS下载文件
#替换Bucket名
response = client.get_object(
        Bucket='migrate-1258272208',
        Key='1.txt',
        )
response['Body'].get_stream_to_file('output.txt')

6. 资源下载

迁移场景

依赖资源

资源下载地址

资源用途

实时迁移

tcaplus_tes.sql

下载地址

定义数据源表结构

实时迁移

test.proto

下载地址

定义TcaplusDB表结构

实时迁移

mysql_demo.py

下载地址

用于模拟写入MySQL数据,依赖mysqlclient库,参考网上资料安装

实时迁移

KafkaDemo.java

下载地址

数据订阅程序,从数据订阅管道拉取binlog捕获数据并解析写入到CKafka

实时迁移

binlogsdk-2.8.2-jar-with-dependencies.jar

下载地址

KafkaDemo依赖,binlog捕获SDK

实时迁移

SLF4J.zip

下载地址

KafkaDemo依赖,日志组件

实时迁移

kafka-clients-1.1.0.jar

下载地址

KafkaDemo依赖,Kafka客户端

实时迁移

json-simple-1.1.jar

下载地址

KafkaDemo依赖,json处理组件

实时迁移

scf_migrate_tcaplusdb.zip

下载地址

SCF程序,从Ckafka摘取数据并写入TcaplusDB

离线迁移

tcaplusdb-restapi-python-sdk-3.0.tgz

下载地址

TcpalusDB Python RESTful SDK API, 基于包装好的RESTful 接口进行TcaplusDB数据操作

7. 总结

本文介绍了MySQL数据迁移TcaplusDB的两种方案: 实时和离线迁移。实时迁移采用订阅MySQL binlog的方式将数据订阅到CKafka, 通过SCF拉取CKafka数据进行实时写入到TcaplusDB。离线迁移可以采用Dump和Select两种方式进行数据导出并将导出文件解析写入到TcaplusDB。本文针对离线迁移数据解析未介绍批量解析方式,后续将独立介绍通过EMR方式进行批量解析,敬请期待!

0 人点赞