Greenplum数据导入系列 -- (一)DataX

2019-12-18 17:27:54 浏览数 (1)

说明

本文描述问题及解决方法同样适用于 腾讯云Snova云数仓。

数据导入介绍

Greenplum(以下简称GP)支持多种数据导入方法,比如GP自带的gpfdist,通过gpfdist 外部表的形式将远端服务器上的数据并行导入到GP中,再比如GP自带的COPY命令,能够将本地的数据按照一定格式导入到GP中。除此之外,还有一些比较优秀的第三方导入工具,本文主要介绍DataX。

DataX概述

DataX是一款能够完成异构数据源之间数据迁移的软件,DataX采用FrameWork Plugin的软件架构,扩展方便。所有数据源中的数据都先转换为DataX的格式,然后在转换成目的端的数据格式,避免出现各异构数据源之间的类型相互转换。

image.pngimage.png

通过DataX,可以容易得将现有数据从mysql、sqlserver、oracle等迁移到Snova中。

支持的数据类型

使用DataX进行数据导入时,第一步是将源端数据源的数据转换为DataX的数据类型,然后将DataX的数据类型转换为目标数据源的数据类型。因此,在使用DataX前,需要先确认是否存在DataX不支持的数据类型,现有数据源中的数据类型与DataX的类型映射如下:

Greenplum

DataX数据类型

GP数据类型

Long

bigint, bigserial, integer, smallint, serial

Double

double precision, money, numeric, real

String

varchar, char, text, bit

Date

date, time, timestamp

Boolean

bool

Bytes

bytea

Mysql

DataX数据类型

Mysql数据类型

Long

int, tinyint, smallint, mediumint, int, bigint

Double

float, double, decimal

String

varchar, char, tinytext, text, mediumtext, longtext, year

Date

date, datetime, timestamp, time

Boolean

bit, bool

Bytes

tinyblob, mediumblob, blob, longblob, varbinary

Oracle

DataX数据类型

Oracle数据类型

Long

NUMBER,INTEGER,INT,SMALLINT

Double

NUMERIC,DECIMAL,FLOAT,DOUBLE PRECISION,REAL

String

char,nchar,ntext,nvarchar,text,varchar,nvarchar(MAX),varchar(MAX)

Date

LONG,CHAR,NCHAR,VARCHAR,VARCHAR2,NVARCHAR2,CLOB,NCLOB,CHARACTER,CHARACTER VARYING,CHAR VARYING,NATIONAL CHARACTER,NATIONAL CHAR,NATIONAL CHARACTER VARYING,NATIONAL CHAR VARYING,NCHAR VARYING

Boolean

bit, bool

Bytes

BLOB,BFILE,RAW,LONG RAW

HashData公司开源DataX介绍

普通DataX工具虽然也支持Greenplum(使用PostgreSQL插件),但是效率非常低,经测试速度只能达到每秒几千条(具体数字取决于表结构等因素)。其原因在于PostgreSQL插件采用的Batch Insert模式。

为了解决上述效率问题,HashData公司使用DataX进行修改,加入了GPDB的插件,该插件使用高效的copy模式,经测试速度可以达到10W条每秒以上,效率提升不止一个数量级。

以下将会以HashData开源的DataX进行介绍。

使用方法

本节将会介绍把Mysql中的数据导入到GP中的方法。

工具安装

DataX的安装非常简单,配置好maven环境之后,直接在DataX顶层目录运行

代码语言:txt复制
mvn -U clean package assembly:assembly -Dmaven.test.skip=true

具体可参见DataX指导

数据准备

Mysql创建表

在mysql的数据库db1中创建test1表。

代码语言:txt复制
create table test2 (id int auto_increment primary key not null, ca int, cb int, cc varchar(50));

Mysql数据生成

向表test1中插入测试数据1000条。

首先创建插入的存储过程:

