图数据库与传统数仓实现联邦查询

2022-09-02 10:07:03 浏览数 (1)

@TOC[1] Here's the table of contents:

•一、MySQL得到研报实体在Oracle中的唯一ID•二、Oracle中过滤时间序列数据•三、CYPHER实现MySQL和Oracle查询语句串联•四、通过apoc.case实现布尔值的判断•五、将查询封装为函数•六、将函数运用在数据过滤查询中•七、总结

使用CYPHER实现从关系数据库过滤时间序列指标

本文中涉及的图数据模型主要是研报相关的数据,对研报数据分词之后得到关键词的数据,模型路径为(股票)<-[涉及]-(研报)-[包含]->(关键词)。在建立起股票研报关键词数据网络之后,需要通过研报的撰写时间过滤出相关股票关键词数据。其中研报实体本身相关的时间序列数据存储在MySQL和Oracle中,通过研报的唯一ID实现不同存储中的数据关联。

一、MySQL得到研报实体在Oracle中的唯一ID

图库中保存的研报实体只有codename两个属性,在关联时需要用该code在MySQL中拿到关联Oracle的ID,因此有了下面这个SQL语句。其中,为了保证在MySQL没有命中数据时CYPHER也能有连贯的数据传递操作,在SQL中加入了一个固定默认值的输出操作。

代码语言:javascript复制
SELECT zyyx_yanbao_code 
    FROM 
        (SELECT zyyx_yanbao_code 
                FROM ZYYX_YANBAO 
                WHERE yanbao_hcode='HDOCec613f2d8b707b66a8edc8c1eaeb29f0' 
                UNION 
        SELECT zyyx_yanbao_code 
                FROM ZYYX_YANBAO_old 
                WHERE yanbao_hcode='HDOCec613f2d8b707b66a8edc8c1eaeb29f0' 
                UNION 
        SELECT -1 AS ZYYX_YANBAO)
    AS a ORDER BY zyyx_yanbao_code ASC LIMIT 2

二、Oracle中过滤时间序列数据

中得到ID之后,从Oracle中过滤时序指标,同样为了保证在Oracle没有命中数据时CYPHER也能有连贯的数据传递操作,在SQL中加入了一个固定默认值的输出操作。

代码语言:javascript复制
SELECT rownum rm, a.* 
    FROM 
        (SELECT REPORT_ID 
            FROM 
                (SELECT REPORT_ID 
                    FROM ODSZYYX.RPT_FORECAST_STK WHERE REPORT_ID='1359506' AND CREATE_DATE BETWEEN TO_DATE(20170902000000,'YYYY-MM-DD HH24:MI:SS') AND TO_DATE(20210902000000,'YYYY-MM-DD HH24:MI:SS') 
                    UNION 
                SELECT -1 AS REPORT_ID 
                    FROM 
                        (SELECT rownum rm, a.* 
                            FROM 
                                ( SELECT REPORT_ID 
                                    FROM ODSZYYX.RPT_FORECAST_STK) 
                            a WHERE 
                    rownum <= 1 ) b 
                WHERE b.rm > 0) a) a         
    WHERE rownum <= 2 
    ORDER BY REPORT_ID ASC

三、CYPHER实现MySQL和Oracle查询语句串联

使用CYPHER实现对MySQL和Oracle查询语句的串联,并保证数据的连续性。

代码语言:javascript复制
// 查询MySQL
CALL apoc.load.jdbc('jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/test?user=dev&password=test&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC', 
'SELECT zyyx_yanbao_code FROM (SELECT zyyx_yanbao_code FROM ZYYX_YANBAO WHERE yanbao_hcode=? UNION SELECT zyyx_yanbao_code FROM ZYYX_YANBAO_old WHERE yanbao_hcode=? UNION SELECT -1 AS ZYYX_YANBAO) AS a ORDER BY zyyx_yanbao_code ASC LIMIT 2',
['HDOCec613f2d8b707b66a8edc8c1eaeb29f0','HDOCec613f2d8b707b66a8edc8c1eaeb29f0']) 
YIELD row WITH row.zyyx_yanbao_code AS zyyx_yanbao_code
// 查询Oracle
CALL apoc.load.jdbc('jdbc:oracle:thin:ngdp/test@ngdpdb-sync-prod.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:1521/ORCL', 
'SELECT rownum rm, a.* FROM (SELECT REPORT_ID FROM (SELECT REPORT_ID FROM TEST.RPT_FORECAST_STK WHERE REPORT_ID=? AND CREATE_DATE BETWEEN TO_DATE(?,'YYYY-MM-DD HH24:MI:SS') AND TO_DATE(?,'YYYY-MM-DD HH24:MI:SS') UNION SELECT -1 AS REPORT_ID FROM (SELECT rownum rm, a.* FROM ( SELECT REPORT_ID FROM ODSZYYX.RPT_FORECAST_STK) a WHERE rownum <= 1 ) b WHERE b.rm > 0) a) a WHERE rownum <= 2 ORDER BY REPORT_ID ASC',
[zyyx_yanbao_code,20170902000000,20210902000000])
YIELD row RETURN row

四、通过apoc.case实现布尔值的判断

使用apoc.case过程,实现对SQL返回值结果的逻辑判断,并重定向数据结果为一个布尔值,返回布尔值是因为在后续的图数据路径过滤中会依赖该查询使用布尔值做判断。

