Flink任务代码
创建Flink任务,从socket中获取事件及规则,从localhost:8888中获取事件,从localhost:6666中获取规则
其中,如果需要动态更新规则,状态后端必须使用RocksDB
代码语言:java复制StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//如需动态更新规则,状态后端必须使用RocksDB
env.setStateBackend(new RocksDBStateBackend("file:///rocksdb/"));
DataStreamSource<String> socketTextStream = env.socketTextStream("localhost", 8888);
DataStreamSource<String> socketTextStream2 = env.socketTextStream("localhost", 6666);
SingleOutputStreamOperator<Event> eventStream = socketTextStream.flatMap((FlatMapFunction<String, Event>) (s, collector) -> {
String[] split = s.split(",");
if (split.length == 3) {
Integer id = Integer.valueOf(split[0]);
Double price = Double.valueOf(split[2]);
Event event = new Event(id, split[1], price);
collector.collect(event);
}
},TypeInformation.of(Event.class));
SingleOutputStreamOperator<DynamicConfiguration> dynamicConfigurationStream = socketTextStream2.flatMap((FlatMapFunction<String, DynamicConfiguration>) (s, collector) -> {
ObjectMapper objectMapper = new ObjectMapper();
try {
DynamicConfiguration dynamicConfiguration = objectMapper.readValue(s, DynamicConfiguration.class);
if(null != dynamicConfiguration){
collector.collect(dynamicConfiguration);
}
} catch (Exception e) {
}
}, TypeInformation.of(DynamicConfiguration.class));
KeyedStream<Event, Integer> eventIdKeyedStream = eventStream.keyBy(event -> event.getId());
DataStream<DynamicMatchData<Object>> dynamicMatchDataDataStream =
DynamicCEP.patternStream(eventIdKeyedStream, dynamicConfigurationStream)
.inProcessingTime()
.minPatternInitialized(1)
.objectProcess();
dynamicMatchDataDataStream.print("dynamic");
env.execute();
测试过程
1.初始化
事件流中输入如下内容,此时,控制台并无任何输出,实际上数据也还没有处理,因为规则尚未初始化
代码语言:shell复制[root@sixjo ~]# nc -lk 8888
1,start,2.0
1,foobar,3.0
1,middle,9.0
2,middle,2.0
1,end,7.0
这时候,在规则流中输入规则json,此时,达到配置中的最少初始化一个pattern,出发计算,计算尚未处理的数据
代码语言:shell复制[root@sixjo ~]# nc -lk 6666
{"id":1,"version":1,"dynamicName":"demo2","dataType":"bean","dataClassName":"","dynamicPatternProcessFunctionClassName":null,"dynamicPattern":"[{"name":"start","matchPosition":"begin","afterMatchSkipStrategy":null,"conditionMethod":"where","dynamicCondition":{"conditionType":"condition","script":"name=='start'","dynamicPatternConfigurations":null},"properties":null},{"name":"middle","matchPosition":"followedByAny","afterMatchSkipStrategy":null,"conditionMethod":"where","dynamicCondition":{"conditionType":"condition","script":"name=='middle'","dynamicPatternConfigurations":null},"properties":null},{"name":"end","matchPosition":"followedByAny","afterMatchSkipStrategy":null,"conditionMethod":"where","dynamicCondition":{"conditionType":"condition","script":"name=='end'","dynamicPatternConfigurations":null},"properties":null}]"}
计算结果输出如下
代码语言:shell复制dynamic:14> DynamicMatchData(id=1, version=1, match={start=[Event(1, start, 2.0)], middle=[Event(1, middle, 9.0)], end=[Event(1, end, 7.0)]})
2.增加规则
在规则流中增加规则10086,这时候,并不会有输出,因为之前的数据已经处理过了
代码语言:shell复制{"id":10086,"version":1,"dynamicName":"demo2","dataType":"bean","dataClassName":"","dynamicPatternProcessFunctionClassName":"org.apache.flink.cep.processor.ObjectDynamicPatternProcessFunction","dynamicPattern":"[{"name":"start","matchPosition":"begin","afterMatchSkipStrategy":null,"conditionMethod":"where","dynamicCondition":{"conditionType":"condition","script":"name=='start'","dynamicPatternConfigurations":null},"properties":null},{"name":"middle","matchPosition":"followedByAny","afterMatchSkipStrategy":null,"conditionMethod":"where","dynamicCondition":{"conditionType":"condition","script":"name=='middle'","dynamicPatternConfigurations":null},"properties":null},{"name":"end","matchPosition":"followedByAny","afterMatchSkipStrategy":null,"conditionMethod":"where","dynamicCondition":{"conditionType":"condition","script":"name=='end'","dynamicPatternConfigurations":null},"properties":null}]"}
此时,在事件流中输入end事件,只有规则1有匹配结果,规则10086是从该事件开始计算,尚未有匹配结果
代码语言:shell复制1,end,7.0
代码语言:shell复制dynamic:14> DynamicMatchData(id=1, version=1, match={start=[Event(1, start, 2.0)], middle=[Event(1, middle, 9.0)], end=[Event(1, end, 7.0)]})
此时重新输入整个匹配的事件,其中规则1理应有3个新的匹配结果输出,规则10086理应有1个匹配结果输出
代码语言:shell复制1,start,2.0
1,foobar,3.0
1,middle,9.0
1,end,7.0
代码语言:shell复制dynamic:14> DynamicMatchData(id=1, version=1, match={start=[Event(1, start, 2.0)], middle=[Event(1, middle, 9.0)], end=[Event(1, end, 7.0)]})
dynamic:14> DynamicMatchData(id=1, version=1, match={start=[Event(1, start, 2.0)], middle=[Event(1, middle, 9.0)], end=[Event(1, end, 7.0)]})
dynamic:14> DynamicMatchData(id=1, version=1, match={start=[Event(1, start, 2.0)], middle=[Event(1, middle, 9.0)], end=[Event(1, end, 7.0)]})
dynamic:14> DynamicMatchData(id=10086, version=1, match={start=[Event(1, start, 2.0)], middle=[Event(1, middle, 9.0)], end=[Event(1, end, 7.0)]})
3.动态更新规则
此时我们规则流重新输入规则1,version为2,更新规则1,此时会清空原有规则1的状态,重新初始化规则1
代码语言:shell复制{"id":1,"version":2,"dynamicName":"demo2","dataType":"bean","dataClassName":"","dynamicPatternProcessFunctionClassName":"org.apache.flink.cep.dynamic.EventPatternProcessFunction","dynamicPattern":"[{"name":"start","matchPosition":"begin","afterMatchSkipStrategy":null,"conditionMethod":"where","dynamicCondition":{"conditionType":"condition","script":"name=='start'","dynamicPatternConfigurations":null},"properties":null},{"name":"middle","matchPosition":"followedByAny","afterMatchSkipStrategy":null,"conditionMethod":"where","dynamicCondition":{"conditionType":"condition","script":"name=='middle'","dynamicPatternConfigurations":null},"properties":null},{"name":"end","matchPosition":"followedByAny","afterMatchSkipStrategy":null,"conditionMethod":"where","dynamicCondition":{"conditionType":"condition","script":"name=='end'","dynamicPatternConfigurations":null},"properties":null}]"}
此时我们事件流重新输入end事件,只有规则10086有匹配结果输出
代码语言:shell复制1,end,7.0
代码语言:shell复制dynamic:14> DynamicMatchData(id=10086, version=1, match={start=[Event(1, start, 2.0)], middle=[Event(1, middle, 9.0)], end=[Event(1, end, 7.0)]})
此时我们事件流重新输入完整匹配事件,其中规则1理应有1个新的匹配结果输出,规则10086理应有3个匹配结果输出
代码语言:shell复制1,start,2.0
1,middle,9.0
1,end,7.0
代码语言:shell复制dynamic:14> DynamicMatchData(id=1, version=2, match=这是EventPatternProcessFunction matchs{start=[Event(1, start, 2.0)], middle=[Event(1, middle, 9.0)], end=[Event(1, end, 7.0)]})
dynamic:14> DynamicMatchData(id=10086, version=1, match={start=[Event(1, start, 2.0)], middle=[Event(1, middle, 9.0)], end=[Event(1, end, 7.0)]})
dynamic:14> DynamicMatchData(id=10086, version=1, match={start=[Event(1, start, 2.0)], middle=[Event(1, middle, 9.0)], end=[Event(1, end, 7.0)]})
dynamic:14> DynamicMatchData(id=10086, version=1, match={start=[Event(1, start, 2.0)], middle=[Event(1, middle, 9.0)], end=[Event(1, end, 7.0)]})
END
至此,Flink CEP动态新增/更新规则已完成