OushuDB入门(五)——ETL篇

2019-05-25 19:37:06 浏览数 (1)

版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/wzy0623/article/details/80281643

一、初始装载

1. 用sqoop用户建立初始抽取脚本

本示例要用Sqoop将MySQL的数据抽取到Hive ORC外部表,然后利用OushuDB ORC外部表功能将数据装载到内部表中。表1汇总了示例中维度表和事实表用到的源数据表及其抽取模式。

源数据表

HDFS目录

对应RDS模式中的表

抽取模式

customer

/data/rds/customer

customer

整体、拉取

product

/data/rds/product

product

整体、拉取

sales_order

/data/rds/sales_order

sales_order

基于时间戳的CDC、拉取

表1

(1)覆盖导入 对于customer、product这两个表采用整体拉取的方式抽数据。ETL通常是按一个固定的时间间隔,周期性定时执行的,因此对于整体拉取的方式而言,每次导入的数据需要覆盖上次导入的数据。 (2)增量导入 Sqoop提供增量导入模式,用于只导入比已经导入行新的数据行。表2所示参数用来控制增量导入。

参数

描述

--check-column

在确定应该导入哪些行时,指定被检查的列。列不能是CHAR/NCHAR/VARCHAR/VARNCHAR/LONGVARCHAR/LONGNVARCHAR数据类型。

--incremental

指定Sqoop怎样确定哪些行是新行。有效值是append和lastmodified。

--last-value

指定已经导入数据的被检查列的最大值。

表2

Sqoop支持两种类型的增量导入:append和lastmodified。可以使用--incremental参数指定增量导入的类型。 当被导入表的新行具有持续递增的行id值时,应该使用append模式。指定行id为--check-column的列。Sqoop导入那些被检查列的值比--last-value给出的值大的数据行。 Sqoop支持的另一个表修改策略叫做lastmodified模式。当源表的数据行可能被修改,并且每次修改都会更新一个last-modified列为当前时间戳时,应该使用lastmodified模式。那些被检查列的时间戳比last-value给出的时间戳新的数据行被导入。 增量导入命令执行后,在控制台输出的最后部分,会打印出后续导入需要使用的last-value。当周期性执行导入时,应该用这种方式指定--last-value参数的值,以确保只导入新的或修改过的数据。可以通过一个增量导入的保存作业自动执行这个过程,这是适合重复执行增量导入的方式。 有了对Sqoop增量导入的基本了解,下面看一下如何在本示例中使用它抽取数据。对于sales_order这个表采用基于时间戳的CDC拉取方式抽数据。这里假设源系统中销售订单记录一旦入库就不再改变,或者可以忽略改变。也就是说销售订单是一个随时间变化单向追加数据的表。sales_order表中有两个关于时间的字段,order_date表示订单时间,entry_date表示订单数据实际插入表里的时间,两个时间可能不同。那么用哪个字段作为CDC的时间戳呢?设想这样的情况,一个销售订单的订单时间是2018年1月1日,实际插入表里的时间是2018年1月2日,ETL每天0点执行,抽取前一天的数据。如果按order_date抽取数据,条件为where order_date >= '2018-01-02' AND order_date < '2018-01-03',则2018年1月3日0点执行的ETL不会捕获到这个新增的订单数据。所以应该以entry_date作为CDC的时间戳。 (3)编写初始数据抽取脚本 用sqoop操作系统用户建立初始数据抽取脚本文件~/init_extract.sh,内容如下:

代码语言:javascript复制
#!/bin/bash    
  
# 建立Sqoop增量导入作业,以entry_date作为检查列,初始的last-value是0  
sqoop job --delete myjob_incremental_import  
sqoop job --create myjob_incremental_import 
-- import 
--connect "jdbc:mysql://172.16.1.127:3306/source?usessl=false&user=dwtest&password=123456" 
--table sales_order 
--hcatalog-database test 
--hcatalog-table sales_order 
--compress 
--incremental lastmodified 
--check-column entry_date 
--last-value 0
  
# 全量抽取客户表  
sqoop import --connect jdbc:mysql://172.16.1.127:3306/source --username dwtest --password 123456 --table customer --hcatalog-database test --hcatalog-table customer --null-string '\N' --null-non-string '\N' --compress
  
# 全量抽取产品表  
sqoop import --connect jdbc:mysql://172.16.1.127:3306/source --username dwtest --password 123456 --table product --hcatalog-database test --hcatalog-table product --null-string '\N' --null-non-string '\N' --compress

# 首次全量抽取销售订单表  
sqoop job --exec myjob_incremental_import  

说明:

  • 为了保证外部表数据量尽可能小,使用compress选项进行压缩,Sqoop缺省的压缩算法是gzip,OushuDB外部表能自动正确读取这种格式的压缩文件。
  • 执行时先重建Sqoop增量抽取作业,指定last-value为0。由于entry_date都是大于0的,因此初始时会装载所有订单数据。
  • hcatalog模式不支持incremental append,错误信息为“Append mode for imports is not compatible with HCatalog. Please remove the parameter--append-mode”。因此这里选择lastmodified增量检查模式,要求被检查列为日类型,如date、datetime或timestamp等。这里使用entry_date作为检查列。lastmodified模式不但可以捕获新增数据,而且只要源端适当维护时间戳字段,ETL系统还能捕获更新的数据。缺点是要求源系统表中具有记录更新的时间戳字段,否则无法使用该模式捕获变化的数据。

