本篇主要演练使用Flink-Cep Groovy Aviator 来实现一个物联网监控规则中的一个场景案例,后续将会介绍如何实现规则动态变更。
技术背景简介
Flink-Cep 是flink中的高级library,用于进行复杂事件处理,例如某一类事件连续出现三次就触发告警,可以类比Siddhi、Esper;
Groovy 是一种动态脚本语言,可以让用户输入代码变成后台可执行代码,像刷题网站leetcode 应该就是用了这么类似的一个东西;
Aviator 用于执行求值表达式,例如求1>2的值,得到true,为什么用这个东西,也跟后续动态规则变更相关,接下来的案例也会具体介绍。
案例分析
物联网通常都是设备数据,比喻说设备的温度、耗电量等等,会有对设备的监控,例如求设备连续三个点的值大于10且三个点的求和值大于100,要求将这三个点发送到下游进行处理,首先看一下直接使用Flink-Cep api的实现:
代码语言:javascript复制case class DpData(dpId:String,value:Double)
val pattern=Pattern.begin("start",AfterMatchSkipStrategy.skipPastLastEvent()).where(new SimpleCondition[DpData] {
override def filter(value: DpData): Boolean = value.value>10
}).times(2).consecutive()
.next("next").where(new IterativeCondition[DpData] {
override def filter(value: DpData, ctx: IterativeCondition.Context[DpData]): Boolean = {
if(value.value>10) {
val sum=value.value ctx.getEventsForPattern("start").map(_.value).sum
return sum>100
}
return false
}
})
在这里使用了一种变相的实现方式,先使用start的Pattern通过times(2) 与 consecutive 来限定连续两个点的值大于10,然后在使用一个next的Pattern, 限定输入数值大于10, 并且求得满足start-Pattern的数据值与当前点数值的和大于100, 最终就会输出我们需要的数据。
但是在实际中,特别是在面向C端用户或者是监控类的每个业务都有自己的监控阈值,因此规则会是一个不断动态变更的过程,通常会定义一个规则模板,模板里面的条件是可动态变更的。用户定义的Pattern在flink里面会被解析成为NFA(代表了一个匹配的流程),NFA生成是不可更改的,所以要想NFA可变,就要求Pattern可动态生成,然后去替换程序里面的NFA,所以我们就需要Groovy这样的脚本语言能够动态生成Pattern对象,对于规则里面的条件value.value>10, 对于规则配置来说就是一个条件表达式,要是条件表达式可执行可使用Aviator。
实现
基于上面的分析,现在思路已经非常清楚了,首先定义一个该场景下的规则模板,也就是Pattern模板是通过Groovy定义的:
代码语言:javascript复制val groovyScript=
"""
import cep.FilterCondition
import cep.SumIterativeCondition
import org.apache.flink.cep.scala.pattern.Pattern
import org.apache.flink.cep.nfa.aftermatch.AfterMatchSkipStrategy
where1=new FilterCondition("_script_","_fieldName_")
where2=new SumIterativeCondition(_sum_,"_script_","_fieldName_")
def getPattern(){
return Pattern.begin("start",AfterMatchSkipStrategy.skipPastLastEvent()).where(where1).times(2).consecutive().next("next").where(where2)
}
""".stripMargin
在这里面的 _script_、_fieldName_、_sum_ 全部都是参数,需要做变量替换,比喻说
代码语言:javascript复制where1=new FilterCondition("_script_","_fieldName_")
替换成为了
代码语言:javascript复制where1=new FilterCondition("getValue(data)>10","value")
表示从流数据里面value字段要求其值大于10。
解析这个groovy脚本,执行其 getPattern 方法获取我们需要的规则定义对象:
代码语言:javascript复制val factory = new ScriptEngineManager();
val engine = factory.getEngineByName("groovy");
engine.eval(groovyScript);
val p = engine.asInstanceOf[Invocable].invokeFunction("getPattern").asInstanceOf[Pattern[DpData,DpData]]
现在重点看一下FilterCondition 定义,表示的一个自定义继承SimpleCondition的实现:
代码语言:javascript复制public class FilterCondition extends SimpleCondition<Map<String,Object>> {
private String script;
private String fieldName;
public FilterCondition(String script,String fieldName){
this.script=script;
this.fieldName=fieldName;
//加载自定义的函数
AviatorEvaluator.addFunction(new ParseValueFunction(this.fieldName));
}
//filter 方法表示的是条件判断
@Override public boolean filter(Map<String,Object> value) throws Exception {
Map<String,Object> params=new HashMap<>();
params.put("data",value);
return (Boolean) AviatorEvaluator.execute(script,params);
}
}
ParseValueFunction 表示的是一个Aviator自定义函数,就是上述提到的getValue函数,它的目的是解析流数据里面的具体字段数值,这里面就是解析value字段的值:
代码语言:javascript复制class ParseValueFunction extends AbstractFunction{
private String fieldName; //value
public ParseValueFunction(String fieldName){
this.fieldName=fieldName;
}
@Override public String getName() {
return "getValue"; //定义函数名称
}
//env 就是上述的params 入参,arg1表示的就是 data参数
@Override public AviatorObject call(Map<String, Object> env, AviatorObject arg1) {
Map<String,Object> map= (Map<String,Object>)FunctionUtils.getJavaObject(arg1,env);
Double value=Double.valueOf((String)map.get(fieldName));
return AviatorDouble.valueOf(value);
}
}
理解了这些之后,在看第二个Pattern条件where2实现就比较清楚了
代码语言:javascript复制public class SumIterativeCondition extends IterativeCondition<HashMap<String,Object>> {
private double sum;
private String script;
private String fieldName;
public SumIterativeCondition(double sum,String scrpit,String fieldName){
this.sum=sum;
this.script=scrpit;
this.fieldName=fieldName;
}
@Override public boolean filter(HashMap<String,Object> value, Context<HashMap<String,Object>> ctx) throws Exception {
Map<String,Object> params=new HashMap<>();
params.put("data",value);
if((Boolean) AviatorEvaluator.execute(script,params)){
double sumNow= Double.valueOf((String)value.get(fieldName)) StreamSupport.stream(ctx.getEventsForPattern("start").spliterator(),false)
.map(x->Double.valueOf((String)value.get(fieldName))).reduce((acc,item)->{
return acc item;
}).orElse(0.0);
return sumNow>sum;
}
return false;
}
}
至此一个简单的Flink-cep Groovy Aviator实现已经完成。
总结
本篇以一个简单的demo来介绍Flink-cep Groovy Aviator的实现流程,为后续介绍Flink-Cep如何实现动态规则变更打下基础,尽情期待。。。