基于Hadoop生态圈的数据仓库实践 —— 进阶技术(十三)

2019-05-25 19:40:46 浏览数 (1)

十三、无事实的事实表 本节讨论一种技术,用来处理源数据中没有度量的需求。例如,产品源数据不包含产品数量信息,如果系统需要得到产品的数量,很显然不能简单地从数据仓库中直接得到。这时就要用到无事实的事实表技术。使用此技术可以通过持续跟踪产品的发布来计算产品的数量。可以创建一个只有产品(计什么数)和日期(什么时候计数)维度代理键的事实表。之所以叫做无事实的事实表是因为表本身并没有度量。 1. 产品发布的无事实事实表 本小节说明如何实现一个产品发布的无事实事实表,包括新增和初始装载product_count_fact表。下图显示了跟踪产品发布数量的数据仓库模式(只显示与product_count_fact表有关的表)。

执行下面的脚本创建产品发布日期视图和无事实事实表。

代码语言:javascript复制
USE dw;  
  
CREATE VIEW product_launch_date_dim 
(product_launch_date_sk, 
 product_launch_date, 
 month_name, 
 month, 
 quarter, 
 year, 
 promo_ind) 
AS  
SELECT DISTINCT  
       date_sk,  
       date,  
       month_name,  
       month,  
       quarter,  
       year,  
       promo_ind
  FROM product_dim a, date_dim b  
 WHERE a.effective_date = b.date;  
  
CREATE TABLE product_count_fact (  
    product_sk INT,  
    product_launch_date_sk INT);

说明:产品发布日期视图只取产品生效日期,并不是日期维度里的所有日期。product_launch_date_dim维度表是日期维度表的子集。 2. 初始装载product_count_fact表 下面的脚本从product_dim表向product_count_fact表装载已有的产品发布信息。脚本里的insert添加所有产品的第一个版本(即产品的首次发布日期)。这里使用Hive的窗口函数row_number正确地选取了产品发布时的生效日期,而不是一个SCD2行的生效日期。

代码语言:javascript复制
use dw;
insert overwrite table product_count_fact 
select product_sk,date_sk
  from (select a.product_sk product_sk,
               a.product_code product_code,
               b.date_sk date_sk,
               row_number() over (partition by a.product_code order by b.date_sk) rn
          from product_dim a,date_dim b
         where a.effective_date = b.date) t
 where rn = 1;

使用下面的语句查询product_count_fact表以确认正确执行了初始装载。

代码语言:javascript复制
select product_sk,product_launch_date_sk from dw.product_count_fact;

查询结果如下图所示。

3. 修改定期装载脚本 修改了数据仓库模式后,还需要修改定期装载脚本。该脚本在导入product_dim表后还要导入product_count_fact表。下面显示了修改后的定期装载脚本。实际上只是把上面的初始装载脚本加在了装载销售订单事实表之前。

代码语言:javascript复制
-- 设置环境与时间窗口  
!run /root/set_time.sql   
    
-- 装载customer维度    
-- 设置已删除记录和地址相关列上SCD2的过期,用<=>运算符处理NULL值。    
UPDATE customer_dim     
   SET expiry_date = ${hivevar:pre_date}      
 WHERE customer_dim.customer_sk IN      
(SELECT a.customer_sk     
   FROM (SELECT customer_sk,    
                customer_number,    
                customer_street_address,    
                customer_zip_code,    
                customer_city,    
                customer_state,    
                shipping_address,    
                shipping_zip_code,    
                shipping_city,    
                shipping_state    
           FROM customer_dim WHERE expiry_date = ${hivevar:max_date}) a LEFT JOIN     
                rds.customer b ON a.customer_number = b.customer_number     
          WHERE b.customer_number IS NULL OR     
          (  !(a.customer_street_address <=> b.customer_street_address)    
          OR !(a.customer_zip_code <=> b.customer_zip_code)    
          OR !(a.customer_city <=> b.customer_city)    
          OR !(a.customer_state <=> b.customer_state)    
          OR !(a.shipping_address <=> b.shipping_address)    
          OR !(a.shipping_zip_code <=> b.shipping_zip_code)    
          OR !(a.shipping_city <=> b.shipping_city)    
          OR !(a.shipping_state <=> b.shipping_state)    
          ));     
    
