作者:霖雾,携程数据开发工程师,关注图数据库等领域。
0. 背景
2017 年 9 月携程金融成立,在金融和风控业务中,有多种场景需要对图关系网络进行分析和实时查询,传统关系型数据库难以保证此类场景下的关联性能,且实现复杂性高,离线关联耗时过长,因此对图数据库的需求日益增加。携程金融从 2020 年开始引入大规模图存储和图计算技术,基于 NebulaGraph 构建了千亿级节点的图存储和分析平台,并取得了一些实际应用成果。
本文主要分享 NebulaGraph 在携程金融的实践,希望能带给大家一些实践启发。
本文主要从以下几个部分进行分析:
- 图基础介绍
- 图平台建设
- 内部应用案例分析
- 痛点与优化
- 总结规划
1. 图基础
首先我们来简单介绍下图相关的概念:
1.1 什么是图
在计算机科学中,图就是一些顶点的集合,这些顶点通过一系列边结对(连接)。比如我们用一个图表示社交网络,每一个人就是一个顶点,互相认识的人之间通过边联系。
在图数据库中,我们使用(起点,边类型,rank,终点)表示一条边。起点和终点比较好理解,表示一条边两个顶点的出入方向。边类型则是用于区分异构图的不同边,如我关注了你,我向你转账,关注和转账就是两种不同种类的边。而 rank 是用来区分同起始点同终点的不同边,如 A 对 B 的多次转账记录,起点、终点、边类型是完全相同的,因此就需要如时间戳作为 rank 来区分不同的边。
同时,点边均可具有属性,如:A 的手机号、银行卡、身份证号、籍贯等信息均可作为 A 的点属性存在,A 对 B 转账这条边,也可以具有属性,如转账金额,转账地点等边属性。
1.2 什么时候用图
(信息收集于开源社区、公开技术博客、文章、视频)
1.2.1 金融风控
- 诈骗电话的特征提取,如不在三步社交邻居圈内,被大量拒接等特征。实时识别拦截。(银行 / 网警等)
- 转账实时拦截(银行 / 支付宝等)
- 实时欺诈检测,羊毛党的识别(电商)
- 黑产群体识别,借贷记录良好用户关联,为用户提供更高额贷款、增加营收
1.2.2 股权穿透
影子集团、集团客户多层交叉持股、股权层层嵌套复杂关系的识别(天眼查 / 企查查)
1.2.3 数据血缘
在数据仓库开发过程中, 会因为数据跨表关联产生大量的中间表,使用图可直接根据关系模型表示出数据加工过程和数据流向,以及在依赖任务问题时快速定位上下游。
1.2.4 知识图谱
构建行业知识图谱。
1.2.5 泛安全
IP 关系等黑客攻击场景,计算机进程与线程等安全管理。
1.2.6 社交推荐
- 好友推荐,行为相似性,咨询传播路径,可能认识的人,大V 粉丝共同关注,共同阅读文章等,商品相似性,实现好友商品或者咨询的精准推荐;
- 通过对用户画像、好友关系等,进行用户分群、实现用户群体精准管理;
1.2.7 代码依赖分析
分析代码依赖关系。
1.2.8 供应链上下游分析
如:汽车供应链上下游可涉及上万零件及供应商,分析某些零件成本上涨 / 供应商单一 / 库存少等多维度的影响。(捷豹)
1.3 谁在研发图,谁在使用图
(信息收集于开源社区、公开技术博客、文章、视频)
目前国内几家大公司都有各自研发的图数据库,主要满足内部应用的需求,大多数都是闭源的,开源的仅有百度的 HugeGraph。其他比较优秀的开源产品有 Google Dgraph,vesoft 的 NebulaGraph 等,其中 NebulaGraph 在国内互联网公司应用非常广泛。结合我们的应用场景,以及外部公开的测试和内部压测,我们最终选择 NebulaGraph 构建金融图平台。
2. 图平台建设
2.1 图平台建设
我们的图平台早期只有 1 个 3 节点的 Nebula 集群。随着图应用场景的不断扩充,需要满足实时检索、离线分析、数据同步与校验等功能,最终演化成上述架构图。
2.1.1 离线图
主要用于图构建阶段(建模、图算法分析),通过 spark-connector 同集团的大数据平台打通,此外我们还将 NebulaGraph 提供的数 10 种常用图算法进行工具化包装,方便图分析人员在 Spark 集群提交图算法作业。
2.1.2 线上图
经过离线图分析确定最终建模后,会通过 spark-connector 将数据导入线上图。通过对接 qmq 消息(集团内部的消息框架)实时更新,对外提供实时检索服务。同时也会有 T 1 的 HIVE 增量数据通过 spark-connector 按天写入。
2.1.3 全量校验
虽然 NebulaGraph 通过 TOSS 保证了正反边的插入一致性,但仍不支持事务,随着数据持续更新,实时图和离线(HIVE 数据)可能会存在不一致的情况,因此我们需要定期进行全量数据的校验(把图读取到 Hive,和 Hive 表存储的图数据进行比对,找出差异、修复),保证数据的最终一致性。
2.1.4 集群规模
为了满足千亿节点的图业务需求,实时集群采用三台独立部署的高性能机器,每台机器 64 core / 320 GB / 12 TB SSD ,版本为 Nebula v2.5,跨机房部署。离线集群 64 core / 320 GB / 3.6 TB SSD 12 ,测试集群 48 core / 188 GB / 5T HDD 4.
2.2 遇到的问题
在 NebulaGraph 应用过程中,也发现一些问题,期待逐步完善:
- 资源隔离问题,目前 Nebula 没有资源分组隔离功能,不同业务会相互影响;如业务图 A 在导数据,业务图 B 线上延迟就非常高。
- 版本升级问题:
- NebulaGraph 在版本升级过程中需要停止服务,无法实现热更新;对于类似实时风控等对可靠性要求非常高的场景非常不友好。此种情况下如需保证在线升级,就需要配备主备集群,每个集群切量后挨个升级,增加服务复杂性和运维成本。
- 客户端不兼容,客户端需要跟着服务端一起升级版本。对于已有多个应用使用的 Nebula 集群,想要协调各应用方同时升级客户端是比较困难的。
3. 内部应用案例分析
3.1 数据血缘图
数据治理是近年来比较热的一个话题,他是解决数仓无序膨胀的有效手段,其中数据血缘是数据有效治理的重要依据,携程金融借助 NebulaGraph 构建了数据血缘图,以支撑数据治理的系统建设。
数据血缘就是数据产生的链路,记录数据加工的流向,经过了哪些过程和阶段;主要解决 ETL 过程中可能产出几十甚至几百个中间表导致的复杂表关系,借用数据血缘可以清晰地记录数据源头到最终数据的生成过程。
图 a 是数据血缘的关系图,采用库名 表名作为图的顶点来保证点的唯一性,点属性则是分开的库名和表名,以便通过库名或者表名进行属性查询。在两张表之间会建立一条边,边的属性主要存放任务的产生运行情况,比如说:任务开始时间,结束时间、用户 ID 等等同任务相关的信息。
图 b 是实际查询中的一张关系图,箭头的方向表示了表的加工方向,通过上游或者下游表我们可以快速地找到它的依赖,清晰明了地显示从上游到下游的每一个链路。
如果要表达复杂的血缘依赖关系图,通过传统的关系型数据库需要复杂的 SQL 实现(循环嵌套),性能也比较差,而通过图数据库实现,则可直接按数据依赖关系存储,读取也快于传统 DB,非常简洁。目前,数据血缘也是携程金融在图数据库上的一个经典应用。
3.2 风控关系人图
关系人图常用于欺诈识别等场景,它是通过 ID、设备、手机标识以及其他介质信息关联不同用户的关系网络。比如说,用户 A 和用户 B 共享一个 Wi-Fi,他们便是局域网下的关系人;用户 C 和用户 D 相互下过单,他们便是下单关系人。简言之,系统通过多种维度的数据关联不同的用户,这便是关系人图。
构建模型时,通常要查询某个时点(比如欺诈事件发生前)的关系图,对当时的图进行模型抽取和特征构建,我们称这个过程为图回溯。随着回溯时间点的不同,返回的图数据也是动态变化的;比如某人上午,下午各自打了一通电话,需要回溯此人中午时间点时的图关系,只会出现上午的电话记录,具体到图,则每类边都具有此类时间特性,每一次查询都需要对时间进行限制。
对于图回溯场景,最初我们尝试通过 HIVE SQL 实现,发现对于二阶及以上的图回溯,SQL 表达会非常复杂,而且性能不可接受(比如二阶回溯 Hive 需要跑数小时,三阶回溯 Hive 几乎不能实现);因此尝试借助图数据库来实现,把时间作为边 rank 进行建模,再根据边关系进行筛选来实现回溯。这种回溯方式更直观、简洁,使用简单的 API 即可完成,在性能上相比 Hive 也有 1 个数量级以上的提升(二阶回溯,图节点:百亿级,待回溯节点:10 万级)。
下面用一个例子说明:如图(a),点 A 分别在 t0、t1、t2 时刻建立了一条边,t0、t1、t2为边 rank 值,需要返回 tx 时的的图关系数据,只能返回 t0、t1 对应的点 B、C,因为当回溯到 tx 时间点时候,t2 还没有发生;最终返回的图关系为 t0 和 t1 时候,VertexA ->VertexB
、VertexA -> VertexC
(见图(c))。这个例子是用一种边进行回溯,实际查询中可能会涉及到 2~3 跳,且存在异构边(打电话是一种边,点外卖又是一种边,下单酒店机票是一种边,都是不同类型的边),而这种异构图的数据都具有回溯特征,因此实际的关系人图回溯查询也会变得复杂。
3.3 实时反欺诈图
用户下单时,会进入一个快速风控的阶段:通过基于关系型数据库和图数据库的规则进行模型特征计算,来判断这个用户是不是风险用户,要不要对该用户进行下单拦截(实时反欺诈)。
我们可以根据图关系配合模型规则,用来挖掘欺诈团伙。比如说,已知某个 uid 是犯欺团伙的一员,根据图关联来判断跟他关系紧密的用户是不是存在欺诈行为。为了避免影响正常用户的下单流程,风控阶段需要快速响应,因此对图查询的性能要求非常高(P95 < 15 ms)。我们基于 NebulaGraph 构建了百亿级的反欺诈图,在查询性能的优化方面进行了较多思考。
此图 Schema 为脱敏过后的部分图模型,当中隐藏很多建模信息。这里简单讲解下部分的查询流程和关联信息。
如上图为一次图查询流程,每一次图查询由多个起始点如用户 uid、用户 mobile 等用户信息同时开始,每条线为一次关联查询,因此一次图查询由几十次点边查询组成,由起始点经过一跳查询和 2 跳查询,最终将结果集返回给风控引擎。
系统会将用户的信息,转化为该用户的标签。在图查询的时候,根据这些标签,如 uid、mobile 进行独立查询。举个例子,根据某个 uid 进行一跳查询,查询出它关联的 5 个手机号。再根据这 5 个手机号进行独立的 2 跳查询,可能会出来 25 个 uid,查询会存在数据膨胀的情况。因此,系统会做一个查询限制。去查看这 5 个手机号关联的 uid 是不是超过了系统设定的热点值。如果说通过 mobile 查询出来关联的手机号、uid 过多的话,系统就会判断其为热点数据,不进行边结果返回。(二阶/三阶回溯,图点边:百亿级)。
4.1 痛点及优化
在上述应用场景中,对于风控关系人图和反欺诈图,由于图规模比较大(百亿点边),查询较多,且对时延要求较高,遇到了一些典型问题,接下来简单介绍一下。
4.1.1 查询性能问题
为了满足实时场景 2 跳查询 P95 15 ms 需求,我们针对图 Schema 和连接池以及查询端做了一些优化:
4.1.2 牺牲写性能换取读性能
首先,我们来看看这样的一个需求:查询 ID 关联的手机号,需要满足对于这个手机号关联边不超过 3 个。这里解释下为什么要限制关联边数量,因为我们正常个体关联边数量是有限的,会有一个对于大多数人的 P95 这样的阈值边数量,超过这个阈值就是脏数据。为了这个阈值校验, 就需要对每次查询的结果再多查询一跳。
如图(a)所示,我们需要进行 2 次查询,第一跳查询是为了查询用户 ID 关联的手机号,第二跳查询是为了保证我们的结果值是合法的(阈值内),这样每跳查询最终需要进行 2 跳查询来满足。如图给出了图查询的 nGQL 2 步伪码,这种情况下无法满足我们的高时效性。如何优化呢?看下图(b) :
我们可以将热点查询固定在点属性上,这样一跳查询时就可以知道该点有多少关联边,避免进行图 a 中(2)语句验证。还是以图 (a)为例,从一个用户 ID 开始查询,查询他的手机号关联,此时因为手机号关联的边已经变成了点属性(修改了 schema),图(a) 2 条查询语句实现的功能就可以变成一条查询 go from $id over $edgeName where $手机号.用户id边数据 <5 | limit 5
。
这种设计的好处就是,在读的时候可以加速验证过程,节约了一跳查询。带来的成本是:每写一条边,同时需要更新 2 个点属性来记录点的关联边情况,而且需要保证幂等(保证重复提交不会叠加属性 1)。当插入一条边的时,先去图里面查询边是否存在,不存在才会进行写边以及点属性 1 的操作。也就是我们牺牲了写性能,来换取读性能,并通过定期 check 保证数据一致。
4.1.3 池化连接降低时延
第二个优化手段是通过池化连接降低时延。Nebula 官方连接池每次进行查询均需要进行建立初始化连接-执行查询任务-关闭连接。而在高频(QPS 会达到几千)的查询场景中,频繁的创建、关闭连接非常影响系统的性能和稳定性。且建立连接过程耗时平均需要 6 ms, 比实际查询时长 1.5 ms 左右高出几倍,这是不可接受的。因此我们对官方客户端进行了二次封装,实现连接的复用和共享。最后,将查询 P95 从 20 ms 降低到了 4 ms。通过合理控制并发,我们最终将 2 跳查询性能控制在 P95 15 ms 。
这里贴下代码供参考:
代码语言:java复制public class SessionPool {
/**
* 创建连接池
*
* @param maxCountSession 默认创建连接数
* @param minCountSession 最大创建连接数
* @param hostAndPort 机器端口列表
* @param userName 用户名
* @param passWord 密码
* @throws UnknownHostException
* @throws NotValidConnectionException
* @throws IOErrorException
* @throws AuthFailedException
*/
public SessionPool(int maxCountSession, int minCountSession, String hostAndPort, String userName, String passWord) throws UnknownHostException, NotValidConnectionException, IOErrorException, AuthFailedException {
this.minCountSession = minCountSession;
this.maxCountSession = maxCountSession;
this.userName = userName;
this.passWord = passWord;
this.queue = new LinkedBlockingQueue<>(minCountSession);
this.pool = this.initGraphClient(hostAndPort, maxCountSession, minCountSession);
initSession();
}
public Session borrow() {
Session se = queue.poll();
if (se != null) {
return se;
}
try {
return this.pool.getSession(userName, passWord, true);
} catch (Exception e) {
log.error("execute borrow session fail, detail: ", e);
throw new RuntimeException(e);
}
}
public void release(Session se) {
if (se != null) {
boolean success = queue.offer(se);
if (!success) {
se.release();
}
}
}
public void close() {
this.pool.close();
}
private void initSession() throws NotValidConnectionException, IOErrorException, AuthFailedException {
for (int i = 0; i < minCountSession; i ) {
queue.offer(this.pool.getSession(userName, passWord, true));
}
}
private NebulaPool initGraphClient(String hostAndPort, int maxConnSize, int minCount) throws UnknownHostException {
List<HostAddress> hostAndPorts = getGraphHostPort(hostAndPort);
NebulaPool pool = new NebulaPool();
NebulaPoolConfig nebulaPoolConfig = new NebulaPoolConfig();
nebulaPoolConfig = nebulaPoolConfig.setMaxConnSize(maxConnSize);
nebulaPoolConfig = nebulaPoolConfig.setMinConnSize(minCount);
nebulaPoolConfig = nebulaPoolConfig.setIdleTime(1000 * 600);
pool.init(hostAndPorts, nebulaPoolConfig);
return pool;
}
private List<HostAddress> getGraphHostPort(String hostAndPort) {
String[] split = hostAndPort.split(",");
return Arrays.stream(split).map(item -> {
String[] splitList = item.split(":");
return new HostAddress(splitList[0], Integer.parseInt(splitList[1]));
}).collect(Collectors.toList());
}
private Queue<Session> queue;
private String userName;
private String passWord;
private int minCountSession;
private int maxCountSession;
private NebulaPool pool;
}
4.1.4 查询端优化
对于查询端,像 3.3 中的例图,每一次图查询由多个起始点开始,可拆解为几十次点边查询,需要让每一层的查询尽可能地并发进行,降低最终时延。我们可以先对 1 跳查询并发(约十几次查询),再对结果进行分类合并,进行第二轮的迭代并发查询(十几到几十次查询),通过合理地控制并发,可将一次组合图查询的 P95 控制在 15 ms 以内。
4.2 边热点问题
在图查询过程中,存在部分用户 ID 关联过多信息,如黄牛用户关联过多信息,这部分异常用户会在每一次查询时被过滤掉,不会继续参与下一次查询,避免结果膨胀。而判断是否为异常用户,则依赖于数据本身设定的阈值,异常数据不会流入下一阶段对模型计算造成干扰。
4.3 一致性问题
NebulaGraph 本身是没有事务的,对于上文写边以及点属性 1 的操作,如何保证这些操作的一致性,上文提到过,我们会定期对全量 HIVE 表数据和图数据库进行 check,以 HIVE 数据为准对线上图进行修正,来实现最终一致性。目前来说,图数据库和 HIVE 表不一致的情况还是比较少的。
5. 总结与展望
基于 NebulaGraph 的图业务应用,完成了对数据血缘、对关系人网络、反欺诈等场景的支持,并将持续应用在金融更多场景下,助力金融业务。我们将持续跟进社区,结合自身应用场景推进图平台建设;同时也期待社区版能提供热升级、资源隔离、更丰富易用的算法包、更强大的 Studio 等功能。
谢谢你读完本文 (///▽///)
如果你想尝鲜图数据库 NebulaGraph,记得去 GitHub 下载、使用、(^з^)-☆ star 它 -> GitHub;和其他的 NebulaGraph 用户一起交流图数据库技术和应用技能,留下「你的名片」一起玩耍呀~