前面已经实现了Flink动态CEP增、改的功能,既然可以增、改,那必然少不了删,但是有了删,必然可能会出现在用规则数少于原定的最少规则数,针对与这种情况,任务是应该要正常运行的,故将规则停用和任务恢复放在一块儿了
原理讲解
规则停用:
根据规则id清除对应规则状态以及其他所有相关内容
任务恢复:
增加runningCountState,记录运行中的规则数
当任务无状态启动时候,runningCountState为空,待规则初始化达到最小规则数以后,任务开始正常处理数据,canProcessElements1置为true,更新运行中的规则数,此时,任务开始正常处理事件流数据,任务正常处理时间流数据后,当规则发生变化,更新runningCountState中的运行的规则数
当任务有状态重启时候,在initializeState方法中初始化状态,包括规则配置、runningCountState、未处理的事件,再通过状态中保存的规则去初始化每个规则对应的computationStates,elementQueueState,partialMatches等CEP相关的状态以及根据runningCountState判断任务是否可以处理事件
之后,会在open方法中根据从checkpoint/savepoint中获取到的规则配置来初始化状态机
代码语言:java复制@Override
public void initializeState(StateInitializationContext context) throws Exception {
super.initializeState(context);
this.initializationContext = context;
this.beforePatternInitOverElements = context.getOperatorStateStore().getListState(new ListStateDescriptor<>(
BEFORE_PATTERN_INIT_OVER_EVENT_QUEUE_STATE_NAME,
TypeInformation.<StreamRecord<IN>>of(new TypeHint<StreamRecord<IN>>() {
})));
this.dynamicConfigurationListState =
context.getOperatorStateStore().getListState(
new ListStateDescriptor<DynamicConfiguration>(DYNAMIC_CONFIGURATION_LIST_STATE_NAME,DynamicConfiguration.class)
);
List<DynamicConfiguration> dynamicConfigurations = (List<DynamicConfiguration>) dynamicConfigurationListState.get();
for (DynamicConfiguration dynamicConfiguration : dynamicConfigurations) {
initCEPState(dynamicConfiguration,dynamicConfigurations.size());
}
this.runningCountState =
context.getOperatorStateStore().getListState(new ListStateDescriptor<Integer>(RUNNING_COUNT_STATE_NAME,Integer.class));
List<Integer> runningCount = (List<Integer>) runningCountState.get();
if(CollectionUtils.isNotEmpty(runningCount)){
canProcessElements1 = Boolean.TRUE;
}
}
测试代码
代码语言:java复制env.enableCheckpointing(60000L);
env.setStateBackend(new RocksDBStateBackend("file:///rocksdb/"));
DataStreamSource<String> socketTextStream = env.socketTextStream("localhost", 8888);
DataStreamSource<String> socketTextStream2 = env.socketTextStream("localhost", 6666);
SingleOutputStreamOperator<EventWithTime> eventStream = socketTextStream.flatMap(new FlatMapFunction<String, EventWithTime>() {
@Override
public void flatMap(String s, Collector<EventWithTime> collector) throws Exception {
String[] split = s.split(",");
if (split.length == 3) {
Long timestamp = Long.valueOf(split[2]);
EventWithTime event = new EventWithTime(split[0], split[1], timestamp);
collector.collect(event);
}
}
});
SingleOutputStreamOperator<EventWithTime> eventWithWatermark = eventStream.assignTimestampsAndWatermarks(
WatermarkStrategy
.<EventWithTime>forBoundedOutOfOrderness(Duration.ofSeconds(2))
.withTimestampAssigner((SerializableTimestampAssigner<EventWithTime>) (eventWithTime, l) -> eventWithTime.getTime())
);
SingleOutputStreamOperator<DynamicConfiguration> dynamicConfigurationStream = socketTextStream2.flatMap((FlatMapFunction<String, DynamicConfiguration>) (s, collector) -> {
ObjectMapper objectMapper = new ObjectMapper();
try {
DynamicConfiguration dynamicConfiguration = objectMapper.readValue(s, DynamicConfiguration.class);
if(null != dynamicConfiguration){
collector.collect(dynamicConfiguration);
}
} catch (Exception e) {
}
}, TypeInformation.of(DynamicConfiguration.class));
KeyedStream<EventWithTime, String> eventIdKeyedStream = eventWithWatermark.keyBy(EventWithTime::getUser);
DataStream<DynamicMatchData<Object>> dynamicMatchDataDataStream =
DynamicCEP.patternStream(eventIdKeyedStream, dynamicConfigurationStream)
.minPatternInitialized(3)
.objectProcess();
dynamicMatchDataDataStream.print("dynamic");
env.execute();
测试过程
1. 数据准备
由于上方代码中配置的最小初始化3个规则之后才,故我们先初始化两个规则,规则1和规则10086,以及输入一些事件,此时并未有任何匹配输出,因为尚未初始化完成,故并未开始计算
代码语言:shell复制[root@sixjo ~]# nc -lk 6666
{"id":1,"version":1,"dynamicName":"demo2","dataType":"bean","dataClassName":"","dynamicPatternProcessFunctionClassName":null,"dynamicPattern":"[{"name":"start","matchPosition":"begin","afterMatchSkipStrategy":null,"conditionMethod":"where","dynamicCondition":{"conditionType":"condition","script":"name=='start'","dynamicPatternConfigurations":null},"properties":null},{"name":"middle","matchPosition":"followedByAny","afterMatchSkipStrategy":null,"conditionMethod":"where","dynamicCondition":{"conditionType":"condition","script":"name=='middle'","dynamicPatternConfigurations":null},"properties":null},{"name":"end","matchPosition":"followedByAny","afterMatchSkipStrategy":null,"conditionMethod":"where","dynamicCondition":{"conditionType":"condition","script":"name=='end'","dynamicPatternConfigurations":null},"properties":null}]"}
{"id":10086,"version":1,"dynamicName":"demo2","dataType":"bean","dataClassName":"","dynamicPatternProcessFunctionClassName":"org.apache.flink.cep.processor.ObjectDynamicPatternProcessFunction","dynamicPattern":"[{"name":"start","matchPosition":"begin","afterMatchSkipStrategy":null,"conditionMethod":"where","dynamicCondition":{"conditionType":"condition","script":"name=='start'","dynamicPatternConfigurations":null},"properties":null},{"name":"middle","matchPosition":"followedByAny","afterMatchSkipStrategy":null,"conditionMethod":"where","dynamicCondition":{"conditionType":"condition","script":"name=='middle'","dynamicPatternConfigurations":null},"properties":null},{"name":"end","matchPosition":"followedByAny","afterMatchSkipStrategy":null,"conditionMethod":"where","dynamicCondition":{"conditionType":"condition","script":"name=='end'","dynamicPatternConfigurations":null},"properties":null}]"}
代码语言:shell复制nc -lk 8888
2,end,1003
2,middle,1002
2,start,1001
2,end,2003
2,end,3003
2,end,4003
2. 从规则未初始化完成的checkpoint恢复任务
待上述数据准备完毕以及完成checkpoint后停止任务,从checkpoint启动,通过debug,我们会发现前面准备的数据已经从checkpoint恢复
代码语言:java复制Configuration configuration = new Configuration();
configuration.setString("execution.savepoint.path","file:///rocksdba1d31ed55a64005fefb630fe56c2e9601/chk-2");
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration);
此时我们继续输入事件,依旧不会触发计算,因为此时规则尚未初始化完成
代码语言:shell复制2,end,4003
然后我们输入规则 2,此时已经规则初始化完成,任务开始正常计算,与预期一致,三个规则全部有一条匹配结果
代码语言:shell复制{"id":2,"version":1,"dynamicName":"demo2","dataType":"bean","dataClassName":"","dynamicPatternProcessFunctionClassName":null,"dynamicPattern":"[{"name":"start","matchPosition":"begin","afterMatchSkipStrategy":null,"conditionMethod":"where","dynamicCondition":{"conditionType":"condition","script":"name=='start'","dynamicPatternConfigurations":null},"properties":null},{"name":"middle","matchPosition":"followedByAny","afterMatchSkipStrategy":null,"conditionMethod":"where","dynamicCondition":{"conditionType":"condition","script":"name=='middle'","dynamicPatternConfigurations":null},"properties":null},{"name":"end","matchPosition":"followedByAny","afterMatchSkipStrategy":null,"conditionMethod":"where","dynamicCondition":{"conditionType":"condition","script":"name=='end'","dynamicPatternConfigurations":null},"properties":{"within":["SECONDS",5]}}]"}
代码语言:shell复制2,end,4004
代码语言:shell复制dynamic> DynamicMatchData(id=10086, version=1, match={start=[EventWithTime(user=2, name=start, time=1001)], middle=[EventWithTime(user=2, name=middle, time=1002)], end=[EventWithTime(user=2, name=end, time=1003)]})
dynamic> DynamicMatchData(id=10086, version=1, match={start=[EventWithTime(user=2, name=start, time=1001)], middle=[EventWithTime(user=2, name=middle, time=1002)], end=[EventWithTime(user=2, name=end, time=2003)]})
dynamic> DynamicMatchData(id=2, version=1, match={start=[EventWithTime(user=2, name=start, time=1001)], middle=[EventWithTime(user=2, name=middle, time=1002)], end=[EventWithTime(user=2, name=end, time=1003)]})
dynamic> DynamicMatchData(id=2, version=1, match={start=[EventWithTime(user=2, name=start, time=1001)], middle=[EventWithTime(user=2, name=middle, time=1002)], end=[EventWithTime(user=2, name=end, time=2003)]})
dynamic> DynamicMatchData(id=1, version=1, match={start=[EventWithTime(user=2, name=start, time=1001)], middle=[EventWithTime(user=2, name=middle, time=1002)], end=[EventWithTime(user=2, name=end, time=1003)]})
dynamic> DynamicMatchData(id=1, version=1, match={start=[EventWithTime(user=2, name=start, time=1001)], middle=[EventWithTime(user=2, name=middle, time=1002)], end=[EventWithTime(user=2, name=end, time=2003)]})
此时我们删除规则10086,删除后,正在运行的规则数变成2,初始配置的规则数3,此时我们期望任务可以正常计算,并且已经没有规则10086
代码语言:shell复制{"id":10086,"version":1,"operate":"DELETE"}
代码语言:shell复制2,end,5004
代码语言:shell复制dynamic> DynamicMatchData(id=2, version=1, match={start=[EventWithTime(user=2, name=start, time=1001)], middle=[EventWithTime(user=2, name=middle, time=1002)], end=[EventWithTime(user=2, name=end, time=3003)]})
dynamic> DynamicMatchData(id=1, version=1, match={start=[EventWithTime(user=2, name=start, time=1001)], middle=[EventWithTime(user=2, name=middle, time=1002)], end=[EventWithTime(user=2, name=end, time=3003)]})
3.从规则初始化完成的状态恢复,但是规则数少于最少规则数
此时我们停掉任务,从最新的checkpoint恢复,继续输入事件,正在运行的规则数依旧为2,依旧可以正常计算,
代码语言:java复制configuration.setString("execution.savepoint.path","file:///rocksdb/0e08999f4b06cc4f50c22c6a752e4b4b/chk-6");
代码语言:shell复制2,end,6004
代码语言:shell复制dynamic> DynamicMatchData(id=2, version=1, match={start=[EventWithTime(user=2, name=start, time=1001)], middle=[EventWithTime(user=2, name=middle, time=1002)], end=[EventWithTime(user=2, name=end, time=4003)]})
dynamic> DynamicMatchData(id=2, version=1, match={start=[EventWithTime(user=2, name=start, time=1001)], middle=[EventWithTime(user=2, name=middle, time=1002)], end=[EventWithTime(user=2, name=end, time=4003)]})
dynamic> DynamicMatchData(id=1, version=1, match={start=[EventWithTime(user=2, name=start, time=1001)], middle=[EventWithTime(user=2, name=middle, time=1002)], end=[EventWithTime(user=2, name=end, time=4003)]})
dynamic> DynamicMatchData(id=1, version=1, match={start=[EventWithTime(user=2, name=start, time=1001)], middle=[EventWithTime(user=2, name=middle, time=1002)], end=[EventWithTime(user=2, name=end, time=4003)]})
代码语言:shell复制2,end,7003
代码语言:shell复制dynamic> DynamicMatchData(id=2, version=1, match={start=[EventWithTime(user=2, name=start, time=1001)], middle=[EventWithTime(user=2, name=middle, time=1002)], end=[EventWithTime(user=2, name=end, time=4004)]})
dynamic> DynamicMatchData(id=1, version=1, match={start=[EventWithTime(user=2, name=start, time=1001)], middle=[EventWithTime(user=2, name=middle, time=1002)], end=[EventWithTime(user=2, name=end, time=4004)]})
代码语言:shell复制2,middle,7002
2,start,7001
2,end,8003
代码语言:shell复制dynamic> DynamicMatchData(id=2, version=1, match={start=[EventWithTime(user=2, name=start, time=1001)], middle=[EventWithTime(user=2, name=middle, time=1002)], end=[EventWithTime(user=2, name=end, time=5004)]})
dynamic> DynamicMatchData(id=1, version=1, match={start=[EventWithTime(user=2, name=start, time=1001)], middle=[EventWithTime(user=2, name=middle, time=1002)], end=[EventWithTime(user=2, name=end, time=5004)]})
继续输入事件,此时,规则2已经超过5s,不再匹配到结果,只有规则1匹配到结果
代码语言:shell复制2,end,9003
代码语言:shell复制dynamic> DynamicMatchData(id=1, version=1, match={start=[EventWithTime(user=2, name=start, time=1001)], middle=[EventWithTime(user=2, name=middle, time=1002)], end=[EventWithTime(user=2, name=end, time=6004)]})
代码语言:shell复制2,end,10003
代码语言:shell复制dynamic> DynamicMatchData(id=1, version=1, match={start=[EventWithTime(user=2, name=start, time=1001)], middle=[EventWithTime(user=2, name=middle, time=1002)], end=[EventWithTime(user=2, name=end, time=7003)]})
dynamic> DynamicMatchData(id=1, version=1, match={start=[EventWithTime(user=2, name=start, time=1001)], middle=[EventWithTime(user=2, name=middle, time=7002)], end=[EventWithTime(user=2, name=end, time=7003)]})
dynamic> DynamicMatchData(id=1, version=1, match={start=[EventWithTime(user=2, name=start, time=7001)], middle=[EventWithTime(user=2, name=middle, time=7002)], end=[EventWithTime(user=2, name=end, time=7003)]})
继续输入事件,此时,规则1、2皆匹配到结果
代码语言:shell复制2,end,11003
代码语言:shell复制dynamic> DynamicMatchData(id=2, version=1, match={start=[EventWithTime(user=2, name=start, time=7001)], middle=[EventWithTime(user=2, name=middle, time=7002)], end=[EventWithTime(user=2, name=end, time=8003)]})
dynamic> DynamicMatchData(id=1, version=1, match={start=[EventWithTime(user=2, name=start, time=1001)], middle=[EventWithTime(user=2, name=middle, time=1002)], end=[EventWithTime(user=2, name=end, time=8003)]})
dynamic> DynamicMatchData(id=1, version=1, match={start=[EventWithTime(user=2, name=start, time=1001)], middle=[EventWithTime(user=2, name=middle, time=7002)], end=[EventWithTime(user=2, name=end, time=8003)]})
dynamic> DynamicMatchData(id=1, version=1, match={start=[EventWithTime(user=2, name=start, time=7001)], middle=[EventWithTime(user=2, name=middle, time=7002)], end=[EventWithTime(user=2, name=end, time=8003)]})
代码语言:shell复制2,end,12003
代码语言:shell复制dynamic> DynamicMatchData(id=2, version=1, match={start=[EventWithTime(user=2, name=start, time=7001)], middle=[EventWithTime(user=2, name=middle, time=7002)], end=[EventWithTime(user=2, name=end, time=9003)]})
dynamic> DynamicMatchData(id=1, version=1, match={start=[EventWithTime(user=2, name=start, time=1001)], middle=[EventWithTime(user=2, name=middle, time=1002)], end=[EventWithTime(user=2, name=end, time=9003)]})
dynamic> DynamicMatchData(id=1, version=1, match={start=[EventWithTime(user=2, name=start, time=1001)], middle=[EventWithTime(user=2, name=middle, time=7002)], end=[EventWithTime(user=2, name=end, time=9003)]})
dynamic> DynamicMatchData(id=1, version=1, match={start=[EventWithTime(user=2, name=start, time=7001)], middle=[EventWithTime(user=2, name=middle, time=7002)], end=[EventWithTime(user=2, name=end, time=9003)]})
代码语言:shell复制2,end,13003
代码语言:shell复制dynamic> DynamicMatchData(id=2, version=1, match={start=[EventWithTime(user=2, name=start, time=7001)], middle=[EventWithTime(user=2, name=middle, time=7002)], end=[EventWithTime(user=2, name=end, time=10003)]})
dynamic> DynamicMatchData(id=1, version=1, match={start=[EventWithTime(user=2, name=start, time=1001)], middle=[EventWithTime(user=2, name=middle, time=1002)], end=[EventWithTime(user=2, name=end, time=10003)]})
dynamic> DynamicMatchData(id=1, version=1, match={start=[EventWithTime(user=2, name=start, time=1001)], middle=[EventWithTime(user=2, name=middle, time=7002)], end=[EventWithTime(user=2, name=end, time=10003)]})
dynamic> DynamicMatchData(id=1, version=1, match={start=[EventWithTime(user=2, name=start, time=7001)], middle=[EventWithTime(user=2, name=middle, time=7002)], end=[EventWithTime(user=2, name=end, time=10003)]})
代码语言:shell复制2,end,14003
代码语言:shell复制dynamic> DynamicMatchData(id=2, version=1, match={start=[EventWithTime(user=2, name=start, time=7001)], middle=[EventWithTime(user=2, name=middle, time=7002)], end=[EventWithTime(user=2, name=end, time=11003)]})
dynamic> DynamicMatchData(id=1, version=1, match={start=[EventWithTime(user=2, name=start, time=1001)], middle=[EventWithTime(user=2, name=middle, time=1002)], end=[EventWithTime(user=2, name=end, time=11003)]})
dynamic> DynamicMatchData(id=1, version=1, match={start=[EventWithTime(user=2, name=start, time=1001)], middle=[EventWithTime(user=2, name=middle, time=7002)], end=[EventWithTime(user=2, name=end, time=11003)]})
dynamic> DynamicMatchData(id=1, version=1, match={start=[EventWithTime(user=2, name=start, time=7001)], middle=[EventWithTime(user=2, name=middle, time=7002)], end=[EventWithTime(user=2, name=end, time=11003)]})
继续输入事件,此时,规则2已经超过5s,不再匹配到结果,只有规则1匹配到结果
代码语言:shell复制2,end,15003
代码语言:shell复制dynamic> DynamicMatchData(id=1, version=1, match={start=[EventWithTime(user=2, name=start, time=1001)], middle=[EventWithTime(user=2, name=middle, time=1002)], end=[EventWithTime(user=2, name=end, time=12003)]})
dynamic> DynamicMatchData(id=1, version=1, match={start=[EventWithTime(user=2, name=start, time=1001)], middle=[EventWithTime(user=2, name=middle, time=7002)], end=[EventWithTime(user=2, name=end, time=12003)]})
dynamic> DynamicMatchData(id=1, version=1, match={start=[EventWithTime(user=2, name=start, time=7001)], middle=[EventWithTime(user=2, name=middle, time=7002)], end=[EventWithTime(user=2, name=end, time=12003)]})
我们输入规则,更改规则1,此时规则1之前的状态被清空,继续输入事件,规则匹配结果与预期一致
代码语言:shell复制{"id":1,"version":2,"dynamicName":"demo2","dataType":"bean","dataClassName":"","dynamicPatternProcessFunctionClassName":"org.apache.flink.cep.dynamic.EventPatternProcessFunction","dynamicPattern":"[{"name":"start","matchPosition":"begin","afterMatchSkipStrategy":null,"conditionMethod":"where","dynamicCondition":{"conditionType":"condition","script":"name=='start'","dynamicPatternConfigurations":null},"properties":null},{"name":"middle","matchPosition":"followedByAny","afterMatchSkipStrategy":null,"conditionMethod":"where","dynamicCondition":{"conditionType":"condition","script":"name=='middle'","dynamicPatternConfigurations":null},"properties":null},{"name":"end","matchPosition":"followedByAny","afterMatchSkipStrategy":null,"conditionMethod":"where","dynamicCondition":{"conditionType":"condition","script":"name=='end'","dynamicPatternConfigurations":null},"properties":null}]"}
代码语言:shell复制2,end,16003
2,middle,16002
2,start,16001
2,end,17003
2,end,18003
2,end,19003
代码语言:shell复制dynamic> DynamicMatchData(id=2, version=1, match={start=[EventWithTime(user=2, name=start, time=16001)], middle=[EventWithTime(user=2, name=middle, time=16002)], end=[EventWithTime(user=2, name=end, time=16003)]})
dynamic> DynamicMatchData(id=1, version=2, match=这是EventPatternProcessFunction matchs{start=[EventWithTime(user=2, name=start, time=16001)], middle=[EventWithTime(user=2, name=middle, time=16002)], end=[EventWithTime(user=2, name=end, time=16003)]})
代码语言:shell复制2,end,20003
代码语言:shell复制dynamic> DynamicMatchData(id=2, version=1, match={start=[EventWithTime(user=2, name=start, time=16001)], middle=[EventWithTime(user=2, name=middle, time=16002)], end=[EventWithTime(user=2, name=end, time=17003)]})
dynamic> DynamicMatchData(id=1, version=2, match=这是EventPatternProcessFunction matchs{start=[EventWithTime(user=2, name=start, time=16001)], middle=[EventWithTime(user=2, name=middle, time=16002)], end=[EventWithTime(user=2, name=end, time=17003)]})
代码语言:shell复制2,end,21003
代码语言:shell复制dynamic> DynamicMatchData(id=2, version=1, match={start=[EventWithTime(user=2, name=start, time=16001)], middle=[EventWithTime(user=2, name=middle, time=16002)], end=[EventWithTime(user=2, name=end, time=18003)]})
dynamic> DynamicMatchData(id=1, version=2, match=这是EventPatternProcessFunction matchs{start=[EventWithTime(user=2, name=start, time=16001)], middle=[EventWithTime(user=2, name=middle, time=16002)], end=[EventWithTime(user=2, name=end, time=18003)]})
代码语言:shell复制dynamic> DynamicMatchData(id=2, version=1, match={start=[EventWithTime(user=2, name=start, time=16001)], middle=[EventWithTime(user=2, name=middle, time=16002)], end=[EventWithTime(user=2, name=end, time=19003)]})
dynamic> DynamicMatchData(id=1, version=2, match=这是EventPatternProcessFunction matchs{start=[EventWithTime(user=2, name=start, time=16001)], middle=[EventWithTime(user=2, name=middle, time=16002)], end=[EventWithTime(user=2, name=end, time=19003)]})
代码语言:shell复制2,end,23003
代码语言:shell复制dynamic> DynamicMatchData(id=2, version=1, match={start=[EventWithTime(user=2, name=start, time=16001)], middle=[EventWithTime(user=2, name=middle, time=16002)], end=[EventWithTime(user=2, name=end, time=20003)]})
dynamic> DynamicMatchData(id=1, version=2, match=这是EventPatternProcessFunction matchs{start=[EventWithTime(user=2, name=start, time=16001)], middle=[EventWithTime(user=2, name=middle, time=16002)], end=[EventWithTime(user=2, name=end, time=20003)]})
继续输入事件,此时,规则2已经超过5s,不再匹配到结果,只有规则1匹配到结果
代码语言:shell复制2,end,25004
代码语言:shell复制dynamic> DynamicMatchData(id=1, version=2, match=这是EventPatternProcessFunction matchs{start=[EventWithTime(user=2, name=start, time=16001)], middle=[EventWithTime(user=2, name=middle, time=16002)], end=[EventWithTime(user=2, name=end, time=21003)]})
代码语言:shell复制2,end,25003
代码语言:shell复制dynamic> DynamicMatchData(id=1, version=2, match=这是EventPatternProcessFunction matchs{start=[EventWithTime(user=2, name=start, time=16001)], middle=[EventWithTime(user=2, name=middle, time=16002)], end=[EventWithTime(user=2, name=end, time=22003)]})
END
至此,Flink动态CEP的测试Demo已完成,基于FlinkCEP源码改造的Flink动态CEP已基本成型