作者:腾讯云流计算 Oceanus 团队
流计算 Oceanus 简介
流计算 Oceanus 是大数据产品生态体系的实时化分析利器,是基于 Apache Flink 构建的具备一站开发、无缝连接、亚秒延时、低廉成本、安全稳定等特点的企业级实时大数据分析平台。流计算 Oceanus 以实现企业数据价值最大化为目标,加速企业实时化数字化的建设进程。
Flink CEP[1] 是在 Flink 上层实现的复杂事件处理库。本文将为您详细介绍如何使用 Flink CEP 实现对复杂事件的处理。示例程序使用 DataStream API 读取 Kafka 中股票的数据,找到股价的低点,完成了复杂事件的处理,最后将结果输出到 Kafka 的另一个 Topic 中去。
前置准备
创建流计算 Oceanus 集群
进入 Oceanus 控制台 [2],点击左侧【集群管理】,点击左上方【创建集群】,具体可参考 Oceanus 官方文档 创建独享集群 [3]。
创建 Kafka Topic
进入 CKafka 控制台 [4],点击左上角【新建】,即可完成 CKafka 实例的创建,并创建 2 个 Topic,demo6-cep-source
和 demo6-cep-dest
。
开发 DataStream 作业
1. 新建 Maven 工程。
在本地 IDEA 中新建 Maven 工程,并配置 pom.xml 文件。pom.xml 文件内容如下:
<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion>
<groupId>com.demos</groupId> <artifactId>DemoCEP</artifactId> <version>1.0-SNAPSHOT</version>
<properties> <maven.compiler.source>8</maven.compiler.source> <maven.compiler.target>8</maven.compiler.target> </properties>
<dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-core</artifactId> <version>1.13.2</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>1.13.2</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.11</artifactId> <version>1.13.2</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_2.11</artifactId> <version>1.13.2</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-cep_2.11</artifactId> <version>1.13.2</version> </dependency>
<!-- test --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_2.11</artifactId> <version>1.13.2</version> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-assembly-plugin</artifactId> <version>3.2.0</version> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> <archive> <manifest> <mainClass>com.demos.CEPTest</mainClass> </manifest> </archive> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> </plugins> </build></project>
2. 代码编写
在 Flink DataStream 作业中,Stock POJO 类用于从 Kafka 中接受 JSON 格式数据,StockSerializerDeserializer 类用于序列化和反序列化,CEPTest 为主程序类。 Stock 类
<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion>
<groupId>com.demos</groupId> <artifactId>DemoCEP</artifactId> <version>1.0-SNAPSHOT</version>
<properties> <maven.compiler.source>8</maven.compiler.source> <maven.compiler.target>8</maven.compiler.target> </properties>
<dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-core</artifactId> <version>1.13.2</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>1.13.2</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.11</artifactId> <version>1.13.2</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_2.11</artifactId> <version>1.13.2</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-cep_2.11</artifactId> <version>1.13.2</version> </dependency>
<!-- test --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_2.11</artifactId> <version>1.13.2</version> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-assembly-plugin</artifactId> <version>3.2.0</version> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> <archive> <manifest> <mainClass>com.demos.CEPTest</mainClass> </manifest> </archive> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> </plugins> </build></project>
StockSerializerDeserializer 类
// 序列化和反序列类public class StockSerializerDeserializer implements SerializationSchema<Stock>, DeserializationSchema<Stock> {
private final ObjectMapper mapper = new ObjectMapper();
@Override public byte[] serialize(Stock stock) { try { return mapper.writeValueAsBytes(stock); } catch (JsonProcessingException e) { throw new RuntimeException(e); } }
@Override public Stock deserialize(byte[] bytes) throws IOException { return mapper.readValue(bytes, Stock.class); }
@Override public boolean isEndOfStream(Stock secEvent) { return false; }
@Override public TypeInformation<Stock> getProducedType() { return TypeExtractor.getForClass(Stock.class); }}
CEPTest 主程序类
public class CEPTest { public static void main(String[] args) { // 设置环境 StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "127.0.0.1:9092"); properties.setProperty("group.id", "test"); String intTopic = "demo6-cep-source"; FlinkKafkaConsumer consumer = new FlinkKafkaConsumer<Stock>(intTopic, new StockSerializerDeserializer(), properties); consumer.setStartFromLatest();
// 添加数据源 DataStream<Stock> input = streamEnv.addSource(consumer);
// 定义要匹配的模式。即股票的低点 Pattern<Stock, ?> pattern = Pattern.<Stock>begin("start").where( new SimpleCondition<Stock>() { @Override public boolean filter(Stock stock) { return stock.getPrice() > 10; } } ).next("bottom").where( new SimpleCondition<Stock>() { @Override public boolean filter(Stock stock) { return stock.getPrice() < 10; } } ).next("up").where( new SimpleCondition<Stock>() { @Override public boolean filter(Stock Stock) { return Stock.getPrice() > 10; } } );
DataStream<String> result = CEP.pattern(input, pattern) .inProcessingTime() .flatSelect( (p, o) -> { StringBuilder builder = new StringBuilder(); builder.append("n"); builder.append(p.get("start").get(0)) .append(",n") .append(p.get("bottom").get(0)) .append(",n") .append(p.get("up").get(0));
o.collect(builder.toString()); }, Types.STRING);
String topicOut = "demo6-cep-dest"; FlinkKafkaProducer<String> producer = new FlinkKafkaProducer<String>(topicOut, new SimpleStringSchema(), properties); // 输出到 Kafka Topic result.addSink(producer); try { streamEnv.execute(); } catch (Exception e) { e.printStackTrace(); } }}
3. 项目打包
使用 IDEA 自带打包工具 Build Artifacts 或者命令行进行打包。命令行打包命令:
mvn clean package
命令行打包后生成的 Jar 包可以在项目 target 目录下找到,Jar 名为 jar_demos-1.0-SNAPSHOT.jar。
流计算 Oceanus 作业
1. 上传依赖
在 Oceanus 控制台,点击左侧【依赖管理】,点击左上角【新建】新建依赖,上传本地 Jar 包。
2. 创建作业
在 Oceanus 控制台,点击左侧【作业管理】,点击左上角【新建】新建作业,作业类型选择 Jar 作业,点击【开发调试】进入作业编辑页面。 【主程序包】选择刚刚上传的依赖,并选择最新版本。参考 pom.xml 文件填写主类,此处填入 com.demos.CEPTest。
3. 运行作业
点击【发布草稿】后启动作业,可通过【日志】面板 TaskManager 或 Flink UI 查看运行信息。
4. 模拟数据
通过 Kafka Client 发送数据到 Topic demo6-cep-source
。 发送命令:
[root@VM-3-centos ~]# cd /opt/kafka_2.11-2.4.1/bin/[root@VM-3-centos bin]# bash kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic demo6-cep-source --producer.config ../config/producer.properties
模拟数据示例:
{"symbol": "ACME", "rowtime": "2020-12-11 10:00:00", "price": 12, "tax":1}{"symbol": "ACME", "rowtime": "2020-12-11 10:00:01", "price": 9, "tax":2}{"symbol": "ACME", "rowtime": "2020-12-11 10:00:02", "price": 12, "tax":1}
更多接入方式请参考 CKafka 收发消息 [5]
5. 查看运行结果
在 Topic demo6-cep-dest
中查看收到的数据,得到期望的数据。
总结
- 使用
DataStream
中的 CEP 时,必须实现 POJO 类的equals()
和hashCode()
方法。因为 Flink CEP 会根据 POJO 类的equals()
和hashCode()
方法进行对象的比较和匹配事件。 - 使用
Table SQL
中的 CEP,请参考 模式检测[6]。 - 打包时无需打包 flink 核心依赖,流计算 Oceanus 平台已提供。
阅读参考
[1] Flink CEP(复杂事件处理): https://nightlies.apache.org/flink/flink-docs-release-1.13/zh/docs/libs/cep/
[2] Oceanus 控制台:https://console.cloud.tencent.com/oceanus/overview
[3] 创建独享集群:https://cloud.tencent.com/document/product/849/48298
[4] CKafka 控制台:https://console.cloud.tencent.com/ckafka/index?rid=1
[5] CKafka 收发消息:https://cloud.tencent.com/document/product/597/54834
[6] 模式检测:https://nightlies.apache.org/flink/flink-docs-release-1.13/zh/docs/dev/table/sql/queries/match_recognize/
流计算 Oceanus 限量秒杀专享活动火爆进行中↓↓
点击文末「阅读原文」,了解腾讯云流计算 Oceanus 更多信息~
腾讯云大数据
长按二维码 关注我们