Flink动态CEP之EventTime

2023-05-22 01:59:05 浏览数 (1)

代码

由于测试案例数据量较小,为了防止不同task之间的watermark影响,导致迟迟不出数据,暂时将并行度设置为1

代码语言:java复制
 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.setParallelism(1);
env.setStateBackend(new RocksDBStateBackend("file:///rocksdb/"));
DataStreamSource<String> socketTextStream = env.socketTextStream("localhost", 8888);

DataStreamSource<String> socketTextStream2 = env.socketTextStream("localhost", 6666);

SingleOutputStreamOperator<EventWithTime> eventStream = socketTextStream.flatMap(new FlatMapFunction<String, EventWithTime>() {
	@Override
	public void flatMap(String s, Collector<EventWithTime> collector) throws Exception {
		String[] split = s.split(",");
		if (split.length == 3) {
			Long timestamp = Long.valueOf(split[2]);
			EventWithTime event = new EventWithTime(split[0], split[1], timestamp);
			collector.collect(event);
		}
	}
});
SingleOutputStreamOperator<EventWithTime> eventWithWatermark = eventStream.assignTimestampsAndWatermarks(
		WatermarkStrategy
				.<EventWithTime>forBoundedOutOfOrderness(Duration.ofSeconds(2))
				.withTimestampAssigner((SerializableTimestampAssigner<EventWithTime>) (eventWithTime, l) -> eventWithTime.getTime())
);

SingleOutputStreamOperator<DynamicConfiguration> dynamicConfigurationStream = socketTextStream2.flatMap((FlatMapFunction<String, DynamicConfiguration>) (s, collector) -> {
	ObjectMapper objectMapper = new ObjectMapper();
	try {
		DynamicConfiguration dynamicConfiguration = objectMapper.readValue(s, DynamicConfiguration.class);
		if(null != dynamicConfiguration){
			collector.collect(dynamicConfiguration);
		}
	} catch (Exception e) {
	}
}, TypeInformation.of(DynamicConfiguration.class));

KeyedStream<EventWithTime, String> eventIdKeyedStream = eventWithWatermark.keyBy(EventWithTime::getUser);
DataStream<DynamicMatchData<Object>> dynamicMatchDataDataStream =
		DynamicCEP.patternStream(eventIdKeyedStream, dynamicConfigurationStream)
				.minPatternInitialized(1)
				.objectProcess();
dynamicMatchDataDataStream.print("dynamic");

env.execute();

测试过程

1.FlinkCEP基于事件事件乱序处理

规则流

代码语言:json复制
[root@sixjo ~]# nc -lk 6666
{"id":1,"version":1,"dynamicName":"demo2","dataType":"bean","dataClassName":"","dynamicPatternProcessFunctionClassName":null,"dynamicPattern":"[{"name":"start","matchPosition":"begin","afterMatchSkipStrategy":null,"conditionMethod":"where","dynamicCondition":{"conditionType":"condition","script":"name=='start'","dynamicPatternConfigurations":null},"properties":null},{"name":"middle","matchPosition":"followedByAny","afterMatchSkipStrategy":null,"conditionMethod":"where","dynamicCondition":{"conditionType":"condition","script":"name=='middle'","dynamicPatternConfigurations":null},"properties":null},{"name":"end","matchPosition":"followedByAny","afterMatchSkipStrategy":null,"conditionMethod":"where","dynamicCondition":{"conditionType":"condition","script":"name=='end'","dynamicPatternConfigurations":null},"properties":null}]"}

事件流

代码语言:shell复制
[root@sixjo ~]# nc -lk 8888
2,end,1003
2,middle,1002
2,start,1001
2,end,2003
2,end,3003

此时,尚未输出任何结果,因为Watermark机制,尚未触发计算,事件流继续输入,有匹配结果输出