-- 处理customer_street_addresses列上SCD2的新增行      
INSERT INTO customer_dim    
SELECT    
    ROW_NUMBER() OVER (ORDER BY t1.customer_number)   t2.sk_max,    
    t1.customer_number,    
    t1.customer_name,    
    t1.customer_street_address,    
    t1.customer_zip_code,    
    t1.customer_city,    
    t1.customer_state,    
    t1.shipping_address,    
    t1.shipping_zip_code,    
    t1.shipping_city,    
    t1.shipping_state,    
    t1.version,    
    t1.effective_date,    
    t1.expiry_date    
FROM      
(      
SELECT      
    t2.customer_number customer_number,    
    t2.customer_name customer_name,    
    t2.customer_street_address customer_street_address,    
    t2.customer_zip_code customer_zip_code,    
    t2.customer_city customer_city,    
    t2.customer_state customer_state,    
    t2.shipping_address shipping_address,    
    t2.shipping_zip_code shipping_zip_code,    
    t2.shipping_city shipping_city,    
    t2.shipping_state shipping_state,    
    t1.version   1 version,    
    ${hivevar:pre_date} effective_date,      
    ${hivevar:max_date} expiry_date      
 FROM customer_dim t1     
INNER JOIN rds.customer t2      
   ON t1.customer_number = t2.customer_number       
  AND t1.expiry_date = ${hivevar:pre_date}      
 LEFT JOIN customer_dim t3     
   ON t1.customer_number = t3.customer_number     
  AND t3.expiry_date = ${hivevar:max_date}      
WHERE (!(t1.customer_street_address <=> t2.customer_street_address)    
   OR  !(t1.customer_zip_code <=> t2.customer_zip_code)    
   OR  !(t1.customer_city <=> t2.customer_city)    
   OR  !(t1.customer_state <=> t2.customer_state)    
   OR  !(t1.shipping_address <=> t2.shipping_address)    
   OR  !(t1.shipping_zip_code <=> t2.shipping_zip_code)    
   OR  !(t1.shipping_city <=> t2.shipping_city)    
   OR  !(t1.shipping_state <=> t2.shipping_state)    
   )    
  AND t3.customer_sk IS NULL) t1      
CROSS JOIN      
(SELECT COALESCE(MAX(customer_sk),0) sk_max FROM customer_dim) t2;    
    
-- 处理customer_name列上的SCD1    
-- 因为hive的update的set子句还不支持子查询,所以这里使用了一个临时表存储需要更新的记录,用先delete再insert代替update    
-- 因为SCD1本身就不保存历史数据,所以这里更新维度表里的所有customer_name改变的记录,而不是仅仅更新当前版本的记录    
DROP TABLE IF EXISTS tmp;    
CREATE TABLE tmp AS    
SELECT    
    a.customer_sk,    
    a.customer_number,    
    b.customer_name,    
    a.customer_street_address,    
    a.customer_zip_code,    
    a.customer_city,    
    a.customer_state,    
    a.shipping_address,    
    a.shipping_zip_code,    
    a.shipping_city,    
    a.shipping_state,    
    a.version,    
    a.effective_date,    
    a.expiry_date    
  FROM customer_dim a, rds.customer b      
 WHERE a.customer_number = b.customer_number AND !(a.customer_name <=> b.customer_name);      
DELETE FROM customer_dim WHERE customer_dim.customer_sk IN (SELECT customer_sk FROM tmp);      
INSERT INTO customer_dim SELECT * FROM tmp;    
    
