一、背景
使用datax从pg同步数据到hive发生报错
datax自定义json内容如下:
代码语言:javascript复制{
"job": {
"setting": {
"speed": {
"channel": 3,
"byte": 1048576
},
"errorLimit": {
"record": 0,
"percentage": 0.02
}
},
"content": [
{
"reader": {
"name": "postgresqlreader",
"parameter": {
"username": "username",
"password": "password",
"column": [
""id"",
""timestamp""
],
"splitPk": "",
"connection": [
{
"table": [
"table_name"
],
"jdbcUrl": [
"jdbc:ip:port/db"
]
}
]
}
},
"writer": {
"name": "hdfswriter",
"parameter": {
"defaultFS": "hdfs://ip:port",
"fileType": "orc",
"path": "/user/hive/warehouse/db_name.db/table_name",
"fileName": "table_name",
"writeMode": "append",
"fieldDelimiter": "t",
"encoding": "utf-8",
"column": [
{
"name": "id",
"type": "bigint"
},
{
"name": "timestamp",
"type": "bigint"
}
]
}
}
}
]
}
}
二、报错
[DataX引擎配置错误,该问题通常是由于DataX安装错误引起,请联系您的运维解决 .]. - 在有总bps限速条件下,单个channel的bps值不能为空,也不能为非正数
三、定位原因
很明显,error信息里面也说了,DataX的配置有问题,单个channel的bps值不能为空,也不能为非正数
四、解决办法
1.修改文件datax/conf/core.json
修改core -> transport -> channel -> speed -> "byte": -1
代码语言:javascript复制"core": {
"dataXServer": {
"address": "http://localhost:7001/api",
"timeout": 10000,
"reportDataxLog": false,
"reportPerfLog": false
},
"transport": {
"channel": {
"class": "com.alibaba.datax.core.transport.channel.memory.MemoryChannel",
"speed": {
"byte": -1,
"record": -1
},
"flowControlInterval": 20,
"capacity": 512,
"byteCapacity": 67108864
},
"exchanger": {
"class": "com.alibaba.datax.core.plugin.BufferedRecordExchanger",
"bufferSize": 32
}
},
}
2.json内容中删除总限速的配置
偷懒的方法,小白可以这么用
代码语言:javascript复制"setting": {
"speed": {
"channel": 3
}
五、参数说明
关键参数
- job.setting.speed.channel : channel并发数
- job.setting.speed.record : 全局配置channel的record限速
- job.setting.speed.byte:全局配置channel的byte限速
- core.transport.channel.speed.record:单个channel的record限速
- core.transport.channel.speed.byte:单个channel的byte限速
六、DataX提速优化
1.提升每个channel的速度
2.提升job内Channel并发有三种配置方式
bps限速:配置全局Byte限速以及单Channel Byte限速(Channel个数 = 全局Byte限速 / 单Channel Byte限速)
代码语言:javascript复制总bps限速通过job.setting.speed.byte设置,在job.json中。
单个channel的bps值通过core.transport.channel.speed.byte设置,在core.json中,默认为-1。
如果设置了总bps限速,那单个channel的bps值不能为空,也不能为非正数。
channel的数量=总bps限速/单个channel的bps值。
比如总bps限速为100,单个channel的bps为10,那channel的数量就是100/10=10个
tps限速:配置全局Record限速以及单Channel Record限速(Channel个数 = 全局Record限速 / 单Channel Record限速)
代码语言:javascript复制总tps限速通过job.setting.speed.record设置,在job.json中。
单个channel的tps值通过core.transport.channel.speed.record设置,在core.json中,默认为-1。
如果设置了总tps限速,那单个channel的tps值不能为空,也不能为非正数。
channel的数量=总tps限速/单个channel的tps值。
比如总tps限速为200,单个channel的tps为40,那channel的数量就是200/40=5个。
直接配置Channel个数
代码语言:javascript复制通过job.setting.speed.channel直接设置。
比如这个值设置为5,那channel的数量就是5个。
3.提高JVM堆内存
python datax/bin/datax.py --jvm="-Xms8G -Xmx8G" XXX.json
优先级
如果以上三个都设置了,那以哪个为主呢?
如果bps限速和tps限速没有设置,那channel的数量默认都是Integer.MAX_VALUE。
如果bps限速和tps限速都设置了,那谁比较小,以哪个为主。
比如上面的例子,tps的channel个数5小于bps的channel个数10,那channel的个数最终为5。
如果bps限速和tps限速只设置了一个,那以设置的那个为准,因为没设置的那个默认Integer.MAX_VALUE,肯定大于设置的那个,所以取小的数就是设置的那个。
如果bps限速和tps限速都没有设置,那才读取直接设置的值,也就是说,直接设置的优先级最低。
如果都没有设置,那直接抛异常,也就是说,必须设置Job运行速度。