将文件修改为可执行模式:

代码语言:javascript复制
chmod 755 ~/init_extract.sh

2. 用gpadmin用户建立初始装载脚本

在数据仓库可以使用前,需要装载历史数据。这些历史数据是导入进数据仓库的第一个数据集合。首次装载被称为初始装载,一般是一次性工作。由最终用户来决定有多少历史数据进入数据仓库。例如,数据仓库使用的开始时间是2018年3月1日,而用户希望装载两年的历史数据,那么应该初始装载2016年3月1日到2018年2月28日之间的源数据。在2018年3月2日装载2018年3月1日的数据(假设执行频率是每天一次),之后周期性地每天装载前一天的数据。在装载事实表前,必须先装载所有的维度表。因为事实表需要引用维度的代理键。这不仅针对初始装载,也针对定期装载。 (1)数据源映射 表3显示了本示例需要的源数据的关键信息,包括源数据表、对应的数据仓库目标表等属性。客户和产品的源数据直接与其数据仓库里的目标表,customer_dim和product_dim表相对应,而销售订单事务表是多个数据仓库表的数据源。

源数据

源数据类型

文件名/表名

数据仓库中的目标表

客户

MySQL表

customer

customer_dim

产品

MySQL表

product

product_dim

销售订单

MySQL表

sales_order

order_dim、sales_order_fact

表3

(2)确定SCD处理方法 标识出了数据源,现在要考虑维度历史的处理。渐变维(SCD)即是一种在多维数据仓库中实现维度历史的技术。有三种不同的SCD技术:SCD 类型1(SCD1),SCD类型2(SCD2),SCD类型3(SCD3):

  • SCD1 - 通过更新维度记录直接覆盖已存在的值,它不维护记录的历史。SCD1一般用于修改错误的数据。
  • SCD2 - 在源数据发生变化时,给维度记录建立一个新的“版本”记录,从而维护维度历史。SCD2不删除、修改已存在的数据。
  • SCD3 – 通常用作保持维度记录的几个版本。它通过给某个数据单元增加多个列来维护历史。例如,为了记录客户地址的变化,customer_dim维度表有一个customer_address列和一个previous_customer_address列,分别记录当前和上一个版本的地址。SCD3可以有效维护有限的历史,而不像SCD2那样保存全部历史。SCD3很少使用。它只适用于数据的存储空间不足并且用户接受有限维度历史的情况。

同一个维度表中的不同字段可以有不同的变化处理方式。在传统数据仓库中,对于SCD1一般就直接UPDATE更新属性,而SCD2则要新增记录。但OushuDB没有提供UPDATE、DELETE等DML操作,因此对于所有属性的变化均增加一条记录,即所有维度属性都按SCD2方式处理。 (3)实现代理键 多维数据仓库中的维度表和事实表一般都需要有一个代理键,作为这些表的主键,代理键一般由单列的自增数字序列构成。OushuDB中的bigserial数据类型与MySQL的auto_increment类似,长用于定义自增列。但它的实现方法却与Oracle的sequence类似,当创建bigserial字段的表时,OushuDB会自动创建一个自增的sequence对象,bigserial字段自动引用sequence实现自增。 (4)编写初始数据装载脚本 所有技术实现的细节都清楚后,现在编写初始数据装载脚本。需要执行一步操作:向TDS模式中的表装载数据。用gpadmin操作系统用户建立初始数据装载脚本文件~/init_load.sql,内容如下:

代码语言:javascript复制
-- 分析外部表  
analyze rds.customer;  
analyze rds.product;  
analyze rds.sales_order;  
  
-- 装载数据仓库数据  
set search_path to tds;  
  
truncate table customer_dim;    
truncate table product_dim;    
truncate table order_dim;    
truncate table sales_order_fact;   
  
-- 序列初始化  
alter sequence customer_dim_customer_sk_seq restart with 1;  
alter sequence product_dim_product_sk_seq restart with 1;  
alter sequence order_dim_order_sk_seq restart with 1;  
  
-- 装载客户维度表    
insert into customer_dim   
(customer_number,  
 customer_name,  
 customer_street_address,  
 customer_zip_code,  
 customer_city,  
 customer_state, 
 isdelete, 
 version,  
 effective_date)   
select t1.customer_number,   
       t1.customer_name,   
       t1.customer_street_address,  
       t1.customer_zip_code,   
       t1.customer_city,   
       t1.customer_state,
       false,	   
       1,  
       '2017-03-01'     
  from rds.customer t1   
 order by t1.customer_number;  
     
-- 装载产品维度表    
insert into product_dim   
(product_code,  
 product_name,  
 product_category,
 isdelete, 
 version,  
 effective_date)  