-- 处理新增的customer记录     
INSERT INTO customer_dim    
SELECT    
    ROW_NUMBER() OVER (ORDER BY t1.customer_number)   t2.sk_max,    
    t1.customer_number,    
    t1.customer_name,    
    t1.customer_street_address,    
    t1.customer_zip_code,    
    t1.customer_city,    
    t1.customer_state,    
    t1.shipping_address,    
    t1.shipping_zip_code,    
    t1.shipping_city,    
    t1.shipping_state,    
    1,    
    ${hivevar:pre_date},    
    ${hivevar:max_date}    
FROM      
(      
SELECT t1.* FROM rds.customer t1 LEFT JOIN customer_dim t2 ON t1.customer_number = t2.customer_number      
 WHERE t2.customer_sk IS NULL) t1      
CROSS JOIN      
(SELECT COALESCE(MAX(customer_sk),0) sk_max FROM customer_dim) t2;    
    
-- 重载PA客户维度    
TRUNCATE TABLE pa_customer_dim;      
INSERT INTO pa_customer_dim      
SELECT      
  customer_sk      
, customer_number      
, customer_name      
, customer_street_address      
, customer_zip_code      
, customer_city      
, customer_state      
, shipping_address      
, shipping_zip_code      
, shipping_city      
, shipping_state      
, version      
, effective_date      
, expiry_date      
FROM customer_dim      
WHERE customer_state = 'PA' ;     
    
-- 装载product维度    
-- 设置已删除记录和product_name、product_category列上SCD2的过期    
UPDATE product_dim    
   SET expiry_date = ${hivevar:pre_date}      
 WHERE product_dim.product_sk IN      
(SELECT a.product_sk     
   FROM (SELECT product_sk,product_code,product_name,product_category     
           FROM product_dim WHERE expiry_date = ${hivevar:max_date}) a LEFT JOIN     
                rds.product b ON a.product_code = b.product_code     
          WHERE b.product_code IS NULL OR (a.product_name <> b.product_name OR a.product_category <> b.product_category));    
    
-- 处理product_name、product_category列上SCD2的新增行      
INSERT INTO product_dim    
SELECT    
    ROW_NUMBER() OVER (ORDER BY t1.product_code)   t2.sk_max,    
    t1.product_code,    
    t1.product_name,    
    t1.product_category,    
    t1.version,    
    t1.effective_date,    
    t1.expiry_date    
FROM      
(      
SELECT      
    t2.product_code product_code,    
    t2.product_name product_name,    
    t2.product_category product_category,        
    t1.version   1 version,    
    ${hivevar:pre_date} effective_date,      
    ${hivevar:max_date} expiry_date      
 FROM product_dim t1     
INNER JOIN rds.product t2      
   ON t1.product_code = t2.product_code      
  AND t1.expiry_date = ${hivevar:pre_date}      
 LEFT JOIN product_dim t3     
   ON t1.product_code = t3.product_code     
  AND t3.expiry_date = ${hivevar:max_date}      
WHERE (t1.product_name <> t2.product_name OR t1.product_category <> t2.product_category) AND t3.product_sk IS NULL) t1      
CROSS JOIN      
(SELECT COALESCE(MAX(product_sk),0) sk_max FROM product_dim) t2;    
    
-- 处理新增的product记录    
INSERT INTO product_dim    
SELECT    
    ROW_NUMBER() OVER (ORDER BY t1.product_code)   t2.sk_max,    
    t1.product_code,    
    t1.product_name,    
    t1.product_category,    
    1,    
    ${hivevar:pre_date},    
    ${hivevar:max_date}    
FROM      
(      
SELECT t1.* FROM rds.product t1 LEFT JOIN product_dim t2 ON t1.product_code = t2.product_code      
 WHERE t2.product_sk IS NULL) t1      
CROSS JOIN      
(SELECT COALESCE(MAX(product_sk),0) sk_max FROM product_dim) t2;    

-- 装载product_count_fact表
insert overwrite table product_count_fact 
select product_sk,date_sk
  from (select a.product_sk product_sk,
               a.product_code product_code,
               b.date_sk date_sk,
               row_number() over (partition by a.product_code order by b.date_sk) rn
          from product_dim a,date_dim b
         where a.effective_date = b.date) t
 where rn = 1;
 
