使用图计算系统实现研报关键词权重分数计算性能提升百倍以上

2022-09-02 09:57:42 浏览数 (1)

@TOC[1] Here's the table of contents:

•一、数据模型•二、TF-IDF算法•三、计算过程 •3.1 指定研报和关键词计算TF-IDF分数 •3.2 优化3.1中的查询 •3.3 将3.2中查询封装为过程 •3.4 在集群中节点执行查询 •3.5 将计算结果写入MySQL •3.6 将计算结果写入图数据库 •3.7 分布式计算系统•四、总结

使用图计算系统实现研报关键词权重分数计算性能提升百倍以上

在这次的工程化操作中通过读写分离查询优化分布式图计算系统实现研报关键词TF-IDF分数计算性能提升百倍以上。 基于图数据库开发的图计算系统,其实是从分布式数据工程中演变过来的系统。该系统采用分布式架构;服务端和客户端使用RMI通信协议。基本运行逻辑是SERVER负责将数据分块协调客户端请求,CLIENT获取到数据之后在不同的服务器上做计算,借助存储系统实现中间结果依赖和其它数据依赖,将最终的结果集做合并。该系统架构非常简洁适合在大批量数据处理ETL、大数据分析场景下的迁移。【大道至简就是这个道理,其实就是一个Java程序好用就行嘛:)】

一、数据模型

在之前的文章中提到过关键词数据模型[2],感兴趣可以回看。和之前不一样的是,在关键词基础上还构建了研报和关键词之间的关系,并且统计了文章中关键词出现的次数。

如上图所示,绿色的大点表示研报节点,蓝色的小点表示关键词节点,节点之间使用包含关系连接。

二、TF-IDF算法

TF-IDF[3](term frequency–inverse document frequency)是一种用于信息检索数据挖掘的常用加权技术。TF是词频(Term Frequency),IDF是逆文本频率指数(Inverse Document Frequency)。

有很多不同的数学公式可以用来计算TF-IDF。这边的例子以上述的数学公式来计算。词频 (TF) 是一词语出现的次数除以该文件的总词语数。假如一篇文件的总词语数是100个,而词语“母牛”出现了3次,那么“母牛”一词在该文件中的词频就是3/100=0.03。一个计算文件频率 (IDF) 的方法是文件集里包含的文件总数除以测定有多少份文件出现过“母牛”一词。所以,如果“母牛”一词在1,000份文件出现过,而文件总数是10,000,000份的话,其逆向文件频率就是 lg(10,000,000 / 1,000)=4。最后的TF-IDF的分数为0.03 * 4=0.12。

三、计算过程

通过的介绍,已经了解了核心算法和数据模型。By the way,该数据结果主要是服务与搜索和推荐系统。下面基于该数据和算法计算权重分数。 查询解构:在图数据库中,运行CYPHER查询之后计算下推执行都是在当前服务器上执行,在集群模式下会导致多节点并行计算的资源浪费,消耗更多时间。因此为了充分发挥集群节点并行的效果,我将查询解构为只读和只写查询。 查询封装:复杂的只读查询封装为存储过程会方便其它程序的依赖,因此这次工程实践中将查询封装为图数据库的过程,便于后续调用。 分布式计算:为了使计算过程在图数据库集群中并发执行,使用分布式程序将数据分块进行计算;并将最终结果保存在MySQL中,用配置图数据库任务的方式将计算结果更新写入到图数据库中。【此处也可以使用Kafka等MQ系统进行数据的更新,如果系统不要求较高的实时性,为了避免抢占资源在业务系统不繁忙时批量写入会更好】

3.1 指定研报和关键词计算TF-IDF分数

在这个查询中,指定了一篇研报和一个研报中的关键词,并计算了他们的TF-IDF分数。该查询耗时在500毫秒左右。其中关于对数运算的函数[4]可以查看这里。