select product_code,   
       product_name,  
       product_category, 
       false,	   
       1,   
       '2016-03-01'    
  from rds.product t1   
 order by t1.product_code;    
  
-- 装载订单维度表    
insert into order_dim (order_number,isdelete,version,effective_date)    
select order_number, false, 1, order_date         
  from rds.sales_order t1   
 order by t1.order_number;   
     
-- 装载销售订单事实表    
insert into sales_order_fact    
select order_sk,   
       customer_sk,   
       product_sk,   
       date_sk,   
       e.year*100   e.month,   
       order_amount    
  from rds.sales_order a,   
       order_dim b,   
       customer_dim c,   
       product_dim d,   
       date_dim e    
 where a.order_number = b.order_number    
   and a.customer_number = c.customer_number    
   and a.product_code = d.product_code    
   and date(a.order_date) = e.date;   
  
-- 分析tds模式的表  
analyze customer_dim;  
analyze product_dim;  
analyze order_dim;  
analyze sales_order_fact;  

说明:

  • 装载前清空表、以及重新初始化序列的目的是为了可重复执行初始装载脚本。
  • 依据OushuDB的建议,装载数据后,执行查询前,先分析表以提高查询性能。

3. 用root用户建立初始ETL脚本

前面的数据抽取脚本文件的属主是sqoop用户,而数据装载脚本文件的属主是gpadmin用户。除了这两个用户以外,还需要使用hdfs用户执行文件操作。为了简化多用户调用执行,用root用户将所有需要的操作封装到一个文件中,提供统一的初始数据装载执行入口。 用root操作系统用户建立初始ETL脚本文件~/init_etl.sh,内容如下:

代码语言:javascript复制
#!/bin/bash  
  
# 为了可以重复执行初始装载过程,先使用hdfs用户删除销售订单外部表目录  
su - hdfs -c 'hdfs dfs -rm -r -skipTrash /data/rds/customer/*'
su - hdfs -c 'hdfs dfs -rm -r -skipTrash /data/rds/product/*'
su - hdfs -c 'hdfs dfs -rm -r -skipTrash /data/rds/sales_order/*' 
  
# 使用sqoop用户执行初始抽取脚本  
su - sqoop -c '~/init_extract.sh'  
  
# 使用gpadmin用户执行初始装载脚本  
su - gpadmin -c 'export PGPASSWORD=123456;psql -U dwtest -d dw -h hdp2 -f ~/init_load.sql'  

说明:

  • Sqoop的hcatalog模式下hive-overwrite不起作用,因此为了提供一个幂等操作,先要用hdfs用户删除相应目录下的所有文件。所谓幂等操作指的是其执行任意多次所产生的影响均与一次执行的影响相同。这样就能在导入失败或修复bug后可以再次执行该操作,而不用担心重复执行会对系统造成数据混乱。
  • 使用su命令,以不同用户执行相应的脚本文件。

将文件修改为可执行模式:

代码语言:javascript复制
chmod 755 ~/init_etl.sh 

4. 用root用户执行初始ETL脚本

代码语言:javascript复制
~/init_etl.sh  

执行以下查询验证初始ETL结果:

代码语言:javascript复制
select order_number,   
       customer_name,   
       product_name,   
       date,   
       order_amount amount    
  from sales_order_fact a,   
       customer_dim b,   
       product_dim c,   
       order_dim d,   
       date_dim e    
 where a.customer_sk = b.customer_sk    
   and a.product_sk = c.product_sk    
   and a.order_sk = d.order_sk    
   and a.order_date_sk = e.date_sk    
 order by order_number;

共装载100条销售订单数据,最后20条如图1所示。

图1

二、定期装载

1. 变化数据捕获(Changed Data Capture,CDC)

初始装载只在数据仓库开始使用前执行一次,然而,必须要周期性地执行装载源数据过程。与初始装载不同,定期装载一般都是增量的,并且需要捕获并且记录数据的变化历史。 (1)识别数据源与装载类型 定期装载首先要识别数据仓库的每个事实表和每个维度表用到的并且是可用的源数据。然后要决定适合装载的抽取模式和维度历史装载类型。表4总了本示例的这些信息。

数据源

RDS模式

TDS模式

抽取模式

维度历史装载类型

customer

customer

customer_dim

整体、拉取

所有属性均为SCD2

product

product

product_dim

整体、拉取

所有属性均为SCD2

sales_order

sales_order

order_dim

CDC(每天)、拉取

唯一订单号

sales_order_fact

CDC(每天)、拉取

N/A

N/A

N/A

date_dim

N/A

预装载

表4

