datax(19):源码解读内置Transformer「建议收藏」

2022-08-30 08:58:54 浏览数 (1)

大家好,又见面了,我是你们的朋友全栈君。

通过datax(18)已经对transformer有了初步了解,继续撸代码,看datax已经内置的5种简单类型transformer;

一、概述

目前datax内置了5种常用的transformer,分别如下

  1. 截取SubstrTransformer
  2. 填充PadTransformer
  3. 替换ReplaceTransformer
  4. 过滤FilterTransformer
  5. Groovy类型GroovyTransformer

二、SubstrTransformer

主要是对record中的column的值进行按照长度截取;

  • 参数:3个
    • 第一个参数:字段编号,对应record中第几个字段。
    • 第二个参数:字段值的开始位置。
    • 第三个参数:目标字段长度。
  • 返回: 从字符串的指定位置(包含)截取指定长度的字符串。如果开始位置非法抛出异常。如果字段为空值,直接返回(即不参与本transformer)
  • 举例:
代码语言:javascript复制
dx_substr(1,"2","5")  column 1的value为“dataxTest”=>"taxTe"
dx_substr(1,"5","10")  column 1的value为“dataxTest”=>"Test"

里面第一个方法是给该transformer一个唯一的标识符name(后续几个transformer的子类类似,不在赘述)

代码语言:javascript复制
 /** * 给该transform起一个唯一标识符 */
  public SubstrTransformer() { 
   
    setTransformerName("dx_substr");
  }

第二个方法是实现父类的evaluate

代码语言:javascript复制
  /** * 参数:3个 <br> * 第一个参数:字段编号,对应record中第几个字段。 <br> * 第二个参数:字段值的开始位置。 <br> * 第三个参数:目标字段长度。 <br> * 举例: <br> * dx_substr(1,"2","5") column 1的value为“dataxTest”=>"taxTe" * dx_substr(1,"5","10") column 1的value为“dataxTest”=>"Test" * * @param record Record 行记录,UDF进行record的处理后,更新相应的record * @param paras Object transformer函数参数 * @return Record从字符串的指定位置(包含)截取指定长度的字符串。如果开始位置非法抛出异常。如果字段为空值,直接返回 * (即不参与本transformer) */
  @Override
  public Record evaluate(Record record, Object... paras) { 
   

    int columnIndex;
    int startIndex;
    int targetLen;
    try { 
   
      // 参数异常检测
      if (paras.length != 3) { 
   
        throw new RuntimeException("dx_substr paras must be 3");
      }
      //获取对应参数值
      columnIndex = (Integer) paras[0];
      startIndex = Integer.valueOf((String) paras[1]);
      targetLen = Integer.valueOf((String) paras[2]);

    } catch (Exception e) { 
   
      throw DataXException.asDataXException(TransformerErrorCode.TRANSFORMER_ILLEGAL_PARAMETER,
          "paras:"   Arrays.asList(paras).toString()   " => "   e.getMessage());
    }
    // 根据index从record中获取 column
    Column column = record.getColumn(columnIndex);
    try { 
   
      String oriValue = column.asString();
      //如果字段为空,跳过subStr处理
      if (oriValue == null) { 
   
        return record;
      }
      String newValue;
      int oriLen = oriValue.length();
      if (startIndex > oriLen) { 
   
        throw new RuntimeException(String
            .format("dx_substr startIndex(%s) out of range(%s)", startIndex, oriLen));
      }
      if (startIndex   targetLen >= oriLen) { 
   
        newValue = oriValue.substring(startIndex);
      } else { 
   
        newValue = oriValue.substring(startIndex, startIndex   targetLen);
      }
      record.setColumn(columnIndex, new StringColumn(newValue));
    } catch (Exception e) { 
   
      throw DataXException.asDataXException(TRANSFORMER_RUN_EXCEPTION, e.getMessage(), e);
    }
    return record;
  }