代码语言:javascript复制
// 获取研报以及关键词,计算该词在这篇研报的TF-IDF分数
// 获取研报`HDOC6a0250e61f91a856cd5dd6c39327fa57`和关键词`HCEPT69305efd9563834a4e7b67d6d5ac4674`,以及该关键词在研报中出现的次数count
MATCH (yb:研报)-[r:包含]->(kw:关键词) WHERE yb.hcode='HDOC6a0250e61f91a856cd5dd6c39327fa57' AND kw.hcode='HCEPT69305efd9563834a4e7b67d6d5ac4674'
WITH ID(yb) AS ybId,r.count AS count,ID(kw) AS kwId
// 获取该研报中关键词总数
MATCH (yb)-[r:包含]->(kw:关键词) WHERE ID(yb)=ybId
WITH ybId,SUM(r.count) AS kwCount,count,kwId
// 计算TF-词频(Term Frequency)
WITH ybId,1.0*count/kwCount AS tf,kwId
// 获取研报总数
MATCH (yb:研报) WITH COUNT(*) AS ybCount,ybId,tf,kwId
// 该关键词出现在多少篇研报中
MATCH (yb:研报)-[:包含]->(kw:关键词) WHERE ID(kw)=kwId WITH COUNT(yb) AS ybKwCount,ybCount,ybId,tf,kwId
// 计算IDF-逆文本频率指数(Inverse Document Frequency)
WITH tf,log10(ybCount/ybKwCount) AS idf,ybId,kwId
RETURN ybId,kwId,tf*idf AS `TF-IDF`
// Started streaming 1 records after 456 ms and completed after 456 ms.

3.2 优化3.1中的查询

3.1中查询耗时在500毫秒左右,对于这个任务来说这个耗时已经很可怕了。因为需要计算的数据集在一亿条左右,因此该查询必须优化。

•查看一下上一次查询的计划

可以看到查询中出现了三次EagerAggregation算子,这是需要重点优化的优化的地方。在计算一批研报关键词数据时,可以先将研报统计数算出来,以参数的方式下推给其它查询。这样的话,每次计算研报和关键词TF-IDF分数时就不用了频繁的执行统计count(*)操作,可以节省一些性能消耗。下面看一下优化后的查询:

代码语言:javascript复制
// 获取研报总数
MATCH (yb:研报) RETURN COUNT(*) AS ybCount 1081720
// Started streaming 1 records after 1 ms and completed after 1 ms.
代码语言:javascript复制
// 获取研报以及关键词,计算该词在这篇研报的TF-IDF分数
// 获取研报`HDOC6a0250e61f91a856cd5dd6c39327fa57`和关键词`HCEPT69305efd9563834a4e7b67d6d5ac4674`,以及该关键词在研报中出现的次数count
MATCH (yb:研报)-[r:包含]->(kw:关键词) WHERE yb.hcode='HDOC6a0250e61f91a856cd5dd6c39327fa57' AND kw.hcode='HCEPT69305efd9563834a4e7b67d6d5ac4674'
WITH ID(yb) AS ybId,r.count AS count,ID(kw) AS kwId
// 获取该研报中关键词总数
MATCH (yb)-[r:包含]->(kw:关键词) WHERE ID(yb)=ybId
WITH ybId,SUM(r.count) AS kwCount,count,kwId
// 计算TF-词频(Term Frequency)
WITH ybId,1.0*count/kwCount AS tf,kwId
// 该关键词出现在多少篇研报中
MATCH (yb:研报)-[:包含]->(kw:关键词) WHERE ID(kw)=kwId WITH COUNT(yb) AS ybKwCount,ybId,tf,kwId
// 计算IDF-逆文本频率指数(Inverse Document Frequency)
WITH tf,log10(1081720/ybKwCount) AS idf,ybId,kwId
RETURN ybId,kwId,tf*idf AS `TF-IDF`
// Started streaming 1 records after 1 ms and completed after 1 ms.

可以看到,优化之后查询只需要一毫秒,这个速度还是可以接受的。再看一下执行计划:

优化之后查询的执行计划中EagerAggregation算子只有两个了,性能已经在可接受范围内,暂不继续优化查询。

3.3 将3.2中查询封装为过程

将查询封装为过程之后,可以方便分布式计算系统调用。封装方式如下:

代码语言:javascript复制
CALL apoc.custom.asProcedure(
'yanbao.kw.tfidf.withYbCount',
'// 获取研报以及关键词,计算该词在这篇研报的TF-IDF分数
// 获取研报`HDOC6a0250e61f91a856cd5dd6c39327fa57`和关键词`HCEPT69305efd9563834a4e7b67d6d5ac4674`,以及该关键词在研报中出现的次数count
MATCH (yb:研报)-[r:包含]->(kw:关键词) WHERE yb.hcode=$yanbaoHcode AND kw.hcode=$kwHcode
WITH ID(yb) AS ybId,r.count AS count,ID(kw) AS kwId,$ybCount as ybCount
// 获取该研报中关键词总数
MATCH (yb)-[r:包含]->(kw:关键词) WHERE ID(yb)=ybId
WITH ybId,SUM(r.count) AS kwCount,count,kwId,ybCount
// 计算TF-词频(Term Frequency)
WITH ybId,1.0*count/kwCount AS tf,kwId,ybCount
// 该关键词出现在多少篇研报中
MATCH (yb:研报)-[:包含]->(kw:关键词) WHERE ID(kw)=kwId WITH COUNT(yb) AS ybKwCount,ybCount,ybId,tf,kwId
// 计算IDF-逆文本频率指数(Inverse Document Frequency)
WITH tf,log10(ybCount/ybKwCount) AS idf,ybId,kwId
RETURN ybId,kwId,tf*idf AS tfidf',
'READ',
[['ybId','LONG'],['kwId','LONG'],['tfidf','DOUBLE']],
[['yanbaoHcode','STRING'],['kwHcode','STRING'],['ybCount','LONG']],
'计算研报中某关键词TF-IDF分数,增加存量数据时传入研报数量参数'
);

