Flink 实践教程-进阶(6):CEP 复杂事件处理

2022-01-14 10:10:06 浏览数 (1)

作者:腾讯云流计算 Oceanus 团队

流计算 Oceanus 简介  

流计算 Oceanus 是大数据产品生态体系的实时化分析利器,是基于 Apache Flink 构建的具备一站开发、无缝连接、亚秒延时、低廉成本、安全稳定等特点的企业级实时大数据分析平台。流计算 Oceanus 以实现企业数据价值最大化为目标,加速企业实时化数字化的建设进程。 

Flink CEP[1] 是在 Flink 上层实现的复杂事件处理库。本文将为您详细介绍如何使用 Flink CEP 实现对复杂事件的处理。示例程序使用 DataStream API 读取 Kafka 中股票的数据,找到股价的低点,完成了复杂事件的处理,最后将结果输出到 Kafka 的另一个 Topic 中去。

前置准备  

创建流计算 Oceanus 集群

进入 Oceanus 控制台 [2],点击左侧【集群管理】,点击左上方【创建集群】,具体可参考 Oceanus 官方文档 创建独享集群 [3]。 

创建 Kafka  Topic

进入 CKafka 控制台 [4],点击左上角【新建】,即可完成 CKafka 实例的创建,并创建 2 个 Topic,demo6-cep-sourcedemo6-cep-dest

开发 DataStream 作业

1. 新建 Maven 工程。 

在本地 IDEA 中新建 Maven 工程,并配置 pom.xml 文件。pom.xml 文件内容如下:  

代码语言:javascript复制
<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0"        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">   <modelVersion>4.0.0</modelVersion>
   <groupId>com.demos</groupId>   <artifactId>DemoCEP</artifactId>   <version>1.0-SNAPSHOT</version>
   <properties>       <maven.compiler.source>8</maven.compiler.source>       <maven.compiler.target>8</maven.compiler.target>   </properties>
   <dependencies>       <dependency>           <groupId>org.apache.flink</groupId>           <artifactId>flink-core</artifactId>           <version>1.13.2</version>           <scope>provided</scope>       </dependency>       <dependency>           <groupId>org.apache.flink</groupId>           <artifactId>flink-java</artifactId>           <version>1.13.2</version>           <scope>provided</scope>       </dependency>       <dependency>           <groupId>org.apache.flink</groupId>           <artifactId>flink-streaming-java_2.11</artifactId>           <version>1.13.2</version>           <scope>provided</scope>       </dependency>       <dependency>           <groupId>org.apache.flink</groupId>           <artifactId>flink-connector-kafka_2.11</artifactId>           <version>1.13.2</version>       </dependency>       <dependency>           <groupId>org.apache.flink</groupId>           <artifactId>flink-cep_2.11</artifactId>           <version>1.13.2</version>       </dependency>
       <!-- test -->       <dependency>           <groupId>org.apache.flink</groupId>           <artifactId>flink-clients_2.11</artifactId>           <version>1.13.2</version>           <scope>test</scope>       </dependency>   </dependencies>   <build>       <plugins>           <plugin>               <groupId>org.apache.maven.plugins</groupId>               <artifactId>maven-assembly-plugin</artifactId>               <version>3.2.0</version>               <configuration>                   <descriptorRefs>                       <descriptorRef>jar-with-dependencies</descriptorRef>                   </descriptorRefs>                   <archive>                       <manifest>                           <mainClass>com.demos.CEPTest</mainClass>                       </manifest>                   </archive>               </configuration>               <executions>                   <execution>                       <id>make-assembly</id>                       <phase>package</phase>                       <goals>                           <goal>single</goal>                       </goals>                   </execution>               </executions>           </plugin>       </plugins>   </build></project>
2. 代码编写  

在 Flink DataStream 作业中,Stock POJO 类用于从 Kafka 中接受 JSON 格式数据,StockSerializerDeserializer 类用于序列化和反序列化,CEPTest 为主程序类。 Stock 类  

