Flink动态CEP之规则停用以及任务恢复

2023-05-26 22:21:23 浏览数 (1)

前面已经实现了Flink动态CEP增、改的功能,既然可以增、改,那必然少不了删,但是有了删,必然可能会出现在用规则数少于原定的最少规则数,针对与这种情况,任务是应该要正常运行的,故将规则停用和任务恢复放在一块儿了

原理讲解

规则停用:

根据规则id清除对应规则状态以及其他所有相关内容

任务恢复:

增加runningCountState,记录运行中的规则数

当任务无状态启动时候,runningCountState为空,待规则初始化达到最小规则数以后,任务开始正常处理数据,canProcessElements1置为true,更新运行中的规则数,此时,任务开始正常处理事件流数据,任务正常处理时间流数据后,当规则发生变化,更新runningCountState中的运行的规则数

当任务有状态重启时候,在initializeState方法中初始化状态,包括规则配置、runningCountState、未处理的事件,再通过状态中保存的规则去初始化每个规则对应的computationStates,elementQueueState,partialMatches等CEP相关的状态以及根据runningCountState判断任务是否可以处理事件

之后,会在open方法中根据从checkpoint/savepoint中获取到的规则配置来初始化状态机

代码语言:java复制
@Override
public void initializeState(StateInitializationContext context) throws Exception {
    super.initializeState(context);
    this.initializationContext = context;
    this.beforePatternInitOverElements = context.getOperatorStateStore().getListState(new ListStateDescriptor<>(
            BEFORE_PATTERN_INIT_OVER_EVENT_QUEUE_STATE_NAME,
            TypeInformation.<StreamRecord<IN>>of(new TypeHint<StreamRecord<IN>>() {
            })));
    this.dynamicConfigurationListState =
            context.getOperatorStateStore().getListState(
                    new ListStateDescriptor<DynamicConfiguration>(DYNAMIC_CONFIGURATION_LIST_STATE_NAME,DynamicConfiguration.class)
                    );
    List<DynamicConfiguration> dynamicConfigurations = (List<DynamicConfiguration>) dynamicConfigurationListState.get();
    for (DynamicConfiguration dynamicConfiguration : dynamicConfigurations) {
        initCEPState(dynamicConfiguration,dynamicConfigurations.size());
    }
    this.runningCountState =
            context.getOperatorStateStore().getListState(new ListStateDescriptor<Integer>(RUNNING_COUNT_STATE_NAME,Integer.class));
    List<Integer> runningCount = (List<Integer>) runningCountState.get();
    if(CollectionUtils.isNotEmpty(runningCount)){
        canProcessElements1 = Boolean.TRUE;
    }
}

测试代码

代码语言:java复制
env.enableCheckpointing(60000L);
env.setStateBackend(new RocksDBStateBackend("file:///rocksdb/"));
DataStreamSource<String> socketTextStream = env.socketTextStream("localhost", 8888);
DataStreamSource<String> socketTextStream2 = env.socketTextStream("localhost", 6666);
SingleOutputStreamOperator<EventWithTime> eventStream = socketTextStream.flatMap(new FlatMapFunction<String, EventWithTime>() {
    @Override
    public void flatMap(String s, Collector<EventWithTime> collector) throws Exception {
        String[] split = s.split(",");
        if (split.length == 3) {
            Long timestamp = Long.valueOf(split[2]);
            EventWithTime event = new EventWithTime(split[0], split[1], timestamp);
            collector.collect(event);
        }
    }
});
SingleOutputStreamOperator<EventWithTime> eventWithWatermark = eventStream.assignTimestampsAndWatermarks(
        WatermarkStrategy
                .<EventWithTime>forBoundedOutOfOrderness(Duration.ofSeconds(2))
                .withTimestampAssigner((SerializableTimestampAssigner<EventWithTime>) (eventWithTime, l) -> eventWithTime.getTime())
);

SingleOutputStreamOperator<DynamicConfiguration> dynamicConfigurationStream = socketTextStream2.flatMap((FlatMapFunction<String, DynamicConfiguration>) (s, collector) -> {
    ObjectMapper objectMapper = new ObjectMapper();
    try {
        DynamicConfiguration dynamicConfiguration = objectMapper.readValue(s, DynamicConfiguration.class);
        if(null != dynamicConfiguration){
            collector.collect(dynamicConfiguration);
        }
    } catch (Exception e) {
    }
}, TypeInformation.of(DynamicConfiguration.class));

