FlinkCEP是在Flink之上实现的复杂事件处理库。它提供了丰富的API,允许您在不停止的事件流中检测事件模式,并对复杂事件做相应处理。模式匹配是复杂事件处理的一个有力的保障,应用场景包括受一系列事件驱动的各种业务流程,例如在正常的网略行为中侦测异常行为;在金融应用中查找价格、交易量和其他行为的模式。
特点:
- 复杂性:多个流join,窗口聚合,事件序列或patterns检测
- 低延迟:秒或毫秒级别,比如做信用卡盗刷检测,或攻击检测
- 高吞吐:每秒上万条消息
在这篇博客中,我们将通过一个案例来讲解flink CEP的使用。 案例来源于官网博客:https://flink.apache.org/news/2016/04/06/cep-monitoring.html
输入事件流由来自一组机架的温度和功率事件组成。目标是检测 当机架过热时我们需要发出警告和报警。
我们通过自定义的source来模拟生成机架的温度,然后定义以下的规则来生成警告和报警
- 警告:某机架在10秒内连续两次上报的温度超过阈值;
- 报警:某机架在20秒内连续两次匹配警告;
首先我们定义一个监控事件
注意要重写里面的hashcode方法和equal方法
来自官网:The events in the DataStream to which you want to apply pattern matching must implement proper equals() and hashCode() methods because FlinkCEP uses them for comparing and matching events.
代码语言:javascript复制public abstract class MonitoringEvent {
private int rackID;
public MonitoringEvent(int rackID) {
this.rackID = rackID;
}
public int getRackID() {
return rackID;
}
public void setRackID(int rackID) {
this.rackID = rackID;
}
@Override
public boolean equals(Object obj) {
if (obj instanceof MonitoringEvent) {
MonitoringEvent monitoringEvent = (MonitoringEvent) obj;
return monitoringEvent.canEquals(this) && rackID == monitoringEvent.rackID;
} else {
return false;
}
}
@Override
public int hashCode() {
return rackID;
}
public boolean canEquals(Object obj) {
return obj instanceof MonitoringEvent;
}
}
public class TemperatureEvent extends MonitoringEvent {
private double temperature;
...
}
public class PowerEvent extends MonitoringEvent {
private double voltage;
...
}
我们通过自定义的source来模拟生成MonitoringEvent数据。
代码语言:javascript复制
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Use ingestion time => TimeCharacteristic == EventTime IngestionTimeExtractor
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// Input stream of monitoring events
DataStream<MonitoringEvent> inputEventStream = env
.addSource(new MonitoringEventSource(
MAX_RACK_ID,
PAUSE,
TEMPERATURE_RATIO,
POWER_STD,
POWER_MEAN,
TEMP_STD,
TEMP_MEAN))
.assignTimestampsAndWatermarks(new IngestionTimeExtractor<>());
接下来定义模式,在10秒钟之内连续两个event的温度超过阈值
代码语言:javascript复制
// Warning pattern: Two consecutive temperature events whose temperature is higher than the given threshold
// appearing within a time interval of 10 seconds
Pattern<MonitoringEvent, ?> warningPattern = Pattern.<MonitoringEvent>begin("first")
.subtype(TemperatureEvent.class)
.where(new IterativeCondition<TemperatureEvent>() {
private static final long serialVersionUID = -6301755149429716724L;
@Override
public boolean filter(TemperatureEvent value, Context<TemperatureEvent> ctx) throws Exception {
return value.getTemperature() >= TEMPERATURE_THRESHOLD;
}
})
.next("second") //紧接着上一个事件
.subtype(TemperatureEvent.class)
.where(new IterativeCondition<TemperatureEvent>() {
private static final long serialVersionUID = 2392863109523984059L;
@Override
public boolean filter(TemperatureEvent value, Context<TemperatureEvent> ctx) throws Exception {
return value.getTemperature() >= TEMPERATURE_THRESHOLD;
}
})
.within(Time.seconds(10));
使用报警模式和输入流生成模式流
代码语言:javascript复制 // Create a pattern stream from our warning pattern
PatternStream<MonitoringEvent> tempPatternStream = CEP.pattern(
inputEventStream.keyBy("rackID"),
warningPattern);
使用select方法为每个匹配的报警模式生成相应的报警。其中返回值是一个map,key是我们定义的模式,value是匹配的事件列表。
代码语言:javascript复制
// Generate temperature warnings for each matched warning pattern
DataStream<TemperatureWarning> warnings = tempPatternStream.select(
(Map<String, List<MonitoringEvent>> pattern) -> {
TemperatureEvent first = (TemperatureEvent) pattern.get("first").get(0);
TemperatureEvent second = (TemperatureEvent) pattern.get("second").get(0);
return new TemperatureWarning(first.getRackID(), (first.getTemperature() second.getTemperature()) / 2);
}
);
以上我们最后生成了相应的用于警告的DataStream类型的数据流warnings,接下来我们使用这个警告流来生成我们的报警流,即在20秒内连续两次发生警告。
代码语言:javascript复制
// Alert pattern: Two consecutive temperature warnings appearing within a time interval of 20 seconds
Pattern<TemperatureWarning, ?> alertPattern = Pattern.<TemperatureWarning>begin("first")
.next("second")
.within(Time.seconds(20));
然后通过上面的报警模式alertPattern和警告流warnings生成我们的报警流alertPatternStream。
代码语言:javascript复制
// Create a pattern stream from our alert pattern
PatternStream<TemperatureWarning> alertPatternStream = CEP.pattern(
warnings.keyBy("rackID"),
alertPattern);
最后当收集到的两次警告中,第一次警告的平均温度小于第二次的时候,生成报警,封装TemperatureAlert信息返回。
代码语言:javascript复制
// Generate a temperature alert only if the second temperature warning's average temperature is higher than
// first warning's temperature
DataStream<TemperatureAlert> alerts = alertPatternStream.flatSelect(
(Map<String, List<TemperatureWarning>> pattern, Collector<TemperatureAlert> out) -> {
TemperatureWarning first = pattern.get("first").get(0);
TemperatureWarning second = pattern.get("second").get(0);
if (first.getAverageTemperature() < second.getAverageTemperature()) {
out.collect(new TemperatureAlert(first.getRackID()));
}
},
TypeInformation.of(TemperatureAlert.class));
最后我们将报警流和警告流输出,当然我们也可以对这两个流做其他的操作,比如发到报警系统等。
代码语言:javascript复制 // Print the warning and alert events to stdout
warnings.print();
alerts.print();
参考: [1] https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/libs/cep.html [2] https://flink.apache.org/news/2016/04/06/cep-monitoring.html