Flink动态CEP之流式获取规则以及动态更新规则

2023-05-22 00:38:28 浏览数 (1)

Flink任务代码

创建Flink任务,从socket中获取事件及规则,从localhost:8888中获取事件,从localhost:6666中获取规则

其中,如果需要动态更新规则,状态后端必须使用RocksDB

代码语言:java复制
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//如需动态更新规则,状态后端必须使用RocksDB
env.setStateBackend(new RocksDBStateBackend("file:///rocksdb/"));
DataStreamSource<String> socketTextStream = env.socketTextStream("localhost", 8888);
DataStreamSource<String> socketTextStream2 = env.socketTextStream("localhost", 6666);
SingleOutputStreamOperator<Event> eventStream = socketTextStream.flatMap((FlatMapFunction<String, Event>) (s, collector) -> {
	String[] split = s.split(",");
	if (split.length == 3) {
		Integer id = Integer.valueOf(split[0]);
		Double price = Double.valueOf(split[2]);

		Event event = new Event(id, split[1], price);
		collector.collect(event);
	}
},TypeInformation.of(Event.class));

SingleOutputStreamOperator<DynamicConfiguration> dynamicConfigurationStream = socketTextStream2.flatMap((FlatMapFunction<String, DynamicConfiguration>) (s, collector) -> {
	ObjectMapper objectMapper = new ObjectMapper();
	try {
		DynamicConfiguration dynamicConfiguration = objectMapper.readValue(s, DynamicConfiguration.class);
		if(null != dynamicConfiguration){
			collector.collect(dynamicConfiguration);
		}
	} catch (Exception e) {

	}

}, TypeInformation.of(DynamicConfiguration.class));

KeyedStream<Event, Integer> eventIdKeyedStream = eventStream.keyBy(event -> event.getId());
DataStream<DynamicMatchData<Object>> dynamicMatchDataDataStream =
		DynamicCEP.patternStream(eventIdKeyedStream, dynamicConfigurationStream)
				.inProcessingTime()
				.minPatternInitialized(1)
				.objectProcess();
dynamicMatchDataDataStream.print("dynamic");

env.execute();

测试过程

1.初始化

事件流中输入如下内容,此时,控制台并无任何输出,实际上数据也还没有处理,因为规则尚未初始化

代码语言:shell复制
[root@sixjo ~]# nc -lk 8888
1,start,2.0
1,foobar,3.0
1,middle,9.0
2,middle,2.0
1,end,7.0

这时候,在规则流中输入规则json,此时,达到配置中的最少初始化一个pattern,出发计算,计算尚未处理的数据

代码语言:shell复制
[root@sixjo ~]# nc -lk 6666
{"id":1,"version":1,"dynamicName":"demo2","dataType":"bean","dataClassName":"","dynamicPatternProcessFunctionClassName":null,"dynamicPattern":"[{"name":"start","matchPosition":"begin","afterMatchSkipStrategy":null,"conditionMethod":"where","dynamicCondition":{"conditionType":"condition","script":"name=='start'","dynamicPatternConfigurations":null},"properties":null},{"name":"middle","matchPosition":"followedByAny","afterMatchSkipStrategy":null,"conditionMethod":"where","dynamicCondition":{"conditionType":"condition","script":"name=='middle'","dynamicPatternConfigurations":null},"properties":null},{"name":"end","matchPosition":"followedByAny","afterMatchSkipStrategy":null,"conditionMethod":"where","dynamicCondition":{"conditionType":"condition","script":"name=='end'","dynamicPatternConfigurations":null},"properties":null}]"}

计算结果输出如下

代码语言:shell复制
dynamic:14> DynamicMatchData(id=1, version=1, match={start=[Event(1, start, 2.0)], middle=[Event(1, middle, 9.0)], end=[Event(1, end, 7.0)]})
2.增加规则

在规则流中增加规则10086,这时候,并不会有输出,因为之前的数据已经处理过了

代码语言:shell复制
{"id":10086,"version":1,"dynamicName":"demo2","dataType":"bean","dataClassName":"","dynamicPatternProcessFunctionClassName":"org.apache.flink.cep.processor.ObjectDynamicPatternProcessFunction","dynamicPattern":"[{"name":"start","matchPosition":"begin","afterMatchSkipStrategy":null,"conditionMethod":"where","dynamicCondition":{"conditionType":"condition","script":"name=='start'","dynamicPatternConfigurations":null},"properties":null},{"name":"middle","matchPosition":"followedByAny","afterMatchSkipStrategy":null,"conditionMethod":"where","dynamicCondition":{"conditionType":"condition","script":"name=='middle'","dynamicPatternConfigurations":null},"properties":null},{"name":"end","matchPosition":"followedByAny","afterMatchSkipStrategy":null,"conditionMethod":"where","dynamicCondition":{"conditionType":"condition","script":"name=='end'","dynamicPatternConfigurations":null},"properties":null}]"}

