Flink动态CEP Demo

2023-05-18 01:27:00 浏览数 (1)

Demo1 使用动态CEP处理数据

使用动态CEP处理数据,分别传入事件流和配置流,配置动态生成Pattern并再DynamicOperator生成状态机等待事件进入,匹配

默认return type为DynamicMatchData<Map<String, List<IN>>>

代码语言:java复制
DataStream<Event> input =
		env.fromElements(
				new Event(1, "barfoo", 1.0),
				new Event(2, "start", 2.0),
				new Event(3, "foobar", 3.0),
				new SubEvent(4, "foo", 4.0, 1.0),
				new Event(5, "middle", 5.0),
				new SubEvent(6, "middle", 6.0, 2.0),
				new SubEvent(7, "bar", 3.0, 3.0),
				new Event(42, "42", 42.0),
				new Event(8, "end", 1.0));

DataStream<DynamicConfiguration> dynamicConfigurationDataStream =
		env.fromElements(
				DynamicConfigurationDemo.dynamicConfiguration2()
		);
DataStream<DynamicMatchData<Map<String, List<Event>>>> returns = DynamicCEP.patternStream(input, dynamicConfigurationDataStream)
		.inProcessingTime()
				.defaultProcess();

returns.print("dynamic");
List<DynamicMatchData> resultList = new ArrayList<>();

DataStreamUtils.collect(returns).forEachRemaining(resultList::add);
Assert.assertTrue(resultList.size()==2);

print结果如下,与预期结果一直,匹配结果为id(2,5,8),id(2,6,8)的事件

代码语言:shell复制
dynamic:2> DynamicMatchData(id=1, version=1, match={start=[Event(2, start, 2.0)], middle=[Event(5, middle, 5.0)], end=[Event(8, end, 1.0)]})
dynamic:3> DynamicMatchData(id=1, version=1, match={start=[Event(2, start, 2.0)], middle=[SubEvent(6, middle, 6.0, 2.0)], end=[Event(8, end, 1.0)]})

Demo2 使用动态CEP处理KeyedStream

事件流为KeyedStrem,则根据KeyBy以后进行排序,入参配置有两个规则,其中处理不同

其中minPatternInitialized(2),则表示DynamicCepOperator中,至少初始化2个Pattern之后,才会开始正常处理事件流数据

默认minPatternInitialized=1

代码语言:java复制
DataStream<Event> input =
		env.fromElements(
				new Event(1, "barfoo", 1.0),
				new Event(1, "start", 2.0),
				new Event(1, "foobar", 3.0),
				new SubEvent(1, "foo", 4.0, 1.0),
				new Event(2, "middle", 9.0),
				new SubEvent(1, "middle", 1.0, 2.0),
				new SubEvent(1, "bar", 5.0, 3.0),
				new Event(1, "42", 6.0),
				new Event(1, "end", 7.0));

DynamicConfiguration dynamicConfiguration = DynamicConfigurationDemo.dynamicConfiguration2();
DynamicConfiguration dynamicConfiguration2 = DynamicConfigurationDemo.dynamicConfiguration2();
dynamicConfiguration2.setId(10086L);
DataStream<DynamicConfiguration> dynamicConfigurationDataStream =
		env.fromElements(
				dynamicConfiguration,dynamicConfiguration2
		);
KeyedStream<Event, Integer> eventIdKeyedStream = input.keyBy((KeySelector<Event, Integer>) Event::getId);
DataStream<DynamicMatchData<Map<String, List<Event>>>> returns = DynamicCEP.patternStream(eventIdKeyedStream, dynamicConfigurationDataStream)
		.inProcessingTime()
		.minPatternInitialized(2)
		.defaultProcess();

returns.print("dynamic");
List<DynamicMatchData> resultList = new ArrayList<>();

print结果如下,与预期结果一致,规则1 匹配结果id=1,price(2.0,1.0,7.0),规则10086匹配结果id=1,price(2.0,1.0,7.0)