KeyedStream<EventWithTime, String> eventIdKeyedStream = eventWithWatermark.keyBy(EventWithTime::getUser);
DataStream<DynamicMatchData<Object>> dynamicMatchDataDataStream =
        DynamicCEP.patternStream(eventIdKeyedStream, dynamicConfigurationStream)
                .minPatternInitialized(3)
                .objectProcess();
dynamicMatchDataDataStream.print("dynamic");
env.execute();

测试过程

1. 数据准备

由于上方代码中配置的最小初始化3个规则之后才,故我们先初始化两个规则,规则1和规则10086,以及输入一些事件,此时并未有任何匹配输出,因为尚未初始化完成,故并未开始计算

代码语言:shell复制
[root@sixjo ~]# nc -lk 6666
{"id":1,"version":1,"dynamicName":"demo2","dataType":"bean","dataClassName":"","dynamicPatternProcessFunctionClassName":null,"dynamicPattern":"[{"name":"start","matchPosition":"begin","afterMatchSkipStrategy":null,"conditionMethod":"where","dynamicCondition":{"conditionType":"condition","script":"name=='start'","dynamicPatternConfigurations":null},"properties":null},{"name":"middle","matchPosition":"followedByAny","afterMatchSkipStrategy":null,"conditionMethod":"where","dynamicCondition":{"conditionType":"condition","script":"name=='middle'","dynamicPatternConfigurations":null},"properties":null},{"name":"end","matchPosition":"followedByAny","afterMatchSkipStrategy":null,"conditionMethod":"where","dynamicCondition":{"conditionType":"condition","script":"name=='end'","dynamicPatternConfigurations":null},"properties":null}]"}
{"id":10086,"version":1,"dynamicName":"demo2","dataType":"bean","dataClassName":"","dynamicPatternProcessFunctionClassName":"org.apache.flink.cep.processor.ObjectDynamicPatternProcessFunction","dynamicPattern":"[{"name":"start","matchPosition":"begin","afterMatchSkipStrategy":null,"conditionMethod":"where","dynamicCondition":{"conditionType":"condition","script":"name=='start'","dynamicPatternConfigurations":null},"properties":null},{"name":"middle","matchPosition":"followedByAny","afterMatchSkipStrategy":null,"conditionMethod":"where","dynamicCondition":{"conditionType":"condition","script":"name=='middle'","dynamicPatternConfigurations":null},"properties":null},{"name":"end","matchPosition":"followedByAny","afterMatchSkipStrategy":null,"conditionMethod":"where","dynamicCondition":{"conditionType":"condition","script":"name=='end'","dynamicPatternConfigurations":null},"properties":null}]"}
代码语言:shell复制
nc -lk 8888
2,end,1003
2,middle,1002
2,start,1001
2,end,2003
2,end,3003
2,end,4003
2. 从规则未初始化完成的checkpoint恢复任务

待上述数据准备完毕以及完成checkpoint后停止任务,从checkpoint启动,通过debug,我们会发现前面准备的数据已经从checkpoint恢复

代码语言:java复制
Configuration configuration = new Configuration();
configuration.setString("execution.savepoint.path","file:///rocksdba1d31ed55a64005fefb630fe56c2e9601/chk-2");
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration);

此时我们继续输入事件,依旧不会触发计算,因为此时规则尚未初始化完成

代码语言:shell复制
2,end,4003

然后我们输入规则 2,此时已经规则初始化完成,任务开始正常计算,与预期一致,三个规则全部有一条匹配结果