(2)处理渐变维(Slowly Changing Dimension,SCD) 目前OushuDB只有INSERT,没有UPDATE、DELETE操作,因此所有维度属性都使用SDC2记录全部历史变化。在捕获数据变化时,需要使用维度表的当前版本数据与从业务数据库最新抽取来的数据做比较。实现方式是在维度表上建立一个当前维度版本的视图,用于比较数据变化。这种设计既可以保留所有数据变化的历史,又屏蔽了查询当前版本的复杂性。 事实表需要引用维度表的代理键,而且不一定是引用当前版本的代理键。比如有些迟到的事实,就必须找到事实发生时的维度版本。因此一个维度的所有版本区间应该构成一个连续且互斥时间范围,每个事实数据都能对应维度的唯一版本。实现方式是在维度表上建立一个维度历史版本的视图,在这个视图中增加版本过期日期导出列。任何一个版本的有效期是一个“左闭右开”的区间,也就是说该版本包含生效日期,但不包含过期日期,而是到过期日期的前一天为止。 (3)设置数据处理时间窗口 对于事实表,我们采用基于时间戳的CDC增量装载模式,时间粒度为天。因此需要两个时间点,分别是本次装载的起始时间点和终止时间点,这两个时间点定义了本次处理的时间窗口,即装载这个时间区间内的数据。还要说明一点,这个区间是左包含的,就是处理的数据包括起始时间点的,但不包括终止时间点的。这样设计的原因是,我们既要处理完整的数据,不能有遗漏,又不能重复装载数据,这就要求时间处理窗口既要连续,又不能存在重叠的部分。

2. 创建维度表当前版本视图

代码语言:javascript复制
-- 切换到tds模式  
set search_path=tds;  
  
-- 建立客户维度当前视图  
create or replace view v_customer_dim_latest as   
select customer_sk,  
       customer_number,   
       customer_name,  
       customer_street_address,  
       customer_zip_code,  
       customer_city,  
       customer_state,  
       version,  
       effective_date  
  from (select distinct on (customer_number) customer_number,   
               customer_sk,    
               customer_name,  
               customer_street_address,  
               customer_zip_code,  
               customer_city,  
               customer_state,  
               isdelete,   
               version,  
               effective_date  
          from customer_dim  
         order by customer_number, customer_sk desc) as latest   
  where isdelete is false;  
  
-- 建立产品维度当前视图    
create or replace view v_product_dim_latest as   
select product_sk,  
       product_code,   
       product_name,  
       product_category,  
       version,  
       effective_date  
  from (select distinct on (product_code) product_code,   
               product_sk,  
               product_name,  
               product_category,  
               isdelete,                 
               version,  
               effective_date  
          from product_dim  
         order by product_code, product_sk desc) as latest   
  where isdelete is false;  

说明:

  • 如前所述,创建维度表的当前视图。这里只为客户和产品维度创建视图,而订单维度不需要当前版本视图,因为假设业务上订单数据只能增加,不能修改,所以没有版本变化。
  • 使用OushuDB的DISTINCT ON语法去重。DISTINCT ON ( expression [, …] )把记录根据[, …]的值进行分组,分组之后仅返回每一组的第一行。需要注意的是,如果不指定ORDER BY子句,返回的第一条的不确定的。如果使用了ORDER BY 子句,那么[, …]里面的值必须靠近ORDER BY子句的最左边。本例中我们按业务主键(customer_number、product_code)分组,每组按代理键(customer_sk、product_sk)倒排序,每组第一行即为维度的当前版本。

3. 创建维度表历史视图

代码语言:javascript复制
-- 切换到tds模式  
set search_path=tds;  
  
-- 建立客户维度历史视图,增加版本过期日期导出列   
create or replace view v_customer_dim_his as   
select *, date(lead(effective_date,1,date '2200-01-01') over (partition by customer_number order by effective_date)) expiry_date   
  from customer_dim;  
  
-- 建立产品维度历史视图,增加版本过期日期导出列   
create or replace view v_product_dim_his as   
select *, date(lead(effective_date,1,date '2200-01-01') over (partition by product_code order by effective_date)) expiry_date   
  from product_dim;  

说明:

  • 维度历史视图增加了版本的过期日期列。
  • 使用LEAD窗口函数实现。以业务主键(customer_number、product_code)分区,每个分区内按生效日期排序。LEAD函数在一个分区内取到当前生效日期的下一个日期,该日期即为对应版本的过期日期。如果是当前版本,下一日期为空,则返回一个很大的时间值,大到足以满足数据仓库整个生命周期的需要,本示例设置的是2200年1月1日。

4. 建立时间戳表

代码语言:javascript复制
create table rds.cdc_time    
(last_load date,    
 current_load date);    
  
insert into rds.cdc_time select current_date - 1, current_date - 1;  

说明:

  • 本示例中order_dim维度表和sales_order_fact事实表使用基于时间戳的CDC装载模式。为此在rds模式中建立一个名为cdc_time的时间戳表,这个表里有last_load和current_load两个字段。之所以需要两个字段,是因为抽取到的数据可能会多于本次需要处理的数据。比如,两点执行ETL过程,则零点到两点这两个小时的数据不会在本次处理。为了确定这个截至时间点,需要给时间戳设定一个上限条件,即这里的current_load字段值。
  • 本示例的时间粒度为每天,所以时间戳只要保留日期部分即可,因此数据类型选为date。这两个字段的初始值是“初始加载”执行日期的前一天。当开始装载时,current_load设置为当前日期。
  • 由于rds.cdc_time表中始终只有一条记录,表格式无关紧要,使用缺省格式即可。

