flink cep 案例之机架温度监控报警

2020-09-15 14:24:19 浏览数 (1)

FlinkCEP是在Flink之上实现的复杂事件处理库。它提供了丰富的API,允许您在不停止的事件流中检测事件模式,并对复杂事件做相应处理。模式匹配是复杂事件处理的一个有力的保障,应用场景包括受一系列事件驱动的各种业务流程,例如在正常的网略行为中侦测异常行为;在金融应用中查找价格、交易量和其他行为的模式。

特点:

  • 复杂性:多个流join,窗口聚合,事件序列或patterns检测
  • 低延迟:秒或毫秒级别,比如做信用卡盗刷检测,或攻击检测
  • 高吞吐:每秒上万条消息

在这篇博客中,我们将通过一个案例来讲解flink CEP的使用。 案例来源于官网博客:https://flink.apache.org/news/2016/04/06/cep-monitoring.html

输入事件流由来自一组机架的温度和功率事件组成。目标是检测 当机架过热时我们需要发出警告和报警。

我们通过自定义的source来模拟生成机架的温度,然后定义以下的规则来生成警告和报警

  • 警告:某机架在10秒内连续两次上报的温度超过阈值;
  • 报警:某机架在20秒内连续两次匹配警告;

首先我们定义一个监控事件

注意要重写里面的hashcode方法和equal方法

来自官网:The events in the DataStream to which you want to apply pattern matching must implement proper equals() and hashCode() methods because FlinkCEP uses them for comparing and matching events.

代码语言:javascript复制
public abstract class MonitoringEvent {
    private int rackID;

    public MonitoringEvent(int rackID) {
        this.rackID = rackID;
    }

    public int getRackID() {
        return rackID;
    }

    public void setRackID(int rackID) {
        this.rackID = rackID;
    }

    @Override
    public boolean equals(Object obj) {
        if (obj instanceof MonitoringEvent) {
            MonitoringEvent monitoringEvent = (MonitoringEvent) obj;
            return monitoringEvent.canEquals(this) && rackID == monitoringEvent.rackID;
        } else {
            return false;
        }
    }

    @Override
    public int hashCode() {
        return rackID;
    }

    public boolean canEquals(Object obj) {
        return obj instanceof MonitoringEvent;
    }
}




public class TemperatureEvent extends MonitoringEvent {
    private double temperature;
    ...
}

public class PowerEvent extends MonitoringEvent {
    private double voltage;
    ...
}

我们通过自定义的source来模拟生成MonitoringEvent数据。

代码语言:javascript复制


        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // Use ingestion time => TimeCharacteristic == EventTime   IngestionTimeExtractor
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        // Input stream of monitoring events
        DataStream<MonitoringEvent> inputEventStream = env
                .addSource(new MonitoringEventSource(
                        MAX_RACK_ID,
                        PAUSE,
                        TEMPERATURE_RATIO,
                        POWER_STD,
                        POWER_MEAN,
                        TEMP_STD,
                        TEMP_MEAN))
                .assignTimestampsAndWatermarks(new IngestionTimeExtractor<>());

接下来定义模式,在10秒钟之内连续两个event的温度超过阈值

代码语言:javascript复制

       // Warning pattern: Two consecutive temperature events whose temperature is higher than the given threshold
        // appearing within a time interval of 10 seconds
        Pattern<MonitoringEvent, ?> warningPattern = Pattern.<MonitoringEvent>begin("first")
                .subtype(TemperatureEvent.class)
                .where(new IterativeCondition<TemperatureEvent>() {
                    private static final long serialVersionUID = -6301755149429716724L;

                    @Override
                    public boolean filter(TemperatureEvent value, Context<TemperatureEvent> ctx) throws Exception {
                        return value.getTemperature() >= TEMPERATURE_THRESHOLD;
                    }
                })
                .next("second")  //紧接着上一个事件
                
                
                .subtype(TemperatureEvent.class)
                .where(new IterativeCondition<TemperatureEvent>() {
                    private static final long serialVersionUID = 2392863109523984059L;

                    @Override
                    public boolean filter(TemperatureEvent value, Context<TemperatureEvent> ctx) throws Exception {
                        return value.getTemperature() >= TEMPERATURE_THRESHOLD;
                    }
                })
                .within(Time.seconds(10));
                

使用报警模式和输入流生成模式流

代码语言:javascript复制
     // Create a pattern stream from our warning pattern
        PatternStream<MonitoringEvent> tempPatternStream = CEP.pattern(
                inputEventStream.keyBy("rackID"),
                warningPattern);

使用select方法为每个匹配的报警模式生成相应的报警。其中返回值是一个map,key是我们定义的模式,value是匹配的事件列表。

代码语言:javascript复制

    // Generate temperature warnings for each matched warning pattern
        DataStream<TemperatureWarning> warnings = tempPatternStream.select(
                (Map<String, List<MonitoringEvent>> pattern) -> {
                    TemperatureEvent first = (TemperatureEvent) pattern.get("first").get(0);
                    TemperatureEvent second = (TemperatureEvent) pattern.get("second").get(0);

                    return new TemperatureWarning(first.getRackID(), (first.getTemperature()   second.getTemperature()) / 2);
                }
        );

以上我们最后生成了相应的用于警告的DataStream类型的数据流warnings,接下来我们使用这个警告流来生成我们的报警流,即在20秒内连续两次发生警告。

代码语言:javascript复制

   // Alert pattern: Two consecutive temperature warnings appearing within a time interval of 20 seconds
        Pattern<TemperatureWarning, ?> alertPattern = Pattern.<TemperatureWarning>begin("first")
                .next("second")
                .within(Time.seconds(20));

然后通过上面的报警模式alertPattern和警告流warnings生成我们的报警流alertPatternStream。

代码语言:javascript复制

   // Create a pattern stream from our alert pattern
        PatternStream<TemperatureWarning> alertPatternStream = CEP.pattern(
                warnings.keyBy("rackID"),
                alertPattern);

最后当收集到的两次警告中,第一次警告的平均温度小于第二次的时候,生成报警,封装TemperatureAlert信息返回。

代码语言:javascript复制

  // Generate a temperature alert only if the second temperature warning's average temperature is higher than
        // first warning's temperature
        DataStream<TemperatureAlert> alerts = alertPatternStream.flatSelect(
                (Map<String, List<TemperatureWarning>> pattern, Collector<TemperatureAlert> out) -> {
                    TemperatureWarning first = pattern.get("first").get(0);
                    TemperatureWarning second = pattern.get("second").get(0);

                    if (first.getAverageTemperature() < second.getAverageTemperature()) {
                        out.collect(new TemperatureAlert(first.getRackID()));
                    }
                },
                TypeInformation.of(TemperatureAlert.class));

最后我们将报警流和警告流输出,当然我们也可以对这两个流做其他的操作,比如发到报警系统等。

代码语言:javascript复制
   // Print the warning and alert events to stdout
        warnings.print();
        alerts.print();

参考: [1] https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/libs/cep.html [2] https://flink.apache.org/news/2016/04/06/cep-monitoring.html

0 人点赞