代码语言:shell复制
{"id":2,"version":1,"dynamicName":"demo2","dataType":"bean","dataClassName":"","dynamicPatternProcessFunctionClassName":null,"dynamicPattern":"[{"name":"start","matchPosition":"begin","afterMatchSkipStrategy":null,"conditionMethod":"where","dynamicCondition":{"conditionType":"condition","script":"name=='start'","dynamicPatternConfigurations":null},"properties":null},{"name":"middle","matchPosition":"followedByAny","afterMatchSkipStrategy":null,"conditionMethod":"where","dynamicCondition":{"conditionType":"condition","script":"name=='middle'","dynamicPatternConfigurations":null},"properties":null},{"name":"end","matchPosition":"followedByAny","afterMatchSkipStrategy":null,"conditionMethod":"where","dynamicCondition":{"conditionType":"condition","script":"name=='end'","dynamicPatternConfigurations":null},"properties":{"within":["SECONDS",5]}}]"}
代码语言:shell复制
2,end,4004
代码语言:shell复制
dynamic> DynamicMatchData(id=10086, version=1, match={start=[EventWithTime(user=2, name=start, time=1001)], middle=[EventWithTime(user=2, name=middle, time=1002)], end=[EventWithTime(user=2, name=end, time=1003)]})
dynamic> DynamicMatchData(id=10086, version=1, match={start=[EventWithTime(user=2, name=start, time=1001)], middle=[EventWithTime(user=2, name=middle, time=1002)], end=[EventWithTime(user=2, name=end, time=2003)]})
dynamic> DynamicMatchData(id=2, version=1, match={start=[EventWithTime(user=2, name=start, time=1001)], middle=[EventWithTime(user=2, name=middle, time=1002)], end=[EventWithTime(user=2, name=end, time=1003)]})
dynamic> DynamicMatchData(id=2, version=1, match={start=[EventWithTime(user=2, name=start, time=1001)], middle=[EventWithTime(user=2, name=middle, time=1002)], end=[EventWithTime(user=2, name=end, time=2003)]})
dynamic> DynamicMatchData(id=1, version=1, match={start=[EventWithTime(user=2, name=start, time=1001)], middle=[EventWithTime(user=2, name=middle, time=1002)], end=[EventWithTime(user=2, name=end, time=1003)]})
dynamic> DynamicMatchData(id=1, version=1, match={start=[EventWithTime(user=2, name=start, time=1001)], middle=[EventWithTime(user=2, name=middle, time=1002)], end=[EventWithTime(user=2, name=end, time=2003)]})

此时我们删除规则10086,删除后,正在运行的规则数变成2,初始配置的规则数3,此时我们期望任务可以正常计算,并且已经没有规则10086

代码语言:shell复制
{"id":10086,"version":1,"operate":"DELETE"}
代码语言:shell复制
2,end,5004
代码语言:shell复制
dynamic> DynamicMatchData(id=2, version=1, match={start=[EventWithTime(user=2, name=start, time=1001)], middle=[EventWithTime(user=2, name=middle, time=1002)], end=[EventWithTime(user=2, name=end, time=3003)]})
dynamic> DynamicMatchData(id=1, version=1, match={start=[EventWithTime(user=2, name=start, time=1001)], middle=[EventWithTime(user=2, name=middle, time=1002)], end=[EventWithTime(user=2, name=end, time=3003)]})
3.从规则初始化完成的状态恢复,但是规则数少于最少规则数

此时我们停掉任务,从最新的checkpoint恢复,继续输入事件,正在运行的规则数依旧为2,依旧可以正常计算,

代码语言:java复制
configuration.setString("execution.savepoint.path","file:///rocksdb/0e08999f4b06cc4f50c22c6a752e4b4b/chk-6");
代码语言:shell复制
2,end,6004
代码语言:shell复制
dynamic> DynamicMatchData(id=2, version=1, match={start=[EventWithTime(user=2, name=start, time=1001)], middle=[EventWithTime(user=2, name=middle, time=1002)], end=[EventWithTime(user=2, name=end, time=4003)]})
dynamic> DynamicMatchData(id=2, version=1, match={start=[EventWithTime(user=2, name=start, time=1001)], middle=[EventWithTime(user=2, name=middle, time=1002)], end=[EventWithTime(user=2, name=end, time=4003)]})
dynamic> DynamicMatchData(id=1, version=1, match={start=[EventWithTime(user=2, name=start, time=1001)], middle=[EventWithTime(user=2, name=middle, time=1002)], end=[EventWithTime(user=2, name=end, time=4003)]})
dynamic> DynamicMatchData(id=1, version=1, match={start=[EventWithTime(user=2, name=start, time=1001)], middle=[EventWithTime(user=2, name=middle, time=1002)], end=[EventWithTime(user=2, name=end, time=4003)]})
代码语言:shell复制
2,end,7003
代码语言:shell复制
dynamic> DynamicMatchData(id=2, version=1, match={start=[EventWithTime(user=2, name=start, time=1001)], middle=[EventWithTime(user=2, name=middle, time=1002)], end=[EventWithTime(user=2, name=end, time=4004)]})
dynamic> DynamicMatchData(id=1, version=1, match={start=[EventWithTime(user=2, name=start, time=1001)], middle=[EventWithTime(user=2, name=middle, time=1002)], end=[EventWithTime(user=2, name=end, time=4004)]})
代码语言:shell复制
2,middle,7002
2,start,7001
2,end,8003
代码语言:shell复制
dynamic> DynamicMatchData(id=2, version=1, match={start=[EventWithTime(user=2, name=start, time=1001)], middle=[EventWithTime(user=2, name=middle, time=1002)], end=[EventWithTime(user=2, name=end, time=5004)]})
dynamic> DynamicMatchData(id=1, version=1, match={start=[EventWithTime(user=2, name=start, time=1001)], middle=[EventWithTime(user=2, name=middle, time=1002)], end=[EventWithTime(user=2, name=end, time=5004)]})

