说明
本文描述问题及解决方法同样适用于 腾讯云Snova云数仓。
数据导入介绍
Greenplum(以下简称GP)支持多种数据导入方法,比如GP自带的gpfdist,通过gpfdist 外部表的形式将远端服务器上的数据并行导入到GP中,再比如GP自带的COPY命令,能够将本地的数据按照一定格式导入到GP中。除此之外,还有一些比较优秀的第三方导入工具,本文主要介绍DataX。
DataX概述
DataX是一款能够完成异构数据源之间数据迁移的软件,DataX采用FrameWork Plugin的软件架构,扩展方便。所有数据源中的数据都先转换为DataX的格式,然后在转换成目的端的数据格式,避免出现各异构数据源之间的类型相互转换。
通过DataX,可以容易得将现有数据从mysql、sqlserver、oracle等迁移到Snova中。
支持的数据类型
使用DataX进行数据导入时,第一步是将源端数据源的数据转换为DataX的数据类型,然后将DataX的数据类型转换为目标数据源的数据类型。因此,在使用DataX前,需要先确认是否存在DataX不支持的数据类型,现有数据源中的数据类型与DataX的类型映射如下:
Greenplum
DataX数据类型 | GP数据类型 |
---|---|
Long | bigint, bigserial, integer, smallint, serial |
Double | double precision, money, numeric, real |
String | varchar, char, text, bit |
Date | date, time, timestamp |
Boolean | bool |
Bytes | bytea |
Mysql
DataX数据类型 | Mysql数据类型 |
---|---|
Long | int, tinyint, smallint, mediumint, int, bigint |
Double | float, double, decimal |
String | varchar, char, tinytext, text, mediumtext, longtext, year |
Date | date, datetime, timestamp, time |
Boolean | bit, bool |
Bytes | tinyblob, mediumblob, blob, longblob, varbinary |
Oracle
DataX数据类型 | Oracle数据类型 |
---|---|
Long | NUMBER,INTEGER,INT,SMALLINT |
Double | NUMERIC,DECIMAL,FLOAT,DOUBLE PRECISION,REAL |
String | char,nchar,ntext,nvarchar,text,varchar,nvarchar(MAX),varchar(MAX) |
Date | LONG,CHAR,NCHAR,VARCHAR,VARCHAR2,NVARCHAR2,CLOB,NCLOB,CHARACTER,CHARACTER VARYING,CHAR VARYING,NATIONAL CHARACTER,NATIONAL CHAR,NATIONAL CHARACTER VARYING,NATIONAL CHAR VARYING,NCHAR VARYING |
Boolean | bit, bool |
Bytes | BLOB,BFILE,RAW,LONG RAW |
HashData公司开源DataX介绍
普通DataX工具虽然也支持Greenplum(使用PostgreSQL插件),但是效率非常低,经测试速度只能达到每秒几千条(具体数字取决于表结构等因素)。其原因在于PostgreSQL插件采用的Batch Insert模式。
为了解决上述效率问题,HashData公司使用DataX进行修改,加入了GPDB的插件,该插件使用高效的copy模式,经测试速度可以达到10W条每秒以上,效率提升不止一个数量级。
以下将会以HashData开源的DataX进行介绍。
使用方法
本节将会介绍把Mysql中的数据导入到GP中的方法。
工具安装
DataX的安装非常简单,配置好maven环境之后,直接在DataX顶层目录运行
代码语言:txt复制mvn -U clean package assembly:assembly -Dmaven.test.skip=true
具体可参见DataX指导
数据准备
Mysql创建表
在mysql的数据库db1中创建test1表。
代码语言:txt复制create table test2 (id int auto_increment primary key not null, ca int, cb int, cc varchar(50));
Mysql数据生成
向表test1中插入测试数据1000条。
首先创建插入的存储过程:
代码语言:txt复制DELIMITER //
CREATE PROCEDURE db1.insert_multi_records(IN n INT)
BEGIN
DECLARE i INT DEFAULT 1;
DECLARE va INT DEFAULT 0;
DECLARE vb INT DEFAULT 0;
DECLARE status TINYINT DEFAULT 1;
WHILE i < n DO
SET va= FLOOR(1 RAND() * 10000);
SET vb = FLOOR(0 RAND()*3);
INSERT INTO db1.test2 VALUES (NULL, va, vb, 'abc');
SET i = i 1;
END WHILE;
END //
DELIMITER ;
插入1000条数据:
代码语言:txt复制CALL db1.insert_multi_records(1000);
GP创建表
在GP中创建与Mysql结构一致的表,当然GP中的表可以与Mysql中不一致,在进行导入作业时,可以通过参数设置那些列需要被导入,参考参数解释。
代码语言:txt复制CREATE TABLE test2(id SERIAL NOT NULL,ca INTEGER, cb INTEGER,cc VARCHAR(50)) DISTRIBUTED BY (id);
数据导入
从Mysql将数据导入到GP的配置文件如下,需注意写入端的writer直接选择gpdbwriter:
代码语言:txt复制{
"job": {
"setting": {
"speed": {
"channel": 3,
"byte": 1048576,
"record": 1000
},
"errorLimit": {
"record": 2,
"percentage": 0.02
}
},
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "****",
"password": "****",
"column": [
"*"
],
"splitPk": "id",
"connection": [
{
"table": [
"test1"
],
"jdbcUrl": [
"jdbc:mysql://***:***/db1"
]
}
]
}
},
"writer": {
"name": "gpdbwriter",
"parameter": {
"username": "******",
"password": "******",
"column": [
"*"
],
"preSql": [
"truncate table test1"
],
"postSql": [
"select count(*) from test2"
],
"segment_reject_limit": 0,
"copy_queue_size": 2000,
"num_copy_processor": 1,
"num_copy_writer": 1,
"connection": [
{
"jdbcUrl": "jdbc:postgresql://****:**/db1",
"table": [
"test1"
]
}
]
}
}
}
]
}
}
运行
代码语言:txt复制python {datax_home}/bin/datax.py ./mysql2gp.json
参数解释
连接信息
reader
所有与读取插件相关的参数都在job.content.reader中,以下参数省略所有job.content.reader前缀,例如 name等价于job.content.reader.name
- name表示插件的类型,在我们的例子中设置为mysqlreader,表示源端数据源类型为mysql。
- parameter表示所有与连接url相关的配置。 2.1 username与password分别表示连接数据库是的用户名和密码。 2.2 connection中的为jdbcUrl为连接数据库时的Url,各数据源的连接url有细微区别,具体可参考:
mysql:http://dev.mysql.com/doc/connector-j/en/connector-j-reference-configuration-properties.html
代码语言:txt复制sqlserver:http://technet.microsoft.com/zh-cn/library/ms378749(v=SQL.110).aspx
代码语言:txt复制oracle:http://www.oracle.com/technetwork/database/enterprise-edition/documentation/index.html
writer
与写插件相关的参数都在job.content.writer中,writer中与连接相关的参数与reader中的各参数意义相同,只是需要设置为目的数据源的相关配置,具体含义可以参考 gpdbwriter说明
并发控制
在DataX中,可以将一个作业根据策略划分成多个Task,Task作为迁移过程中执行的最小作业单位等待调度。
多表导入
数据导入时,可以对单张表或者多张表进行导入,在我们的例子中,只对单张表进行了导入,实际上可以通过对reader.connection.table进行设置,例如例子中的reader.connection参数设置以下参数,表示多张表同时导入。但是,多张表进行导入时,多张表的结果必须完全一致。
代码语言:txt复制"connection": [{
"table": [
"test2",
"test3"
],
"jdbcUrl": [
"jdbc:mysql://172.16.0.14:3306/db1"
]
}]
单表导入
出了使用多表进行并发,如果导入时只有一张表,同样可以进行并发导入。例如例子中的以下配置可以对表进行划分成多个Task。
代码语言:txt复制"reader": {
"splitPk": "id"
}
splitPk用设置对表进行划分时依赖的列,不一定要设置为主键,可以设置为能将表均匀散列的列,如果splitPk选取不合适,会出现Task负载不均的情况,并发效果较差。另外,splitPk只能选取数据类型为整形或者字符串类型的列,其他类型会报错。
例如,对于例子中的配置而言,以id作为splitPk作为划分键时,假设数据库中共计50W条数据,id为自增长的整型,并且取值为1~500000,通过拆分以后,会划分成50个数据量为1W的任务,并发导入。
并发通道
在job.setting.speed中,同样存在控制并发导入的参数,channel用来控制并发通道的数量。即,即使通过上述的splitPk对表进行了划分,但是如果channel设置不合理,也不能并发导入,比如,如果channel设置为1,那么50任务会串行通过该唯一通道进行迁移,而如果channel设置为10,会启动10个channel同时进行迁移。因此channel与splitPk必须搭配使用才能生效。
批量控制
在writer中,可以设置batchsize对插入进行批量处理。例如,不设置batchsize时,如果要插入两条数据,DataX会通过以下语句进行:
代码语言:txt复制insert into test2 (id, ca, cb, cc) values(1, 1, 2, 'abcdefg');
insert into test2 (id, ca, cb, cc) values(2, 1, 2, 'gfedcba');
这会导致DataX与GP之间会通过两次实务来插入量条数据,增加通信成本,而如果设置了batchsize,比如讲batchsize设置为100时,DataX会通过以下语句进行:
代码语言:txt复制insert into test2 (id, ca, cb, cc) values(1, 1, 2, 'abcdefg'),values(2, 1, 2, 'gfedcba');
当batchsize设置在100左右时,能够大幅度降低DataX和目标数据源之间的通信成本,提升导入性能。
条件过滤
结构过滤
通常情况下,源端数据源的表结构与目标数据源的表结构,如例子中的mysql与GP中数据迁移前后的表列数与类型完全一致,但是在某些情况下,迁移前后的结构是不一致的,通常表现后目标数据库中的表中只保留源端数据库表中的某几列,这时,可以通过reader.parameter和writer.parameter中的column参数来指定需要进行导入的列。例如:
代码语言:txt复制"reader": {
"column": [
"id",
"ca",
"cc"
],
"splitPk": "id",
}
代码语言:txt复制"writer": {
"column": [
"id",
"ca",
"cc"
]
},
如上配置,从mysql导入到GP中以后,只保留id,ca,cc三列数据,cb列的数据被丢弃。
数据过滤
例子中的配置,在每次作业时,会将源端数据库表中的数据全量导入到目标数据库表中,DataX支持对reader进行where条件配置,对源端数据库表的数据进行过滤,例如:
代码语言:txt复制"reader": {
"where" : "id>100"
}
通过配置where语句,通常可以用来对表的数据进行增量导入,例如,若表中有一个时间戳字段,下次导入时,可以选择一上次导入的最后一个时间戳开始,完成增量导入。
数据清理
在DataX进行作业前后,可以分别配置sql语句进行额外的处理,例如preSql会在导入任务执行前执行,postSql会在导入完成后执行。
如例子中的配置,每次在将数据导入到test2表中前,先将test2中的数据删除,避免重复导入。
代码语言:txt复制"reader": {
"preSql" : [
"delete from test2"
]
}
总结
虽然DataX提供了丰富的从其他数据库迁移到GP的方法,但是将数据导入到GP时,所有数据需要经过master根据分布键计算后再次进行分发,master的会成为数据导入过程中的性能瓶颈,后续会介绍其他不经过master的更高性能的导入方法。