三、PadTransformer

  • 参数:4个
    • 第一个参数:字段编号,对应record中第几个字段。
    • 第二个参数:“l”,“r”, 指示是在头进行pad,还是尾进行pad。
    • 第三个参数:目标字段长度。
    • 第四个参数:需要pad的字符。
  • 返回: 如果源字符串长度小于目标字段长度,按照位置添加pad字符后返回。如果长于,直接截断(都截右边)。如果字段为空值,转换为空字符串进行pad,即最后的字符串全是需要pad的字符
  • 举例:
代码语言:javascript复制
         dx_pad(1,"l","4","A"), 如果column 1 的值为 xyz=> Axyz, 值为 xyzzzzz => xyzz
         dx_pad(1,"r","4","A"), 如果column 1 的值为 xyz=> xyzA, 值为 xyzzzzz => xyzz
代码语言:javascript复制
public PadTransformer() { 
   
    setTransformerName("dx_pad");
  }

  /** * 参数:4个 <br/> * 第一个参数:字段编号,对应record中第几个字段。 <br/> * 第二个参数:"l","r", 指示是在头进行pad,还是尾进行pad。 <br/> * 第三个参数:目标字段长度。 <br/> * 第四个参数:需要pad的字符。 <br/> * 举例: <br/> * dx_pad(1,"l","4","A"), 如果column 1 的值为 xyz=> Axyz, 值为 xyzzzzz => xyzz <br/> * dx_pad(1,"r","4","A"), 如果column 1 的值为 xyz=> xyzA, 值为 xyzzzzz => xyzz <br/> * * @param record Record 行记录,UDF进行record的处理后,更新相应的record * @param paras Object transformer函数参数 * @return Record 如果源字符串长度小于目标字段长度,按照位置添加pad字符后返回。如果长于,直接截断(都截右边)。 * 如果字段为空值,转换为空字符串进行pad,即最后的字符串全是需要pad的字符 */
  @Override
  public Record evaluate(Record record, Object... paras) { 
   

    int columnIndex;
    String padType;
    int length;
    String padString;
    try { 
   
      if (paras.length != 4) { 
   
        throw new RuntimeException("dx_pad paras must be 4");
      }
      columnIndex = (Integer) paras[0];
      padType = (String) paras[1];
      length = Integer.valueOf((String) paras[2]);
      padString = (String) paras[3];
    } catch (Exception e) { 
   
      throw DataXException.asDataXException(TransformerErrorCode.TRANSFORMER_ILLEGAL_PARAMETER,
          "paras:"   Arrays.asList(paras).toString()   " => "   e.getMessage());
    }

    Column column = record.getColumn(columnIndex);
    try { 
   
      String oriValue = column.asString();
      //如果字段为空,作为空字符串处理
      if (oriValue == null) { 
   
        oriValue = "";
      }
      String newValue;
      //"l","r", 指示是在头进行pad,还是尾进行pad
      if (!padType.equalsIgnoreCase("r") && !padType.equalsIgnoreCase("l")) { 
   
        throw new RuntimeException(String.format("dx_pad first para(%s) support l or r", padType));
      }

      if (length <= oriValue.length()) { 
   
        // 如果目标长度len 小于 真实原数数据长度oriValue.length,则使用截取,截取原来数据0到len
        newValue = oriValue.substring(0, length);
      } else { 
   
        newValue = doPad(padType, oriValue, length, padString);
      }
      record.setColumn(columnIndex, new StringColumn(newValue));
    } catch (Exception e) { 
   
      throw DataXException.asDataXException(TRANSFORMER_RUN_EXCEPTION, e.getMessage(), e);
    }
    return record;
  }

  /** * 真正实现填充的逻辑 * * @param padType String 需要填充的类型 * @param oriValue String 原始数据 * @param length int 需要填充后的长度 * @param padString String 需要填充的字符串 * @return String */
  private String doPad(String padType, String oriValue, int length, String padString) { 
   

    String finalPad = "";
    int needLen = length - oriValue.length();
    while (needLen > 0) { 
   

      if (needLen >= padString.length()) { 
   
        finalPad  = padString;
        needLen -= padString.length();
      } else { 
   
        finalPad  = padString.substring(0, needLen);
        needLen = 0;
      }
    }
    //"l","r", 指示是在头进行pad,还是尾进行pad
    if (padType.equalsIgnoreCase("l")) { 
   
      return finalPad   oriValue;
    } else { 
   
      return oriValue   finalPad;
    }
  }

