导读:继Wormhole的设计思想介绍和功能介绍之后,相信大家对Wormhole已经有了初步的了解。2018年7月31日,我们发布了Wormhole_0.5新版本,与以往基于Spark计算引擎的版本相比,该版本新增了基于Flink计算引擎的流式处理功能,主要关注低延迟和CEP。基于Flink计算引擎版本具体内容是什么呢?还请各位看官移步正文~
一、Wormhole Flink版介绍
延迟时间是评判流式处理性能的关键指标之一。Spark 基于弹性分布式数据集(Resilient Distributed Dataset,RDD)进行微批处理,所以Spark在流式处理方面,不可避免会存在一些延时,只支持秒级延迟。Flink基于事件处理,实现了真正的流式计算。与基于Spark的流式处理相比,它的延迟更低。Wormhole通过对Flink计算引擎的支持,将延迟降低到毫秒级。
Wormhole Flink版除了支持Flink SQL,Lookup SQL,新增了对CEP的支持,并且支持三者的混合编排,即一个Flow中可以包含多个Flink SQL,多个Lookup SQL和多个CEP。Flink SQL与Spark SQL用法类似,Spark SQL和Lookup SQL在上一篇Wormhole系列文章中已经介绍过,这里将不再赘述,下面我们将重点讲解CEP。
二、CEP(复杂事件处理)简介
在传统DBMS中,所有的操作都只能在数据落库之后才能进行,这极大地降低了事件处理的实时性。与传统DBMS不同,CEP从流式事件中查找匹配指定模式的事件,对流式事件边获取边处理,整个处理过程都在数据流中进行,无需落地,因此它拥有更低的延迟,即所有输入都将被立刻处理,一旦在流式事件中发现了匹配指定模式的事件集,结果就会立即输出。
正因如此,CEP引起了广泛的关注,并得到了大量的应用推广,主要体现在运营和运维两方面。在运营方面,CEP经常被应用于金融产品中,例如,股票市场趋势预测、信用卡诈骗预防等。在运维方面,CEP被用在基于RFID的追踪和监控系统中,例如,检测库房中失窃的物品。当然,CEP也能通过指定嫌疑人的行为,来检测网络入侵。
Wormhole CEP是基于Flink CEP实现的,并且提供了可视化操作界面,无需编码即可快速实现业务需求。Wormhole CEP引入了窗口时间(Window Time),窗口策略(Strategy),分组策略(KeyBy),输出格式(Output),筛选规则(Pattern)等概念。下面,我们逐一介绍这些概念的含义及用途。
Window Time:指在触发了符合Begin Pattern的事件记录后的窗口时间,如果watermark的time超过了触发时间 窗口时间,本次pattern结束;
Strategy:包含NO_SKIP和SKIP_PAST_LAST_EVENT两种策略,前者对应事件滑动策略,后者对应事件滚动策略,具体区别可以借鉴下面的例子:
事件滑动:
a1 a2 a3 a4 .......
a2 a3 a4 a5 ........
a3 a4 a5 a6 ........
事件滚动:
a1 a2 a3 a4 ........
a5 a6 a7 a8 ........
a9 a10 a11 a12......
KeyBy:指依据事件中的哪个字段来做分区。例如,现在有一条数据,它的schema包括ums_id_, ums_op_, ums_ts_, value1, value2等几个字段,这里选定value1来做分区的依赖字段,那么,与value1字段相同的数据将被分配到同一个分组上。CEP操作将分别针对每一分组的数据进行处理,KeyBy可以作用在多个字段上。
Output:输出结果的形式,分为三类:Agg、Detail、FilteredRow
- Agg:将匹配的多条数据做聚合,生成一条数据输出
例:field1:avg,field2:max(目前支持max/min/avg/sum)
- Detail:将匹配的多条数据逐一输出
- FilteredRow:按条件选择指定的一条数据输出
例:head/last/field1:min/max
Pattern:筛选规则。每个CEP由若干个Pattern组成。
每个Pattern包括以下三个概念:
- Operator:操作算子。CEP中的第一个Pattern Operator只能为begin,其后的每个Pattern Operator只能为next、followedBy、notNext、notFollowedBy四种类型中的一种,其含义分别为:
✔ next:会追加一个新的Pattern对象到既有的Pattern之后,它表示当前模式运算符所匹配的事件必须是严格紧邻的,这意味着两个被匹配的事件必须是前后紧邻,中间没有其他元素;
✔ followedBy:会追加一个新的Pattern到既有的Pattern之后(其实返回的是一个FollowedByPattern对象,它是Pattern的派生类),它表示当前运算符所匹配的事件不必严格紧邻,这意味着匹配的两个事件之间允许穿插其他事件;
✔ notNext:增加一个新的否定模式。匹配(否定)事件必须直接输出先前的匹配事件(严格紧邻),以便将部分匹配丢弃;
✔ notFollowedBy:会丢弃或者跳过已匹配的事件(注:notFollowedBy不能为最后一个Pattern)。
- Quantifier:用来指定满足某一pattern的事件数量。目前配置包括:一条及以上,指定条数,指定条数及以上;这里需要特殊说明的是,notNext、notFollowedBy这两种Operator无法设置Quantifier;
- Conditions:判断条件。用户可以针对事件的某个或多个属性设置判断条件,例如,可以设置只有符合value1 like a and value2 >=10的事件才是符合条件的事件。
三、Wormhole CEP应用场景
场景一:网络DDOS攻击警告
Wormhole CEP在日常运维中被广泛应用。下面以运维中会遇到的一类情况为例,来介绍如何使用Wormhole CEP。
DDOS攻击是日常运维中经常遇到的一类问题,CEP正好可以用来对DDOS攻击进行预警。
DDOS攻击的判断规则如下:
- 正常:流量在预设的正常范围内;
- 警告:某数据中心在10秒内连续2次上报的流量超过认定的正常值;
- 报警:某数据中心在30秒内连续2次匹配警告;
- 通知:报警后需要短信/邮件通知相关人员。
通过上述规则,DDOS攻击的判断依据可以被量化为流量超出事件在一定时间内多次产生。只要符合条件,客户请求就可以被认定为DDOS攻击。针对符合条件的事件,Wormhole会向Kafka传入报警消息,并由业务系统去Kafka中消费报警消息,从而进行相应的后续处理。
图1 kafka业务系统消费示意图
下面,结合一个具体的操作例子来说明Wormhole CEP是如何检测DDOS攻击的。
首先,针对警告规则,设置一个窗口时间为10秒,次数为2次,判断条件为流量超过45(GB)的CEP,作为第一个CEP,并将事件发生时间,以及次数1作为中间结果进行输出;
图2 设置警告CEP
然后,针对报警规则,再设置一个窗口为30秒,判断条件为警告事件发生次数为2次作为第二个CEP。针对符合条件的事件,向Kafka中传入报警消息,否则,不做任何操作。
图3 设置报警CEP
最终,设置完两个CEP之后,它们将对所有流上事件进行叠加过滤,并针对符合条件的事件,向Kafka写入报警消息,从而,协助各个数据中心预防DDOS攻击。
图4 CEP列表
场景二:电商业务人工外呼通知
Wormhole CEP在运营中也起到了重要作用,比如在电商平台中,客户填写提交订单后,由于某些原因长时间未付款,这时需要人工介入处理,如给客户打电话进行回访,从而了解客户情况,提高业务成交量及服务质量。下面以此业务场景为例,介绍如何通过Wormhole CEP来实现此类业务需求。
这里将购物步骤简化为两步,第一步提交订单,第二步付款。若某一客户在提交订单后,5min内未付款,则平台通知工作人员联系客户。假设事件流不断流入Kafka中,事件中userid字段代表客户ID;state字段代表订单状态(s1是“已提交”,s2是“已付款”)。通过CEP很容易实现上述需求,首先设置第一个Pattern,用来匹配某客户提交订单事件,即state=s1;然后设置第二个Pattern,用来匹配该客户未付款事件,即相邻的事件中state=s2未发生。满足两个规则数据即满足需要人工外呼条件,这时系统发消息通知工作人员联系该客户。
图5 电商平台数据处理示意图
针对需求,需要设置一个300s(即5min)的窗口,然后按照userid分组,对每个客户分别进行匹配。
图6 CEP基本配置
第一个pattern为客户已提交订单。
图7 Pattern Begin
第二个Pattern为客户未付款。
图8 Pattern notNext
最终,该CEP将对所有流上事件进行过滤,并针对符合条件的事件,将数据发送到Kafka,人工外呼系统根据此数据触发相关业务流程。
总的来说,Wormhole_v0.5主要是针对Flink实现了流式处理,关注点是低延迟和CEP。目前版本处理支持Flink SQL,Lookup SQL,CEP,并且支持三者的混合编排。同时,新增Spark Stream支持配置用户自定义Topic,可直接对接DBus独立拉全量功能。后续我们会尽快支持ums_extension(目前支持ums)、异构sink(目前支持Kafka)、udf等功能。欢迎大家持续关注!
作者:赵平
来源:宜信技术学院