5. 用Sqoop用户建立定期抽取脚本

用sqoop操作系统用户建立初始数据抽取脚本文件~/regular_extract.sh,内容如下:

代码语言:javascript复制
#!/bin/bash    
  
# 全量抽取客户表  
sqoop import --connect jdbc:mysql://172.16.1.127:3306/source --username dwtest --password 123456 --table customer --hcatalog-database test --hcatalog-table customer --null-string '\N' --null-non-string '\N' --compress
  
# 全量抽取产品表  
sqoop import --connect jdbc:mysql://172.16.1.127:3306/source --username dwtest --password 123456 --table product --hcatalog-database test --hcatalog-table product --null-string '\N' --null-non-string '\N' --compress

# 增量抽取销售订单表  
sqoop job --exec myjob_incremental_import   

这个文件与初始抽取的shell脚本基本相同,只是去掉了创建Sqoop作业的命令。每次装载后,都会将已经导入的最大执行时的时间戳赋予增量抽取作业的last-value。可以使用以下命令查看当前的last-value:

代码语言:javascript复制
sqoop job --show myjob_incremental_import | grep incremental.last.value | awk '{print $3 " " $4}'

将文件修改为可执行模式:

代码语言:javascript复制
chmod 755 ~/regular_extract.sh 

6. 建立定期装载OushuDB函数

代码语言:javascript复制
create or replace function fn_regular_load ()    
returns void as    
$$    
declare    
    -- 设置scd的生效时间  
    v_cur_date date := current_date;      
    v_pre_date date := current_date - 1;  
    v_last_load date;  
begin  
    -- 分析rds模式的表  
    analyze rds.customer;  
    analyze rds.product;  
    analyze rds.sales_order;  
  
    -- 设置cdc的上限时间  
    select last_load into v_last_load from rds.cdc_time;  
    truncate table rds.cdc_time;  
    insert into rds.cdc_time select v_last_load, v_cur_date;  
  
    -- 装载客户维度  
    insert into tds.customer_dim  
    (customer_number,  
     customer_name,  
     customer_street_address,  
     customer_zip_code,  
     customer_city,  
     customer_state,  
     isdelete,  
     version,  
     effective_date)  
    select case flag   
                when 'D' then a_customer_number  
                else b_customer_number  
            end customer_number,  
           case flag   
                when 'D' then a_customer_name  
                else b_customer_name  
            end customer_name,  
           case flag   
                when 'D' then a_customer_street_address  
                else b_customer_street_address  
            end customer_street_address,  
           case flag   
                when 'D' then a_customer_zip_code  
                else b_customer_zip_code  
            end customer_zip_code,  
           case flag   
                when 'D' then a_customer_city  
                else b_customer_city  
            end customer_city,  
           case flag   
                when 'D' then a_customer_state  
                else b_customer_state  
            end customer_state,  
           case flag   
                when 'D' then true  
                else false  
            end isdelete,  
           case flag   
                when 'D' then a_version  
                when 'I' then 1  
                else a_version   1  
            end v,  
           v_pre_date  
      from (select a.customer_number a_customer_number,  
                   a.customer_name a_customer_name,  
                   a.customer_street_address a_customer_street_address,  
                   a.customer_zip_code a_customer_zip_code,  
                   a.customer_city a_customer_city,  
                   a.customer_state a_customer_state,  
                   a.version a_version,  
                   b.customer_number b_customer_number,  
                   b.customer_name b_customer_name,  
                   b.customer_street_address b_customer_street_address,  
                   b.customer_zip_code b_customer_zip_code,  
                   b.customer_city b_customer_city,  
                   b.customer_state b_customer_state,  
                   case when a.customer_number is null then 'I'  
                        when b.customer_number is null then 'D'  
                        else 'U'   
                    end flag  
              from v_customer_dim_latest a   
              full join rds.customer b on a.customer_number = b.customer_number   
             where a.customer_number is null -- 新增  
                or b.customer_number is null -- 删除  
                or (a.customer_number = b.customer_number   
                    and not   
                           (a.customer_name = b.customer_name   
                        and a.customer_street_address = b.customer_street_address   
                        and a.customer_zip_code = b.customer_zip_code  
                        and a.customer_city = b.customer_city   
                        and a.customer_state = b.customer_state))) t  
             order by coalesce(a_customer_number, 999999999999), b_customer_number limit 999999999999;  
  
    -- 装载产品维度  
    insert into tds.product_dim  
    (product_code,  
     product_name,  
     product_category,       
     isdelete,  
     version,  
     effective_date)  
    select case flag   
                when 'D' then a_product_code  
                else b_product_code  
            end product_code,  
           case flag   
                when 'D' then a_product_name  
                else b_product_name  
            end product_name,  
           case flag   
                when 'D' then a_product_category  
                else b_product_category  
            end product_category,  
           case flag   
                when 'D' then true  
                else false  
            end isdelete,  
           case flag   
                when 'D' then a_version  
                when 'I' then 1  
                else a_version   1  
            end v,  
           v_pre_date  
      from (select a.product_code a_product_code,  
                   a.product_name a_product_name,  
                   a.product_category a_product_category,  
                   a.version a_version,  
                   b.product_code b_product_code,  
                   b.product_name b_product_name,  
                   b.product_category b_product_category,                 
                   case when a.product_code is null then 'I'  
                        when b.product_code is null then 'D'  
                        else 'U'   
                    end flag  
              from v_product_dim_latest a   
              full join rds.product b on a.product_code = b.product_code   
             where a.product_code is null -- 新增  
                or b.product_code is null -- 删除  
                or (a.product_code = b.product_code   
                    and not   
                           (a.product_name = b.product_name   
                        and a.product_category = b.product_category))) t  
             order by coalesce(a_product_code, 999999999999), b_product_code limit 999999999999;  
  
    -- 装载order维度    
    insert into order_dim (order_number, isdelete, version, effective_date)   
    select t.order_number, t.isdelete, t.v, t.effective_date    
      from (select order_number, false isdelete, 1 v, order_date effective_date     
              from rds.sales_order, rds.cdc_time     
             where entry_date >= last_load and entry_date < current_load) t;  
  
    -- 装载销售订单事实表    
    insert into sales_order_fact    
    select order_sk,    
           customer_sk,    
           product_sk,    
           date_sk,  
           year * 100   month,       
           order_amount    
      from rds.sales_order a,    
           order_dim b,    
           v_customer_dim_his c,    
           v_product_dim_his d,    
           date_dim e,    
           rds.cdc_time f    
     where a.order_number = b.order_number    
       and a.customer_number = c.customer_number    
       and a.order_date >= c.effective_date  
       and a.order_date < c.expiry_date     
       and a.product_code = d.product_code    
       and a.order_date >= d.effective_date  
       and a.order_date < d.expiry_date     
       and date(a.order_date) = e.date    
       and a.entry_date >= f.last_load and a.entry_date < f.current_load;                
  
    -- 分析tds模式的表  
    analyze customer_dim;  
    analyze product_dim;  
    analyze order_dim;  
    analyze sales_order_fact;  
  
    -- 更新时间戳表的last_load字段    
    truncate table rds.cdc_time;  
    insert into rds.cdc_time select v_cur_date, v_cur_date;  
  