四、ReplaceTransformer

  • 参数:4个
    • 第一个参数:字段编号,对应record中第几个字段。
    • 第二个参数:字段值的开始位置。
    • 第三个参数:需要替换的字段长度。
    • 第四个参数:需要替换的字符串。
  • 返回: 从字符串的指定位置(包含)替换指定长度的字符串。如果开始位置非法抛出异常。如果字段为空值,直接返回(即不参与本transformer)
  • 举例:
代码语言:javascript复制
dx_replace(1,"2","4","****")  column 1的value为“dataxTest”=>"da****est"
dx_replace(1,"5","10","****")  column 1的value为“dataxTest”=>"data****"
代码语言:javascript复制
/** * 参数:4个 * 第一个参数:字段编号,对应record中第几个字段。 * 第二个参数:字段值的开始位置。 * 第三个参数:需要替换的字段长度。 * 第四个参数:需要替换的字符串。 * 举例: * dx_replace(1,"2","4","****") column 1的value为“dataxTest”=>"da****est" * dx_replace(1,"5","10","****") column 1的value为“dataxTest”=>"data****" * * @param record Record 行记录,UDF进行record的处理后,更新相应的record * @param paras Object transformer函数参数 * @return 从字符串的指定位置(包含)替换指定长度的字符串。如果开始位置非法抛出异常。如果字段为空值,直接返回(即不参与本transformer) */
  @Override
  public Record evaluate(Record record, Object... paras) { 
   

    int columnIndex;
    int startIndex;
    int targetLen;
    String replaceStr;
    try { 
   
      if (paras.length != 4) { 
   
        throw new RuntimeException("dx_replace paras must be 4");
      }

      columnIndex = (Integer) paras[0];
      startIndex = Integer.valueOf((String) paras[1]);
      targetLen = Integer.valueOf((String) paras[2]);
      replaceStr = (String) paras[3];
    } catch (Exception e) { 
   
      throw DataXException.asDataXException(TransformerErrorCode.TRANSFORMER_ILLEGAL_PARAMETER,
          "paras:"   Arrays.asList(paras).toString()   " => "   e.getMessage());
    }
    Column column = record.getColumn(columnIndex);
    try { 
   
      String oriValue = column.asString();
      //如果字段为空,跳过replace处理
      if (oriValue == null) { 
   
        return record;
      }
      String newValue;
      int oriLen = oriValue.length();
      if (startIndex > oriLen) { 
   
        throw new RuntimeException(String
            .format("dx_replace startIndex(%s) out of range(%s)", startIndex, oriLen));
      }
      newValue = oriValue.substring(0, startIndex)   replaceStr;
      if (startIndex   targetLen < oriLen) { 
   
        newValue = newValue   oriValue.substring(startIndex   targetLen);
      }
      record.setColumn(columnIndex, new StringColumn(newValue));
    } catch (Exception e) { 
   
      throw DataXException
          .asDataXException(TransformerErrorCode.TRANSFORMER_RUN_EXCEPTION, e.getMessage(), e);
    }
    return record;
  }

五、FilterTransformer

  • 参数:
    • 第一个参数:字段编号,对应record中第几个字段。
    • 第二个参数:运算符,支持一下运算符:like, not like, >, =, <, >=, !=, <=
    • 第三个参数:正则表达式(java正则表达式)、值。
  • 返回:
    • 如果匹配正则表达式,返回Null,表示过滤该行。不匹配表达式时,表示保留该行。(注意是该行)。对于>=<都是对字段直接compare的结果.
    • like , not like是将字段转换成String,然后和目标正则表达式进行全匹配。
    • , =, <, >=, !=, <= 对于DoubleColumn比较double值,对于LongColumn和DateColumn比较long值,其他StringColumn,BooleanColumn以及ByteColumn均比较的是StringColumn值。
    • 如果目标column为空(null),对于 = null的过滤条件,将满足条件,被过滤。!=null的过滤条件,null不满足过滤条件,不被过滤。 like,字段为null不满足条件,不被过滤,和not like,字段为null满足条件,被过滤。
  • 举例:
代码语言:javascript复制
dx_filter(1,"like","dataTest")  
dx_filter(1,">=","10")  
代码语言:javascript复制
/** * 过滤transformer类 * Created by liqiang on 16/3/4. */
public class FilterTransformer extends Transformer { 
   

  public FilterTransformer() { 
   
    setTransformerName("dx_filter");
  }

  /** * 参数: <br> * 第一个参数:字段编号,对应record中第几个字段。 <br> * 第二个参数:运算符,支持一下运算符:like, not like, >, =, <, >=, !=, <= <br> * 第三个参数:正则表达式(java正则表达式)、值。 <br> * 返回: * 如果匹配正则表达式,返回Null,表示过滤该行。不匹配表达式时,表示保留该行。(注意是该行)。对于>=<都是对字段直接compare的结果. <br> * like , not like是将字段转换成String,然后和目标正则表达式进行全匹配。 <br> * , =, <, >=, !=, <= 对于DoubleColumn比较double值,对于LongColumn和DateColumn比较long值, <br> * 其他StringColumn,BooleanColumn以及ByteColumn均比较的是StringColumn值。 <br> * <p> * 如果目标column为空(null),对于 = null的过滤条件,将满足条件,被过滤。!=null的过滤条件,null不满足过滤条件,不被过滤。 <br> * like,字段为null不满足条件,不被过滤,和not like,字段为null满足条件,被过滤。 <br> * 举例: <br> * dx_filter(1,"like","dataTest") <br> * dx_filter(1,">=","10") <br> * * @param record Record 行记录,UDF进行record的处理后,更新相应的record * @param paras Object transformer函数参数 * @return Record */
  @Override
  public Record evaluate(Record record, Object... paras) { 
   
    int columnIndex;
    String code;
    String value;
    try { 
   
      if (paras.length != 3) { 
   
        throw new RuntimeException("dx_filter paras must be 3");
      }
      columnIndex = (Integer) paras[0];
      code = (String) paras[1];
      value = (String) paras[2];
      if (StringUtils.isEmpty(value)) { 
   
        throw new RuntimeException("dx_filter para 2 can't be null");
      }
    } catch (Exception e) { 
   
      throw DataXException.asDataXException(TransformerErrorCode.TRANSFORMER_ILLEGAL_PARAMETER,
          "paras:"   Arrays.asList(paras).toString()   " => "   e.getMessage());
    }
    Column column = record.getColumn(columnIndex);
    try { 
   
      if ("like".equalsIgnoreCase(code)) { 
   
        return doLike(record, value, column);
      } else if ("not like".equalsIgnoreCase(code)) { 
   
        return doNotLike(record, value, column);
      } else if (">".equalsIgnoreCase(code)) { 
   
        return doGreat(record, value, column, false);
      } else if ("<".equalsIgnoreCase(code)) { 
   
        return doLess(record, value, column, false);
      } else if ("=".equalsIgnoreCase(code) || "==".equalsIgnoreCase(code)) { 
   
        return doEqual(record, value, column);
      } else if ("!=".equalsIgnoreCase(code)) { 
   
        return doNotEqual(record, value, column);
      } else if (">=".equalsIgnoreCase(code)) { 
   
        return doGreat(record, value, column, true);
      } else if ("<=".equalsIgnoreCase(code)) { 
   
        return doLess(record, value, column, true);
      } else { 
   
        throw new RuntimeException("dx_filter can't support code:"   code);
      }
    } catch (Exception e) { 
   
      throw DataXException.asDataXException(TRANSFORMER_RUN_EXCEPTION, e.getMessage(), e);
    }
  }