代码语言:shell复制
2,end,4003
代码语言:shell复制
dynamic> DynamicMatchData(id=1, version=1, match={start=[EventWithTime(user=2, name=start, time=1001)], middle=[EventWithTime(user=2, name=middle, time=1002)], end=[EventWithTime(user=2, name=end, time=1003)]})
代码语言:shell复制
2,end,5003
代码语言:shell复制
dynamic> DynamicMatchData(id=1, version=1, match={start=[EventWithTime(user=2, name=start, time=1001)], middle=[EventWithTime(user=2, name=middle, time=1002)], end=[EventWithTime(user=2, name=end, time=2003)]})
代码语言:shell复制
2,end,6003
代码语言:shell复制
dynamic> DynamicMatchData(id=1, version=1, match={start=[EventWithTime(user=2, name=start, time=1001)], middle=[EventWithTime(user=2, name=middle, time=1002)], end=[EventWithTime(user=2, name=end, time=3003)]})
代码语言:shell复制
2,end,7003
代码语言:shell复制
dynamic> DynamicMatchData(id=1, version=1, match={start=[EventWithTime(user=2, name=start, time=1001)], middle=[EventWithTime(user=2, name=middle, time=1002)], end=[EventWithTime(user=2, name=end, time=4003)]})
2.Flink动态CEP 窗口处理

基于上方运行中的demo继续测试

规则流输入规则1,version为3,更新规则1,within 5s

这个时候,清空规则1的状态,包括watermark至为初始状态-9223372036854775808,假如有很早的数据先进来,则也是可以正常处理的,但是触发计算是任何级别的,并不会因为单个pattern的watermark提前,单个pattern就提前触发计算

代码语言:json复制
{"id":1,"version":3,"dynamicName":"demo2","dataType":"bean","dataClassName":"","dynamicPatternProcessFunctionClassName":null,"dynamicPattern":"[{"name":"start","matchPosition":"begin","afterMatchSkipStrategy":null,"conditionMethod":"where","dynamicCondition":{"conditionType":"condition","script":"name=='start'","dynamicPatternConfigurations":null},"properties":null},{"name":"middle","matchPosition":"followedByAny","afterMatchSkipStrategy":null,"conditionMethod":"where","dynamicCondition":{"conditionType":"condition","script":"name=='middle'","dynamicPatternConfigurations":null},"properties":null},{"name":"end","matchPosition":"followedByAny","afterMatchSkipStrategy":null,"conditionMethod":"where","dynamicCondition":{"conditionType":"condition","script":"name=='end'","dynamicPatternConfigurations":null},"properties":{"within":["SECONDS",5]}}]"}

事件流输入数据测试,至此,未有任何匹配结果输出

代码语言:shell复制
2,end,-1001
2,middle,-1002
2,start,-1003
2,end,1003
2,middle,1002
2,start,1001
2,end,2003
2,end,3003
2,end,4003
2,end,5003
2,end,6003
2,end,7003
2,middle,7002
2,start,7001

事件流继续输入,测试流程如下