在调用过程时,只需要这样写即可:

代码语言:javascript复制
CALL custom.yanbao.kw.tfidf.withYbCount('HDOC6a0250e61f91a856cd5dd6c39327fa57','HCEPT69305efd9563834a4e7b67d6d5ac4674',1081720) 
YIELD ybId,kwId,tfidf 
// 输出研报节点ID、关键词节点ID、TD-IDF分数
RETURN ybId,kwId,tfidf

3.4 在集群中节点执行查询

在集群中某个节点执行时,该查询所有的计算都是在一台节点执行,如下是执行方式:

代码语言:javascript复制
MATCH (n:研报)-[:包含]->(m:关键词)
CALL custom.yanbao.kw.tfidf(n.hcode,m.hcode) YIELD ybId,kwId,tfidf
RETURN ybId,kwId,tfidf

3.5 将计算结果写入MySQL

该查询是将计算结果写入MySQL中,因为采用了读写分离的架构因此在系统中不支持一边查一边写入的操作。计算结果保存到MySQL之后,使用数据更新任务将数据写入到图数据库。

代码语言:javascript复制
MATCH (n:研报)-[:包含]->(m:关键词)
CALL custom.yanbao.kw.tfidf(n.hcode,m.hcode) YIELD ybId,kwId,tfidf
WITH ybId,kwId,tfidf 
// 写入MySQL【读取和写入查询分离有利于提升性能】
WITH algo.getNodeById(ybId).name AS ybIdName,algo.getNodeById(kwId).name AS kwIdName,tfidf
CALL apoc.load.jdbcUpdate('jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/master_dev?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC',
'INSERT IGNORE YB_KW_WEIGHT (ybId,kwId,tfidf) VALUES (?,?,?)',
[ybIdName,kwIdName,tfidf]) 
YIELD row 
RETURN row;

3.6 将计算结果写入图数据库

在后台程序还未上线的时候,采用这种计算并写入的方式需要在业务低峰期间进行,避免过度消耗资源发生生产事故

代码语言:javascript复制
CALL apoc.periodic.commit(
    'MATCH (n:研报)-[r:包含]->(m:关键词) 
    WITH n.hcode AS ybHcode,r,m.hcode AS kwHcode
     LIMIT $limit 
     CALL custom.yanbao.kw.tfidf.withYbCount(ybHcode,kwHcode,1081720) 
     YIELD ybId,kwId,tfidf 
     SET r.weight=tfidf 
     RETURN count(*)',
     {limit:10000}
)

3.7 分布式计算系统

该系统在SERVER端监控研报关键词数据的更新,并负责将数据分发给CLIENT端。客户端在拿到数据之后先获取研报统计数,然后计算TF-IDF分数,并将计算合并写入到存储系统MySQL。同时在数据调度系统中,配置一个对应的图数据构建任务,将计算好的weight更新到图数据库中。 执行过程可以部署到多台机器分布式执行,图数据库集群系统使用Nginx[5]接收请求分发到图数据库集群,实现多节点多请求并发执行加倍提高计算性能。

四、总结

本次工程化实战,涉及数据建模、算法设计、查询优化、分布式系统等,借助数据工程的程序架构经验建立起了一个基于图数据库的分布式计算系统,极大的优化了在图数据库上的计算效率。工程化系统化的设计思路在其中是很重要的一个部分,为完成该任务打下了基础;任何架构都是为业务为效率服务,没有最好只有更好;系统框架背后都是精心思考设计的结果。

References

[1] TOC: 使用图计算系统实现研报关键词权重分数计算性能提升百倍以上 [2] 关键词数据模型: https://blog.csdn.net/superman_xxx/article/details/117827458 [3] TF-IDF: https://baike.baidu.com/item/tf-idf/8816134 [4] 对数运算的函数: https://github.com/ongdb-contrib/ongdb-lab-apoc/wiki [5] Nginx: https://baike.baidu.com/item/nginx

0 人点赞