代码语言:javascript复制
<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0"        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">   <modelVersion>4.0.0</modelVersion>
   <groupId>com.demos</groupId>   <artifactId>DemoCEP</artifactId>   <version>1.0-SNAPSHOT</version>
   <properties>       <maven.compiler.source>8</maven.compiler.source>       <maven.compiler.target>8</maven.compiler.target>   </properties>
   <dependencies>       <dependency>           <groupId>org.apache.flink</groupId>           <artifactId>flink-core</artifactId>           <version>1.13.2</version>           <scope>provided</scope>       </dependency>       <dependency>           <groupId>org.apache.flink</groupId>           <artifactId>flink-java</artifactId>           <version>1.13.2</version>           <scope>provided</scope>       </dependency>       <dependency>           <groupId>org.apache.flink</groupId>           <artifactId>flink-streaming-java_2.11</artifactId>           <version>1.13.2</version>           <scope>provided</scope>       </dependency>       <dependency>           <groupId>org.apache.flink</groupId>           <artifactId>flink-connector-kafka_2.11</artifactId>           <version>1.13.2</version>       </dependency>       <dependency>           <groupId>org.apache.flink</groupId>           <artifactId>flink-cep_2.11</artifactId>           <version>1.13.2</version>       </dependency>
       <!-- test -->       <dependency>           <groupId>org.apache.flink</groupId>           <artifactId>flink-clients_2.11</artifactId>           <version>1.13.2</version>           <scope>test</scope>       </dependency>   </dependencies>   <build>       <plugins>           <plugin>               <groupId>org.apache.maven.plugins</groupId>               <artifactId>maven-assembly-plugin</artifactId>               <version>3.2.0</version>               <configuration>                   <descriptorRefs>                       <descriptorRef>jar-with-dependencies</descriptorRef>                   </descriptorRefs>                   <archive>                       <manifest>                           <mainClass>com.demos.CEPTest</mainClass>                       </manifest>                   </archive>               </configuration>               <executions>                   <execution>                       <id>make-assembly</id>                       <phase>package</phase>                       <goals>                           <goal>single</goal>                       </goals>                   </execution>               </executions>           </plugin>       </plugins>   </build></project>

StockSerializerDeserializer 类  

代码语言:javascript复制
// 序列化和反序列类public class StockSerializerDeserializer implements SerializationSchema<Stock>, DeserializationSchema<Stock> {
   private final ObjectMapper mapper = new ObjectMapper();
   @Override   public byte[] serialize(Stock stock) {       try {           return mapper.writeValueAsBytes(stock);      } catch (JsonProcessingException e) {           throw new RuntimeException(e);      }  }
   @Override   public Stock deserialize(byte[] bytes) throws IOException {       return mapper.readValue(bytes, Stock.class);  }
   @Override   public boolean isEndOfStream(Stock secEvent) {       return false;  }
   @Override   public TypeInformation<Stock> getProducedType() {       return TypeExtractor.getForClass(Stock.class);  }}

CEPTest 主程序类

