Flink-Cep实现规则动态更新

2022-04-18 13:36:41 浏览数 (1)

规则引擎通常对我们的理解就是用来做模式匹配的,在数据流里面检测满足规则要求的数据。有人会问为什么需要规则动态变更呢?直接修改了规则把服务重启一下不就可以了吗,这个当然是不行的,规则引擎里面通常会维护很多不同的规则,例如在监控告警的场景下,如果每个人修改一下自己的监控阈值,就重启一下服务,必然会影响其他人的使用,因此需要线上满足规则动态变更加载。本篇基于Flink-Cep 来实现规则动态变更加载,同时参考了Flink中文社区刘博老师的分享,在这个分享里面是针对在处理流中每一个Key使用不同的规则,本篇的讲解将不区分key的规则。

实现分析

•外部加载:通常规则引擎会有专门的规则管理模块,提供用户去创建自己的规则,对于Flink任务来说需要到外部去加载规则•动态更新:需要提供定时去检测规则是否变更•历史状态清理:在模式匹配中是一系列NFAState 的不断变更,如果规则发生变更那么这些State也就是无用的了,需要清理掉•易容的API: 不同的业务开发人员可能会有自己的规则管理、定时策略等,那么需要对外提供易用的API

实现步骤

用户API定义: InjectionPatternFunction 用于获取、定义用户的规则

代码语言:javascript复制
package org.apache.flink.cep.functions;

import org.apache.flink.api.common.functions.Function;
import org.apache.flink.cep.pattern.Pattern;

import java.io.Serializable;

/**
 * @param <T>
 */
public interface InjectionPatternFunction<T> extends Function, Serializable {
    /**
     * 你可能有一些初始化的工作
     */
    public void init() throws Exception;

    /**
     * 获取新的pattern
     * @return
     */
    public Pattern<T,T> inject() throws Exception;

    /**
     * 一个扫描周期:ms
     * @return
     */
    public long getPeriod() throws Exception;

    /**
     * 规则是否发生变更
     * @return
     */
    public boolean isChanged() throws Exception;
}

那么如何将这个API暴露出去呢?正常情况的使用是:

代码语言:javascript复制
CEP.pattern(dataStream,pattern)

希望以同样的方式暴露:

代码语言:javascript复制
CEP.injectionPattern(dataStream,new YourInjectionPatternFunction)

就需要在CEP-Lib里面进行改造:

代码语言:javascript复制
package org.apache.flink.cep
//CEP 里面增加方法  
public static <T> PatternStream<T> injectionPattern(
        DataStream<T> input,
        InjectionPatternFunction<T> injectionPatternFunction){
        return new PatternStream<>(input,injectionPatternFunction); //在PatternStream 里面增加对应的构造函数
    }

同样需要在PatternStreamBuilder.build 进行改造:

代码语言:javascript复制
CepOperator<IN, K, OUT> operator=null;
        if(injectionPatternFunction==null){
            final NFACompiler.NFAFactory<IN> nfaFactory = NFACompiler.compileFactory(pattern, timeoutHandling);
            operator = new CepOperator<>(
                inputSerializer,
                isProcessingTime,
                nfaFactory,
                comparator,
                pattern.getAfterMatchSkipStrategy(),
                processFunction,
                lateDataOutputTag);
        }else{
            operator = new CepOperator<>(
                inputSerializer,
                isProcessingTime,
                injectionPatternFunction, // 将InjectionPatternFunction 传给了CepOperator
                comparator,
                null,
                processFunction,
                lateDataOutputTag,null);
        }

加载Pattern

上述步骤已经将InjectionPatternFunction 加载到CepOperator 中,接下来就需要从InjectionPatternFunction中获取Pattern并且构造NFA

代码语言:javascript复制
if(injectionPatternFunction!=null){
            injectionPatternFunction.init();
      Pattern pattern=injectionPatternFunction.inject();
            afterMatchSkipStrategy=pattern.getAfterMatchSkipStrategy();
            boolean timeoutHandling = getUserFunction() instanceof TimedOutPartialMatchHandler;
            nfaFactory = NFACompiler.compileFactory(pattern, timeoutHandling);
            long period=injectionPatternFunction.getPeriod();
  // 注册了一个定时检测规则是否变更的定时器
            if(period>0){
getProcessingTimeService().registerTimer(timerService.currentProcessingTime() period,this::onProcessingTime);
            }
        }
        nfa = nfaFactory.createNFA();
        nfa.open(cepRuntimeContext, new Configuration());

上面注册的定时器需要实现ProcessingTimeCallback 接口的onProcessingTime 方法

代码语言:javascript复制
@Override public void onProcessingTime(long timestamp) throws Exception {

     //先检查是否变更
        if(injectionPatternFunction.isChanged()){
            //重新注入
            Pattern pattern=injectionPatternFunction.inject();
            afterMatchSkipStrategy=pattern.getAfterMatchSkipStrategy();
            boolean timeoutHandling = getUserFunction() instanceof TimedOutPartialMatchHandler;
            nfaFactory = NFACompiler.compileFactory(pattern, timeoutHandling);

            nfa = nfaFactory.createNFA();
            nfa.open(cepRuntimeContext, new Configuration());

            refreshVersion.incrementAndGet();
        }
        //重新注册
        if(injectionPatternFunction.getPeriod()>0){
getProcessingTimeService().registerTimer(timerService.currentProcessingTime() injectionPatternFunction.getPeriod(),this::onProcessingTime);
        }
    }