  private Record doGreat(Record record, String value, Column column, boolean hasEqual) { 
   

    //如果字段为空,直接不参与比较。即空也属于无穷小
    if (column.getRawData() == null) { 
   
      return record;
    }
    if (column instanceof DoubleColumn) { 
   
      Double ori = column.asDouble();
      double val = Double.parseDouble(value);
      if (hasEqual) { 
   
        return ori >= val ? null : record;
      } else { 
   
        return ori > val ? null : record;
      }
    } else if (column instanceof LongColumn || column instanceof DateColumn) { 
   
      Long ori = column.asLong();
      long val = Long.parseLong(value);
      if (hasEqual) { 
   
        return ori >= val ? null : record;
      } else { 
   
        return ori > val ? null : record;
      }
    } else if (column instanceof StringColumn || column instanceof BytesColumn
        || column instanceof BoolColumn) { 
   
      String ori = column.asString();
      if (hasEqual) { 
   
        return ori.compareTo(value) >= 0 ? null : record;
      } else { 
   
        return ori.compareTo(value) > 0 ? null : record;
      }
    } else { 
   
      throw new RuntimeException(
          ">=,> can't support this columnType:"   column.getClass().getSimpleName());
    }
  }

  private Record doLess(Record record, String value, Column col, boolean hasEqual) { 
   

    //如果字段为空,直接不参与比较。即空也属于无穷大
    if (col.getRawData() == null) { 
   
      return record;
    }
    if (col instanceof DoubleColumn) { 
   
      Double ori = col.asDouble();
      double val = Double.parseDouble(value);
      if (hasEqual) { 
   
        return ori <= val ? null : record;
      } else { 
   
        return ori < val ? null : record;
      }
    } else if (col instanceof LongColumn || col instanceof DateColumn) { 
   
      Long ori = col.asLong();
      long val = Long.parseLong(value);
      if (hasEqual) { 
   
        return ori <= val ? null : record;
      } else { 
   
        return ori < val ? null : record;
      }
    } else if (col instanceof StringColumn || col instanceof BytesColumn
        || col instanceof BoolColumn) { 
   
      String ori = col.asString();
      if (hasEqual) { 
   
        return ori.compareTo(value) <= 0 ? null : record;
      } else { 
   
        return ori.compareTo(value) < 0 ? null : record;
      }
    } else { 
   
      throw new RuntimeException(
          "<=,< can't support this columnType:"   col.getClass().getSimpleName());
    }
  }

  /** * DateColumn将比较long值,StringColumn,ByteColumn以及BooleanColumn比较其String值 * * @param record Record * @param value String * @param col Column * @return Record 如果相等,则过滤。 */
  private Record doEqual(Record record, String value, Column col) { 
   
    //如果字段为空,只比较目标字段为"null",否则null字段均不过滤
    if (col.getRawData() == null) { 
   
      return "null".equalsIgnoreCase(value) ? null : record;
    }
    if (col instanceof DoubleColumn) { 
   
      Double ori = col.asDouble();
      double val = Double.parseDouble(value);
      return ori == val ? null : record;
    } else if (col instanceof LongColumn || col instanceof DateColumn) { 
   
      Long ori = col.asLong();
      long val = Long.parseLong(value);
      return ori == val ? null : record;
    } else if (col instanceof StringColumn || col instanceof BytesColumn
        || col instanceof BoolColumn) { 
   
      String ori = col.asString();
      return ori.compareTo(value) == 0 ? null : record;
    } else { 
   
      throw new RuntimeException("can't support this columnType:"   col.getClass().getSimpleName());
    }
  }

  /** * DateColumn将比较long值,StringColumn,ByteColumn以及BooleanColumn比较其String值 * * @param record Record * @param value String * @param col Column * @return Record 如果不相等,则过滤。 */
  private Record doNotEqual(Record record, String value, Column col) { 
   
    //如果字段为空,只比较目标字段为"null", 否则null字段均过滤。
    if (col.getRawData() == null) { 
   
      return "null".equalsIgnoreCase(value) ? record : null;
    }
    if (col instanceof DoubleColumn) { 
   
      Double ori = col.asDouble();
      double val = Double.parseDouble(value);
      return ori != val ? null : record;
    } else if (col instanceof LongColumn || col instanceof DateColumn) { 
   
      Long ori = col.asLong();
      long val = Long.parseLong(value);
      return ori != val ? null : record;
    } else if (col instanceof StringColumn || col instanceof BytesColumn
        || col instanceof BoolColumn) { 
   
      String ori = col.asString();
      return ori.compareTo(value) != 0 ? null : record;
    } else { 
   
      throw new RuntimeException("can't support this columnType:"   col.getClass().getSimpleName());
    }
  }