end;    
$$    
language plpgsql;  

说明:

  • 同初始装载一样,RDS模式表的数据来自从Hive ORC表,而Hive ORC表数据是通过执行regular_extract.sh脚本用Sqoop抽取而来。rds.customer和rds.product全量装载,rds.sales_order增量装载。
  • 脚本中设置三个变量,v_last_load和v_cur_date分别赋予起始日期、终止日期,并且将时间戳表rds.cdc_time的last_load和current_load字段分别设置为起始日期和终止日期。v_pre_date表示版本过期日期。
  • 维度表数据可能是新增、修改或删除。这里用FULL JOIN连接原始数据表与维度当前版本视图,统一处理这三种情况。外查询中使用CASE语句判断属于哪种情况,分别取得不同的字段值。
  • 为了保证数据插入维度表时,代理键与业务主键保持相同的顺序,必须使用“order by coalesce(a_product_code, 999999999999), b_product_code limit 999999999999;”类似的语句。
  • 订单维度增量装载,没有历史版本问题。
  • 装载事实表时连接维度历史视图,引用事实数据所对应的维度代理键。该代理键可以通过维度版本的生效日期、过期日期区间唯一确定。
  • 装载数据后,执行查询前,分析表以提高查询性能。
  • 数据装载完成后,更新数据处理时间窗口。

7. 用root用户建立定期ETL脚本

用root操作系统用户建立初始ETL脚本文件~/regular_etl.sh,内容如下:

代码语言:javascript复制
#!/bin/bash  
  
# 先使用hdfs用户删除全量装载的外部表目录  
su - hdfs -c 'hdfs dfs -rm -r -skipTrash /data/rds/customer/*'
su - hdfs -c 'hdfs dfs -rm -r -skipTrash /data/rds/product/*' 
  
# 使用sqoop用户执行定期抽取脚本  
su - sqoop -c '~/regular_extract.sh'  
  
# 使用gpadmin用户执行定期装载函数  
su - gpadmin -c 'export PGPASSWORD=123456;psql -U dwtest -d dw -h hdp2 -c "set search_path=tds;select fn_regular_load ();"'  

该文件的作用与初始ETL的shell脚本基本相同,为定期ETL提供统一的执行入口。 将文件修改为可执行模式:

代码语言:javascript复制
chmod 755 ~/regular_etl.sh  

8. 测试定期ETL过程

(1)准备测试数据 在MySQL数据库中执行下面的SQL脚本准备源数据库中的客户、产品和销售订单测试数据。

代码语言:javascript复制
use source;    
    
/***       
客户数据的改变如下:    
客户6的街道号改为7777 ritter rd。(原来是7070 ritter rd)    
客户7的姓名改为distinguished agencies。(原来是distinguished partners)    
新增第八个客户。    
***/    
update customer set customer_street_address = '7777 ritter rd.' where customer_number = 6 ;    
update customer set customer_name = 'distinguished agencies' where customer_number = 7 ;    
insert into customer    
(customer_name, customer_street_address, customer_zip_code, customer_city, customer_state)    
values    
('subsidiaries', '10000 wetline blvd.', 17055, 'pittsburgh', 'pa') ;    
    