继续输入事件,此时,规则2已经超过5s,不再匹配到结果,只有规则1匹配到结果

代码语言:shell复制
2,end,9003
代码语言:shell复制
dynamic> DynamicMatchData(id=1, version=1, match={start=[EventWithTime(user=2, name=start, time=1001)], middle=[EventWithTime(user=2, name=middle, time=1002)], end=[EventWithTime(user=2, name=end, time=6004)]})
代码语言:shell复制
2,end,10003
代码语言:shell复制
dynamic> DynamicMatchData(id=1, version=1, match={start=[EventWithTime(user=2, name=start, time=1001)], middle=[EventWithTime(user=2, name=middle, time=1002)], end=[EventWithTime(user=2, name=end, time=7003)]})
dynamic> DynamicMatchData(id=1, version=1, match={start=[EventWithTime(user=2, name=start, time=1001)], middle=[EventWithTime(user=2, name=middle, time=7002)], end=[EventWithTime(user=2, name=end, time=7003)]})
dynamic> DynamicMatchData(id=1, version=1, match={start=[EventWithTime(user=2, name=start, time=7001)], middle=[EventWithTime(user=2, name=middle, time=7002)], end=[EventWithTime(user=2, name=end, time=7003)]})

继续输入事件,此时,规则1、2皆匹配到结果

