一、背景
使用datax同步postgre库表数据到hive表中,执行后报错:
java.lang.IllegalArgumentException: No enum constant com.alibaba.datax.plugin.writer.hdfswriter.SupportHiveDataType.DECIMAL(6,4)
二、定位问题
看到关键字enumdecimal,就知道肯定是数据类型的问题了,果然,datax HdfsWriter不支持decimal 嘤嘤嘤~
三、解决方案
1.将json中字段类型为decimal的替换为double
{ "name":"col_name", "type":"decimal(6,4)" }
替换成
{ "name":"col_name", "type":"double" }
2.修改datax源码,在枚举类中新增DECIMAL
ORC中获取字段序列化器的入口位置[HdfsHelper.getColumnTypeInspectors]方法内部
代码语言:javascript复制HdfsHelper:
// 根据writer配置的字段类型,构建序列化器
public List<ObjectInspector> getColumnTypeInspectors(List<Configuration> columns){
List<ObjectInspector> columnTypeInspectors = Lists.newArrayList();
for (Configuration eachColumnConf : columns) {
SupportHiveDataType columnType = SupportHiveDataType.valueOf(eachColumnConf.getString(Key.TYPE).toUpperCase());
ObjectInspector objectInspector = null;
switch (columnType) {
case TINYINT:
objectInspector = ObjectInspectorFactory.getReflectionObjectInspector(Byte.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
break;
...
}
代码语言:javascript复制ObjectInspectorFactory:
public static ObjectInspector getReflectionObjectInspector(Type t, ObjectInspectorFactory.ObjectInspectorOptions options) {
// 优先从缓存中获取
ObjectInspector oi = (ObjectInspector)objectInspectorCache.get(t);
if (oi == null) {
// 缓存中不存在,获取实际类,并添加到缓存中
oi = getReflectionObjectInspectorNoCache(t, options);
objectInspectorCache.put(t, oi);
}
...
return oi;
}
private static ObjectInspector getReflectionObjectInspectorNoCache(Type t, ObjectInspectorFactory.ObjectInspectorOptions options) {
// 开头就验证Map,Array类型的复合字段类型,这就说明了其实hive提供的sdk本身也是支持这些字段类型写入的
if (t instanceof GenericArrayType) {
GenericArrayType at = (GenericArrayType)t;
return getStandardListObjectInspector(getReflectionObjectInspector(at.getGenericComponentType(), options));
} else {
if (t instanceof ParameterizedType) {
ParameterizedType pt = (ParameterizedType)t;
if (List.class.isAssignableFrom((Class)pt.getRawType()) || Set.class.isAssignableFrom((Class)pt.getRawType())) {
return getStandardListObjectInspector(getReflectionObjectInspector(pt.getActualTypeArguments()[0], options));
}
if (Map.class.isAssignableFrom((Class)pt.getRawType())) {
return getStandardMapObjectInspector(getReflectionObjectInspector(pt.getActualTypeArguments()[0], options), getReflectionObjectInspector(pt.getActualTypeArguments()[1], options));
}
t = pt.getRawType();
}
if (!(t instanceof Class)) {
throw new RuntimeException(ObjectInspectorFactory.class.getName() " internal error:" t);
} else {
Class<?> c = (Class)t;
// 根据传入的不同类去不同的缓存中获取class对象
if (PrimitiveObjectInspectorUtils.isPrimitiveJavaType(c)) {
return PrimitiveObjectInspectorFactory.getPrimitiveJavaObjectInspector(PrimitiveObjectInspectorUtils.getTypeEntryFromPrimitiveJavaType(c).primitiveCategory);
} else if (PrimitiveObjectInspectorUtils.isPrimitiveJavaClass(c)) {
return PrimitiveObjectInspectorFactory.getPrimitiveJavaObjectInspector(PrimitiveObjectInspectorUtils.getTypeEntryFromPrimitiveJavaClass(c).primitiveCategory);
} else if (PrimitiveObjectInspectorUtils.isPrimitiveWritableClass(c)) {
return PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector(PrimitiveObjectInspectorUtils.getTypeEntryFromPrimitiveWritableClass(c).primitiveCategory);
}
...
}
}
代码语言:javascript复制PrimitiveObjectInspectorUtils:
// 缓存中注册类型
static void registerType(PrimitiveObjectInspectorUtils.PrimitiveTypeEntry t) {
...
if (t.primitiveJavaType != null) {
primitiveJavaTypeToTypeEntry.put(t.primitiveJavaType, t);
}
if (t.primitiveJavaClass != null) {
primitiveJavaClassToTypeEntry.put(t.primitiveJavaClass, t);
}
if (t.primitiveWritableClass != null) {
primitiveWritableClassToTypeEntry.put(t.primitiveWritableClass, t);
}
...
}
// 静态代码块初始化
static {
binaryTypeEntry = new PrimitiveObjectInspectorUtils.PrimitiveTypeEntry(PrimitiveCategory.BINARY, "binary", byte[].class, byte[].class, BytesWritable.class);
stringTypeEntry = new PrimitiveObjectInspectorUtils.PrimitiveTypeEntry(PrimitiveCategory.STRING, "string", (Class)null, String.class, Text.class);
booleanTypeEntry = new PrimitiveObjectInspectorUtils.PrimitiveTypeEntry(PrimitiveCategory.BOOLEAN, "boolean", Boolean.TYPE, Boolean.class, BooleanWritable.class);
intTypeEntry = new PrimitiveObjectInspectorUtils.PrimitiveTypeEntry(PrimitiveCategory.INT, "int", Integer.TYPE, Integer.class, IntWritable.class);
longTypeEntry = new PrimitiveObjectInspectorUtils.PrimitiveTypeEntry(PrimitiveCategory.LONG, "bigint", Long.TYPE, Long.class, LongWritable.class);
floatTypeEntry = new PrimitiveObjectInspectorUtils.PrimitiveTypeEntry(PrimitiveCategory.FLOAT, "float", Float.TYPE, Float.class, FloatWritable.class);
voidTypeEntry = new PrimitiveObjectInspectorUtils.PrimitiveTypeEntry(PrimitiveCategory.VOID, "void", Void.TYPE, Void.class, NullWritable.class);
doubleTypeEntry = new PrimitiveObjectInspectorUtils.PrimitiveTypeEntry(PrimitiveCategory.DOUBLE, "double", Double.TYPE, Double.class, DoubleWritable.class);
byteTypeEntry = new PrimitiveObjectInspectorUtils.PrimitiveTypeEntry(PrimitiveCategory.BYTE, "tinyint", Byte.TYPE, Byte.class, ByteWritable.class);
shortTypeEntry = new PrimitiveObjectInspectorUtils.PrimitiveTypeEntry(PrimitiveCategory.SHORT, "smallint", Short.TYPE, Short.class, ShortWritable.class);
dateTypeEntry = new PrimitiveObjectInspectorUtils.PrimitiveTypeEntry(PrimitiveCategory.DATE, "date", (Class)null, Date.class, DateWritable.class);
timestampTypeEntry = new PrimitiveObjectInspectorUtils.PrimitiveTypeEntry(PrimitiveCategory.TIMESTAMP, "timestamp", (Class)null, Timestamp.class, TimestampWritable.class);
decimalTypeEntry = new PrimitiveObjectInspectorUtils.PrimitiveTypeEntry(PrimitiveCategory.DECIMAL, "decimal", (Class)null, HiveDecimal.class, HiveDecimalWritable.class);
varcharTypeEntry = new PrimitiveObjectInspectorUtils.PrimitiveTypeEntry(PrimitiveCategory.VARCHAR, "varchar", (Class)null, HiveVarchar.class, HiveVarcharWritable.class);
charTypeEntry = new PrimitiveObjectInspectorUtils.PrimitiveTypeEntry(PrimitiveCategory.CHAR, "char", (Class)null, HiveChar.class, HiveCharWritable.class);
unknownTypeEntry = new PrimitiveObjectInspectorUtils.PrimitiveTypeEntry(PrimitiveCategory.UNKNOWN, "unknown", (Class)null, Object.class, (Class)null);
registerType(binaryTypeEntry);
registerType(stringTypeEntry);
registerType(charTypeEntry);
registerType(varcharTypeEntry);
registerType(booleanTypeEntry);
registerType(intTypeEntry);
registerType(longTypeEntry);
registerType(floatTypeEntry);
registerType(voidTypeEntry);
registerType(doubleTypeEntry);
registerType(byteTypeEntry);
registerType(shortTypeEntry);
registerType(dateTypeEntry);
registerType(timestampTypeEntry);
registerType(decimalTypeEntry);
registerType(unknownTypeEntry);
}
hive底层是支持decimal类型的字段写入的,所以只需要拿到入参的class类 (HiveDecimal.class, HiveDecimalWritable.class),回到HdfsHelper中,添加decimal类型,并在枚举类中新增DECIMAL即可
代码语言:javascript复制case DECIMAL:
objectInspector = ObjectInspectorFactory.getReflectionObjectInspector(HiveDecimal.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
break;