代码语言:shell复制
dynamic:3> DynamicMatchData(id=1, version=1, match={start=[Event(1, start, 2.0)], middle=[SubEvent(1, middle, 1.0, 2.0)], end=[Event(1, end, 7.0)]})
dynamic:3> DynamicMatchData(id=10086, version=1, match={start=[Event(1, start, 2.0)], middle=[SubEvent(1, middle, 1.0, 2.0)], end=[Event(1, end, 7.0)]})

Demo3 使用动态CEP处理,返回结果为Object

return type为DynamicMatchData<Object>,建议使用这个,因为可以接收所有类型数据,方便后续处理,防止类型转换错误

ObjectDynamicPatternProcessFunction和DefaultDynamicPatternProcessFunction返回的数据是一样的

也可以继承DynamicPatternProcessFunction自己

代码语言:java复制
DataStream<Event> input =
		env.fromElements(
				new Event(1, "barfoo", 1.0),
				new Event(1, "start", 2.0),
				new Event(1, "foobar", 3.0),
				new SubEvent(1, "foo", 4.0, 1.0),
				new Event(2, "middle", 9.0),
				new SubEvent(1, "middle", 1.0, 2.0),
				new SubEvent(1, "bar", 5.0, 3.0),
				new Event(1, "42", 6.0),
				new Event(1, "end", 7.0));
DynamicConfiguration dynamicConfiguration = DynamicConfigurationDemo.dynamicConfiguration2();
dynamicConfiguration.setDynamicPatternProcessFunctionClassName(EventPatternProcessFunction.class.getName());
DynamicConfiguration dynamicConfiguration2 = DynamicConfigurationDemo.dynamicConfiguration2();
dynamicConfiguration2.setId(10086L);
DataStream<DynamicConfiguration> dynamicConfigurationDataStream =
		env.fromElements(
				dynamicConfiguration,dynamicConfiguration2
		);
KeyedStream<Event, Integer> eventIdKeyedStream = input.keyBy((KeySelector<Event, Integer>) Event::getId);
DataStream<DynamicMatchData<Object>> returns = DynamicCEP.patternStream(eventIdKeyedStream, dynamicConfigurationDataStream)
		.inProcessingTime()
		.minPatternInitialized(2)
				.objectProcess();

returns.print("dynamic");
List<DynamicMatchData<Object>> resultList = new ArrayList<>();

print结果如下,与预期结果一致,

规则1返回数据中的match为字符串,匹配结果id=1,price(2.0,1.0,7.0)

规则10086返回的数据中match为DynamicMatchData<Map<String, List<IN>>>,匹配结果id=1,price(2.0,1.0,7.0)

代码语言:java复制
dynamic:3> DynamicMatchData(id=10086, version=1, match={start=[Event(1, start, 2.0)], middle=[SubEvent(1, middle, 1.0, 2.0)], end=[Event(1, end, 7.0)]})
dynamic:3> DynamicMatchData(id=1, version=1, match=这是EventPatternProcessFunction matchs{start=[Event(1, start, 2.0)], middle=[SubEvent(1, middle, 1.0, 2.0)], end=[Event(1, end, 7.0)]})

EventPatternProcessFunction代码

代码语言:java复制
public class EventPatternProcessFunction extends DynamicPatternProcessFunction<Event,String> {
    @Override
    public DynamicMatchData<String> transform(Map<String, List<Event>> match, Context ctx) {
        DynamicMatchData<String> objectDynamicMatchData = new DynamicMatchData<>();
        objectDynamicMatchData.setMatch("这是EventPatternProcessFunction matchs" match.toString());
        return objectDynamicMatchData;
    }
}

Demo4 配置入参ClassName不继承DynamicPatternProcessFunction

入参的dynamicPatternProcessFunctionClassName必须继承DynamicPatternProcessFunction

