流计算 Oceanus 简介
流计算 Oceanus 是大数据产品生态体系的实时化分析利器,是基于 Apache Flink 构建的具备一站开发、无缝连接、亚秒延时、低廉成本、安全稳定等特点的企业级实时大数据分析平台。流计算 Oceanus 以实现企业数据价值最大化为目标,加速企业实时化数字化的建设进程。
Flink CEP[1] 是在 Flink 上层实现的复杂事件处理库。本文将为您详细介绍如何使用 Flink CEP 实现对复杂事件的处理。 示例程序使用 DataStream API 读取 Kafka 中股票的数据,找到股价的低点,完成了复杂事件的处理,最后将结果输出到 Kafka 的另一个 Topic 中去。
前置准备
创建流计算 Oceanus 集群
在流计算 Oceanus 产品活动页面 1 元购买 Oceanus 集群。进入 Oceanus 控制台 [2],点击左侧【集群管理】,点击左上方【创建集群】,具体可参考 Oceanus 官方文档 创建独享集群 [3]。
创建 Kafka Topic
进入 CKafka 控制台 [4],点击左上角【新建】,即可完成 CKafka 实例的创建,并创建 2 个 Topic,demo6-cep-source
和 demo6-cep-dest
。
开发 DataStream 作业
1. 新建 Maven 工程。
在本地 IDEA 中新建 Maven 工程,并配置 pom.xml 文件。pom.xml 文件内容如下:
代码语言:html复制<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.demos</groupId>
<artifactId>DemoCEP</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>1.13.2</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.13.2</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.13.2</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>1.13.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cep_2.11</artifactId>
<version>1.13.2</version>
</dependency>
<!-- test -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>1.13.2</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.2.0</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass>com.demos.CEPTest</mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
2. 代码编写
在 Flink DataStream 作业中,Stock POJO 类用于从 Kafka 中接受 JSON 格式数据,StockSerializerDeserializer 类用于序列化和反序列化,CEPTest 为主程序类。 Stock 类
代码语言:java复制// POJO类用于模式匹配时,需实现 equals 和 hashCode 方法。
public class Stock {
private String symbol;
private String rowtime;
private int price;
private int tax;
public Stock() {
}
public Stock(String symbol, String rowtime, int price, int tax) {
this.symbol = symbol;
this.rowtime = rowtime;
this.price = price;
this.tax = tax;
}
public String getSymbol() {
return symbol;
}
public void setSymbol(String symbol) {
this.symbol = symbol;
}
public String getRowtime() {
return rowtime;
}
public void setRowtime(String rowtime) {
this.rowtime = rowtime;
}
public int getPrice() {
return price;
}
public void setPrice(int price) {
this.price = price;
}
public int getTax() {
return tax;
}
public void setTax(int tax) {
this.tax = tax;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Stock stock = (Stock) o;
return Objects.equals(symbol, stock.symbol) && Objects.equals(rowtime, stock.rowtime) && Objects.equals(price, stock.price) && Objects.equals(tax, stock.tax);
}
@Override
public int hashCode() {
return Objects.hash(symbol, rowtime, price, tax);
}
@Override
public String toString() {
return "Stock{"
"symbol='" symbol '''
", rowtime='" rowtime '''
", price=" price
", tax=" tax
'}';
}
}
StockSerializerDeserializer 类
代码语言:java复制// 序列化和反序列类
public class StockSerializerDeserializer implements SerializationSchema<Stock>, DeserializationSchema<Stock> {
private final ObjectMapper mapper = new ObjectMapper();
@Override
public byte[] serialize(Stock stock) {
try {
return mapper.writeValueAsBytes(stock);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}
@Override
public Stock deserialize(byte[] bytes) throws IOException {
return mapper.readValue(bytes, Stock.class);
}
@Override
public boolean isEndOfStream(Stock secEvent) {
return false;
}
@Override
public TypeInformation<Stock> getProducedType() {
return TypeExtractor.getForClass(Stock.class);
}
}
CEPTest 主程序类
代码语言:java复制public class CEPTest {
public static void main(String[] args) {
// 设置环境
StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "127.0.0.1:9092");
properties.setProperty("group.id", "test");
String intTopic = "demo6-cep-source";
FlinkKafkaConsumer consumer = new FlinkKafkaConsumer<Stock>(intTopic, new StockSerializerDeserializer(), properties);
consumer.setStartFromLatest();
// 添加数据源
DataStream<Stock> input = streamEnv.addSource(consumer);
// 定义要匹配的模式。即股票的低点
Pattern<Stock, ?> pattern = Pattern.<Stock>begin("start").where(
new SimpleCondition<Stock>() {
@Override
public boolean filter(Stock stock) {
return stock.getPrice() > 10;
}
}
).next("bottom").where(
new SimpleCondition<Stock>() {
@Override
public boolean filter(Stock stock) {
return stock.getPrice() < 10;
}
}
).next("up").where(
new SimpleCondition<Stock>() {
@Override
public boolean filter(Stock Stock) {
return Stock.getPrice() > 10;
}
}
);
DataStream<String> result = CEP.pattern(input, pattern)
.inProcessingTime()
.flatSelect(
(p, o) -> {
StringBuilder builder = new StringBuilder();
builder.append("n");
builder.append(p.get("start").get(0))
.append(",n")
.append(p.get("bottom").get(0))
.append(",n")
.append(p.get("up").get(0));
o.collect(builder.toString());
},
Types.STRING);
String topicOut = "demo6-cep-dest";
FlinkKafkaProducer<String> producer = new FlinkKafkaProducer<String>(topicOut, new SimpleStringSchema(), properties);
// 输出到 Kafka Topic
result.addSink(producer);
try {
streamEnv.execute();
} catch (Exception e) {
e.printStackTrace();
}
}
}
3. 项目打包
使用 IDEA 自带打包工具 Build Artifacts 或者命令行进行打包。命令行打包命令:
代码语言:shell复制mvn clean package
命令行打包后生成的 Jar 包可以在项目 target 目录下找到,Jar 名为 jar_demos-1.0-SNAPSHOT.jar。
流计算 Oceanus 作业
1. 上传依赖
在 Oceanus 控制台,点击左侧【依赖管理】,点击左上角【新建】新建依赖,上传本地 Jar 包。
2. 创建作业
在 Oceanus 控制台,点击左侧【作业管理】,点击左上角【新建】新建作业,作业类型选择 Jar 作业,点击【开发调试】进入作业编辑页面。 【主程序包】选择刚刚上传的依赖,并选择最新版本。参考 pom.xml 文件填写主类,此处填入 com.demos.CEPTest。
3. 运行作业
点击【发布草稿】后启动作业,可通过【日志】面板 TaskManager 或 Flink UI 查看运行信息。
4. 模拟数据
通过 Kafka Client 发送数据到 Topic demo6-cep-source
。 发送命令:
[root@VM-3-centos ~]# cd /opt/kafka_2.11-2.4.1/bin/
[root@VM-3-centos bin]# bash kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic demo6-cep-source --producer.config ../config/producer.properties
模拟数据示例:
代码语言:json复制{"symbol": "ACME", "rowtime": "2020-12-11 10:00:00", "price": 12, "tax":1}
{"symbol": "ACME", "rowtime": "2020-12-11 10:00:01", "price": 9, "tax":2}
{"symbol": "ACME", "rowtime": "2020-12-11 10:00:02", "price": 12, "tax":1}
更多接入方式请参考 CKafka 收发消息 [5]
5. 查看运行结果
在 Topic demo6-cep-dest
中查看收到的数据,得到期望的数据。
总结
- 使用
DataStream
中的 CEP 时,必须实现 POJO 类的equals()
和hashCode()
方法。 因为 Flink CEP 会根据 POJO 类的equals()
和hashCode()
方法进行对象的比较和匹配事件。 - 使用
Table SQL
中的 CEP,请参考 模式检测[6]。 - 打包时无需打包 flink 核心依赖,流计算 Oceanus 平台已提供。
阅读参考
[1] Flink CEP(复杂事件处理): https://nightlies.apache.org/flink/flink-docs-release-1.13/zh/docs/libs/cep/
[2] Oceanus 控制台:https://console.cloud.tencent.com/oceanus/overview
[3] 创建独享集群:https://cloud.tencent.com/document/product/849/48298
[4] CKafka 控制台:https://console.cloud.tencent.com/ckafka/index?rid=1
[5] CKafka 收发消息:https://cloud.tencent.com/document/product/597/54834
[6] 模式检测:https://nightlies.apache.org/flink/flink-docs-release-1.13/zh/docs/dev/table/sql/queries/match_recognize/