代码语言:shell复制
2,end,8003
代码语言:shell复制
dynamic> DynamicMatchData(id=1, version=3, match={start=[EventWithTime(user=2, name=start, time=-1003)], middle=[EventWithTime(user=2, name=middle, time=-1002)], end=[EventWithTime(user=2, name=end, time=-1001)]})
dynamic> DynamicMatchData(id=1, version=3, match={start=[EventWithTime(user=2, name=start, time=-1003)], middle=[EventWithTime(user=2, name=middle, time=-1002)], end=[EventWithTime(user=2, name=end, time=1003)]})
dynamic> DynamicMatchData(id=1, version=3, match={start=[EventWithTime(user=2, name=start, time=-1003)], middle=[EventWithTime(user=2, name=middle, time=1002)], end=[EventWithTime(user=2, name=end, time=1003)]})
dynamic> DynamicMatchData(id=1, version=3, match={start=[EventWithTime(user=2, name=start, time=1001)], middle=[EventWithTime(user=2, name=middle, time=1002)], end=[EventWithTime(user=2, name=end, time=1003)]})
dynamic> DynamicMatchData(id=1, version=3, match={start=[EventWithTime(user=2, name=start, time=-1003)], middle=[EventWithTime(user=2, name=middle, time=-1002)], end=[EventWithTime(user=2, name=end, time=2003)]})
dynamic> DynamicMatchData(id=1, version=3, match={start=[EventWithTime(user=2, name=start, time=-1003)], middle=[EventWithTime(user=2, name=middle, time=1002)], end=[EventWithTime(user=2, name=end, time=2003)]})
dynamic> DynamicMatchData(id=1, version=3, match={start=[EventWithTime(user=2, name=start, time=1001)], middle=[EventWithTime(user=2, name=middle, time=1002)], end=[EventWithTime(user=2, name=end, time=2003)]})
dynamic> DynamicMatchData(id=1, version=3, match={start=[EventWithTime(user=2, name=start, time=-1003)], middle=[EventWithTime(user=2, name=middle, time=-1002)], end=[EventWithTime(user=2, name=end, time=3003)]})
dynamic> DynamicMatchData(id=1, version=3, match={start=[EventWithTime(user=2, name=start, time=-1003)], middle=[EventWithTime(user=2, name=middle, time=1002)], end=[EventWithTime(user=2, name=end, time=3003)]})
dynamic> DynamicMatchData(id=1, version=3, match={start=[EventWithTime(user=2, name=start, time=1001)], middle=[EventWithTime(user=2, name=middle, time=1002)], end=[EventWithTime(user=2, name=end, time=3003)]})
dynamic> DynamicMatchData(id=1, version=3, match={start=[EventWithTime(user=2, name=start, time=1001)], middle=[EventWithTime(user=2, name=middle, time=1002)], end=[EventWithTime(user=2, name=end, time=4003)]})
dynamic> DynamicMatchData(id=1, version=3, match={start=[EventWithTime(user=2, name=start, time=1001)], middle=[EventWithTime(user=2, name=middle, time=1002)], end=[EventWithTime(user=2, name=end, time=5003)]})
代码语言:shell复制
2,end,9003
代码语言:shell复制
2,end,10003
代码语言:shell复制
dynamic> DynamicMatchData(id=1, version=3, match={start=[EventWithTime(user=2, name=start, time=7001)], middle=[EventWithTime(user=2, name=middle, time=7002)], end=[EventWithTime(user=2, name=end, time=7003)]})
代码语言:shell复制
2,end,11003
代码语言:shell复制
dynamic> DynamicMatchData(id=1, version=3, match={start=[EventWithTime(user=2, name=start, time=7001)], middle=[EventWithTime(user=2, name=middle, time=7002)], end=[EventWithTime(user=2, name=end, time=8003)]})
代码语言:shell复制
2,end,12003
代码语言:shell复制
dynamic> DynamicMatchData(id=1, version=3, match={start=[EventWithTime(user=2, name=start, time=7001)], middle=[EventWithTime(user=2, name=middle, time=7002)], end=[EventWithTime(user=2, name=end, time=9003)]})
代码语言:shell复制
2,end,13003
代码语言:shell复制
dynamic> DynamicMatchData(id=1, version=3, match={start=[EventWithTime(user=2, name=start, time=7001)], middle=[EventWithTime(user=2, name=middle, time=7002)], end=[EventWithTime(user=2, name=end, time=10003)]})
代码语言:shell复制
2,end,14003
代码语言:shell复制
dynamic> DynamicMatchData(id=1, version=3, match={start=[EventWithTime(user=2, name=start, time=7001)], middle=[EventWithTime(user=2, name=middle, time=7002)], end=[EventWithTime(user=2, name=end, time=11003)]})
代码语言:shell复制
2,end,15003
代码语言:shell复制
2,end,16003
代码语言:shell复制
2,end,17003

END

至此,基于事件时间的Flink动态CEP测试已基本结束

0 人点赞