此时,在事件流中输入end事件,只有规则1有匹配结果,规则10086是从该事件开始计算,尚未有匹配结果

代码语言:shell复制
1,end,7.0
代码语言:shell复制
dynamic:14> DynamicMatchData(id=1, version=1, match={start=[Event(1, start, 2.0)], middle=[Event(1, middle, 9.0)], end=[Event(1, end, 7.0)]})

此时重新输入整个匹配的事件,其中规则1理应有3个新的匹配结果输出,规则10086理应有1个匹配结果输出

代码语言:shell复制
1,start,2.0
1,foobar,3.0
1,middle,9.0
1,end,7.0
代码语言:shell复制
dynamic:14> DynamicMatchData(id=1, version=1, match={start=[Event(1, start, 2.0)], middle=[Event(1, middle, 9.0)], end=[Event(1, end, 7.0)]})
dynamic:14> DynamicMatchData(id=1, version=1, match={start=[Event(1, start, 2.0)], middle=[Event(1, middle, 9.0)], end=[Event(1, end, 7.0)]})
dynamic:14> DynamicMatchData(id=1, version=1, match={start=[Event(1, start, 2.0)], middle=[Event(1, middle, 9.0)], end=[Event(1, end, 7.0)]})
dynamic:14> DynamicMatchData(id=10086, version=1, match={start=[Event(1, start, 2.0)], middle=[Event(1, middle, 9.0)], end=[Event(1, end, 7.0)]})
3.动态更新规则

此时我们规则流重新输入规则1,version为2,更新规则1,此时会清空原有规则1的状态,重新初始化规则1

代码语言:shell复制
{"id":1,"version":2,"dynamicName":"demo2","dataType":"bean","dataClassName":"","dynamicPatternProcessFunctionClassName":"org.apache.flink.cep.dynamic.EventPatternProcessFunction","dynamicPattern":"[{"name":"start","matchPosition":"begin","afterMatchSkipStrategy":null,"conditionMethod":"where","dynamicCondition":{"conditionType":"condition","script":"name=='start'","dynamicPatternConfigurations":null},"properties":null},{"name":"middle","matchPosition":"followedByAny","afterMatchSkipStrategy":null,"conditionMethod":"where","dynamicCondition":{"conditionType":"condition","script":"name=='middle'","dynamicPatternConfigurations":null},"properties":null},{"name":"end","matchPosition":"followedByAny","afterMatchSkipStrategy":null,"conditionMethod":"where","dynamicCondition":{"conditionType":"condition","script":"name=='end'","dynamicPatternConfigurations":null},"properties":null}]"}

此时我们事件流重新输入end事件,只有规则10086有匹配结果输出

代码语言:shell复制
1,end,7.0
代码语言:shell复制
dynamic:14> DynamicMatchData(id=10086, version=1, match={start=[Event(1, start, 2.0)], middle=[Event(1, middle, 9.0)], end=[Event(1, end, 7.0)]})

此时我们事件流重新输入完整匹配事件,其中规则1理应有1个新的匹配结果输出,规则10086理应有3个匹配结果输出

代码语言:shell复制
1,start,2.0
1,middle,9.0
1,end,7.0
代码语言:shell复制
dynamic:14> DynamicMatchData(id=1, version=2, match=这是EventPatternProcessFunction matchs{start=[Event(1, start, 2.0)], middle=[Event(1, middle, 9.0)], end=[Event(1, end, 7.0)]})
dynamic:14> DynamicMatchData(id=10086, version=1, match={start=[Event(1, start, 2.0)], middle=[Event(1, middle, 9.0)], end=[Event(1, end, 7.0)]})
dynamic:14> DynamicMatchData(id=10086, version=1, match={start=[Event(1, start, 2.0)], middle=[Event(1, middle, 9.0)], end=[Event(1, end, 7.0)]})
dynamic:14> DynamicMatchData(id=10086, version=1, match={start=[Event(1, start, 2.0)], middle=[Event(1, middle, 9.0)], end=[Event(1, end, 7.0)]})

END

至此,Flink CEP动态新增/更新规则已完成

0 人点赞