基于Hadoop生态圈的数据仓库实践 —— 进阶技术(十一)

2019-05-25 19:40:29 浏览数 (1)

十一、多重星型模式 从“进阶技术”开始,已经通过增加列和表扩展了数据仓库,在进阶技术(五) “快照”里增加了第二个事实表,month_end_sales_order_fact表。这之后数据仓库模式就有了两个事实表(第一个是在开始建立数据仓库时创建的sales_order_fact表)。有了这两个事实表的数据仓库就是一个标准的双星型模式。 本节将在现有的维度数据仓库上再增加一个新的星型结构。与现有的与销售关联的星型结构不同,新的星型结构关注的是产品业务领域。新的星型结构有一个事实表和一个维度表,用于存储数据仓库中的产品数据。 1. 一个新的星型模式 下图显示了扩展后的数据仓库模式。

模式中有三个星型结构。sales_order_fact表是第一个星型结构的事实表,与其相关的维度表是customer_dim、product_dim、date_dim和sales_order_attribute_dim表。month_end_sales_order_fact表是第二个星型结构的事实表。product_dim和month_dim是其对应的维度表。第一个和第二个星型结构共享product_dim维度表。第二个星型结构的事实表和月份维度数据分别来自于第一个星型结构的事实表和date_dim维度表。它们不从源数据获得数据。第三个星型模式的事实表是新建的production_fact表。它的维度除了存储在已有的date_dim和product_dim表,还有一个新的factory_dim表。第三个星型结构的数据来自源数据。 执行下面的脚本建立第三个星型模式中的新表和对应的源数据表。

代码语言:javascript复制
-- 在MySQL源库上建立工厂表和每日产品表
USE source;  
CREATE TABLE factory_master (  
    factory_code INT,  
    factory_name CHAR(30),  
    factory_street_address CHAR(50),  
    factory_zip_code INT(5),  
    factory_city CHAR(30),  
    factory_state CHAR(2)  
); 
alter table factory_master add primary key (factory_code);

CREATE TABLE daily_production (  
    product_code INT,  
    production_date DATE,  
    factory_code INT,  
    production_quantity INT  
); 

ALTER TABLE daily_production ADD FOREIGN KEY (factory_code)   
REFERENCES factory_master(factory_code)  ON DELETE CASCADE ON UPDATE CASCADE ;  
ALTER TABLE daily_production ADD FOREIGN KEY (product_code)   
REFERENCES product(product_code)  ON DELETE CASCADE ON UPDATE CASCADE ;  
alter table daily_production add primary key (product_code,production_date,factory_code);

-- 在Hive的rds库上建立相应的过渡表
USE rds;  
CREATE TABLE factory_master (  
    factory_code INT,  
    factory_name VARCHAR(30),  
    factory_street_address VARCHAR(50),  
    factory_zip_code INT,  
    factory_city VARCHAR(30),  
    factory_state VARCHAR(2)  
);  

CREATE TABLE daily_production (  
    product_code INT,  
    production_date DATE,  
    factory_code INT,  
    production_quantity INT  
); 

-- 在Hive的dw库上建立相应的维度表和事实表
USE dw;  
CREATE TABLE factory_dim (  
    factory_sk INT,  
    factory_code INT,  
    factory_name VARCHAR(30),  
    factory_street_address VARCHAR(50),  
    factory_zip_code INT,  
    factory_city VARCHAR(30),  
    factory_state VARCHAR(2),  
    version int,  
    effective_date DATE,  
    expiry_date DATE  
)
clustered by (factory_sk) into 8 buckets      
stored as orc tblproperties ('transactional'='true');  
  
CREATE TABLE production_fact (  
  product_sk INT  
, production_date_sk INT  
, factory_sk INT  
, production_quantity INT  
);

2. 新建定期导入脚本文件 (1)新建抽取作业

代码语言:javascript复制
# 建立增量抽取每日产品表的作业,以production_date作为检查列,初始值是'1900-01-01'
last_value='1900-01-01'
sqoop job --delete myjob_incremental_import_daily_production --meta-connect jdbc:hsqldb:hsql://cdh2:16000/sqoop 
sqoop job 
--meta-connect jdbc:hsqldb:hsql://cdh2:16000/sqoop 
--create myjob_incremental_import_daily_production 
-- 
import 
--connect "jdbc:mysql://cdh1:3306/source?useSSL=false&user=root&password=mypassword" 
--table daily_production 
--columns "product_code,production_date,factory_code,production_quantity" 
--hive-import 
--hive-table rds.daily_production 
--incremental append 
--check-column production_date 
--last-value $last_value

新建定期装载每日产品脚本文件regular_etl_daily_production.sh,内容如下。

代码语言:javascript复制
#!/bin/bash
# 全量抽取工厂表
sqoop import --connect jdbc:mysql://cdh1:3306/source?useSSL=false --username root --password mypassword --table factory_master --hive-impo
rt --hive-table rds.factory_master --hive-overwrite
# 增量抽取每日产品表
sqoop job --exec myjob_incremental_import_daily_production --meta-connect jdbc:hsqldb:hsql://cdh2:16000/sqoop
# 调用regular_etl_daily_production.sql文件执行定期装载
beeline -u jdbc:hive2://cdh2:10000/dw -f regular_etl_daily_production.sql

为了和其它定期装载脚本共用环境和时间窗口设置,新建一个set_time.sql脚本,内容如下。

代码语言:javascript复制
-- 设置变量以支持事务    
set hive.support.concurrency=true;    
set hive.exec.dynamic.partition.mode=nonstrict;    
set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;    
set hive.compactor.initiator.on=true;    
set hive.compactor.worker.threads=1;    
    