如果不继承则使用默认的处理方法DefaultDynamicPatternProcessFunction

代码语言:java复制
DataStream<Event> input =
		env.fromElements(
				new Event(1, "barfoo", 1.0),
				new Event(1, "start", 2.0),
				new Event(1, "foobar", 3.0),
				new SubEvent(1, "foo", 4.0, 1.0),
				new Event(2, "middle", 9.0),
				new SubEvent(1, "middle", 1.0, 2.0),
				new SubEvent(1, "bar", 5.0, 3.0),
				new Event(1, "42", 6.0),
				new Event(1, "end", 7.0));
DynamicConfiguration dynamicConfiguration = DynamicConfigurationDemo.dynamicConfiguration2();
dynamicConfiguration.setDynamicPatternProcessFunctionClassName(EventPatternProcessFunction2.class.getName());
DynamicConfiguration dynamicConfiguration2 = DynamicConfigurationDemo.dynamicConfiguration2();
dynamicConfiguration2.setId(10086L);
DataStream<DynamicConfiguration> dynamicConfigurationDataStream =
		env.fromElements(
				dynamicConfiguration,dynamicConfiguration2
		);
KeyedStream<Event, Integer> eventIdKeyedStream = input.keyBy((KeySelector<Event, Integer>) Event::getId);
DataStream<DynamicMatchData<Object>> returns = DynamicCEP.patternStream(eventIdKeyedStream, dynamicConfigurationDataStream)
		.inProcessingTime()
		.minPatternInitialized(2)
				.objectProcess();

returns.print("dynamic");
List<DynamicMatchData<Object>> resultList = new ArrayList<>();

DataStreamUtils.collect(returns).forEachRemaining(resultList::add);
Assert.assertTrue(resultList.size()==2);

print结果,与预期结果一致

规则1 匹配结果id=1,price(2.0,1.0,7.0),规则10086匹配结果id=1,price(2.0,1.0,7.0)

规则1,10086 返回数据类型均为DynamicMatchData<Map<String, List<IN>>>

代码语言:shell复制
dynamic:3> DynamicMatchData(id=10086, version=1, match={start=[Event(1, start, 2.0)], middle=[SubEvent(1, middle, 1.0, 2.0)], end=[Event(1, end, 7.0)]})
dynamic:3> DynamicMatchData(id=1, version=1, match={start=[Event(1, start, 2.0)], middle=[SubEvent(1, middle, 1.0, 2.0)], end=[Event(1, end, 7.0)]})

EventPatternProcessFunction2

代码语言:java复制
public class EventPatternProcessFunction2 extends PatternProcessFunction<Event, DynamicMatchData<String>> {
    public DynamicMatchData<String> transform(Map<String, List<Event>> match, Context ctx) {
        DynamicMatchData<String> objectDynamicMatchData = new DynamicMatchData<>();
        objectDynamicMatchData.setMatch("这是EventPatternProcessFunction2 matchs" match.toString());
        return objectDynamicMatchData;
    }

    @Override
    public void processMatch(Map<String, List<Event>> match, Context ctx, Collector<DynamicMatchData<String>> out) throws Exception {
        DynamicMatchData<String> dynamicMatchData = transform(match,ctx);
        if(ctx instanceof DynamicCepOperator.ContextFunctionImpl){
            DynamicCepOperator.ContextFunctionImpl context = (DynamicCepOperator.ContextFunctionImpl) ctx;
            String id = context.getPatternProcessor().getId();
            int version = context.getPatternProcessor().getVersion();
            dynamicMatchData.setId(id);
            dynamicMatchData.setVersion(version);
        }
        out.collect(dynamicMatchData);
    }
}

dynamicConfiguration2匹配规则伪代码

代码语言:java复制
Pattern.begin("start").where("name='start'")
    .middle('middle').followedByAny("name='middle'")
    .end('end').followedByAny("name='end'")

0 人点赞