代码语言:txt复制
DELIMITER //
CREATE PROCEDURE db1.insert_multi_records(IN n INT)
BEGIN
    DECLARE i INT DEFAULT 1;
    DECLARE va  INT DEFAULT 0;
    DECLARE vb INT DEFAULT 0;
    DECLARE status TINYINT DEFAULT 1;
    WHILE i < n DO
        SET va= FLOOR(1   RAND() * 10000);
        SET vb = FLOOR(0   RAND()*3);
        INSERT INTO db1.test2 VALUES (NULL, va, vb, 'abc');
        SET i = i   1;
    END WHILE;
END //
DELIMITER ;

插入1000条数据:

代码语言:txt复制
CALL db1.insert_multi_records(1000);

GP创建表

在GP中创建与Mysql结构一致的表,当然GP中的表可以与Mysql中不一致,在进行导入作业时,可以通过参数设置那些列需要被导入,参考参数解释。

代码语言:txt复制
CREATE TABLE test2(id SERIAL NOT NULL,ca INTEGER, cb INTEGER,cc VARCHAR(50)) DISTRIBUTED BY  (id);

数据导入

从Mysql将数据导入到GP的配置文件如下,需注意写入端的writer直接选择gpdbwriter:

代码语言:txt复制
{
    "job": {
        "setting": {
            "speed": {
                "channel": 3, 
                "byte": 1048576, 
                "record": 1000
            }, 
            "errorLimit": {
                "record": 2, 
                "percentage": 0.02
            }
        }, 
        "content": [
            {
                "reader": {
                    "name": "mysqlreader", 
                    "parameter": {
                        "username": "****", 
                        "password": "****", 
                        "column": [
                            "*"
                        ], 
                        "splitPk": "id", 
                        "connection": [
                            {
                                "table": [
                                    "test1"
                                ], 
                                "jdbcUrl": [
                                    "jdbc:mysql://***:***/db1"
                                ]
                            }
                        ]
                    }
                }, 
                "writer": {
                    "name": "gpdbwriter", 
                    "parameter": {
                        "username": "******", 
                        "password": "******", 
                        "column": [
                            "*"
                        ], 
                        "preSql": [
                            "truncate table test1"
                        ], 
                        "postSql": [
                            "select count(*) from test2"
                        ], 
                        "segment_reject_limit": 0, 
                        "copy_queue_size": 2000, 
                        "num_copy_processor": 1, 
                        "num_copy_writer": 1, 
                        "connection": [
                            {
                                "jdbcUrl": "jdbc:postgresql://****:**/db1", 
                                "table": [
                                    "test1"
                                ]
                            }
                        ]
                    }
                }
            }
        ]
    }
}

运行

代码语言:txt复制
python {datax_home}/bin/datax.py ./mysql2gp.json 

参数解释

连接信息

reader

所有与读取插件相关的参数都在job.content.reader中,以下参数省略所有job.content.reader前缀,例如 name等价于job.content.reader.name

  1. name表示插件的类型,在我们的例子中设置为mysqlreader,表示源端数据源类型为mysql。
  2. parameter表示所有与连接url相关的配置。 2.1 username与password分别表示连接数据库是的用户名和密码。 2.2 connection中的为jdbcUrl为连接数据库时的Url,各数据源的连接url有细微区别,具体可参考:
代码语言:txt复制
mysql:http://dev.mysql.com/doc/connector-j/en/connector-j-reference-configuration-properties.html
代码语言:txt复制
sqlserver:http://technet.microsoft.com/zh-cn/library/ms378749(v=SQL.110).aspx
代码语言:txt复制
oracle:http://www.oracle.com/technetwork/database/enterprise-edition/documentation/index.html
writer

与写插件相关的参数都在job.content.writer中,writer中与连接相关的参数与reader中的各参数意义相同,只是需要设置为目的数据源的相关配置,具体含义可以参考 gpdbwriter说明

并发控制

在DataX中,可以将一个作业根据策略划分成多个Task,Task作为迁移过程中执行的最小作业单位等待调度。

多表导入

数据导入时,可以对单张表或者多张表进行导入,在我们的例子中,只对单张表进行了导入,实际上可以通过对reader.connection.table进行设置,例如例子中的reader.connection参数设置以下参数,表示多张表同时导入。但是,多张表进行导入时,多张表的结果必须完全一致