-- 装载销售订单事实表 
-- 前一天新增的销售订单   
INSERT INTO sales_order_fact    
SELECT    
    a.order_number,    
    customer_sk,    
    product_sk, 
    g.sales_order_attribute_sk,
    e.order_date_sk,
    null,
    null,
    null,
    null,
    null,
    null,
    null,
    null,
    f.request_delivery_date_sk,
    order_amount,    
    quantity    
  FROM    
    rds.sales_order a,     
    customer_dim c,    
    product_dim d,    
    order_date_dim e,  
    request_delivery_date_dim f, 
    sales_order_attribute_dim g,
    rds.cdc_time h
 WHERE 
    a.order_status = 'N'
AND a.customer_number = c.customer_number    
AND a.status_date >= c.effective_date    
AND a.status_date < c.expiry_date    
AND a.product_code = d.product_code    
AND a.status_date >= d.effective_date    
AND a.status_date < d.expiry_date    
AND to_date(a.status_date) = e.order_date   
AND to_date(a.request_delivery_date) = f.request_delivery_date
AND a.verification_ind = g.verification_ind  
AND a.credit_check_flag = g.credit_check_flag  
AND a.new_customer_ind = g.new_customer_ind  
AND a.web_order_flag = g.web_order_flag 
AND a.entry_date >= h.last_load AND a.entry_date < h.current_load ;    

-- 处理分配库房、打包、配送和收货四个状态
DROP TABLE IF EXISTS tmp;
CREATE TABLE tmp AS
select t0.order_number order_number,
       t0.customer_sk customer_sk,
       t0.product_sk product_sk,
       t0.sales_order_attribute_sk,
       t0.order_date_sk order_date_sk,
       t2.allocate_date_sk allocate_date_sk,
       t1.quantity allocate_quantity,
       t0.packing_date_sk packing_date_sk,
       t0.packing_quantity packing_quantity,
       t0.ship_date_sk ship_date_sk,
       t0.ship_quantity ship_quantity,
       t0.receive_date_sk receive_date_sk,
       t0.receive_quantity receive_quantity,
       t0.request_delivery_date_sk request_delivery_date_sk,
       t0.order_amount order_amount,
       t0.order_quantity order_quantity
  from sales_order_fact t0,
       rds.sales_order t1,
       allocate_date_dim t2,
       rds.cdc_time t4
 where t0.order_number = t1.order_number and t1.order_status = 'A' 
   and to_date(t1.status_date) = t2.allocate_date
   and t1.entry_date >= t4.last_load and t1.entry_date < t4.current_load;

DELETE FROM sales_order_fact WHERE sales_order_fact.order_number IN (SELECT order_number FROM tmp); 
INSERT INTO sales_order_fact SELECT * FROM tmp;

DROP TABLE IF EXISTS tmp;
CREATE TABLE tmp AS
select t0.order_number order_number,
       t0.customer_sk customer_sk,
       t0.product_sk product_sk,
       t0.sales_order_attribute_sk,
       t0.order_date_sk order_date_sk,
       t0.allocate_date_sk allocate_date_sk,
       t0.allocate_quantity allocate_quantity,
       t2.packing_date_sk packing_date_sk,
       t1.quantity packing_quantity,
       t0.ship_date_sk ship_date_sk,
       t0.ship_quantity ship_quantity,
       t0.receive_date_sk receive_date_sk,
       t0.receive_quantity receive_quantity,
       t0.request_delivery_date_sk request_delivery_date_sk,
       t0.order_amount order_amount,
       t0.order_quantity order_quantity
  from sales_order_fact t0,
       rds.sales_order t1,
       packing_date_dim t2,
       rds.cdc_time t4
 where t0.order_number = t1.order_number and t1.order_status = 'P' 
   and to_date(t1.status_date) = t2.packing_date
   and t1.entry_date >= t4.last_load and t1.entry_date < t4.current_load; 
   