/***    
产品数据的改变如下:    
产品3的名称改为flat panel。(原来是lcd panel)    
新增第四个产品。    
***/    
update product set product_name = 'flat panel' where product_code = 3 ;    
insert into product    
(product_name, product_category)    
values    
('keyboard', 'peripheral') ;     
      
/***      
新增订单日期为昨天的16条订单。      
***/    
set @start_date := unix_timestamp(date_sub(current_date, interval 1 day));    
set @end_date := unix_timestamp(current_date);   
drop table if exists temp_sales_order_data;    
create table temp_sales_order_data as select * from sales_order where 1=0;     
    
set @order_date := from_unixtime(@start_date   rand() * (@end_date - @start_date));    
set @amount := floor(1000   rand() * 9000);    
insert into temp_sales_order_data values (101, 1, 1, @order_date, @order_date, @amount);    
    
set @order_date := from_unixtime(@start_date   rand() * (@end_date - @start_date));    
set @amount := floor(1000   rand() * 9000);    
insert into temp_sales_order_data values (102, 2, 2, @order_date, @order_date, @amount);    
    
set @order_date := from_unixtime(@start_date   rand() * (@end_date - @start_date));    
set @amount := floor(1000   rand() * 9000);    
insert into temp_sales_order_data values (103, 3, 3, @order_date, @order_date, @amount);    
    
set @order_date := from_unixtime(@start_date   rand() * (@end_date - @start_date));    
set @amount := floor(1000   rand() * 9000);    
insert into temp_sales_order_data values (104, 4, 4, @order_date, @order_date, @amount);    
    
set @order_date := from_unixtime(@start_date   rand() * (@end_date - @start_date));    
set @amount := floor(1000   rand() * 9000);    
insert into temp_sales_order_data values (105, 5, 2, @order_date, @order_date, @amount);    
    
set @order_date := from_unixtime(@start_date   rand() * (@end_date - @start_date));    
set @amount := floor(1000   rand() * 9000);    
insert into temp_sales_order_data values (106, 6, 2, @order_date, @order_date, @amount);    
    
set @order_date := from_unixtime(@start_date   rand() * (@end_date - @start_date));    
set @amount := floor(1000   rand() * 9000);    
insert into temp_sales_order_data values (107, 7, 3, @order_date, @order_date, @amount);    
    
set @order_date := from_unixtime(@start_date   rand() * (@end_date - @start_date));    
set @amount := floor(1000   rand() * 9000);    
insert into temp_sales_order_data values (108, 8, 4, @order_date, @order_date, @amount);    
    
set @order_date := from_unixtime(@start_date   rand() * (@end_date - @start_date));    
set @amount := floor(1000   rand() * 9000);    
insert into temp_sales_order_data values (109, 1, 1, @order_date, @order_date, @amount);    
    
set @order_date := from_unixtime(@start_date   rand() * (@end_date - @start_date));    
set @amount := floor(1000   rand() * 9000);    
insert into temp_sales_order_data values (110, 2, 2, @order_date, @order_date, @amount);    
    
set @order_date := from_unixtime(@start_date   rand() * (@end_date - @start_date));    
set @amount := floor(1000   rand() * 9000);    
insert into temp_sales_order_data values (111, 3, 3, @order_date, @order_date, @amount);    
      
set @order_date := from_unixtime(@start_date   rand() * (@end_date - @start_date));    
set @amount := floor(1000   rand() * 9000);    
insert into temp_sales_order_data values (112, 4, 4, @order_date, @order_date, @amount);    
    
set @order_date := from_unixtime(@start_date   rand() * (@end_date - @start_date));    
set @amount := floor(1000   rand() * 9000);    
insert into temp_sales_order_data values (113, 5, 1, @order_date, @order_date, @amount);    
    
set @order_date := from_unixtime(@start_date   rand() * (@end_date - @start_date));    
set @amount := floor(1000   rand() * 9000);    
insert into temp_sales_order_data values (114, 6, 2, @order_date, @order_date, @amount);    
    
set @order_date := from_unixtime(@start_date   rand() * (@end_date - @start_date));    
set @amount := floor(1000   rand() * 9000);    
insert into temp_sales_order_data values (115, 7, 3, @order_date, @order_date, @amount);    
    
set @order_date := from_unixtime(@start_date   rand() * (@end_date - @start_date));    
set @amount := floor(1000   rand() * 9000);    
insert into temp_sales_order_data values (116, 8, 4, @order_date, @order_date, @amount);    
    
insert into sales_order    
select null,customer_number,product_code,order_date,entry_date,order_amount from temp_sales_order_data order by order_date;      
    
commit ;  

(2)重建Sqoop作业

