目录
一、背景
二、框架设计
三、核心架构
核心模块介绍:
DataX调度流程:
四、目前支持的数据源清单
五、案例
1.从mysql同步全量数据到hive无分区表的json文件配置
2.从mysql同步增量数据到hive无分区表的json文件配置
3.从mysql同步全量数据到hive分区表的json文件配置
4.从hive同步全量数据到mysql的json文件配置
5.从hive同步增量数据到mysql的json文件配置
6.从Postgre同步全量数据到hive分区表的json文件配置
7.从Postgre同步全量数据到hive分区表的json文件配置
8.从mysql同步数据到doris的json文件配置
六、执行
一、背景
DataX 是阿里云DataWorks数据集成的开源版本,在阿里巴巴集团内被广泛使用的离线数据同步工具/平台。DataX 实现了包括 MySQL、Oracle、OceanBase、SqlServer、Postgre、HDFS、Hive、ADS、HBase、TableStore(OTS)、MaxCompute(ODPS)、Hologres、DRDS 等各种异构数据源之间高效的数据同步功能。
二、框架设计
DataX本身作为离线数据同步框架,采用Framework plugin架构构建。将数据源读取和写入抽象成为Reader/Writer插件,纳入到整个同步框架中。
- Reader:Reader为数据采集模块,负责采集数据源的数据,将数据发送给Framework。
- Writer: Writer为数据写入模块,负责不断向Framework取数据,并将数据写入到目的端。
- Framework:Framework用于连接reader和writer,作为两者的数据传输通道,并处理缓冲,流控,并发,数据转换等核心技术问题。
三、核心架构
核心模块介绍:
- DataX完成单个数据同步的作业,我们称之为Job,DataX接受到一个Job之后,将启动一个进程来完成整个作业同步过程。DataX Job模块是单个作业的中枢管理节点,承担了数据清理、子任务切分(将单一作业计算转化为多个子Task)、TaskGroup管理等功能。
- DataXJob启动后,会根据不同的源端切分策略,将Job切分成多个小的Task(子任务),以便于并发执行。Task便是DataX作业的最小单元,每一个Task都会负责一部分数据的同步工作。
- 切分多个Task之后,DataX Job会调用Scheduler模块,根据配置的并发数据量,将拆分成的Task重新组合,组装成TaskGroup(任务组)。每一个TaskGroup负责以一定的并发运行完毕分配好的所有Task,默认单个任务组的并发数量为5。
- 每一个Task都由TaskGroup负责启动,Task启动后,会固定启动Reader—>Channel—>Writer的线程来完成任务同步工作。
- DataX作业运行起来之后, Job监控并等待多个TaskGroup模块任务完成,等待所有TaskGroup任务完成后Job成功退出。否则,异常退出,进程退出值非0
DataX调度流程:
举例来说,用户提交了一个DataX作业,并且配置了20个并发,目的是将一个100张分表的mysql数据同步到odps里面。 DataX的调度决策思路是:
- DataXJob根据分库分表切分成了100个Task。
- 根据20个并发,DataX计算共需要分配4个TaskGroup。
- 4个TaskGroup平分切分好的100个Task,每一个TaskGroup负责以5个并发共计运行25个Task。
四、目前支持的数据源清单
类型 | 数据源 | Reader(读) | Writer(写) |
---|---|---|---|
RDBMS 关系型数据库 | MySQL | MysqlReader | MysqlWriter |
Oracle | OracleReader | OracleWriter | |
OceanBase | oceanbasev10reader | oceanbasev10writer | |
SQLServer | SqlServerReader | SqlServerWriter | |
PostgreSQL | PostgresqlReader | PostgresqlWriter | |
DRDS | DrdsReader | DRDSWriter | |
Kudu | kuduwriter | ||
Clickhouse | clickwriter | ||
通用RDBMS(支持所有关系型数据库) | RDBMSReader | RDBMSWriter | |
阿里云数仓数据存储 | ODPS | ODPSReader | ODPSWriter |
ADS | ADSWriter | ||
OSS | OSSReader | OSSWriter | |
OCS | OCSWriter | ||
NoSQL数据存储 | OTS | OTSReaderotsstreamreader | OTSWriter |
Hbase0.94 | Hbase094XReader | Hbase094XWriter | |
Hbase1.1 | Hbase11XReader | Hbase11XWriter | |
Phoenix4.x | hbase11xsqlreader | HBase11xsqlwriter | |
Phoenix5.x | hbase20xsqlreader | HBase20xsqlwriter | |
MongoDB | MongoDBReader | MongoDBWriter | |
Hive | HdfsReader | HdfsWriter | |
Cassandra | CassandraReader | CassandraWriter | |
无结构化数据存储 | TxtFile | TxtFileReader | TxtFileWriter |
FTP | FtpReader | FtpWriter | |
HDFS | HdfsReader | HdfsWriter | |
Elasticsearch | ElasticSearchWriter | ||
时间序列数据库 | OpenTSDB | OpenTSDBReader | |
TSDB | TSDBReader | TSDBWriter | |
TDengine | TDengineReader | TDengineWriter |
数据源参考指南:GitHub - alibaba/DataX: DataX是阿里云DataWorks数据集成的开源版本。
五、案例
1.从mysql同步全量数据到hive无分区表的json文件配置
代码语言:javascript复制{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"connection": [
{
"jdbcUrl": ["jdbc:mysql://ip:port/db_name?useSSL=false"],
"querySql": ["select * from table_name"],
}
],
"username": "username",
"password": "password"
}
},
"writer": {
"name": "hdfswriter",
"parameter": {
"defaultFS": "hdfs://ip:port",
"fileType": "text",
"path": "/user/hive/warehouse/db_name.db/hive_table_name_da",
"fileName": "hive_table_name",
"column": [
{"name":"id","type":"int"},
{"name":"name","type":"string"}
],
"writeMode": "append",
"fieldDelimiter": "t",
"encoding": "utf-8"
}
}
}],
"setting": {
"speed": {
"channel": "1"
},
"errorLimit": {
"record": 0,
"percentage": 0.02
}
}
}
}
2.从mysql同步增量数据到hive无分区表的json文件配置
代码语言:javascript复制{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"connection": [
{
"jdbcUrl": ["jdbc:mysql://ip:port/db_name?useSSL=false"],
"querySql": ["select * from mysql_table_name where date(date_created)='${date_create}'"],
}
],
"username": "username",
"password": "password"
}
},
"writer": {
"name": "hdfswriter",
"parameter": {
"defaultFS": "hdfs://ip:port",
"fileType": "text",
"path": "/user/hive/warehouse/db_name.db/hive_table_name_da",
"fileName": "hive_table_name",
"column": [
{"name":"id","type":"int"},
{"name":"name","type":"string"}
],
"writeMode": "append",
"fieldDelimiter": "t",
"encoding": "utf-8"
}
}
}],
"setting": {
"speed": {
"channel": "1"
},
"errorLimit": {
"record": 0,
"percentage": 0.02
}
}
}
}
3.从mysql同步全量数据到hive分区表的json文件配置
代码语言:javascript复制{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"connection": [
{
"jdbcUrl": ["jdbc:mysql://ip:port/db_name?useSSL=false"],
"querySql": ["select * from mysql_table_name"],
}
],
"username": "username",
"password": "password"
}
},
"writer": {
"name": "hdfswriter",
"parameter": {
"defaultFS": "hdfs://ip:port",
"fileType": "text",
"path": "/user/hive/warehouse/db_name.db/hive_table_name_ds/ds=2022-09-16",
"fileName": "hive_table_name",
"column": [
{"name":"id","type":"int"},
{"name":"name","type":"string"}
],
"writeMode": "append",
"fieldDelimiter": "t",
"encoding": "utf-8"
}
}
}],
"setting": {
"speed": {
"channel": "1"
},
"errorLimit": {
"record": 0,
"percentage": 0.02
}
}
}
}
4.从hive同步全量数据到mysql的json文件配置
代码语言:javascript复制{
"job": {
"setting": {
"speed": {
"channel": 1
},
"errorLimit": {
"record": 0,
"percentage": 0.02
}
},
"content": [{
"reader": {
"name": "hdfsreader",
"parameter": {
"path":"/user/hive/warehouse/db_name.db/hive_table_name",
"defaultFS": "hdfs://ip:port",
"column": [
{
"index": 0,
"type": "long"
},
{
"index": 1,
"type": "string"
},
{
"index": 3,
"type": "long"
}
],
"fileType": "text",
"encoding": "UTF-8",
"fieldDelimiter": "t"
}
},
"writer": {
"name": "mysqlwriter",
"parameter": {
"writeMode": "insert",
"username": "username",
"password": "password",
"column": [
"id",
"name",
"age"
],
"session": [
"set session sql_mode='ANSI'"
],
"preSql": [
"truncate table mysql_table_name"
],
"connection": [{
"jdbcUrl": "jdbc:mysql://ip:port/db_name?useUnicode=true&characterEncoding=utf8",
"table": [
"mysql_table_name"
]
}]
}
}
}]
}
}
5.从hive同步增量数据到mysql的json文件配置
代码语言:javascript复制{
"job": {
"setting": {
"speed": {
"channel": 1
},
"errorLimit": {
"record": 0,
"percentage": 0.02
}
},
"content": [{
"reader": {
"name": "hdfsreader",
"parameter": {
"path":"/user/hive/warehouse/db_name.db/hive_table_name/ds=${ds}",
"defaultFS": "hdfs://ip:port",
"column": [
{
"index": 0,
"type": "long"
},
{
"index": 1,
"type": "string"
},
{
"index": 3,
"type": "long"
}
],
"fileType": "text",
"encoding": "UTF-8",
"fieldDelimiter": "t"
}
},
"writer": {
"name": "mysqlwriter",
"parameter": {
"writeMode": "insert",
"username": "username",
"password": "password",
"column": [
"id",
"name",
"age"
],
"session": [
"set session sql_mode='ANSI'"
],
"preSql": [
"truncate table mysql_table_name"
],
"connection": [{
"jdbcUrl": "jdbc:mysql://ip:port/db_name?useUnicode=true&characterEncoding=utf8",
"table": [
"mysql_table_name"
]
}]
}
}
}]
}
}
6.从Postgre同步全量数据到hive分区表的json文件配置
代码语言:javascript复制{
"job": {
"content": [
{
"reader": {
"name": "postgresqlreader",
"parameter": {
"connection": [
{
"jdbcUrl": ["jdbc:postgresql://ip:port/pg_db_name"],
"querySql": ["select * from pg_table_name"],
}
],
"username": "username",
"password": "password"
}
},
"writer": {
"name": "hdfswriter",
"parameter": {
"defaultFS": "hdfs://ip:port",
"fileType": "text",
"path": "/user/hive/warehouse/hive_db_name.db/hive_table_name/ds=${ds}",
"fileName": "hive_table_name",
"column": [
{"name":"id","type":"bigint"},
{"name":"name","type":"string"},
{"name":"date_create","type":"string"}
],
"writeMode": "append",
"fieldDelimiter": "t",
"encoding": "utf-8"
}
}
}],
"setting": {
"speed": {
"channel": "1"
}
}
}
}
7.从Postgre同步全量数据到hive分区表的json文件配置
代码语言:javascript复制{
"job": {
"content": [
{
"reader": {
"name": "postgresqlreader",
"parameter": {
"connection": [
{
"jdbcUrl": ["jdbc:postgresql://ip:[ort/pg_db_name"],
"querySql": ["select * from pg_table_name where date_create='${date_create}'"],
}
],
"username": "username",
"password": "password"
}
},
"writer": {
"name": "hdfswriter",
"parameter": {
"defaultFS": "hdfs://ip:port",
"fileType": "text",
"path": "/user/hive/warehouse/hive_db_name.db/hive_table_name/ds=${ds}",
"fileName": "hive_table_name",
"column": [
{"name":"id","type":"bigint"},
{"name":"name","type":"string"},
{"name":"date_create","type":"string"}
],
"writeMode": "append",
"fieldDelimiter": "t",
"encoding": "utf-8"
}
}
}],
"setting": {
"speed": {
"channel": "1"
},
"errorLimit": {
"record": 0,
"percentage": 0.02
}
}
}
}
8.从mysql同步数据到doris的json文件配置
代码语言:javascript复制{
"core":{
"transport": {
"channel": {
"speed": {
"byte": 104857600,
"record": 200000
}
}
}
},
"job": {
"setting": {
"speed": {
"channel": 1
},
"errorLimit": {
"record": 0,
"percentage": 0
}
},
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "username",
"password": "password",
"connection": [
{
"jdbcUrl": [
"jdbc:mysql://ip:port/mysql_db_name"
],
"querySql": [
"select * from mysql_table_name;"
]
}
]
}
},
"writer": {
"name": "doriswriter",
"parameter": {
"username": "username",
"password": "password",
"database": "db_name",
"table": "table_name",
"column": [ "column1","column2","column3"],
"preSql": [],
"postSql": [],
"jdbcUrl": "jdbc:mysql://ip:port/",
"feLoadUrl": ["cdh3:port"],
"beLoadUrl": ["cdh1:port", "cdh2:port", "cdh3:port"],
"loadProps": {
},
"maxBatchRows" : 200000,
"maxBatchByteSize" : 104857600,
"lineDelimiter": "n"
}
}
}
]
}
}
六、执行
执行命令
代码语言:javascript复制$ python datax.py conf.json