代码语言:javascript复制
// 查询MySQL
CALL apoc.load.jdbc('jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/test?user=dev&password=test&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC', 
'SELECT zyyx_yanbao_code FROM (SELECT zyyx_yanbao_code FROM ZYYX_YANBAO WHERE yanbao_hcode=? UNION SELECT zyyx_yanbao_code FROM ZYYX_YANBAO_old WHERE yanbao_hcode=? UNION SELECT -1 AS ZYYX_YANBAO) AS a ORDER BY zyyx_yanbao_code ASC LIMIT 2',
['HDOCec613f2d8b707b66a8edc8c1eaeb29f0','HDOCec613f2d8b707b66a8edc8c1eaeb29f0']) 
YIELD row WITH row.zyyx_yanbao_code AS zyyx_yanbao_code
// 查询Oracle
CALL apoc.load.jdbc('jdbc:oracle:thin:ngdp/test@ngdpdb-sync-prod.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:1521/ORCL', 
'SELECT rownum rm, a.* FROM (SELECT REPORT_ID FROM (SELECT REPORT_ID FROM TEST.RPT_FORECAST_STK WHERE REPORT_ID=? AND CREATE_DATE BETWEEN TO_DATE(?,'YYYY-MM-DD HH24:MI:SS') AND TO_DATE(?,'YYYY-MM-DD HH24:MI:SS') UNION SELECT -1 AS REPORT_ID FROM (SELECT rownum rm, a.* FROM ( SELECT REPORT_ID FROM ODSZYYX.RPT_FORECAST_STK) a WHERE rownum <= 1 ) b WHERE b.rm > 0) a) a WHERE rownum <= 2 ORDER BY REPORT_ID ASC',
[zyyx_yanbao_code,20170902000000,20210902000000])
YIELD row WITH row
WITH COLLECT(row.REPORT_ID) AS REPORT_ID_LIST
WITH [REPORT_ID IN REPORT_ID_LIST WHERE REPORT_ID<>'-1'] AS RE_REPORT_ID_LIST
CALL apoc.case(
    [RE_REPORT_ID_LIST<>[],
    'RETURN TRUE AS bool'],
    'RETURN FALSE AS bool'
) 
YIELD value 
RETURN value.bool AS bool

五、将查询封装为函数

为了方便在后续的CYPHER中调用中复杂的查询,将中的查询封装为一个CYPHER函数。

代码语言:javascript复制
CALL apoc.custom.asFunction(
    'yanbaoHcode.createDate.range.bool',
    'CALL apoc.load.jdbc('jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC', 'SELECT zyyx_yanbao_code FROM (SELECT zyyx_yanbao_code FROM ZYYX_YANBAO WHERE yanbao_hcode=? UNION SELECT zyyx_yanbao_code FROM ZYYX_YANBAO_old WHERE yanbao_hcode=? UNION SELECT -1 AS ZYYX_YANBAO) AS a ORDER BY zyyx_yanbao_code ASC LIMIT 2',[$yanban_hcode,$yanban_hcode]) YIELD row WITH row.zyyx_yanbao_code AS zyyx_yanbao_code CALL apoc.load.jdbc('jdbc:oracle:thin:ngdp/datalabgogo@ngdpdb-sync-prod.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:1521/ORCL', 'SELECT rownum rm, a.* FROM (SELECT REPORT_ID FROM (SELECT REPORT_ID FROM ODSZYYX.RPT_FORECAST_STK WHERE REPORT_ID=? AND CREATE_DATE BETWEEN TO_DATE(?,\'YYYY-MM-DD HH24:MI:SS\') AND TO_DATE(?,\'YYYY-MM-DD HH24:MI:SS\') UNION SELECT -1 AS REPORT_ID FROM (SELECT rownum rm, a.* FROM ( SELECT REPORT_ID FROM ODSZYYX.RPT_FORECAST_STK) a WHERE rownum <= 1 ) b WHERE b.rm > 0) a) a WHERE rownum <= 2 ORDER BY REPORT_ID ASC',[zyyx_yanbao_code,$create_date_start,$create_date_stop]) YIELD row WITH row WITH COLLECT(row.REPORT_ID) AS REPORT_ID_LIST WITH [REPORT_ID IN REPORT_ID_LIST WHERE REPORT_ID<>'-1'] AS RE_REPORT_ID_LIST CALL apoc.case([RE_REPORT_ID_LIST<>[],'RETURN TRUE AS bool'],'RETURN FALSE AS bool') YIELD value RETURN value.bool AS bool',
    'BOOLEAN',
    [['yanban_hcode','STRING'],['create_date_start','LONG'],['create_date_stop','LONG']],
    false,
    '通过判断研报撰写日期返回FALSE或者TRUE【结果集大于0返回TRUE】【时间范围左闭右闭】'
);

六、将函数运用在数据过滤查询中

通过一系列的查询下推拆分在一到五节中,实现了复杂查询的封装,在应用这个时序指标过滤函数时就可以方便地调用。下面的查询实现了对2006090200000020210902000000之间撰写的研报数据关联网络的过滤。

代码语言:javascript复制
MATCH p=(n:股票)<-[:涉及]-(c:研报)-[r:包含]->(k:关键词)
    WHERE 
        custom.yanbaoHcode.createDate.range.bool(
            c.hcode,
            20060902000000,
            20210902000000)
    RETURN k.name AS keyword, r.weight AS weight LIMIT 10

•查看执行效率查询100条路径耗时100ms

七、总结

在本文中可以看到,数据架构中拆分了时序指标数据和关联网络,这样的做法可以尽可能节省图数据库单节点服务器的硬盘存储资源,使得一台服务器可以存储更多的关联网络;同时,充分利用了数仓存储资源计算资源。但是这种架构方式,启用了更多的网络查询消耗,和本地存储方式相比会多消耗一些时间资源。在实际跑模型中,可以根据实际场景优化查询。将查询较多的属性数据存放在图库中,可以减少网络消耗;时序数据可以用JSON串保存在属性中,使用存储过程过滤。

References

[1] TOC: 图数据库与传统数仓实现联邦查询

0 人点赞