代码语言:javascript复制
sqoop job --delete myjob_incremental_import  
sqoop job --create myjob_incremental_import 
-- import 
--connect "jdbc:mysql://172.16.1.127:3306/source?usessl=false&user=dwtest&password=123456" 
--table sales_order 
--hcatalog-database test 
--hcatalog-table sales_order 
--compress 
--incremental lastmodified 
--check-column entry_date 
--last-value "2018-05-10 00:00:00"

这步操作修改sqoop job的last-value值,只是为了测试,实际运行时不需要重建Sqoop作业。 (3)执行定期ETL脚本 用root用户执行定期ETL脚本。

代码语言:javascript复制
~/regular_etl.sh  

(4)查询数据,确认ETL过程正确执行 查询客户维度当前视图,结果如图2所示。

代码语言:javascript复制
select customer_sk,  
       customer_number,  
       customer_name,  
       customer_street_address,  
       version,  
       effective_date   
  from v_customer_dim_latest  
 order by customer_number; 

图2

查询客户维度历史视图,结果如图3所示。

代码语言:javascript复制
select customer_sk c_sk,  
       customer_number c_num,  
       customer_name c_name,  
       customer_street_address c_address,  
       version,  
       effective_date,  
       expiry_date,  
       isdelete      
  from v_customer_dim_his  
 order by customer_number, version; 

图3

查询产品维度当前视图,结果如图4所示。

代码语言:javascript复制
select product_sk,  
       product_code,  
       product_name,  
       version,  
       effective_date  
  from v_product_dim_latest  
 order by product_code; 

图4

查询客户维度历史视图,结果如图5所示。

代码语言:javascript复制
select product_sk,  
       product_code,  
       product_name,  
       version,  
       effective_date,  
       expiry_date  
  from v_product_dim_his  
 order by product_code, version; 

图5

查询订单维度表和事实表,结果如图6所示,新装载了16条订单记录。

代码语言:javascript复制
select count(*) from order_dim;   
select count(*) from sales_order_fact; 

图6

查询事实表数据,结果如图7所示。

代码语言:javascript复制
select * from sales_order_fact   
 where order_sk > 100  
 order by order_sk; 

图7

可以看到,customer_sk没有6,7,而是8、9,10为新增;product_sk用4代替3,5为新增。 查询时间窗口表,结果如图8所示。

代码语言:javascript复制
select * from rds.cdc_time; 

图8

可以看到时间窗口已经更新。

9. 动态分区滚动

tds.sales_order_fact是按月做的范围分区,需要进一步设计滚动分区维护策略。通过维护一个数据滚动窗口,删除老分区,添加新分区,将老分区的数据迁移到数据仓库以外的次级存储,以节省系统开销。下面的OushuDB函数按照转储最老分区数据、删除最老分区数据、建立新分区的步骤动态滚动分区。

代码语言:javascript复制
-- 创建动态滚动分区的函数  
create or replace function tds.fn_rolling_partition(p_year_month_start date) returns int    
as $body$    
declare    
    v_min_partitiontablename name;  
    v_year_month_end date := p_year_month_start   interval '1 month';  
    v_year_month_start_int int := extract(year from p_year_month_start) * 100   extract(month from p_year_month_start);  
    v_year_month_end_int int := extract(year from v_year_month_end) * 100   extract(month from v_year_month_end);  
    sqlstring varchar(1000);      
begin   
     
    -- 处理tds.sales_order_fact       
        
    -- 转储最早一个月的数据,    
    select partitiontablename  
      into v_min_partitiontablename  
      from pg_partitions    
     where tablename='sales_order_fact' and partitionrank = 1;  
   
    sqlstring = 'copy (select * from ' || v_min_partitiontablename || ') to ''/home/gpadmin/sales_order_fact_' || cast(v_year_month_start_int as varchar) || '.txt'' with delimiter ''|'';';    
    execute sqlstring;    
    -- raise notice '%', sqlstring;    
    
    -- 删除最早月份对应的分区    
    sqlstring := 'alter table sales_order_fact drop partition for (rank(1));';    
    execute sqlstring;    
        
    -- 增加下一个月份的新分区    
    sqlstring := 'alter table sales_order_fact add partition start ('||cast(v_year_month_start_int as varchar)||') inclusive end ('||cast(v_year_month_end_int as varchar)||') exclusive;';    
    execute sqlstring;    
    -- raise notice '%', sqlstring;   
      
    -- 正常返回1    
    return 1;    
        
-- 异常返回0    
exception when others then     
    raise exception '%: %', sqlstate, sqlerrm;      
    return 0;    
end    
$body$ language plpgsql;  

将执行该函数的psql命令行放到cron中自动执行。下面的例子表示每月1号2点执行分区滚动操作。假设数据仓库中只保留最近一年的销售数据。

代码语言:javascript复制
0 2 1 * * psql -d dw -c "set search_path=rds,tds; select fn_rolling_partition(date(date_trunc('month',current_date)   interval '1 month'));" > rolling_partition.log 2>&1  

参考:

  • HAWQ取代传统数仓实践(三)——初始ETL(Sqoop、HAWQ)
  • HAWQ取代传统数仓实践(四)——定期ETL(Sqoop、HAWQ)

0 人点赞