  private Record doLike(Record record, String value, Column column) { 
   
    String oriValue = column.asString();
    return oriValue != null && oriValue.matches(value) ? null : record;
  }

  private Record doNotLike(Record record, String value, Column column) { 
   
    String oriValue = column.asString();
    return oriValue != null && oriValue.matches(value) ? record : null;
  }
}

六、GroovyTransformer

首先需要知道groovy是什么:运行在jvm上,吸收Python、Ruby和Smalltalk等特性的一种脚本语言!!!可以和java代码库相互操作; 一句话概括就是:用户可以写一些groovy代码,使用GroovyTransformer加载运行实现transform的作用!!!

  • 参数。
    • 第一个参数: groovy code
    • 第二个参数(列表或者为空):extraPackage
  • 备注:
    • dx_groovy只能调用一次。不能多次调用。
    • groovy code中支持java.lang, java.util的包,可直接引用的对象有record,以及element下的各种column(BoolColumn.class,BytesColumn.class,DateColumn.class,DoubleColumn.class,LongColumn.class,StringColumn.class)。不支持其他包,如果用户有需要用到其他包,可设置extraPackage,注意extraPackage不支持第三方jar包。
    • groovy code中,返回更新过的Record(比如record.setColumn(columnIndex, new StringColumn(newValue));),或者null。返回null表示过滤此行。
    • 用户可以直接调用静态的Util方式(GroovyTransformerStaticUtil),目前GroovyTransformerStaticUtil的方法列表 (按需补充):
  • 举例:
代码语言:javascript复制
groovy 实现的subStr:
        String code = "Column column = record.getColumn(1);n"  
                " String oriValue = column.asString();n"  
                " String newValue = oriValue.substring(0, 3);n"  
                " record.setColumn(1, new StringColumn(newValue));n"  
                " return record;";
        dx_groovy(record);
代码语言:javascript复制
groovy 实现的Replace
String code2 = "Column column = record.getColumn(1);n"  
                " String oriValue = column.asString();n"  
                " String newValue = "****"   oriValue.substring(3, oriValue.length());n"  
                " record.setColumn(1, new StringColumn(newValue));n"  
                " return record;";
代码语言:javascript复制
groovy 实现的Pad
String code3 = "Column column = record.getColumn(1);n"  
                " String oriValue = column.asString();n"  
                " String padString = "12345";n"  
                " String finalPad = "";n"  
                " int NeedLength = 8 - oriValue.length();n"  
                "        while (NeedLength > 0) {n"  
                "n"  
                "            if (NeedLength >= padString.length()) {n"  
                "                finalPad  = padString;n"  
                "                NeedLength -= padString.length();n"  
                "            } else {n"  
                "                finalPad  = padString.substring(0, NeedLength);n"  
                "                NeedLength = 0;n"  
                "            }n"  
                "        }n"  
                " String newValue= finalPad   oriValue;n"  
                " record.setColumn(1, new StringColumn(newValue));n"  
                " return record;";
代码语言:javascript复制
/** * Groovy类的transformer * Created by liqiang on 16/3/4. */
public class GroovyTransformer extends Transformer { 
   

  public GroovyTransformer() { 
   
    setTransformerName("dx_groovy");
  }

  private Transformer groovyTransformer;