DELETE FROM sales_order_fact WHERE sales_order_fact.order_number IN (SELECT order_number FROM tmp); 
INSERT INTO sales_order_fact SELECT * FROM tmp;

DROP TABLE IF EXISTS tmp;
CREATE TABLE tmp AS
select t0.order_number order_number,
       t0.customer_sk customer_sk,
       t0.product_sk product_sk,
       t0.sales_order_attribute_sk,
       t0.order_date_sk order_date_sk,
       t0.allocate_date_sk allocate_date_sk,
       t0.allocate_quantity allocate_quantity,
       t0.packing_date_sk packing_date_sk,
       t0.packing_quantity packing_quantity,
       t2.ship_date_sk ship_date_sk,
       t1.quantity ship_quantity,
       t0.receive_date_sk receive_date_sk,
       t0.receive_quantity receive_quantity,
       t0.request_delivery_date_sk request_delivery_date_sk,
       t0.order_amount order_amount,
       t0.order_quantity order_quantity
  from sales_order_fact t0,
       rds.sales_order t1,
       ship_date_dim t2,
       rds.cdc_time t4
 where t0.order_number = t1.order_number and t1.order_status = 'S' 
   and to_date(t1.status_date) = t2.ship_date
   and t1.entry_date >= t4.last_load and t1.entry_date < t4.current_load;
   
DELETE FROM sales_order_fact WHERE sales_order_fact.order_number IN (SELECT order_number FROM tmp); 
INSERT INTO sales_order_fact SELECT * FROM tmp;

DROP TABLE IF EXISTS tmp;
CREATE TABLE tmp AS
select t0.order_number order_number,
       t0.customer_sk customer_sk,
       t0.product_sk product_sk,
       t0.sales_order_attribute_sk,
       t0.order_date_sk order_date_sk,
       t0.allocate_date_sk allocate_date_sk,
       t0.allocate_quantity allocate_quantity,
       t0.packing_date_sk packing_date_sk,
       t0.packing_quantity packing_quantity,
       t0.ship_date_sk ship_date_sk,
       t0.ship_quantity ship_quantity,
       t2.receive_date_sk receive_date_sk,
       t1.quantity receive_quantity,
       t0.request_delivery_date_sk request_delivery_date_sk,
       t0.order_amount order_amount,
       t0.order_quantity order_quantity
  from sales_order_fact t0,
       rds.sales_order t1,
       receive_date_dim t2,
       rds.cdc_time t4
 where t0.order_number = t1.order_number and t1.order_status = 'R' 
   and to_date(t1.status_date) = t2.receive_date
   and t1.entry_date >= t4.last_load and t1.entry_date < t4.current_load;
   
DELETE FROM sales_order_fact WHERE sales_order_fact.order_number IN (SELECT order_number FROM tmp); 
INSERT INTO sales_order_fact SELECT * FROM tmp;

-- 更新时间戳表的last_load字段    
INSERT OVERWRITE TABLE rds.cdc_time SELECT current_load, current_load FROM rds.cdc_time;

3. 测试定期装载 (1)修改源数据库的product表 把产品编码为1的产品名称改为‘Regular Hard Disk Drive’,并新增一个产品‘High End Hard Disk Drive’(产品编码为5)。执行下面的脚本完成此修改。

代码语言:javascript复制
use source;  
  
update product set product_name = 'Regular Hard Disk Drive' where product_code=1;  
insert into product values (5, 'High End Hard Disk Drive', 'Storage');  
  
commit;

修改后的产品数据如下图所示。

(2)执行定期装载

代码语言:javascript复制
./regular_etl.sh

(3)通过查询product_count_fact表确认定期装载执行正确

代码语言:javascript复制
use dw;
select c.product_sk psk,
       c.product_code pc,
       b.product_launch_date_sk plsk,
       b.product_launch_date pld
  from product_count_fact a,
       product_launch_date_dim b,
       product_dim c
 where a.product_launch_date_sk = b.product_launch_date_sk
   and a.product_sk = c.product_sk
cluster by pc, pld;

查询结果如下图所示。

0 人点赞