基于check-point实现图数据构建任务
问题背景
图数据schema
检查点记录表结构设计
TASK设计
节点TASK
关系TASK
备注
问题背景
从关系数据库抽取图数据,需要考虑的一个场景是新增数据的处理【其中任务状态的依赖与数据依赖关系非常重要】。从一个自动化抽取图数据的工具角度来说,自动化生成脚本可以与如下实现完成对接【即设计好schema之后自动生成如下脚本】。该设计方案可以与自动化抽取图数据的工具无缝集成。 在现有的Airflow调度系统中【可以自行实现调度逻辑或者可以是其它的调度系统,本文的设计思路可以借鉴】,可以设计Task和DAG来完整增量数据的处理,完成线上数据的持续更新需求。在构建TASK时,按照图数据的特点设计了节点TASK和关系TASK,并在同一个DAG中执行调度。【DAG的设计可以是某一类业务数据的处理流程】在下面的案例中主要展示了担保关系图数据的构建设计。
图数据schema
担保关系schema展示~
检查点记录表结构设计
检查点记录表主要用来记录任务的处理状态,实现节点TASK和关系TASK的任务状态对接。 调度系统负责执行逻辑和周期性调度,TASK之间状态的依赖无法直接实现,需要借助额外实现;数据依赖关系也需要额外实现。 TASK之间数据的依赖在这个案例中其实是借助ONgDB实现,TASK之间状态的依赖借助了MySQL来实现。
代码语言:javascript复制CREATE TABLE `ONGDB_TASK_CHECK_POINT` (
`huid` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '自增主键',
`hcode` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT '代码:HGRAPHTASK(FromLabel)-[RelType]->(ToLabel)',
`from` varchar(256) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT '名称',
`relationship` varchar(128) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT '关联类型',
`to` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT 'MSTR_ORG的hcode',
`node_check_point` datetime(0) NULL DEFAULT '1900-01-01 00:00:00' COMMENT '节点可以获取检查点时间可更改,关系TASK可以获取检查点时间【一个完整的图数据DAG-TASK必须包含节点和关系构建TASK】',
`rel_check_point` datetime(0) NULL DEFAULT '1900-01-01 00:00:00' COMMENT '保存更新前node_check_point的值',
`from_update_check` int(2) NOT NULL DEFAULT 0 COMMENT 'from是否更新检查点:0-否,1-是【from和to是一样的标签则不需要使用此判断】',
`to_update_check` int(2) NOT NULL DEFAULT 0 COMMENT 'to是否更新了检查点:0-否,1-是【from和to是一样的标签则不需要使用此判断】',
`description` varchar(256) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT '对该检查点任务的具体描述',
`overall_data_split_cypher` text CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL COMMENT '同步全量数据的CYPHER:数据分块方案脚本',
`overall_data_timezone_cypher` text CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL COMMENT '同步全量数据的CYPHER:不设置时间范围的同步脚本',
`hcreatetime` datetime(0) NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`hupdatetime` datetime(0) NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP(0) COMMENT '更新时间',
`create_by` varchar(24) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT '创建人',
`update_by` varchar(24) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT '更新人',
`hisvalid` int(11) NOT NULL DEFAULT 1 COMMENT '逻辑删除标记:0-无效;1-有效',
`src` varchar(32) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL DEFAULT 'ART' COMMENT '数据来源标记',
PRIMARY KEY (`huid`) USING BTREE,
UNIQUE INDEX `unique_key_02`(`hcode`) USING BTREE COMMENT '唯一索引',
UNIQUE INDEX `unique_key_01`(`from`, `to`, `relationship`) USING BTREE COMMENT '唯一索引',
INDEX `updateTime`(`hupdatetime`) USING BTREE,
INDEX `name`(`from`) USING BTREE,
INDEX `hisvalid`(`hisvalid`) USING BTREE,
INDEX `type`(`relationship`) USING BTREE,
INDEX `check_point`(`node_check_point`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 742715628 CHARACTER SET = utf8mb4 COLLATE = utf8mb4_0900_ai_ci COMMENT = 'ONgDB DAG TASK检查点记录表' ROW_FORMAT = Dynamic;
TASK设计
TASK在设计时以一个CYPHER语句为单位设计,每个CYPHER都是一个完整的TASK。每次执行一个TASK时都获取上一次记录的检查点时间。在运行构建关系的TASK时检查点也必须与节点的检查点时间一致【防止时间差导致的数据遗漏】。
节点TASK
大致为四步
- 获取检查点时间
- 定义SQL获取数据方式
- 批量迭代执行构建任务
- 更新任务状态
// 获取检查点时间【跑全量数据时修改CHECK_POINT的时间点为最早的一个时间即可】【数据量高于堆内存限制则必须使用数据分块方案】
CALL apoc.load.jdbcParams('jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC','SELECT DATE_FORMAT(node_check_point,'%Y-%m-%d %H:%i:%s') AS check_point,DATE_FORMAT(NOW(),'%Y-%m-%d %H:%i:%s') AS currentTime FROM ONGDB_TASK_CHECK_POINT WHERE hcode=?',['HGRAPHTASK(HORGGuaranteeV003)-[担保]->(HORGGuaranteeV003)']) YIELD row WITH apoc.text.join([''',row.check_point,'''], '') AS check_point,row.currentTime AS currentTime,row.check_point AS rawCheckPoint
// 定义SQL获取数据方式
WITH REPLACE('CALL apoc.load.jdbc('jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC', 'SELECT
hcode,name,credit_code,label,CONVERT(DATE_FORMAT(hcreatetime,\'%Y%m%d%H%i%S\'),UNSIGNED INTEGER) AS hcreatetime,CONVERT(DATE_FORMAT(hupdatetime,\'%Y%m%d%H%i%S\'),UNSIGNED INTEGER) AS hupdatetime,hisvalid,create_by,update_by FROM HORGGuaranteeV003 WHERE hupdatetime>=?',[check_point])','check_point',check_point) AS sqlData,currentTime,rawCheckPoint
// 批量迭代执行节点构建
CALL apoc.periodic.iterate(sqlData,'MERGE (n:HORGGuaranteeV003 {hcode:row.hcode}) SET n =row WITH n,row CALL apoc.create.addLabels(n,apoc.convert.fromJsonList(row.label)) YIELD node RETURN node', {parallel:false,batchSize:10000,iterateList:true}) YIELD batches,total,timeTaken,committedOperations,failedOperations,failedBatches,retries,errorMessages,batch,operations
WITH batches,total,timeTaken,committedOperations,failedOperations,failedBatches,retries,errorMessages,batch,operations,currentTime,rawCheckPoint
// 当操作失败的数据包数量小于1时【即操作全部执行成功】则更新检查点【更新node_check_point为系统时间】【rel_check_point设置为更新前node_check_point的值】
WHERE batch.failed<1
CALL apoc.load.jdbcUpdate('jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC','UPDATE ONGDB_TASK_CHECK_POINT SET node_check_point=?,rel_check_point=? WHERE hcode=?',[currentTime,rawCheckPoint,'HGRAPHTASK(HORGGuaranteeV003)-[担保]->(HORGGuaranteeV003)']) YIELD row RETURN row,batches,total,timeTaken,committedOperations,failedOperations,failedBatches,retries,errorMessages,batch,operations,currentTime;
关系TASK
大致为三步
- 获取检查点时间【关系TASK不负责任务状态的更新依赖节点TASK的任务状态】
- 定义SQL获取数据方式
- 批量迭代执行构建任务
// 获取检查点时间【跑全量数据时修改CHECK_POINT的时间点为最早的一个时间即可】【数据量高于堆内存限制则必须使用数据分块方案】
CALL apoc.load.jdbcParams('jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC','SELECT DATE_FORMAT(rel_check_point,'%Y-%m-%d %H:%i:%s') AS check_point FROM ONGDB_TASK_CHECK_POINT WHERE hcode=?',['HGRAPHTASK(HORGGuaranteeV003)-[担保]->(HORGGuaranteeV003)']) YIELD row WITH apoc.text.join([''',row.check_point,'''], '') AS check_point
// 定义SQL获取数据方式
WITH REPLACE('CALL apoc.load.jdbc('jdbc:mysql://datalab-contentdb-dev.crkldnwly6ki.rds.cn-north-1.amazonaws.com.cn:3306/analytics_graph_data?user=dev&password=datalabgogo&useUnicode=true&characterEncoding=utf8&serverTimezone=UTC', 'SELECT `from`,`to`,guarantee_detail,guarantee_detail_size,CONVERT(DATE_FORMAT(hcreatetime,\'%Y%m%d%H%i%S\'),UNSIGNED INTEGER) AS hcreatetime,CONVERT(DATE_FORMAT(hupdatetime,\'%Y%m%d%H%i%S\'),UNSIGNED INTEGER) AS hupdatetime,hisvalid,create_by,update_by FROM HORGGuarantee_GuarV003 WHERE hupdatetime>=?',[check_point])','check_point',check_point) AS sqlData
// 批量迭代执行节点构建
CALL apoc.periodic.iterate(sqlData,'MATCH (from:HORGGuaranteeV003 {hcode:row.from}),(to:HORGGuaranteeV003 {hcode:row.to}) MERGE (from)-[r:担保]->(to) SET r ={guarantee_detail_size:row.guarantee_detail_size,guarantee_detail:row.guarantee_detail,hupdatetime:row.hupdatetime,hcreatetime:row.hcreatetime,hisvalid:row.hisvalid,create_by:row.create_by,update_by:row.update_by}', {parallel:false,batchSize:10000,iterateList:true}) YIELD batches,total,timeTaken,committedOperations,failedOperations,failedBatches,retries,errorMessages,batch,operations RETURN batches,total,timeTaken,committedOperations,failedOperations,failedBatches,retries,errorMessages,batch,operations;
备注
在上文中主要展示了构建具有相同标签的节点之间的数据,可理解为同构图的构建。异构图的构建方式在任务状态的依赖上有一些区别,即任务状态不可被二次修改,请看下回分解。