代码语言:txt复制
"connection": [{
						"table": [
							"test2",
                            "test3"
						],
						"jdbcUrl": [
							"jdbc:mysql://172.16.0.14:3306/db1"
						]
					}]
单表导入

出了使用多表进行并发,如果导入时只有一张表,同样可以进行并发导入。例如例子中的以下配置可以对表进行划分成多个Task。

代码语言:txt复制
"reader": {
					"splitPk": "id"
			}

splitPk用设置对表进行划分时依赖的列,不一定要设置为主键,可以设置为能将表均匀散列的列,如果splitPk选取不合适,会出现Task负载不均的情况,并发效果较差。另外,splitPk只能选取数据类型为整形或者字符串类型的列,其他类型会报错。

例如,对于例子中的配置而言,以id作为splitPk作为划分键时,假设数据库中共计50W条数据,id为自增长的整型,并且取值为1~500000,通过拆分以后,会划分成50个数据量为1W的任务,并发导入。

并发通道

在job.setting.speed中,同样存在控制并发导入的参数,channel用来控制并发通道的数量。即,即使通过上述的splitPk对表进行了划分,但是如果channel设置不合理,也不能并发导入,比如,如果channel设置为1,那么50任务会串行通过该唯一通道进行迁移,而如果channel设置为10,会启动10个channel同时进行迁移。因此channel与splitPk必须搭配使用才能生效。

批量控制

在writer中,可以设置batchsize对插入进行批量处理。例如,不设置batchsize时,如果要插入两条数据,DataX会通过以下语句进行:

代码语言:txt复制
insert into test2 (id, ca, cb, cc) values(1, 1, 2, 'abcdefg');
insert into test2 (id, ca, cb, cc) values(2, 1, 2, 'gfedcba');

这会导致DataX与GP之间会通过两次实务来插入量条数据,增加通信成本,而如果设置了batchsize,比如讲batchsize设置为100时,DataX会通过以下语句进行:

代码语言:txt复制
insert into test2 (id, ca, cb, cc) values(1, 1, 2, 'abcdefg'),values(2, 1, 2, 'gfedcba');

当batchsize设置在100左右时,能够大幅度降低DataX和目标数据源之间的通信成本,提升导入性能。

条件过滤

结构过滤

通常情况下,源端数据源的表结构与目标数据源的表结构,如例子中的mysql与GP中数据迁移前后的表列数与类型完全一致,但是在某些情况下,迁移前后的结构是不一致的,通常表现后目标数据库中的表中只保留源端数据库表中的某几列,这时,可以通过reader.parameter和writer.parameter中的column参数来指定需要进行导入的列。例如:

代码语言:txt复制
"reader": {
					"column": [
						"id",
                        "ca",
                        "cc"
					],
					"splitPk": "id",
			}
代码语言:txt复制
"writer": {
					"column": [
						"id",
                        "ca",
                        "cc"
					]
			},

如上配置,从mysql导入到GP中以后,只保留id,ca,cc三列数据,cb列的数据被丢弃。

数据过滤

例子中的配置,在每次作业时,会将源端数据库表中的数据全量导入到目标数据库表中,DataX支持对reader进行where条件配置,对源端数据库表的数据进行过滤,例如:

代码语言:txt复制
"reader": {
				"where" : "id>100"
			}

通过配置where语句,通常可以用来对表的数据进行增量导入,例如,若表中有一个时间戳字段,下次导入时,可以选择一上次导入的最后一个时间戳开始,完成增量导入。

数据清理

在DataX进行作业前后,可以分别配置sql语句进行额外的处理,例如preSql会在导入任务执行前执行,postSql会在导入完成后执行。

如例子中的配置,每次在将数据导入到test2表中前,先将test2中的数据删除,避免重复导入。

代码语言:txt复制
"reader": {
				"preSql" : [
                    "delete from test2"
                ]
			}

总结

虽然DataX提供了丰富的从其他数据库迁移到GP的方法,但是将数据导入到GP时,所有数据需要经过master根据分布键计算后再次进行分发,master的会成为数据导入过程中的性能瓶颈,后续会介绍其他不经过master的更高性能的导入方法。

0 人点赞