导读
在主流互联网产品中,比如搜索和推荐的系统,为了挖掘用户潜在购买需求,缩短用户到商品或信息的距离,提高用户的使用体验,都需要使用大量的特征来刻画用户的行为。在信息安全领域,建立在人工智能技术之上的策略引擎已经深入到了风控产品功能的方方面面,相应的,每一个策略系统都离不开大量的特征,来支撑模型算法或人工规则对请求的精准响应,因此特征系统成为了支持线上风控引擎的重要支柱。
本文以智能风控在线特征系统为原型,重点从线上数据从生产到特征物料提取、计算、存取角度介绍一些实践中的通用技术点,以解决在线特征系统在高并发情形下面临的问题和挑战。
特征系统的基本概念
1. 特征定义
什么是特征?特征是一个客体或一组客体特性的抽象结果。特征是用来描述概念的。任一客体或一组客体都具有众多特性,我们根据客体所共有的特性抽象出某一概念,该概念便成为了特征。因此我们可以理解特征是观察事物的一个角度,它可以是“横看成岭侧成峰”。特征它是一个抽象概念, 为了使抽象的概念可落地、可存储、可量化,结合了我们的业务特性对特征进行了又一次定义:特征 = 维度 时间窗口 计算函数。
举个例子 :“过去15分钟同用户多iP的数量”,那么最终的实际计算结果为特征值,过去15分钟为时间窗口,用户标识为维度,计算函数是针对iP进行去重计算的逻辑。
2. 时间窗口类型
在信息安全领域,黑产为了追求收益,一定会最大程度的将成本最小化。为了保证成本的可控,黑产在攻击时采取的策略是能简单决不复杂,能机器绝不人工,总之就一个目标,完成利益的收割,因此他们一定会利用仅有的资源做一些高频的动作。那么以什么样的周期或者时间窗口来统计这些高频率动作更能反应出实际问题呢?我们在长期的风控治理中结合业界的划分标准归纳了以下四种:
a) 自然窗口期:时间窗口的起点是固定的,但终止时间点一直在向前滚动,比如用户当天累计发帖数量或者消耗类特征的存储。
b) 固定窗口期:时间窗口的起止时间点是固定的,比如每天的某一时间段用户发送消息数量,主要针对特定时间用户的处罚、灌水的限制等。
c) 滑动窗口期:时间窗口的长度是固定的,但起止时间点一直在向前滚动,主要针对风控事中检测,常用来判读信息准入,例如风控发帖时间点前15分钟的计数。
d)Session窗口期:以第一个事件开始,依次向后滚动计算,直到超出一个session窗口期时间重新开始,主要针对控频,UV统计等。
图1
如图1所示,相同的维度,相同的计算函数,不同的时间窗口类型得到的特征值及其反应的业务含义都会有一定的差别。
3. 计算函数类型
特征的计算有繁有简,复杂多变。回到业务需求,我们的目的是通过特征生产系统来简化开发工作量,而非完全取代特征开发。因此我们选择一部分常见的函数计算类型,实现自动化生产。对于更复杂的特征计算,提供了特征更新接口支持第三方应用的对接。总结常见的计算类型主要有以下几种。
a) 求和(SUM),对窗口期内的数据进行求和;
b) 计数(COUNT),对窗口期内的数据进行计数统计;
c) 去重计数(COUNT_DISTINCT),对窗口期内的指定字段去除重复量后统计;
d) 明细(LIST),返回窗口期内最新的前5000条明细数据;
e) 最大值(MAX),计算出窗口期内的最大值;
f)最小值(MIN),计算出窗口期内的最小值;
g) 平均数(AVG),对窗口期内的数进行均值计算。
早期特征系统技术实现方案
早期特征系统主要以离线的方式为主,在数据仓库中特征表主要依靠数据分析师、算法工程师以及策略运营等同学建立特征需求由数据工程师排期开发,同时数据工程师还需要开发ETL调度任务,每天定时将数据同步到相应的Hbase表中,通过统一的服务接口为线上风控策略提供支持。
图2
早期技术架构如图2所示,但是随着业务量的不断扩张,现有的技术架构已不能满足日益增长的业务需求,主要体现在以下两点:
a) 无论是业务的创新速度还是对数据需求变化的速度都要远远超过数据工程师对特征开发的速度;
b) 因为风控存在对抗性,因此用户近几分钟、近几秒的行为信息往往比很多离线特征更具有价值,在线实时特征必然会在策略系统中发挥越来越重要的作用。
在线实时特征系统设计与实践,对从整体功能上来讲,在线实时特征系统的设计主要考虑以下几个方面:
a) 数据大,风控系统每天产生日志量3TB左右,同时特征系统还会接入发布、浏览、登录、注册、聊天等数据。很多情况下同一份数据需要提取不同维度、不同指标的特征,待处理的数量还会倍增。因此每天需要解析及计算的数量巨大。
b) 时效性高,面对庞大的数据量级,数据的处理实效性要求是秒级别,同时不能产生数据堆积的情况。
c) 并发大,风控策略系统面向用户端,服务端峰值QPS超过35万,每日调用量超过200亿次。
d) 延迟低,面对用户的请求,风控系统为了保持良好的用户体验,更快的完成对用户准入条件的判断,要求特征系统接口的延迟在50ms以内。
实现一个简化版本特征系统可能只需要几人日就可以完成,但是带着以上几个问题的同时还需要考虑在复杂的业务场景中应用,兼顾用户的灵活配置、稳定的提供服务等情况下,却需要一个团队长期的业务积累和技术沉淀。
图3
图3为在线实时特征系统的概貌,自底向上为数据流动的方向,各部分的功能如下:
a) 数据源:线上系统产生的数据,经过加工采集离线部分流入到离线数据仓库(Hive),实时数据源主要会推送到Kafka。
b) 物料提取:根据中控台配置对原始数据进行解析,相应的维度提取,数据流削峰后流入计算层。
c) 特征计算:该部分主要提供计算框架,生产特征。
d) 特征存储:该部分提供在线特征存、取能力,直接为上层应用提供统一的服务接口。
e) 特征应用:线上风控、预警等,线下模型训练样本向量化。
特征生产的生命周期可以抽象为提、算、存、用四个步骤,作为在线特征系统的一体化解决方案。下文主要围绕特征系统的核心功能在开发过程中遇到的问题及解决办法和一些通用的实践经验等展开介绍,如数据字典建设、分布式系统设计、在线特征计算框架、低延迟计算等主题会在下面文章中做详细介绍。
1. 可灵活配置的特征系统
构建在线的实时特征系统的主要目的之一就是“提效”,因此至少90%以上的特征计算由日常运营配置产出。那么让运营人员在日常工作中产生的特征可配置的难点在于处理消息队列中的实时数据无法获取元数据及字段说明,在运营人员对日志又不是十分了解的情况下手动录入字段出错率很高。
为了解决消息队列数据无法获取元数据问题,我们基于离线数据仓库构建了“数据字典”,主要方案是定义了日志打印标准,统一使用Json记录日志。日志采集统一到Kafka中,其中Kafka有一个数据仓库的消费者,将数据写入数据仓库中。当数据导入数据仓库时,我们记录了下字段名称、字段更新时间,是否在扩展字段,通过Hive还可以获取到字段的备注内容等。
另外还有一些字段需要二次解析、变形、转置之后才能使用,但是又不能每次需要解析时而进行重新发版上线,因此这里使用Groovy通过闭包的方式,把一些需要变换的逻辑抽象成一个一个的解析函数。
图4
如图4所示,在线上的应用场景中,同一个数据源一定也会生产出多个特征,那么这些特征也会使用各种Groovy解析函数。在使用这些解析函数时,可以把这些待处理的特征按照Groovy解析函数来排序,相同的解析函数直接使用上次解析的结果,从而避免重复加载而降低Cpu的资源开销。
2. 大规模数据特征提取
大规模数据直接会导致系统的并发量上升,同时也会对系统的吞吐量有较高的要求。当我们在解决高并发、高吞吐量时最直接有效的办法就是增加机器资源,没有之一。
图5
关于特征提取,正如图5所示,针对同一个Topic的每个分区,我们都会有一个对应的节点来消费,这样可以达到最大的并行处理速度。但是面对业务的增长,一个重度使用的数据源可能会慢慢的积累几百个特征配置,那么这个数据源的每条数据也需要重复处理几百次,因此这个数据源的Topic分区对应消费者节点的Cpu使用率也跟着直线上升,当Cpu使用率达到100%时就会消费延迟,分区数据积压现象。
在排查分析原因是,根据一个节点会同时消费多个topic的其中一个分区,找了一个满载节点粗略算了一下,数量大约在2W/s,当前这个数据源配置了600个特征,那么当前节点每秒需要处理1200W个特征物料,因此结论就是数据太大机器负载过高,在单位时间内处理不完了。
我们都知道,Kafka Topic的分区数量决定了消费者并行度,因此最容易想到的解决方法就是扩分区,要不就是增大单节点内核。但是这里会出现一个问题,业务会增长导致特征数量也一定会再增长,而分区和内核数量却都有上限,因此这种方案只是换汤不换药。
针对以上问题解决办法主要引进了分布式设计的思路,将节点划分为数据拉取节点(Spout)和数据处理节点(Worker),Spout会消费Kafka中的数据然后将数据序列化后发送到Worker。这么做的目的是可以让同一个分区的数据分散到不同的Worker节点处理,通过支持横向扩展的方式使服务的整体可靠性和扩展性的到了提升。
图6
使用了分布式系统设计就需要考虑它的容错机制,Kafka和使用的SCF框架本身具备容错机制,但是以下两点需要格外注意:
a) 在网络繁忙或Worker节点负载过高时可能会导致Spout发送数据失败,这时需要Spout具备故障自动转移和负载轮询功能。
b) 当数据到达Worker节点,Worker节点处理数据可能会失败,也可能宕机。这时Spout会封装Offset、iP、md5check为一个Tuple,Spout首先会将Tuple推送到延迟队列,延迟时间为特征配置的Timeout,然后向Worker节点发送序列化的Tuple。数据在Worker节点处理完成后会通过RPC调用Spout的ack方法,Spout会将当前消息从延迟队列移除,否则延迟队列会将消息发送回Spout让其重新向Worker发送数据。
3. 在线特征计算框架
我们前面提到过特征的定义,那么计算特征值其实就是计算当前维度下单位时间内按照指定计算函数计算出来的值,因此相同维度的指标计算只需要考虑时间窗口和计算函数。我们在框架的设计上也考虑到了不同时间窗口的实现方式应该尽量跟计算函数解耦,可以抽象出各自的处理方式。根据现有的窗口类型和计算函数的组合,一共可以支持以下28种常见的特征计算。
自然窗口期 | 固定窗口期 | 滑动窗口期 | Session窗口期 | |
---|---|---|---|---|
SUM | 累加器 | 累加器 | 延迟队列 | 累加器 |
COUNT | 累加器 | 累加器 | 延迟队列 | 累加器 |
AVG | 累加器 | 累加器 | 延迟队列 | 累加器 |
MAX | 对比器 | 对比器 | 顺序队列 | 对比器 |
MIN | 对比器 | 对比器 | 顺序队列 | 对比器 |
COUNT_DISTINCT | 集合 | 集合 | 顺序队列 | 集合 |
LIST | 列表 | 列表 | 顺序队列 | 列表 |
对于在线特征计算框架核心计算逻辑主要由以下几种算子实现:
a) 累加器:在Redis中维护最新的计算值,当产生新数据时进行累加操作,同时重置过期时间。过期时间可以根据窗口类型与当前时间准运算出Redis Key的到期时间。
b) 对比器:和累加器类似,区别在新产生的值和最大小值对比,在Redis中始终维护最大值和最小值。
c) 延迟队列:迟队列的作用是可以将数据延迟指定时间后重新发送回计算框架,当产生新数据时,会使用累加器加和到特征值,同时将明细数据发送到延迟队列。当计算框架收到延迟队列返回的数据时,会使用累加器加和对应的负值。
d) 顺序队列:在队列中维护一份明细数据, 队列的原则是先进者先出,不允许插队。
当产生新数据需要入队时会有三个步骤:1)将当前数据放到队列尾部,同时用时间戳作为当前数据的下标;2)检查队列头部过期数据让其出队;3)计算队列中的数据。
e) 集合:顾名思义,就是在Redis中维护一个集合,当有新数据产生时存入集合中后计算特征值。
f) 列表:实现了一个缓存功能,将产生的数据原封不动的存储在一个列表中,返回的值类型是一个List,其他算子返回的是一个dobule类型值。
累加器 | 对比器 | 延迟队列 | 顺序队列 | 集合 | 列表 | |
---|---|---|---|---|---|---|
毫秒 | TC | TC | TC | TC | TC | TC |
秒 | Spark Streaming | Spark Streaming | TC | TC | Spark Streaming | Spark Streaming |
分钟 | Spark Streaming | Spark Streaming | TC Spark Streaming | TC Spark Streaming | TC Spark Streaming | TC Spark Streaming |
小时 | Spark | Spark | TC | TC | TC | TC |
天 | MapReduce | MapReduce | MapReduce | MapReduce | MapReduce | MapReduce |
在线特征计算框架如果采用统一的工具暴力计算会耗费大量的存储计算等资源,因此在计算框架的算子开发过程中,我们也按照不同的逻辑选择了不同的开发工具,比如使用MapReduce解决天级别以上的高吞吐量计算,使用Spark Streaming做实时计算。想必我们的开发者对Spark Streaming的计算窗口、滑动步长等概念和它的一些其他特性都非常了解,开发起来也比较顺手。但是针对在线的实时计算框架除了使用Spark Streaming之外还自己开发了一个计算模块(TitanCounter简称TC),TC主要实现了文中提到的累加器、延迟队列、顺序队列等计算功能。
图7
为什么还要自己开发一个计算模块呢?如图7所示,这里有个时间轴,我的计算窗口是1小时,滑动步长是15分钟,那么使用SaprkStreaming将会每隔15分钟计算1次最近1小时的值。如果有一个特征查询时间点是10:10,那么我们当前系统只存储了10:00的特征值,10:15特征值还没有计算出来。因此对时间特别敏感的特征应该采用TC的方式计算,图8为TC设计的核心流程。
图8
4. 低延时存储设计
a) 资源隔离
考虑到特征的存取延时要求极低,因此底层使用Redis分片集群。同时业务上有大有小、有核心业务也有一般业务,所以在分片集群上构建了一个资源隔离层,目的就是让不同的场景特征可以互不影响,同时还可以解决当Redis分片达到上限时仍然可以通过场景的方式扩容。
b) 镜像快照
在资源隔离层下还构建了一个快照场景,快照场景主要是将Redis中的特征值镜像到快照场景中,快照场景底层使用Hbase存储。当有离线模型需要训练时,快照场景可以为历史样本提供秒级特征补全,这样可以对已完成人工审核的样本数据重复利用而避免浪费人力重复审核样本。
c) 极限存储
海量数据不断的加载到线上系统并在系统间流转,对内存、网络带宽等资源都是不小的开销。比如一个特征是“最近12小时同用户不同帖子内容数”,帖子内容本身可以很大,恰巧如果有用户在疯狂灌水,会导致队列浪费大量资源。因此字符长度在超过固定长度后将会使用字符的md5值参与存储和运算。还有一点就是针对队列设定上限,如果当前风控策略设置不同帖子数量大于10将会对其做出处罚,那么当前特征计算的值达到11时就已经完成了它的使命。因此为了节省线上宝贵的存储资源,队列的裁剪不能完全依靠过期时间,还需要设定上限。
总结和规划
本文主要以智能风控在线特征系统为原型,提出了在线特征系统的一些设计思路。其中特征工程系统的边界并不限于特征的解析、计算、存取等。我们也经常会遇到像计算“截止到当前时刻最近n天用户累计发送消息数量”等类似的特征,显着这个特征最佳办法是使用两个特征组合(离线计算n天、实时的自然窗口期特征)更能够有效的利用资源、还有诸如跟据特征值的结果做一个(类似A、B、C、D)等级划分等,像特征的组合、变形、调度等都可看作为特征系统的一部分延伸和扩展。同时我们的特征系统也在需求与挑战中不断演进,也在试图去构建特征工程与知识谱的融合。因为在信息安全领域,挖掘用户关系的关联性是未来的趋势,只有构建多元话信息集成才能将潜在风险识别出去。
作者简介:
李文学:2017年3月加入58, 资深数据开发工程师, 目前担任信息安全部数据方向负责人,专注于大数据应用架构。
代码语言:javascript复制热门推荐?1. 300页!阿里《大数据工程师 必读手册》限时免费下载2. 一个CEO的忠告:你那么牛逼,怎么还是打工人3. 蚂蚁集团暂缓上市!人民财评:没有所谓的马云时代,只有时代中的马云4. 架构之道:大道至简5.程序员,为什么不建议你写框架