Flink 实践教程:进阶6-CEP 复杂事件处理

2022-03-07 11:22:24 浏览数 (2)

流计算 Oceanus 简介

流计算 Oceanus 是大数据产品生态体系的实时化分析利器,是基于 Apache Flink 构建的具备一站开发、无缝连接、亚秒延时、低廉成本、安全稳定等特点的企业级实时大数据分析平台。流计算 Oceanus 以实现企业数据价值最大化为目标,加速企业实时化数字化的建设进程。

Flink CEP[1] 是在 Flink 上层实现的复杂事件处理库。本文将为您详细介绍如何使用 Flink CEP 实现对复杂事件的处理。 示例程序使用 DataStream API 读取 Kafka 中股票的数据,找到股价的低点,完成了复杂事件的处理,最后将结果输出到 Kafka 的另一个 Topic 中去。

股票价格走势图股票价格走势图
视频内容

前置准备

创建流计算 Oceanus 集群

在流计算 Oceanus 产品活动页面 1 元购买 Oceanus 集群。进入 Oceanus 控制台 [2],点击左侧【集群管理】,点击左上方【创建集群】,具体可参考 Oceanus 官方文档 创建独享集群 [3]。

创建 Kafka Topic

进入 CKafka 控制台 [4],点击左上角【新建】,即可完成 CKafka 实例的创建,并创建 2 个 Topic,demo6-cep-sourcedemo6-cep-dest

开发 DataStream 作业

1. 新建 Maven 工程。

在本地 IDEA 中新建 Maven 工程,并配置 pom.xml 文件。pom.xml 文件内容如下:

代码语言:html复制
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.demos</groupId>
    <artifactId>DemoCEP</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-core</artifactId>
            <version>1.13.2</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>1.13.2</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.11</artifactId>
            <version>1.13.2</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_2.11</artifactId>
            <version>1.13.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-cep_2.11</artifactId>
            <version>1.13.2</version>
        </dependency>

        <!-- test -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.11</artifactId>
            <version>1.13.2</version>
            <scope>test</scope>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.2.0</version>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                    <archive>
                        <manifest>
                            <mainClass>com.demos.CEPTest</mainClass>
                        </manifest>
                    </archive>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>
2. 代码编写

在 Flink DataStream 作业中,Stock POJO 类用于从 Kafka 中接受 JSON 格式数据,StockSerializerDeserializer 类用于序列化和反序列化,CEPTest 为主程序类。 Stock

代码语言:java复制
// POJO类用于模式匹配时,需实现 equals 和 hashCode 方法。
public class Stock {
    private String symbol;
    private String rowtime;
    private int price;
    private int tax;

    public Stock() {
    }

    public Stock(String symbol, String rowtime, int price, int tax) {
        this.symbol = symbol;
        this.rowtime = rowtime;
        this.price = price;
        this.tax = tax;
    }

    public String getSymbol() {
        return symbol;
    }

    public void setSymbol(String symbol) {
        this.symbol = symbol;
    }

    public String getRowtime() {
        return rowtime;
    }

    public void setRowtime(String rowtime) {
        this.rowtime = rowtime;
    }

    public int getPrice() {
        return price;
    }

    public void setPrice(int price) {
        this.price = price;
    }

    public int getTax() {
        return tax;
    }

    public void setTax(int tax) {
        this.tax = tax;
    }

    @Override
    public boolean equals(Object o) {
        if (this == o) return true;
        if (o == null || getClass() != o.getClass()) return false;
        Stock stock = (Stock) o;
        return Objects.equals(symbol, stock.symbol) && Objects.equals(rowtime, stock.rowtime) && Objects.equals(price, stock.price) && Objects.equals(tax, stock.tax);
    }

    @Override
    public int hashCode() {
        return Objects.hash(symbol, rowtime, price, tax);
    }

    @Override
    public String toString() {
        return "Stock{"  
                "symbol='"   symbol   '''  
                ", rowtime='"   rowtime   '''  
                ", price="   price  
                ", tax="   tax  
                '}';
    }
}

StockSerializerDeserializer

代码语言:java复制
// 序列化和反序列类
public class StockSerializerDeserializer implements SerializationSchema<Stock>, DeserializationSchema<Stock> {

    private final ObjectMapper mapper = new ObjectMapper();

