一个Flink-Cep使用案例

2022-04-18 13:36:13 浏览数 (1)

本篇主要演练使用Flink-Cep Groovy Aviator 来实现一个物联网监控规则中的一个场景案例,后续将会介绍如何实现规则动态变更。

技术背景简介

Flink-Cep 是flink中的高级library,用于进行复杂事件处理,例如某一类事件连续出现三次就触发告警,可以类比Siddhi、Esper;

Groovy 是一种动态脚本语言,可以让用户输入代码变成后台可执行代码,像刷题网站leetcode 应该就是用了这么类似的一个东西;

Aviator 用于执行求值表达式,例如求1>2的值,得到true,为什么用这个东西,也跟后续动态规则变更相关,接下来的案例也会具体介绍。

案例分析

物联网通常都是设备数据,比喻说设备的温度、耗电量等等,会有对设备的监控,例如求设备连续三个点的值大于10且三个点的求和值大于100,要求将这三个点发送到下游进行处理,首先看一下直接使用Flink-Cep api的实现:

代码语言:javascript复制
case class  DpData(dpId:String,value:Double)
val pattern=Pattern.begin("start",AfterMatchSkipStrategy.skipPastLastEvent()).where(new SimpleCondition[DpData] {
      override def filter(value: DpData): Boolean = value.value>10
    }).times(2).consecutive()
      .next("next").where(new IterativeCondition[DpData] {
      override def filter(value: DpData, ctx: IterativeCondition.Context[DpData]): Boolean =      {
        if(value.value>10) {
          val sum=value.value ctx.getEventsForPattern("start").map(_.value).sum
          return sum>100
        }
        return false
      }
    })

在这里使用了一种变相的实现方式,先使用start的Pattern通过times(2) 与 consecutive 来限定连续两个点的值大于10,然后在使用一个next的Pattern, 限定输入数值大于10, 并且求得满足start-Pattern的数据值与当前点数值的和大于100, 最终就会输出我们需要的数据。

但是在实际中,特别是在面向C端用户或者是监控类的每个业务都有自己的监控阈值,因此规则会是一个不断动态变更的过程,通常会定义一个规则模板,模板里面的条件是可动态变更的。用户定义的Pattern在flink里面会被解析成为NFA(代表了一个匹配的流程),NFA生成是不可更改的,所以要想NFA可变,就要求Pattern可动态生成,然后去替换程序里面的NFA,所以我们就需要Groovy这样的脚本语言能够动态生成Pattern对象,对于规则里面的条件value.value>10, 对于规则配置来说就是一个条件表达式,要是条件表达式可执行可使用Aviator。

实现

基于上面的分析,现在思路已经非常清楚了,首先定义一个该场景下的规则模板,也就是Pattern模板是通过Groovy定义的:

代码语言:javascript复制
val groovyScript=
      """
import cep.FilterCondition
import cep.SumIterativeCondition
import org.apache.flink.cep.scala.pattern.Pattern
import org.apache.flink.cep.nfa.aftermatch.AfterMatchSkipStrategy
where1=new FilterCondition("_script_","_fieldName_") 
where2=new SumIterativeCondition(_sum_,"_script_","_fieldName_")
def getPattern(){
return Pattern.begin("start",AfterMatchSkipStrategy.skipPastLastEvent()).where(where1).times(2).consecutive().next("next").where(where2)
}
      """.stripMargin

在这里面的 _script_、_fieldName_、_sum_ 全部都是参数,需要做变量替换,比喻说

代码语言:javascript复制
where1=new FilterCondition("_script_","_fieldName_")

替换成为了

代码语言:javascript复制
where1=new FilterCondition("getValue(data)>10","value")

表示从流数据里面value字段要求其值大于10。

解析这个groovy脚本,执行其 getPattern 方法获取我们需要的规则定义对象:

代码语言:javascript复制
val factory = new ScriptEngineManager();
val engine =  factory.getEngineByName("groovy");
engine.eval(groovyScript);
val p = engine.asInstanceOf[Invocable].invokeFunction("getPattern").asInstanceOf[Pattern[DpData,DpData]]

现在重点看一下FilterCondition 定义,表示的一个自定义继承SimpleCondition的实现:

代码语言:javascript复制
public class FilterCondition extends SimpleCondition<Map<String,Object>> {

    private String script;
    private String fieldName;
    public FilterCondition(String script,String fieldName){
        this.script=script;
        this.fieldName=fieldName;
        //加载自定义的函数
        AviatorEvaluator.addFunction(new ParseValueFunction(this.fieldName));
    }
    //filter 方法表示的是条件判断
    @Override public boolean filter(Map<String,Object> value) throws Exception {
        Map<String,Object> params=new HashMap<>();
        params.put("data",value);
        return (Boolean) AviatorEvaluator.execute(script,params);
    }
}

ParseValueFunction 表示的是一个Aviator自定义函数,就是上述提到的getValue函数,它的目的是解析流数据里面的具体字段数值,这里面就是解析value字段的值:

代码语言:javascript复制
class ParseValueFunction extends AbstractFunction{

    private String fieldName; //value
    public ParseValueFunction(String fieldName){
        this.fieldName=fieldName;
    }
    @Override public String getName() {
        return "getValue"; //定义函数名称
    }
    //env 就是上述的params 入参,arg1表示的就是 data参数
    @Override public AviatorObject call(Map<String, Object> env, AviatorObject arg1) {

        Map<String,Object> map= (Map<String,Object>)FunctionUtils.getJavaObject(arg1,env);
        Double value=Double.valueOf((String)map.get(fieldName));
        return AviatorDouble.valueOf(value);
    }
}

理解了这些之后,在看第二个Pattern条件where2实现就比较清楚了

代码语言:javascript复制
public class SumIterativeCondition extends IterativeCondition<HashMap<String,Object>> {

    private double sum;
    private String script;
    private String fieldName;

    public SumIterativeCondition(double sum,String scrpit,String fieldName){
        this.sum=sum;
       this.script=scrpit;
       this.fieldName=fieldName;
    }

    @Override public boolean filter(HashMap<String,Object> value, Context<HashMap<String,Object>> ctx) throws Exception {

        Map<String,Object> params=new HashMap<>();
        params.put("data",value);

        if((Boolean) AviatorEvaluator.execute(script,params)){
            double sumNow= Double.valueOf((String)value.get(fieldName))  StreamSupport.stream(ctx.getEventsForPattern("start").spliterator(),false)
                    .map(x->Double.valueOf((String)value.get(fieldName))).reduce((acc,item)->{
                        return acc item;
                    }).orElse(0.0);
            return sumNow>sum;
        }
        return false;
    }
}

至此一个简单的Flink-cep Groovy Aviator实现已经完成。

总结

本篇以一个简单的demo来介绍Flink-cep Groovy Aviator的实现流程,为后续介绍Flink-Cep如何实现动态规则变更打下基础,尽情期待。。。

0 人点赞