代码语言:shell复制
2,end,11003
代码语言:shell复制
dynamic> DynamicMatchData(id=2, version=1, match={start=[EventWithTime(user=2, name=start, time=7001)], middle=[EventWithTime(user=2, name=middle, time=7002)], end=[EventWithTime(user=2, name=end, time=8003)]})
dynamic> DynamicMatchData(id=1, version=1, match={start=[EventWithTime(user=2, name=start, time=1001)], middle=[EventWithTime(user=2, name=middle, time=1002)], end=[EventWithTime(user=2, name=end, time=8003)]})
dynamic> DynamicMatchData(id=1, version=1, match={start=[EventWithTime(user=2, name=start, time=1001)], middle=[EventWithTime(user=2, name=middle, time=7002)], end=[EventWithTime(user=2, name=end, time=8003)]})
dynamic> DynamicMatchData(id=1, version=1, match={start=[EventWithTime(user=2, name=start, time=7001)], middle=[EventWithTime(user=2, name=middle, time=7002)], end=[EventWithTime(user=2, name=end, time=8003)]})
代码语言:shell复制
2,end,12003
代码语言:shell复制
dynamic> DynamicMatchData(id=2, version=1, match={start=[EventWithTime(user=2, name=start, time=7001)], middle=[EventWithTime(user=2, name=middle, time=7002)], end=[EventWithTime(user=2, name=end, time=9003)]})
dynamic> DynamicMatchData(id=1, version=1, match={start=[EventWithTime(user=2, name=start, time=1001)], middle=[EventWithTime(user=2, name=middle, time=1002)], end=[EventWithTime(user=2, name=end, time=9003)]})
dynamic> DynamicMatchData(id=1, version=1, match={start=[EventWithTime(user=2, name=start, time=1001)], middle=[EventWithTime(user=2, name=middle, time=7002)], end=[EventWithTime(user=2, name=end, time=9003)]})
dynamic> DynamicMatchData(id=1, version=1, match={start=[EventWithTime(user=2, name=start, time=7001)], middle=[EventWithTime(user=2, name=middle, time=7002)], end=[EventWithTime(user=2, name=end, time=9003)]})
代码语言:shell复制
2,end,13003
代码语言:shell复制
dynamic> DynamicMatchData(id=2, version=1, match={start=[EventWithTime(user=2, name=start, time=7001)], middle=[EventWithTime(user=2, name=middle, time=7002)], end=[EventWithTime(user=2, name=end, time=10003)]})
dynamic> DynamicMatchData(id=1, version=1, match={start=[EventWithTime(user=2, name=start, time=1001)], middle=[EventWithTime(user=2, name=middle, time=1002)], end=[EventWithTime(user=2, name=end, time=10003)]})
dynamic> DynamicMatchData(id=1, version=1, match={start=[EventWithTime(user=2, name=start, time=1001)], middle=[EventWithTime(user=2, name=middle, time=7002)], end=[EventWithTime(user=2, name=end, time=10003)]})
dynamic> DynamicMatchData(id=1, version=1, match={start=[EventWithTime(user=2, name=start, time=7001)], middle=[EventWithTime(user=2, name=middle, time=7002)], end=[EventWithTime(user=2, name=end, time=10003)]})
代码语言:shell复制
2,end,14003
代码语言:shell复制
dynamic> DynamicMatchData(id=2, version=1, match={start=[EventWithTime(user=2, name=start, time=7001)], middle=[EventWithTime(user=2, name=middle, time=7002)], end=[EventWithTime(user=2, name=end, time=11003)]})
dynamic> DynamicMatchData(id=1, version=1, match={start=[EventWithTime(user=2, name=start, time=1001)], middle=[EventWithTime(user=2, name=middle, time=1002)], end=[EventWithTime(user=2, name=end, time=11003)]})
dynamic> DynamicMatchData(id=1, version=1, match={start=[EventWithTime(user=2, name=start, time=1001)], middle=[EventWithTime(user=2, name=middle, time=7002)], end=[EventWithTime(user=2, name=end, time=11003)]})
dynamic> DynamicMatchData(id=1, version=1, match={start=[EventWithTime(user=2, name=start, time=7001)], middle=[EventWithTime(user=2, name=middle, time=7002)], end=[EventWithTime(user=2, name=end, time=11003)]})

继续输入事件,此时,规则2已经超过5s,不再匹配到结果,只有规则1匹配到结果

代码语言:shell复制
2,end,15003
代码语言:shell复制
dynamic> DynamicMatchData(id=1, version=1, match={start=[EventWithTime(user=2, name=start, time=1001)], middle=[EventWithTime(user=2, name=middle, time=1002)], end=[EventWithTime(user=2, name=end, time=12003)]})
dynamic> DynamicMatchData(id=1, version=1, match={start=[EventWithTime(user=2, name=start, time=1001)], middle=[EventWithTime(user=2, name=middle, time=7002)], end=[EventWithTime(user=2, name=end, time=12003)]})
dynamic> DynamicMatchData(id=1, version=1, match={start=[EventWithTime(user=2, name=start, time=7001)], middle=[EventWithTime(user=2, name=middle, time=7002)], end=[EventWithTime(user=2, name=end, time=12003)]})

我们输入规则,更改规则1,此时规则1之前的状态被清空,继续输入事件,规则匹配结果与预期一致

