聊聊canal的CanalEventFilter

2020-04-13 10:33:53 浏览数 (1)

本文主要研究一下canal的CanalEventFilter

CanalEventFilter

canal-1.1.4/filter/src/main/java/com/alibaba/otter/canal/filter/CanalEventFilter.java

代码语言:javascript复制
public interface CanalEventFilter<T> {
​
    boolean filter(T event) throws CanalFilterException;
}
  • CanalEventFilter接口定义了filter方法

AviaterELFilter

canal-1.1.4/filter/src/main/java/com/alibaba/otter/canal/filter/aviater/AviaterELFilter.java

代码语言:javascript复制
public class AviaterELFilter implements CanalEventFilter<CanalEntry.Entry> {
​
    public static final String ROOT_KEY = "entry";
    private String             expression;
​
    public AviaterELFilter(String expression){
        this.expression = expression;
    }
​
    public boolean filter(CanalEntry.Entry entry) throws CanalFilterException {
        if (StringUtils.isEmpty(expression)) {
            return true;
        }
​
        Map<String, Object> env = new HashMap<String, Object>();
        env.put(ROOT_KEY, entry);
        return (Boolean) AviatorEvaluator.execute(expression, env);
    }
​
}
  • AviaterELFilter实现了CanalEventFilter接口,其构造器接收expression参数,其filter方法通过AviatorEvaluator.execute(expression, env)计算结果

AviaterSimpleFilter

canal-1.1.4/filter/src/main/java/com/alibaba/otter/canal/filter/aviater/AviaterSimpleFilter.java

代码语言:javascript复制
public class AviaterSimpleFilter implements CanalEventFilter<String> {
​
    private static final String SPLIT             = ",";
​
    private static final String FILTER_EXPRESSION = "include(list,target)";
​
    private final Expression    exp               = AviatorEvaluator.compile(FILTER_EXPRESSION, true);
​
    private final List<String>  list;
​
    public AviaterSimpleFilter(String filterExpression){
        if (StringUtils.isEmpty(filterExpression)) {
            list = new ArrayList<String>();
        } else {
            String[] ss = filterExpression.toLowerCase().split(SPLIT);
            list = Arrays.asList(ss);
        }
    }
​
    public boolean filter(String filtered) throws CanalFilterException {
        if (list.isEmpty()) {
            return true;
        }
        if (StringUtils.isEmpty(filtered)) {
            return true;
        }
        Map<String, Object> env = new HashMap<String, Object>();
        env.put("list", list);
        env.put("target", filtered.toLowerCase());
        return (Boolean) exp.execute(env);
    }
​
}
  • AviaterSimpleFilter实现了CanalEventFilter接口;它定义了Expression属性,其值为AviatorEvaluator.compile(FILTER_EXPRESSION, true);其构造器接收filterExpression;其filter方法通过exp.execute(env)返回结果

AviaterRegexFilter

canal-1.1.4/filter/src/main/java/com/alibaba/otter/canal/filter/aviater/AviaterRegexFilter.java

代码语言:javascript复制
public class AviaterRegexFilter implements CanalEventFilter<String> {
​
    private static final String             SPLIT             = ",";
    private static final String             PATTERN_SPLIT     = "|";
    private static final String             FILTER_EXPRESSION = "regex(pattern,target)";
    private static final RegexFunction      regexFunction     = new RegexFunction();
    private final Expression                exp               = AviatorEvaluator.compile(FILTER_EXPRESSION, true);
    static {
        AviatorEvaluator.addFunction(regexFunction);
    }
​
    private static final Comparator<String> COMPARATOR        = new StringComparator();
​
    final private String                    pattern;
    final private boolean                   defaultEmptyValue;
​
    public AviaterRegexFilter(String pattern){
        this(pattern, true);
    }
​
    public AviaterRegexFilter(String pattern, boolean defaultEmptyValue){
        this.defaultEmptyValue = defaultEmptyValue;
        List<String> list = null;
        if (StringUtils.isEmpty(pattern)) {
            list = new ArrayList<String>();
        } else {
            String[] ss = StringUtils.split(pattern, SPLIT);
            list = Arrays.asList(ss);
        }
​
        // 对pattern按照从长到短的排序
        // 因为 foo|foot 匹配 foot 会出错,原因是 foot 匹配了 foo 之后,会返回 foo,但是 foo 的长度和 foot
        // 的长度不一样
        Collections.sort(list, COMPARATOR);
        // 对pattern进行头尾完全匹配
        list = completionPattern(list);
        this.pattern = StringUtils.join(list, PATTERN_SPLIT);
    }
​
    public boolean filter(String filtered) throws CanalFilterException {
        if (StringUtils.isEmpty(pattern)) {
            return defaultEmptyValue;
        }
​
        if (StringUtils.isEmpty(filtered)) {
            return defaultEmptyValue;
        }
​
        Map<String, Object> env = new HashMap<String, Object>();
        env.put("pattern", pattern);
        env.put("target", filtered.toLowerCase());
        return (Boolean) exp.execute(env);
    }
​
    //......
​
    @Override
    public String toString() {
        return pattern;
    }
​
}
  • AviaterRegexFilter实现了CanalEventFilter接口;它定义了Expression属性,其值为AviatorEvaluator.compile(FILTER_EXPRESSION, true);其构造器接收pattern参数,它会先对pattern进行sort以及修正;其filter方法通过exp.execute(env)返回结果

小结

CanalEventFilter接口定义了filter方法;它三个实现类分别是AviaterELFilter、AviaterSimpleFilter、AviaterRegexFilter

doc

  • CanalEventFilter

0 人点赞