代码
由于测试案例数据量较小,为了防止不同task之间的watermark影响,导致迟迟不出数据,暂时将并行度设置为1
代码语言:java复制 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.setStateBackend(new RocksDBStateBackend("file:///rocksdb/"));
DataStreamSource<String> socketTextStream = env.socketTextStream("localhost", 8888);
DataStreamSource<String> socketTextStream2 = env.socketTextStream("localhost", 6666);
SingleOutputStreamOperator<EventWithTime> eventStream = socketTextStream.flatMap(new FlatMapFunction<String, EventWithTime>() {
@Override
public void flatMap(String s, Collector<EventWithTime> collector) throws Exception {
String[] split = s.split(",");
if (split.length == 3) {
Long timestamp = Long.valueOf(split[2]);
EventWithTime event = new EventWithTime(split[0], split[1], timestamp);
collector.collect(event);
}
}
});
SingleOutputStreamOperator<EventWithTime> eventWithWatermark = eventStream.assignTimestampsAndWatermarks(
WatermarkStrategy
.<EventWithTime>forBoundedOutOfOrderness(Duration.ofSeconds(2))
.withTimestampAssigner((SerializableTimestampAssigner<EventWithTime>) (eventWithTime, l) -> eventWithTime.getTime())
);
SingleOutputStreamOperator<DynamicConfiguration> dynamicConfigurationStream = socketTextStream2.flatMap((FlatMapFunction<String, DynamicConfiguration>) (s, collector) -> {
ObjectMapper objectMapper = new ObjectMapper();
try {
DynamicConfiguration dynamicConfiguration = objectMapper.readValue(s, DynamicConfiguration.class);
if(null != dynamicConfiguration){
collector.collect(dynamicConfiguration);
}
} catch (Exception e) {
}
}, TypeInformation.of(DynamicConfiguration.class));
KeyedStream<EventWithTime, String> eventIdKeyedStream = eventWithWatermark.keyBy(EventWithTime::getUser);
DataStream<DynamicMatchData<Object>> dynamicMatchDataDataStream =
DynamicCEP.patternStream(eventIdKeyedStream, dynamicConfigurationStream)
.minPatternInitialized(1)
.objectProcess();
dynamicMatchDataDataStream.print("dynamic");
env.execute();
测试过程
1.FlinkCEP基于事件事件乱序处理
规则流
代码语言:json复制[root@sixjo ~]# nc -lk 6666
{"id":1,"version":1,"dynamicName":"demo2","dataType":"bean","dataClassName":"","dynamicPatternProcessFunctionClassName":null,"dynamicPattern":"[{"name":"start","matchPosition":"begin","afterMatchSkipStrategy":null,"conditionMethod":"where","dynamicCondition":{"conditionType":"condition","script":"name=='start'","dynamicPatternConfigurations":null},"properties":null},{"name":"middle","matchPosition":"followedByAny","afterMatchSkipStrategy":null,"conditionMethod":"where","dynamicCondition":{"conditionType":"condition","script":"name=='middle'","dynamicPatternConfigurations":null},"properties":null},{"name":"end","matchPosition":"followedByAny","afterMatchSkipStrategy":null,"conditionMethod":"where","dynamicCondition":{"conditionType":"condition","script":"name=='end'","dynamicPatternConfigurations":null},"properties":null}]"}
事件流
代码语言:shell复制[root@sixjo ~]# nc -lk 8888
2,end,1003
2,middle,1002
2,start,1001
2,end,2003
2,end,3003
此时,尚未输出任何结果,因为Watermark机制,尚未触发计算,事件流继续输入,有匹配结果输出
代码语言:shell复制2,end,4003
代码语言:shell复制dynamic> DynamicMatchData(id=1, version=1, match={start=[EventWithTime(user=2, name=start, time=1001)], middle=[EventWithTime(user=2, name=middle, time=1002)], end=[EventWithTime(user=2, name=end, time=1003)]})
代码语言:shell复制2,end,5003
代码语言:shell复制dynamic> DynamicMatchData(id=1, version=1, match={start=[EventWithTime(user=2, name=start, time=1001)], middle=[EventWithTime(user=2, name=middle, time=1002)], end=[EventWithTime(user=2, name=end, time=2003)]})
代码语言:shell复制2,end,6003
代码语言:shell复制dynamic> DynamicMatchData(id=1, version=1, match={start=[EventWithTime(user=2, name=start, time=1001)], middle=[EventWithTime(user=2, name=middle, time=1002)], end=[EventWithTime(user=2, name=end, time=3003)]})
代码语言:shell复制2,end,7003
代码语言:shell复制dynamic> DynamicMatchData(id=1, version=1, match={start=[EventWithTime(user=2, name=start, time=1001)], middle=[EventWithTime(user=2, name=middle, time=1002)], end=[EventWithTime(user=2, name=end, time=4003)]})
2.Flink动态CEP 窗口处理
基于上方运行中的demo继续测试
规则流输入规则1,version为3,更新规则1,within 5s
这个时候,清空规则1的状态,包括watermark至为初始状态-9223372036854775808,假如有很早的数据先进来,则也是可以正常处理的,但是触发计算是任何级别的,并不会因为单个pattern的watermark提前,单个pattern就提前触发计算
代码语言:json复制{"id":1,"version":3,"dynamicName":"demo2","dataType":"bean","dataClassName":"","dynamicPatternProcessFunctionClassName":null,"dynamicPattern":"[{"name":"start","matchPosition":"begin","afterMatchSkipStrategy":null,"conditionMethod":"where","dynamicCondition":{"conditionType":"condition","script":"name=='start'","dynamicPatternConfigurations":null},"properties":null},{"name":"middle","matchPosition":"followedByAny","afterMatchSkipStrategy":null,"conditionMethod":"where","dynamicCondition":{"conditionType":"condition","script":"name=='middle'","dynamicPatternConfigurations":null},"properties":null},{"name":"end","matchPosition":"followedByAny","afterMatchSkipStrategy":null,"conditionMethod":"where","dynamicCondition":{"conditionType":"condition","script":"name=='end'","dynamicPatternConfigurations":null},"properties":{"within":["SECONDS",5]}}]"}
事件流输入数据测试,至此,未有任何匹配结果输出
代码语言:shell复制2,end,-1001
2,middle,-1002
2,start,-1003
2,end,1003
2,middle,1002
2,start,1001
2,end,2003
2,end,3003
2,end,4003
2,end,5003
2,end,6003
2,end,7003
2,middle,7002
2,start,7001
事件流继续输入,测试流程如下
代码语言:shell复制2,end,8003
代码语言:shell复制dynamic> DynamicMatchData(id=1, version=3, match={start=[EventWithTime(user=2, name=start, time=-1003)], middle=[EventWithTime(user=2, name=middle, time=-1002)], end=[EventWithTime(user=2, name=end, time=-1001)]})
dynamic> DynamicMatchData(id=1, version=3, match={start=[EventWithTime(user=2, name=start, time=-1003)], middle=[EventWithTime(user=2, name=middle, time=-1002)], end=[EventWithTime(user=2, name=end, time=1003)]})
dynamic> DynamicMatchData(id=1, version=3, match={start=[EventWithTime(user=2, name=start, time=-1003)], middle=[EventWithTime(user=2, name=middle, time=1002)], end=[EventWithTime(user=2, name=end, time=1003)]})
dynamic> DynamicMatchData(id=1, version=3, match={start=[EventWithTime(user=2, name=start, time=1001)], middle=[EventWithTime(user=2, name=middle, time=1002)], end=[EventWithTime(user=2, name=end, time=1003)]})
dynamic> DynamicMatchData(id=1, version=3, match={start=[EventWithTime(user=2, name=start, time=-1003)], middle=[EventWithTime(user=2, name=middle, time=-1002)], end=[EventWithTime(user=2, name=end, time=2003)]})
dynamic> DynamicMatchData(id=1, version=3, match={start=[EventWithTime(user=2, name=start, time=-1003)], middle=[EventWithTime(user=2, name=middle, time=1002)], end=[EventWithTime(user=2, name=end, time=2003)]})
dynamic> DynamicMatchData(id=1, version=3, match={start=[EventWithTime(user=2, name=start, time=1001)], middle=[EventWithTime(user=2, name=middle, time=1002)], end=[EventWithTime(user=2, name=end, time=2003)]})
dynamic> DynamicMatchData(id=1, version=3, match={start=[EventWithTime(user=2, name=start, time=-1003)], middle=[EventWithTime(user=2, name=middle, time=-1002)], end=[EventWithTime(user=2, name=end, time=3003)]})
dynamic> DynamicMatchData(id=1, version=3, match={start=[EventWithTime(user=2, name=start, time=-1003)], middle=[EventWithTime(user=2, name=middle, time=1002)], end=[EventWithTime(user=2, name=end, time=3003)]})
dynamic> DynamicMatchData(id=1, version=3, match={start=[EventWithTime(user=2, name=start, time=1001)], middle=[EventWithTime(user=2, name=middle, time=1002)], end=[EventWithTime(user=2, name=end, time=3003)]})
dynamic> DynamicMatchData(id=1, version=3, match={start=[EventWithTime(user=2, name=start, time=1001)], middle=[EventWithTime(user=2, name=middle, time=1002)], end=[EventWithTime(user=2, name=end, time=4003)]})
dynamic> DynamicMatchData(id=1, version=3, match={start=[EventWithTime(user=2, name=start, time=1001)], middle=[EventWithTime(user=2, name=middle, time=1002)], end=[EventWithTime(user=2, name=end, time=5003)]})
代码语言:shell复制2,end,9003
代码语言:shell复制2,end,10003
代码语言:shell复制dynamic> DynamicMatchData(id=1, version=3, match={start=[EventWithTime(user=2, name=start, time=7001)], middle=[EventWithTime(user=2, name=middle, time=7002)], end=[EventWithTime(user=2, name=end, time=7003)]})
代码语言:shell复制2,end,11003
代码语言:shell复制dynamic> DynamicMatchData(id=1, version=3, match={start=[EventWithTime(user=2, name=start, time=7001)], middle=[EventWithTime(user=2, name=middle, time=7002)], end=[EventWithTime(user=2, name=end, time=8003)]})
代码语言:shell复制2,end,12003
代码语言:shell复制dynamic> DynamicMatchData(id=1, version=3, match={start=[EventWithTime(user=2, name=start, time=7001)], middle=[EventWithTime(user=2, name=middle, time=7002)], end=[EventWithTime(user=2, name=end, time=9003)]})
代码语言:shell复制2,end,13003
代码语言:shell复制dynamic> DynamicMatchData(id=1, version=3, match={start=[EventWithTime(user=2, name=start, time=7001)], middle=[EventWithTime(user=2, name=middle, time=7002)], end=[EventWithTime(user=2, name=end, time=10003)]})
代码语言:shell复制2,end,14003
代码语言:shell复制dynamic> DynamicMatchData(id=1, version=3, match={start=[EventWithTime(user=2, name=start, time=7001)], middle=[EventWithTime(user=2, name=middle, time=7002)], end=[EventWithTime(user=2, name=end, time=11003)]})
代码语言:shell复制2,end,15003
代码语言:shell复制2,end,16003
代码语言:shell复制2,end,17003
END
至此,基于事件时间的Flink动态CEP测试已基本结束