    @Override
    public byte[] serialize(Stock stock) {
        try {
            return mapper.writeValueAsBytes(stock);
        } catch (JsonProcessingException e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public Stock deserialize(byte[] bytes) throws IOException {
        return mapper.readValue(bytes, Stock.class);
    }

    @Override
    public boolean isEndOfStream(Stock secEvent) {
        return false;
    }

    @Override
    public TypeInformation<Stock> getProducedType() {
        return TypeExtractor.getForClass(Stock.class);
    }
}

CEPTest 主程序类

代码语言:java复制
public class CEPTest {
    public static void main(String[] args) {
        // 设置环境
        StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();

        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "127.0.0.1:9092");
        properties.setProperty("group.id", "test");
        String intTopic = "demo6-cep-source";
        FlinkKafkaConsumer consumer = new FlinkKafkaConsumer<Stock>(intTopic, new StockSerializerDeserializer(), properties);
        consumer.setStartFromLatest();
        
        // 添加数据源
        DataStream<Stock> input = streamEnv.addSource(consumer);

        // 定义要匹配的模式。即股票的低点
        Pattern<Stock, ?> pattern = Pattern.<Stock>begin("start").where(
                new SimpleCondition<Stock>() {
                    @Override
                    public boolean filter(Stock stock) {
                        return stock.getPrice() > 10;
                    }
                }
        ).next("bottom").where(
                new SimpleCondition<Stock>() {
                    @Override
                    public boolean filter(Stock stock) {
                        return stock.getPrice() < 10;
                    }
                }
        ).next("up").where(
                new SimpleCondition<Stock>() {
                    @Override
                    public boolean filter(Stock Stock) {
                        return Stock.getPrice() > 10;
                    }
                }
        );

        DataStream<String> result = CEP.pattern(input, pattern)
                .inProcessingTime()
                .flatSelect(
                        (p, o) -> {
                            StringBuilder builder = new StringBuilder();
                            builder.append("n");
                            builder.append(p.get("start").get(0))
                                    .append(",n")
                                    .append(p.get("bottom").get(0))
                                    .append(",n")
                                    .append(p.get("up").get(0));

                            o.collect(builder.toString());
                        },
                        Types.STRING);

        String topicOut = "demo6-cep-dest";
        FlinkKafkaProducer<String> producer = new FlinkKafkaProducer<String>(topicOut, new SimpleStringSchema(), properties);
      
        // 输出到 Kafka Topic
        result.addSink(producer);
        try {
            streamEnv.execute();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
3. 项目打包

使用 IDEA 自带打包工具 Build Artifacts 或者命令行进行打包。命令行打包命令:

代码语言:shell复制
mvn clean package

命令行打包后生成的 Jar 包可以在项目 target 目录下找到,Jar 名为 jar_demos-1.0-SNAPSHOT.jar。

流计算 Oceanus 作业

1. 上传依赖

在 Oceanus 控制台,点击左侧【依赖管理】,点击左上角【新建】新建依赖,上传本地 Jar 包。

2. 创建作业

在 Oceanus 控制台,点击左侧【作业管理】,点击左上角【新建】新建作业,作业类型选择 Jar 作业,点击【开发调试】进入作业编辑页面。 【主程序包】选择刚刚上传的依赖,并选择最新版本。参考 pom.xml 文件填写主类,此处填入 com.demos.CEPTest。

3. 运行作业

点击【发布草稿】后启动作业,可通过【日志】面板 TaskManager 或 Flink UI 查看运行信息。

4. 模拟数据

通过 Kafka Client 发送数据到 Topic demo6-cep-source。 发送命令:

代码语言:shell复制
[root@VM-3-centos ~]# cd /opt/kafka_2.11-2.4.1/bin/
[root@VM-3-centos bin]# bash kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic demo6-cep-source --producer.config ../config/producer.properties

模拟数据示例:

代码语言:json复制
{"symbol": "ACME", "rowtime": "2020-12-11 10:00:00", "price": 12, "tax":1}
{"symbol": "ACME", "rowtime": "2020-12-11 10:00:01", "price": 9, "tax":2}
{"symbol": "ACME", "rowtime": "2020-12-11 10:00:02", "price": 12, "tax":1}

更多接入方式请参考 CKafka 收发消息 [5]

5. 查看运行结果

在 Topic demo6-cep-dest中查看收到的数据,得到期望的数据。

运行结果运行结果

总结

  1. 使用 DataStream 中的 CEP 时,必须实现 POJO 类的 equals()hashCode()方法。 因为 Flink CEP 会根据 POJO 类的 equals()hashCode()方法进行对象的比较和匹配事件。
  2. 使用 Table SQL 中的 CEP,请参考 模式检测[6]。
  3. 打包时无需打包 flink 核心依赖,流计算 Oceanus 平台已提供。

阅读参考

[1] Flink CEP(复杂事件处理): https://nightlies.apache.org/flink/flink-docs-release-1.13/zh/docs/libs/cep/

[2] Oceanus 控制台:https://console.cloud.tencent.com/oceanus/overview

[3] 创建独享集群:https://cloud.tencent.com/document/product/849/48298

[4] CKafka 控制台:https://console.cloud.tencent.com/ckafka/index?rid=1

[5] CKafka 收发消息:https://cloud.tencent.com/document/product/597/54834

[6] 模式检测:https://nightlies.apache.org/flink/flink-docs-release-1.13/zh/docs/dev/table/sql/queries/match_recognize/

0 人点赞