代码语言:javascript复制
public class CEPTest {   public static void main(String[] args) {       // 设置环境       StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
       Properties properties = new Properties();       properties.setProperty("bootstrap.servers", "127.0.0.1:9092");       properties.setProperty("group.id", "test");       String intTopic = "demo6-cep-source";       FlinkKafkaConsumer consumer = new FlinkKafkaConsumer<Stock>(intTopic, new StockSerializerDeserializer(), properties);       consumer.setStartFromLatest();
       // 添加数据源       DataStream<Stock> input = streamEnv.addSource(consumer);
       // 定义要匹配的模式。即股票的低点       Pattern<Stock, ?> pattern = Pattern.<Stock>begin("start").where(               new SimpleCondition<Stock>() {                   @Override                   public boolean filter(Stock stock) {                       return stock.getPrice() > 10;                  }              }      ).next("bottom").where(               new SimpleCondition<Stock>() {                   @Override                   public boolean filter(Stock stock) {                       return stock.getPrice() < 10;                  }              }      ).next("up").where(               new SimpleCondition<Stock>() {                   @Override                   public boolean filter(Stock Stock) {                       return Stock.getPrice() > 10;                  }              }      );
       DataStream<String> result = CEP.pattern(input, pattern)              .inProcessingTime()              .flatSelect(                      (p, o) -> {                           StringBuilder builder = new StringBuilder();                           builder.append("n");                           builder.append(p.get("start").get(0))                                  .append(",n")                                  .append(p.get("bottom").get(0))                                  .append(",n")                                  .append(p.get("up").get(0));
                           o.collect(builder.toString());                      },                       Types.STRING);
       String topicOut = "demo6-cep-dest";       FlinkKafkaProducer<String> producer = new FlinkKafkaProducer<String>(topicOut, new SimpleStringSchema(), properties);            // 输出到 Kafka Topic       result.addSink(producer);       try {           streamEnv.execute();      } catch (Exception e) {           e.printStackTrace();      }  }}

3. 项目打包  

使用 IDEA 自带打包工具 Build Artifacts 或者命令行进行打包。命令行打包命令:

代码语言:javascript复制
mvn clean package

命令行打包后生成的 Jar 包可以在项目 target 目录下找到,Jar 名为 jar_demos-1.0-SNAPSHOT.jar。 

流计算 Oceanus 作业  

1. 上传依赖  

在 Oceanus 控制台,点击左侧【依赖管理】,点击左上角【新建】新建依赖,上传本地 Jar 包。

2. 创建作业  

在 Oceanus 控制台,点击左侧【作业管理】,点击左上角【新建】新建作业,作业类型选择 Jar 作业,点击【开发调试】进入作业编辑页面。 【主程序包】选择刚刚上传的依赖,并选择最新版本。参考 pom.xml 文件填写主类,此处填入 com.demos.CEPTest。 

3. 运行作业  

点击【发布草稿】后启动作业,可通过【日志】面板 TaskManager 或 Flink UI 查看运行信息。

4. 模拟数据  

通过 Kafka Client 发送数据到 Topic demo6-cep-source。 发送命令:

代码语言:javascript复制
[root@VM-3-centos ~]# cd /opt/kafka_2.11-2.4.1/bin/[root@VM-3-centos bin]# bash kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic demo6-cep-source --producer.config ../config/producer.properties

模拟数据示例:

代码语言:javascript复制
{"symbol": "ACME", "rowtime": "2020-12-11 10:00:00", "price": 12, "tax":1}{"symbol": "ACME", "rowtime": "2020-12-11 10:00:01", "price": 9, "tax":2}{"symbol": "ACME", "rowtime": "2020-12-11 10:00:02", "price": 12, "tax":1}

更多接入方式请参考 CKafka 收发消息 [5]  

5. 查看运行结果  

在 Topic demo6-cep-dest中查看收到的数据,得到期望的数据。

总结

  1. 使用 DataStream 中的 CEP 时,必须实现 POJO 类的 equals()hashCode()方法。因为 Flink CEP 会根据 POJO 类的 equals()hashCode()方法进行对象的比较和匹配事件。 
  2. 使用 Table SQL 中的 CEP,请参考 模式检测[6]。 
  3. 打包时无需打包 flink 核心依赖,流计算 Oceanus 平台已提供。 

阅读参考  

[1] Flink CEP(复杂事件处理): https://nightlies.apache.org/flink/flink-docs-release-1.13/zh/docs/libs/cep/

[2] Oceanus 控制台:https://console.cloud.tencent.com/oceanus/overview

[3] 创建独享集群:https://cloud.tencent.com/document/product/849/48298

[4] CKafka 控制台:https://console.cloud.tencent.com/ckafka/index?rid=1

[5] CKafka 收发消息:https://cloud.tencent.com/document/product/597/54834

[6] 模式检测:https://nightlies.apache.org/flink/flink-docs-release-1.13/zh/docs/dev/table/sql/queries/match_recognize/

流计算 Oceanus 限量秒杀专享活动火爆进行中↓↓

点击文末「阅读原文」,了解腾讯云流计算 Oceanus 更多信息~

腾讯云大数据

长按二维码 关注我们

0 人点赞