一个良好的数据质量规则引擎必然是可配置的,可开发的,可定时执行的,前一个版本是写死在代码中的,友好性比较差,再则缺乏判断会导致重复执行问题,2.0在前面基础上增加了容错机制,增加了可配置性。再有甚者,还可以写个前端页面做质量规则配置,写个定时任务配置等等。
构造两个源表和两个目标表,后面用来测试
代码语言:javascript复制--创建测试源表
CREATE TABLE aaa(a VARCHAR2(10),B VARCHAR2(10));
CREATE TABLE bbb(a VARCHAR2(20));
INSERT INTO aaa VALUES('A1','B'1);
INSERT INTO aaa VALUES('A2','B2');
INSERT INTO aaa VALUES('A3','B3');
INSERT INTO bbb VALUES('1');
COMMIT;
--创建测试目标表
create table aaadest
(
statdate varchar2(10),
a VARCHAR2(10),
b VARCHAR2(10)
);
create table bbbdest
(
statdate varchar2(10),
a VARCHAR2(20)
);
构造一个数据质量规则配置表
代码语言:javascript复制--创建数据质量规则表
CREATE TABLE data_quality_rule_config
(
Step INTEGER, --记录当前执行步骤
RuleType VARCHAR2(100), --场景类型
RuleName VARCHAR2(100), --场景名称
TableName VARCHAR2(50), --记录当前执行的表名
ProblemSQL VARCHAR2(2000), --问题清单SQL
RecordnumSQL VARCHAR2(2000), --记录数SQL
PrimaryField VARCHAR2(30), --记录主键字段,做版本比较用
CityCodeField VARCHAR2(30),--记录局编码字段,用来切分供电局
SuoCodefiled VARCHAR2(30),--记录所编码字段,用来切分供电所
);
create primary key tt on data_quality_rule_config(step);
--插入协同场景协同规则
INSERT INTO data_quality_rule_config(Step,RuleType,RuleName,TableName,problemSQL,recordnumSQL)
VALUES (1,'测试','场景a','AAAdest',
'insert into aaadest select to_char(sysdate,''yyyy-mm-dd''),a,b from aaa t',
'select count(*) from aaa t');
INSERT INTO data_quality_rule_config (Step,RuleType,RuleName,TableName,problemSQL,recordnumSQL)
VALUES (2,'测试','场景b','BBBdest',
'insert into bbbdest select to_char(sysdate,''yyyy-mm-dd''),a from bbb t ',
'select count(*) from aaa t');
INSERT INTO data_quality_rule_config (Step,RuleType,RuleName,TableName,problemSQL,recordnumSQL)
VALUES (3,'测试','场景c','CCCdest',
'insert into cccdest select to_char(sysdate,''yyyy-mm-dd''),a from bbb t ',
'select count(*) from aaa t;');
COMMIT;
构造一个记录数总表
代码语言:javascript复制--建个表,记录总数-分母
CREATE TABLE recordnumbyorg
(
step INTEGER, --记录当前执行步骤
RuleType VARCHAR2(100), --场景类型
RuleName VARCHAR2(100), --场景名称
TableName VARCHAR2(50), --记录当前执行的表名
orgid VARCHAR2(50),
suoid VARCHAR2(50),
recordnum NUMBER(10)
);
创建一个问题清单处理日志表和记录总数处理日志表,将错误和成功的合二为一。
代码语言:javascript复制--创建问题清单处理日志表
CREATE TABLE etl_problemdetail_log(
statdate VARCHAR2(10),
step INTEGER,
ruletype VARCHAR2(100),
rulename VARCHAR2(100),
tablename VARCHAR2(50),
flag VARCHAR(50),
insertnum INTEGER,
sqlcode VARCHAR(1000),
sqlerr VARCHAR(1000),
execdatetime TIMESTAMP
);
--创建记录总数处理日志表
CREATE TABLE etl_recordnum_log(
statdate VARCHAR2(10),
step INTEGER,
ruletype VARCHAR2(100),
rulename VARCHAR2(100),
tablename VARCHAR2(50),
flag VARCHAR(50),
insertnum INTEGER,
sqlcode VARCHAR(1000),
sqlerr VARCHAR(1000),
execdatetime TIMESTAMP
);
数据质量规则引擎,主要采用了游标和动态SQL语句方式
代码语言:javascript复制CREATE OR REPLACE PROCEDURE p_data_quality_problem_detail
IS
vDate VARCHAR2(10):=TO_CHAR(SYSDATE,'YYYY-MM-DD'); --获取当天日期
vExecDatetime TIMESTAMP; --获取当前脚本执行的时间
vStep INTEGER:=0; --记录当前执行步骤
vRuleType VARCHAR2(100); --协同场景类型
vRuleName VARCHAR2(100); --协同场景名称
vTableName VARCHAR2(50); --记录当前执行的表名
vInsertNum NUMBER(10); --记录当前执行插入的行数
vSQLCode VARCHAR2(1000); --记录错误编码
vSQLErr VARCHAR2(1000); --记录错误信息
vProblemSQL VARCHAR2(1000); --记录数据质量问题清单SQL
--比较已经执行的规则,如果已经成功,则不执行,如果未成功,再次执行
CURSOR cur IS --定义游标
SELECT a.Step,a.RuleType,a.RuleName,a.TableName,a.ProblemSQL,a.RecordNumSQL
FROM data_quality_rule_config a
LEFT OUTER JOIN etl_problemdetail_log b
ON a.step=b.step AND b.statdate=vDate AND b.flag='Success'
WHERE b.step IS NULL;
BEGIN
FOR CurrentCursor IN cur LOOP
vStep:=CurrentCursor.Step;
vRuleType:=CurrentCursor.RuleType;
vRuleName:=CurrentCursor.RuleName;
vTableName:=CurrentCursor.TableName;
vExecDatetime:=SYSTIMESTAMP;
vProblemSQL:=CurrentCursor.ProblemSQL;
--问题清单数据处理
BEGIN
EXECUTE IMMEDIATE vProblemSQL;
vInsertNum:=SQL%ROWCOUNT;
INSERT INTO etl_problemdetail_log(statdate,step,ruletype,rulename,tablename,flag,insertnum,execdatetime)
VALUES(vDate,vStep,vRuleType,vRuleName,vTableName,'Success',vInsertNum,vExecDatetime);
COMMIT;
EXCEPTION
WHEN OTHERS THEN
ROLLBACK;
vSQLCode :=SQLCODE;
vSQLErr :=SQLERRM;
INSERT INTO etl_problemdetail_log(statdate,step,ruletype,rulename,tablename,flag,sqlcode,sqlerr,execdatetime)
VALUES(vDate,vStep,vRuleType,vRuleName,vTableName,'Fail',vSQLCode,vSQLErr,vExecDatetime);
COMMIT;
END;
END LOOP;
END;