关于数据质量脚本自动化处理V2.0

2022-03-11 17:05:29 浏览数 (1)

一个良好的数据质量规则引擎必然是可配置的,可开发的,可定时执行的,前一个版本是写死在代码中的,友好性比较差,再则缺乏判断会导致重复执行问题,2.0在前面基础上增加了容错机制,增加了可配置性。再有甚者,还可以写个前端页面做质量规则配置,写个定时任务配置等等。

构造两个源表和两个目标表,后面用来测试

代码语言:javascript复制
--创建测试源表
CREATE TABLE aaa(a VARCHAR2(10),B VARCHAR2(10));
CREATE TABLE bbb(a  VARCHAR2(20));
INSERT INTO aaa VALUES('A1','B'1);
INSERT INTO aaa VALUES('A2','B2');
INSERT INTO aaa VALUES('A3','B3');
INSERT INTO bbb VALUES('1');
COMMIT;
--创建测试目标表
create table aaadest
(
 statdate varchar2(10),
  a VARCHAR2(10),
  b VARCHAR2(10)
);
create table bbbdest
(
 statdate varchar2(10),
  a VARCHAR2(20)
);

构造一个数据质量规则配置表

代码语言:javascript复制
--创建数据质量规则表
CREATE TABLE data_quality_rule_config
(
        Step INTEGER,       --记录当前执行步骤
        RuleType VARCHAR2(100),        --场景类型
        RuleName VARCHAR2(100),        --场景名称
        TableName VARCHAR2(50), --记录当前执行的表名
        ProblemSQL VARCHAR2(2000), --问题清单SQL
        RecordnumSQL VARCHAR2(2000),  --记录数SQL
        PrimaryField VARCHAR2(30),  --记录主键字段,做版本比较用
        CityCodeField VARCHAR2(30),--记录局编码字段,用来切分供电局
        SuoCodefiled VARCHAR2(30),--记录所编码字段,用来切分供电所       
);
create primary key tt on data_quality_rule_config(step);

--插入协同场景协同规则
INSERT INTO data_quality_rule_config(Step,RuleType,RuleName,TableName,problemSQL,recordnumSQL) 
VALUES (1,'测试','场景a','AAAdest',
'insert into aaadest select to_char(sysdate,''yyyy-mm-dd''),a,b from aaa t',
'select count(*) from aaa t');
INSERT INTO data_quality_rule_config (Step,RuleType,RuleName,TableName,problemSQL,recordnumSQL) 
VALUES (2,'测试','场景b','BBBdest',
'insert into bbbdest select to_char(sysdate,''yyyy-mm-dd''),a from bbb t ',
'select count(*) from aaa t');
INSERT INTO data_quality_rule_config (Step,RuleType,RuleName,TableName,problemSQL,recordnumSQL) 
VALUES (3,'测试','场景c','CCCdest',
'insert into cccdest select to_char(sysdate,''yyyy-mm-dd''),a from bbb t ',
'select count(*) from aaa t;');
COMMIT;

构造一个记录数总表

代码语言:javascript复制
--建个表,记录总数-分母
CREATE TABLE recordnumbyorg
(
        step  INTEGER,       --记录当前执行步骤
        RuleType VARCHAR2(100),        --场景类型
        RuleName VARCHAR2(100),        --场景名称
        TableName VARCHAR2(50), --记录当前执行的表名
        orgid   VARCHAR2(50),
        suoid   VARCHAR2(50),
        recordnum NUMBER(10)
);

创建一个问题清单处理日志表和记录总数处理日志表,将错误和成功的合二为一。

代码语言:javascript复制
--创建问题清单处理日志表
CREATE TABLE etl_problemdetail_log(
        statdate     VARCHAR2(10),
        step    INTEGER,
        ruletype       VARCHAR2(100),
        rulename      VARCHAR2(100),
        tablename       VARCHAR2(50),
        flag    VARCHAR(50),
        insertnum INTEGER,
        sqlcode VARCHAR(1000), 
        sqlerr VARCHAR(1000),
        execdatetime    TIMESTAMP
);
--创建记录总数处理日志表
CREATE TABLE etl_recordnum_log(
        statdate     VARCHAR2(10),
        step    INTEGER,
        ruletype       VARCHAR2(100),
        rulename      VARCHAR2(100),
        tablename       VARCHAR2(50),
        flag    VARCHAR(50),
        insertnum INTEGER,
        sqlcode VARCHAR(1000), 
        sqlerr VARCHAR(1000),
        execdatetime    TIMESTAMP
);

数据质量规则引擎,主要采用了游标和动态SQL语句方式

代码语言:javascript复制
CREATE OR REPLACE PROCEDURE p_data_quality_problem_detail
IS
vDate VARCHAR2(10):=TO_CHAR(SYSDATE,'YYYY-MM-DD');     --获取当天日期
vExecDatetime TIMESTAMP;        --获取当前脚本执行的时间
vStep INTEGER:=0;       --记录当前执行步骤
vRuleType VARCHAR2(100);        --协同场景类型
vRuleName VARCHAR2(100);        --协同场景名称
vTableName VARCHAR2(50); --记录当前执行的表名
vInsertNum NUMBER(10);  --记录当前执行插入的行数
vSQLCode VARCHAR2(1000); --记录错误编码
vSQLErr VARCHAR2(1000);  --记录错误信息
vProblemSQL VARCHAR2(1000);  --记录数据质量问题清单SQL

--比较已经执行的规则,如果已经成功,则不执行,如果未成功,再次执行
CURSOR cur IS  --定义游标
        SELECT a.Step,a.RuleType,a.RuleName,a.TableName,a.ProblemSQL,a.RecordNumSQL
        FROM data_quality_rule_config a
        LEFT OUTER JOIN etl_problemdetail_log b
        ON a.step=b.step AND b.statdate=vDate AND b.flag='Success'
        WHERE b.step IS NULL;
BEGIN
        FOR CurrentCursor IN cur LOOP 
                vStep:=CurrentCursor.Step;
                vRuleType:=CurrentCursor.RuleType;
                vRuleName:=CurrentCursor.RuleName;
                vTableName:=CurrentCursor.TableName;
                vExecDatetime:=SYSTIMESTAMP;
                vProblemSQL:=CurrentCursor.ProblemSQL;
                --问题清单数据处理
                BEGIN                        
                        EXECUTE IMMEDIATE vProblemSQL;
                        vInsertNum:=SQL%ROWCOUNT;
                        INSERT INTO etl_problemdetail_log(statdate,step,ruletype,rulename,tablename,flag,insertnum,execdatetime)
                        VALUES(vDate,vStep,vRuleType,vRuleName,vTableName,'Success',vInsertNum,vExecDatetime);
                        COMMIT;
                EXCEPTION
                        WHEN OTHERS THEN
                                ROLLBACK;
                                vSQLCode :=SQLCODE;
                                vSQLErr  :=SQLERRM;
                                INSERT INTO etl_problemdetail_log(statdate,step,ruletype,rulename,tablename,flag,sqlcode,sqlerr,execdatetime)
                                VALUES(vDate,vStep,vRuleType,vRuleName,vTableName,'Fail',vSQLCode,vSQLErr,vExecDatetime);
                                COMMIT;
                END;
        END LOOP;
END;

0 人点赞