代码语言:shell复制
{"id":1,"version":2,"dynamicName":"demo2","dataType":"bean","dataClassName":"","dynamicPatternProcessFunctionClassName":"org.apache.flink.cep.dynamic.EventPatternProcessFunction","dynamicPattern":"[{"name":"start","matchPosition":"begin","afterMatchSkipStrategy":null,"conditionMethod":"where","dynamicCondition":{"conditionType":"condition","script":"name=='start'","dynamicPatternConfigurations":null},"properties":null},{"name":"middle","matchPosition":"followedByAny","afterMatchSkipStrategy":null,"conditionMethod":"where","dynamicCondition":{"conditionType":"condition","script":"name=='middle'","dynamicPatternConfigurations":null},"properties":null},{"name":"end","matchPosition":"followedByAny","afterMatchSkipStrategy":null,"conditionMethod":"where","dynamicCondition":{"conditionType":"condition","script":"name=='end'","dynamicPatternConfigurations":null},"properties":null}]"}
代码语言:shell复制
2,end,16003
2,middle,16002
2,start,16001
2,end,17003
2,end,18003
2,end,19003
代码语言:shell复制
dynamic> DynamicMatchData(id=2, version=1, match={start=[EventWithTime(user=2, name=start, time=16001)], middle=[EventWithTime(user=2, name=middle, time=16002)], end=[EventWithTime(user=2, name=end, time=16003)]})
dynamic> DynamicMatchData(id=1, version=2, match=这是EventPatternProcessFunction matchs{start=[EventWithTime(user=2, name=start, time=16001)], middle=[EventWithTime(user=2, name=middle, time=16002)], end=[EventWithTime(user=2, name=end, time=16003)]})
代码语言:shell复制
2,end,20003
代码语言:shell复制
dynamic> DynamicMatchData(id=2, version=1, match={start=[EventWithTime(user=2, name=start, time=16001)], middle=[EventWithTime(user=2, name=middle, time=16002)], end=[EventWithTime(user=2, name=end, time=17003)]})
dynamic> DynamicMatchData(id=1, version=2, match=这是EventPatternProcessFunction matchs{start=[EventWithTime(user=2, name=start, time=16001)], middle=[EventWithTime(user=2, name=middle, time=16002)], end=[EventWithTime(user=2, name=end, time=17003)]})
代码语言:shell复制
2,end,21003
代码语言:shell复制
dynamic> DynamicMatchData(id=2, version=1, match={start=[EventWithTime(user=2, name=start, time=16001)], middle=[EventWithTime(user=2, name=middle, time=16002)], end=[EventWithTime(user=2, name=end, time=18003)]})
dynamic> DynamicMatchData(id=1, version=2, match=这是EventPatternProcessFunction matchs{start=[EventWithTime(user=2, name=start, time=16001)], middle=[EventWithTime(user=2, name=middle, time=16002)], end=[EventWithTime(user=2, name=end, time=18003)]})
代码语言:shell复制
dynamic> DynamicMatchData(id=2, version=1, match={start=[EventWithTime(user=2, name=start, time=16001)], middle=[EventWithTime(user=2, name=middle, time=16002)], end=[EventWithTime(user=2, name=end, time=19003)]})
dynamic> DynamicMatchData(id=1, version=2, match=这是EventPatternProcessFunction matchs{start=[EventWithTime(user=2, name=start, time=16001)], middle=[EventWithTime(user=2, name=middle, time=16002)], end=[EventWithTime(user=2, name=end, time=19003)]})
代码语言:shell复制
2,end,23003
代码语言:shell复制
dynamic> DynamicMatchData(id=2, version=1, match={start=[EventWithTime(user=2, name=start, time=16001)], middle=[EventWithTime(user=2, name=middle, time=16002)], end=[EventWithTime(user=2, name=end, time=20003)]})
dynamic> DynamicMatchData(id=1, version=2, match=这是EventPatternProcessFunction matchs{start=[EventWithTime(user=2, name=start, time=16001)], middle=[EventWithTime(user=2, name=middle, time=16002)], end=[EventWithTime(user=2, name=end, time=20003)]})

继续输入事件,此时,规则2已经超过5s,不再匹配到结果,只有规则1匹配到结果

代码语言:shell复制
2,end,25004
代码语言:shell复制
dynamic> DynamicMatchData(id=1, version=2, match=这是EventPatternProcessFunction matchs{start=[EventWithTime(user=2, name=start, time=16001)], middle=[EventWithTime(user=2, name=middle, time=16002)], end=[EventWithTime(user=2, name=end, time=21003)]})
代码语言:shell复制
2,end,25003
代码语言:shell复制
dynamic> DynamicMatchData(id=1, version=2, match=这是EventPatternProcessFunction matchs{start=[EventWithTime(user=2, name=start, time=16001)], middle=[EventWithTime(user=2, name=middle, time=16002)], end=[EventWithTime(user=2, name=end, time=22003)]})
END

至此,Flink动态CEP的测试Demo已完成,基于FlinkCEP源码改造的Flink动态CEP已基本成型

0 人点赞