  /** * 参数 <br> * 第一个参数: groovy code <br> * 第二个参数(列表或者为空):extraPackage <br> * 备注: <br> * dx_groovy只能调用一次。不能多次调用。 <br> * groovy code中支持java.lang, java.util的包,可直接引用的对象有record,以及element下的各种 <br> * column(BoolColumn.class,BytesColumn.class,DateColumn.class,DoubleColumn.class,LongColumn.class, * <br> * StringColumn.class)。不支持其他包,如果用户有需要用到其他包,可设置extraPackage,注意extraPackage不支持第三方jar包。 <br> * groovy code中,返回更新过的Record(比如record.setColumn(columnIndex, new StringColumn(newValue));), <br> * 或者null。返回null表示过滤此行。 <br> * 用户可以直接调用静态的Util方式(GroovyTransformerStaticUtil),目前GroovyTransformerStaticUtil的方法列表 (按需补充): <br> * * @param record Record 行记录,UDF进行record的处理后,更新相应的record * @param paras Object transformer函数参数 * @return Record */
  @Override
  public Record evaluate(Record record, Object... paras) { 
   
    if (groovyTransformer == null) { 
   
      //全局唯一
      if (paras.length < 1 || paras.length > 2) { 
   
        throw DataXException.asDataXException(TransformerErrorCode.TRANSFORMER_ILLEGAL_PARAMETER,
            "dx_groovy paras must be 1 or 2. now paras is: "   Arrays.asList(paras).toString());
      }
      synchronized (this) { 
   
        if (groovyTransformer == null) { 
   
          String code = (String) paras[0];
          @SuppressWarnings("unchecked")
          List<String> extraPackage = paras.length == 2 ? (List<String>) paras[1] : null;
          initGroovyTransformer(code, extraPackage);
        }
      }
    }
    return this.groovyTransformer.evaluate(record);
  }

  /** * 初始化 GroovyTransformer。<br> * 1 根据code和包列表,构造出完整的groovy代码段。<br> * 2 反射加载该groovy。<br> * 3 将2反射构造出的groovy对象强制类型转为,最后赋给groovyTransformer(Transformer类型)。<br> * * @param code String Groovy代码片段 * @param extraPackage List<String> 额外的import的包 */
  private void initGroovyTransformer(String code, List<String> extraPackage) { 
   
    GroovyClassLoader loader = new GroovyClassLoader(GroovyTransformer.class.getClassLoader());
    String groovyRule = getGroovyRule(code, extraPackage);
    Class groovyClass;
    try { 
   
      groovyClass = loader.parseClass(groovyRule);
    } catch (CompilationFailedException cfe) { 
   
      throw DataXException.asDataXException(TRANSFORMER_GROOVY_INIT_EXCEPTION, cfe);
    }
    try { 
   
      Object t = groovyClass.newInstance();
      if (!(t instanceof Transformer)) { 
   
        throw DataXException
            .asDataXException(TRANSFORMER_GROOVY_INIT_EXCEPTION, "datax bug! contact askdatax");
      }
      this.groovyTransformer = (Transformer) t;
    } catch (Throwable ex) { 
   
      throw DataXException.asDataXException(TRANSFORMER_GROOVY_INIT_EXCEPTION, ex);
    }
  }


  /** * 根据code 和 引用的包,构建出groovy代码片段 * * @param code * @param extraPackagesStrList * @return */
  private String getGroovyRule(String code, List<String> extraPackagesStrList) { 
   
    StringBuffer sb = new StringBuffer();
    if (extraPackagesStrList != null) { 
   
      for (String extraPackagesStr : extraPackagesStrList) { 
   
        if (StringUtils.isNotEmpty(extraPackagesStr)) { 
   
          sb.append(extraPackagesStr);
        }
      }
    }
    sb.append(
        "import static com.alibaba.datax.core.transport.transformer.GroovyTransformerStaticUtil.*;");
    sb.append("import com.alibaba.datax.common.element.*;");
    sb.append("import com.alibaba.datax.common.exception.DataXException;");
    sb.append("import com.alibaba.datax.transformer.Transformer;");
    sb.append("import java.util.*;");
    sb.append("public class RULE extends Transformer").append("{");
    sb.append("public Record evaluate(Record record, Object... paras) {");
    sb.append(code);
    sb.append("}}");
    return sb.toString();
  }
}

注:

  1. 对源码进行略微改动,主要修改为 1 阿里代码规约扫描出来的,2 clean code;
  2. 所有代码都已经上传到github(master分支和dev),可以免费白嫖

发布者:全栈程序员栈长,转载请注明出处:https://javaforall.cn/145297.html原文链接:https://javaforall.cn

0 人点赞