至此已经完成了动态加载与定时检测,接下来需要实现状态的清理动作。

状态清理

状态清理一共分为两块: 匹配状态数据清理、定时器清理;

状态清理有两种方式:一种是对KeyedState 执行clear操作,就是每处理一个key时执行清理操作;另外一种方式是getKeyedStateBackend().applyToAllKeys 一次性清理所有的状态,这种方式可能会导致任务消费阻塞,因此使用第一种方式;

另外需要思考的一个问题是如何判断状态是否需要清理?这里可以使用版本比对的方式进行处理,每一次规则变更对应的version提升,然后在使用该version与数据的version进行比对处理。

定义几个状态变量:

代码语言:javascript复制
/**
     * 动态的pattern注入
     */
    private InjectionPatternFunction injectionPatternFunction;

    /**
     *  表示的是一个version
     */
    ListState<Integer> refreshFlagState; //nfa 的version 需要持久化
    private AtomicInteger refreshVersion;  //    刷新nfa的version
    private ValueState<Integer> needRefresh; //  每一个key 对应一个version

    private ListState<Long> registerTimeState;// 注册定时器存储的时间

在processElement里面执行状态清理动作:

代码语言:javascript复制
if(injectionPatternFunction!=null){
  int currVersion=needRefresh.value(); //当前key的版本
  if(currVersion<refreshVersion.get()){ //版本不一致
    //那么就开始执行清理动作 状态 与 定时器, 应该没有其他的了吧
    computationStates.clear();
    elementQueueState.clear();
    partialMatches.releaseData();
   //删除定时器相关的操作
    Iterable<Long> registerTime=registerTimeState.get();
   if(registerTime!=null){
       Iterator<Long> registerTimeIter=registerTime.iterator();
        while(registerTimeIter.hasNext()){
                        Long l=registerTimeIter.next();
                        timerService.deleteEventTimeTimer(VoidNamespace.INSTANCE,l); //删除定时器
                        timerService.deleteProcessingTimeTimer(VoidNamespace.INSTANCE,l);
                        registerTimeIter.remove(); //把状态清理一下
                    }
                }

      needRefresh.update(refreshVersion.get()); //更新到当前的版本
            }
        }

在上面用到的registerTimeState 状态数据从哪里来的呢?比喻说我们需要做数据排序比较或者是事件时间的语义,通常需要先缓存数据,然后会做一个排序操作,最后做匹配,那么这个触发的就是由定时器来完成的。接下来看一下registerTimeState中数据来源入口:

代码语言:javascript复制
//processElement中排序与事件时间处理逻辑中增加saveRegisterTime方法
//time 表示触发的时间
private void saveRegisterTime(long time) throws Exception {
        if(injectionPatternFunction!=null){
            registerTimeState.add(time);
        }
    }

同样在定时器触发,也需要将registerTimeState 中对应的时间移除掉。另外如果状态变更了但是还未来得及清理定时器,那么就有可能造成定时器触发,可以在onEventTime 或者onProcessingTime方法里面做一个前置的版本比对判断,如果version不一致就不做任何处理或者提前清理定时器的数据。

在上面自定义了一些状态,接下来看一下状态的初始化与保存操作:

代码语言:javascript复制
//initializeState 方法
if(injectionPatternFunction!=null) {
            /**
             * 两个标识位状态
             */
            refreshFlagState = context.getOperatorStateStore()
                .getUnionListState(new ListStateDescriptor<Integer>("refreshFlagState", Integer.class));
            if (context.isRestored()) {
                if (refreshFlagState.get().iterator().hasNext()) {
                    refreshVersion = new AtomicInteger(refreshFlagState.get().iterator().next());
                }
            } else {
                refreshVersion = new AtomicInteger(0);
            }
            needRefresh = context.getKeyedStateStore()
                .getState(new ValueStateDescriptor<Integer>("needRefreshState", Integer.class, 0));
            registerTimeState = context.getKeyedStateStore()
                .getListState(new ListStateDescriptor<Long>("registerTimeState", Long.class));
        }

可以看出refreshFlagState 使用的是一个Union类型的Operator-State,这个思考题留给大家这个为什么要这样使用。对应这种类型state通常会在定义一下:

代码语言:javascript复制
@Override public void snapshotState(StateSnapshotContext context) throws Exception {
        super.snapshotState(context);
        if(injectionPatternFunction!=null){
            refreshFlagState.clear();
            refreshFlagState.add(refreshVersion.get());
        }
    }

至此整个流程完成。

总结

本篇介绍cep如何实现动态规则加载,给出了大部分的关键实现代码,需要与前一篇给出的demo结合使用,对于不同Key的变更,需要定义与Key相关联的NFA,其他的处理逻辑大体相同,欢迎大家一起交流。

0 人点赞