USE dw;    
      
-- 设置SCD的生效时间和过期时间    
SET hivevar:cur_date = CURRENT_DATE();
SET hivevar:pre_date = DATE_ADD(${hivevar:cur_date},-1);    
SET hivevar:max_date = CAST('2200-01-01' AS DATE);    
      
-- 设置CDC的上限时间    
INSERT OVERWRITE TABLE rds.cdc_time SELECT last_load, ${hivevar:cur_date} FROM rds.cdc_time;

新建regular_etl_daily_production.sql脚本文件内容如下。

代码语言:javascript复制
-- 设置环境与时间窗口
!run /root/set_time.sql

-- 工厂信息很少修改,一般不需要保留历史,所以使用SCD1
drop table if exists tmp;
create table tmp as 
select a.factory_sk,
       a.factory_code,
       b.factory_name,
       b.factory_street_address,
       b.factory_zip_code,
       b.factory_city,
       b.factory_state,
       a.version,
       a.effective_date,
       a.expiry_date
  from factory_dim a,rds.factory_master b
 where a.factory_code = b.factory_code and 
     !(a.factory_name <=> b.factory_name 
   and a.factory_street_address <=> b.factory_street_address
   and a.factory_zip_code <=> b.factory_zip_code
   and a.factory_city <=> b.factory_city
   and a.factory_state <=> b.factory_state);

delete from factory_dim where factory_dim.factory_sk in (select factory_sk from tmp);
insert into factory_dim select * from tmp;

-- 添加新的工厂信息
INSERT INTO factory_dim    
SELECT    
    ROW_NUMBER() OVER (ORDER BY t1.factory_code)   t2.sk_max,    
    t1.factory_code,    
    t1.factory_name,    
    t1.factory_street_address,    
    t1.factory_zip_code,    
    t1.factory_city,    
    t1.factory_state,    
    1,    
    ${hivevar:pre_date},    
    ${hivevar:max_date}    
FROM      
(      
SELECT t1.* FROM rds.factory_master t1 LEFT JOIN factory_dim t2 ON t1.factory_code = t2.factory_code      
 WHERE t2.factory_sk IS NULL) t1      
CROSS JOIN      
(SELECT COALESCE(MAX(factory_sk),0) sk_max FROM factory_dim) t2; 

-- 装载每日产品事实表
INSERT INTO production_fact  
SELECT  
  b.product_sk  
, c.date_sk  
, d.factory_sk  
, production_quantity  
FROM  
  rds.daily_production a  
, product_dim b  
, date_dim c  
, factory_dim d  
WHERE  
    production_date = ${hivevar:pre_date} 
AND a.product_code = b.product_code  
AND a.production_date >= b.effective_date  
AND a.production_date <= b.expiry_date  
AND a.production_date = c.date  
AND a.factory_code = d.factory_code ;

3. 测试 到目前为止已经讨论了第三个星型结构里的所有表,现在做一些测试。首先需要一些工厂信息。执行下面的语句向源数据库的factory_master表中装载四个工厂信息。

代码语言:javascript复制
USE source;  
INSERT INTO factory_master VALUES  
  ( 1, 'First Factory', '11111 Lichtman St.', 17050,  
       'Mechanicsburg', 'PA' )  
, ( 2, 'Second Factory', '22222 Stobosky Ave.', 17055, 'Pittsburgh',  
       'PA' )  
, ( 3, 'Third Factory', '33333 Fritze Rd.', 17050, 'Mechanicsburg',  
       'PA' )  
, ( 4, 'Fourth Factory', '44444 Jenzen Blvd.', 17055, 'Pittsburgh',  
       'PA' );  
  
COMMIT ;

执行下面的语句向源数据库的daily_production表添加数据。

代码语言:javascript复制
USE source;  
set @yesterday:=date_sub(current_date, interval 1 day);
INSERT INTO daily_production VALUES  
  (1, @yesterday, 4, 100 )  
, (2, @yesterday, 3, 200 )  
, (3, @yesterday, 2, 300 )  
, (4, @yesterday, 1, 400 )  
, (1, @yesterday, 1, 400 )  
, (2, @yesterday, 2, 300 )  
, (3, @yesterday, 3, 200 )  
, (4, @yesterday, 4, 100 );  
  
COMMIT;

现在已经做好了测试产品定期装载的准备,使用下面的命令执行定期装载作业。

代码语言:javascript复制
./regular_etl_daily_production.sh

使用下面的SQL语句查询production_fact表,确认每天产品数据的定期装载是正确的。

代码语言:javascript复制
select product_sk product_sk,
       production_date_sk date_sk,
       factory_sk factory_sk,
       production_quantity quantity
  from dw.production_fact;

查询结果如下图所示,可以看到已经正确装载了8条每日产品记录。

为了确认工厂维度上成功应用了SCD1,使用下面的语句查询factory_dim表。

代码语言:javascript复制
select factory_sk sk,
       factory_code c,
       factory_name name,
       factory_street_address address,
       factory_zip_code zip,
       factory_city city,
       factory_state state,
       version ver,
       effective_date,
       expiry_date   
  from dw.factory_dim a;

查询结果如下图所示。

使用下面的语句修改源库中factory_master的数据。

代码语言:javascript复制
use source;
update factory_master set factory_street_address= (case when factory_code=2 then '24242 Bunty La.' else '37373 Burbank Dr.' end)
 where factory_code in (2,3);
commit;

再次执行定期装载作业。

代码语言:javascript复制
./regular_etl_daily_production.sh

再次查询factory_dim表,查询结果如下图所示。

可以看到第二和第三个工厂已经正确修改了地址。

0 人点赞