(四)定期装载 初始装载只在开始数据仓库使用前执行一次,然而,必须要按时调度定期执行装载源数据的过程。本篇说明执行定期装载的步骤,包括识别源数据与装载类型、使用SQL和Kettle两种方法开发和测试定期装载过程。 从源抽取数据导入数据仓库有两种方式,可以从源把数据抓取出来(拉),也可以请求源把数据发送(推)到数据仓库。影响选择数据抽取方式的一个重要因素是源数据的可用性和数据量,这基于是抽取整个源数据还是仅仅抽取自最后一次抽取以来的变化。考虑以下两个问题:
- 需要抽取哪部分源数据加载到数据仓库?有两种方式,完全抽取和变化数据捕获。
- 数据抽取的方向是什么?有两种方式,拉模式(用数据仓库去拉)和推模式(通过源去推)。
完全抽取和变化数据捕获(CDC) 如果你的数据量很小是并且易处理,一般来说采取完全源数据抽取(所有的文件记录或所有的数据库表)。这种方式适合引用类型的源数据,比如邮政编码。引用型源数据通常是维度表的源。如果源数据量很大,抽取全部数据是不可行的,那么只能抽取变化的源数据(自最后一次抽取以来变化的数据)。这种数据抽取模式称为变化数据捕获(CDC),通常被用于抽取操作型事务数据,比如销售订单。
CDC大体可以分为两种,一种是侵入式的,另一种是非侵入式的。所谓侵入式的是指CDC操作会给源系统带来性能的影响。只要CDC操作以任何一种方式执行了SQL语句,就可以认为是侵入式的CDC。常用的四种CDC方法中有三种是侵入性的,这四种方法是:基于时间戳的CDC、基于触发器的CDC、基于快照的CDC、基于日志的CDC。表(四)- 1总结了四种CDC方案的特点。
时间戳方式 | 快照方式 | 触发器方式 | 日志方式 | |
---|---|---|---|---|
能区分插入/更新 | 否 | 是 | 是 | 是 |
周期内,检测到多次更新 | 否 | 否 | 是 | 是 |
能检测到删除 | 否 | 是 | 是 | 是 |
不具有侵入性 | 否 | 否 | 否 | 是 |
支持实时 | 否 | 否 | 是 | 是 |
需要DBA | 否 | 否 | 是 | 是 |
不依赖数据库 | 是 | 是 | 否 | 否 |
表(四)- 1
从源拉数据或源来推数据
如果想让数据源只是简单的等待数据仓库来抽取,那么可以使用拉模式。但是必须确认,在数据仓库抽取数据时,源数据必须是可用的而且已经准备好了数据。如果抽取数据的及时性非常重要,或者希望数据源一旦准备好数据就立即发送,那么应该使用由数据源推数据的抽取模式。如果数据源是受到保护并且是禁止访问的,则只能使用数据源推数据的方式。 识别源数据与装载类型 定期装载首先要识别数据仓库的每个事实表和每个维度表用到的并且是可用的源数据。然后要决定适合装载的抽取模式和维度历史装载类型。表(四)- 2里汇总了本示例的这些信息。
源数据 | 数据仓库表 | 抽取模式 | 维度历史装载类型 |
---|---|---|---|
customer | customer_dim | 整体、拉取 | address列上SCD2 name列上SCD1 |
product | product_dim | 整体、拉取 | SCD2 |
sales_order | order_dim | CDC(每天)、拉取 | 唯一订单号 |
sales_order_fact | CDC(每天)、拉取 | 每日销售订单 | |
n/a | date_dim | n/a | 预装载 |
本示例中order_dim维度表和sales_order_fact使用基于时间戳的CDC抽取模式。为此建立一个名为cdc_time的时间戳表,这个表里有两个字段,一个是last_load,一个是current_load。之所以需要两个字段,是因为在装载过程中,可能会有新的数据被插入或更新,为了避免脏读和死锁的情况,最好给时间戳设定一个上限条件,即current_load字段。本示例的时间粒度为每天,所以时间戳只要保留日期部分即可。这两个字段的初始值是“初始加载”执行的日期,本示例中为'2015-03-01'。当开始装载时,current_load设置为当前日期。在开始定期装载实验前,先使用清单(四)- 1里的脚本建立时间戳表。
代码语言:javascript复制
USE dw;
DROP TABLE IF EXISTS cdc_time ;
CREATE TABLE cdc_time
(
last_load date,
current_load date
);
INSERT INTO cdc_time VALUES ('2015-03-01', '2015-03-01') ;
COMMIT ;
清单(四)- 1
使用清单(四)- 2里的SQL脚本用于完成定期装载过程。
代码语言:javascript复制USE dw;
-- 设置SCD的截止时间和生效时间
SET @pre_date = SUBDATE(CURRENT_DATE,1) ;
-- 设置CDC的上限时间
UPDATE cdc_time SET current_load = CURRENT_DATE ;
-- 装载客户维度
TRUNCATE TABLE customer_stg;
INSERT INTO customer_stg
SELECT
customer_number
, customer_name
, customer_street_address
, customer_zip_code
, customer_city
, customer_state
FROM source.customer ;
/* 在 customer_street_addresses 列上 SCD2 */
/* 置过期 */
UPDATE
customer_dim a
, customer_stg b
SET
expiry_date = @pre_date
WHERE
a.customer_number = b.customer_number
AND a.customer_street_address <> b.customer_street_address
AND expiry_date = '2200-01-01' ;
/* 加新行 */
INSERT INTO customer_dim
SELECT
NULL
, b.customer_number
, b.customer_name
, b.customer_street_address
, b.customer_zip_code
, b.customer_city
, b.customer_state
, a.version 1
, @pre_date
, '2200-01-01'
FROM
customer_dim a
, customer_stg b
WHERE
a.customer_number = b.customer_number
AND (a.customer_street_address <> b.customer_street_address)
AND EXISTS(
SELECT *
FROM customer_dim x
WHERE
b.customer_number=x.customer_number
AND a.expiry_date = @pre_date )
AND NOT EXISTS (
SELECT *
FROM customer_dim y
WHERE
b.customer_number = y.customer_number
AND y.expiry_date = '2200-01-01') ;
/* 在 customer_name 列上 SCD1 */
UPDATE customer_dim a, customer_stg b
SET a.customer_name = b.customer_name
WHERE a.customer_number = b.customer_number
AND a.customer_name <> b.customer_name ;
/* 新增的客户 */
INSERT INTO customer_dim
SELECT
NULL
, customer_number
, customer_name
, customer_street_address
, customer_zip_code
, customer_city
, customer_state
, 1
, @pre_date
,'2200-01-01'
FROM customer_stg
WHERE customer_number NOT IN(
SELECT y.customer_number
FROM customer_dim x, customer_stg y
WHERE x.customer_number = y.customer_number) ;
/* 装载产品维度 */
TRUNCATE TABLE product_stg ;
INSERT INTO product_stg
SELECT
product_code
, product_name
, product_category
FROM source.product ;
/* 在 product_name 和 product_category 列上 SCD2 */
/* 置过期 */
UPDATE
product_dim a
, product_stg b
SET
expiry_date = @pre_date
WHERE
a.product_code = b.product_code
AND ( a.product_name <> b.product_name
OR a.product_category <> b.product_category)
AND expiry_date = '2200-01-01';
/* 加新行 */
INSERT INTO product_dim
SELECT
NULL
, b.product_code
, b.product_name
, b.product_category
, a.version 1
, @pre_date
,'2200-01-01'
FROM
product_dim a
, product_stg b
WHERE
a.product_code = b.product_code
AND ( a.product_name <> b.product_name
OR a.product_category <> b.product_category)
AND EXISTS(
SELECT *
FROM product_dim x
WHERE b.product_code = x.product_code
AND a.expiry_date = @pre_date)
AND NOT EXISTS (
SELECT *
FROM product_dim y
WHERE b.product_code = y.product_code
AND y.expiry_date = '2200-01-01') ;
/* 新增的产品 */
INSERT INTO product_dim
SELECT
NULL
, product_code
, product_name
, product_category
, 1
, @pre_date
, '2200-01-01'
FROM product_stg
WHERE product_code NOT IN(
SELECT y.product_code
FROM product_dim x, product_stg y
WHERE x.product_code = y.product_code) ;
-- 装载订单维度,新增前一天的订单号
INSERT INTO order_dim (
order_number
, effective_date
, expiry_date)
SELECT
order_number
, order_date
, '2200-01-01'
FROM source.sales_order, cdc_time
WHERE entry_date >= last_load AND entry_date < current_load ;
-- 装载事实表,新增前一天的订单
INSERT INTO sales_order_fact
SELECT
order_sk
, customer_sk
, product_sk
, date_sk
, order_amount
FROM
source.sales_order a
, order_dim b
, customer_dim c
, product_dim d
, date_dim e
, cdc_time f
WHERE
a.order_number = b.order_number
AND a.customer_number = c.customer_number
AND a.order_date >= c.effective_date
AND a.order_date < c.expiry_date
AND a.product_code = d.product_code
AND a.order_date >= d.effective_date
AND a.order_date < d.expiry_date
AND a.order_date = e.date
AND a.entry_date >= f.last_load AND a.entry_date < f.current_load ;
-- 更新时间戳表的last_load字段
UPDATE cdc_time SET last_load = current_load ;
COMMIT ;
- customer和product表分别通过customer_stg和product_stg表导入customer_dim和product_dim表。
- 客户地址、产品名称和产品分类使用SCD2,客户姓名使用SCD1。
- 只有前一天的销售订单被装载到order_dim和sales_order_fact表。
测试步骤: 1. 执行清单(四)- 3里的SQL脚本准备准备客户、产品和销售订单测试数据。 2. 设置系统日期为2015年3月2日。 3. 执行清单(四)- 2里的SQL脚本或图(四)- 1到图(四)- 34的Kettle步骤进行定期装载。
代码语言:javascript复制USE source;
/***
客户数据的改变如下:
客户6的街道号改为7777 Ritter Rd。(原来是7070 Ritter Rd)
客户7的姓名改为Distinguished Agencies。(原来是Distinguished Partners)
新增第八个客户。
***/
UPDATE customer SET customer_street_address = '7777 Ritter Rd.' WHERE customer_number = 6 ;
UPDATE customer SET customer_name = 'Distinguished Agencies' WHERE customer_number = 7 ;
INSERT INTO customer
(customer_name, customer_street_address, customer_zip_code, customer_city, customer_state)
VALUES
('Subsidiaries', '10000 Wetline Blvd.', 17055, 'Pittsburgh', 'PA') ;
/***
产品数据的改变如下:
产品3的名称改为Flat Panel。(原来是LCD Panel)
新增第四个产品。
***/
UPDATE product SET product_name = 'Flat Panel' WHERE product_code = 3 ;
INSERT INTO product
(product_name, product_category)
VALUES
('Keyboard', 'Peripheral') ;
/***
销售订单事务 假设你的数据仓库从2015年3月2日开始使用(首次运行定期导入的日期)。新增订单日期为2015年3月1日的16条订单。
***/
INSERT INTO sales_order VALUES
(22, 1, 1, '2015-03-01', '2015-03-01', 1000)
, (23, 2, 2, '2015-03-01', '2015-03-01', 2000)
, (24, 3, 3, '2015-03-01', '2015-03-01', 3000)
, (25, 4, 4, '2015-03-01', '2015-03-01', 4000)
, (26, 5, 2, '2015-03-01', '2015-03-01', 1000)
, (27, 6, 2, '2015-03-01', '2015-03-01', 3000)
, (28, 7, 3, '2015-03-01', '2015-03-01', 5000)
, (29, 8, 4, '2015-03-01', '2015-03-01', 7000)
, (30, 1, 1, '2015-03-01', '2015-03-01', 1000)
, (31, 2, 2, '2015-03-01', '2015-03-01', 2000)
, (32, 3, 3, '2015-03-01', '2015-03-01', 4000)
, (33, 4, 4, '2015-03-01', '2015-03-01', 6000)
, (34, 5, 1, '2015-03-01', '2015-03-01', 2500)
, (35, 6, 2, '2015-03-01', '2015-03-01', 5000)
, (36, 7, 3, '2015-03-01', '2015-03-01', 7500)
, (37, 8, 4, '2015-03-01', '2015-03-01', 1000) ;
COMMIT ;
清单(四)- 3
图(四)- 1
图(四)- 2
图(四)- 3
图(四)- 4
图(四)- 5
图(四)- 6
图(四)- 7
图(四)- 8
图(四)- 9
图(四)- 10
图(四)- 11
图(四)- 12
图(四)- 13
图(四)- 14
图(四)- 15
图(四)- 16
图(四)- 17
图(四)- 18
图(四)- 19
图(四)- 20
图(四)- 21
图(四)- 22
图(四)- 23
图(四)- 24
图(四)- 25
图(四)- 26
图(四)- 27
图(四)- 28
图(四)- 29
图(四)- 30
图(四)- 31
图(四)- 32
图(四)- 33
图(四)- 34 验证结果应该如下所示: mysql> select * from customer_dim G *************************** 1. row *************************** customer_sk: 1 customer_number: 1 customer_name: Really Large Customers customer_street_address: 7500 Louise Dr. customer_zip_code: 17050 customer_city: Mechanicsburg customer_state: PA version: 1 effective_date: 2013-03-01 expiry_date: 2200-01-01 *************************** 2. row *************************** customer_sk: 2 customer_number: 2 customer_name: Small Stores customer_street_address: 2500 Woodland St. customer_zip_code: 17055 customer_city: Pittsburgh customer_state: PA version: 1 effective_date: 2013-03-01 expiry_date: 2200-01-01 *************************** 3. row *************************** customer_sk: 3 customer_number: 3 customer_name: Medium Retailers customer_street_address: 1111 Ritter Rd. customer_zip_code: 17055 customer_city: Pittsburgh customer_state: PA version: 1 effective_date: 2013-03-01 expiry_date: 2200-01-01 *************************** 4. row *************************** customer_sk: 4 customer_number: 4 customer_name: Good Companies customer_street_address: 9500 Scott St. customer_zip_code: 17050 customer_city: Mechanicsburg customer_state: PA version: 1 effective_date: 2013-03-01 expiry_date: 2200-01-01 *************************** 5. row *************************** customer_sk: 5 customer_number: 5 customer_name: Wonderful Shops customer_street_address: 3333 Rossmoyne Rd. customer_zip_code: 17050 customer_city: Mechanicsburg customer_state: PA version: 1 effective_date: 2013-03-01 expiry_date: 2200-01-01 *************************** 6. row *************************** customer_sk: 6 customer_number: 6 customer_name: Loyal Clients customer_street_address: 7070 Ritter Rd. customer_zip_code: 17055 customer_city: Pittsburgh customer_state: PA version: 1 effective_date: 2013-03-01 expiry_date: 2015-03-01 *************************** 7. row *************************** customer_sk: 7 customer_number: 7 customer_name: Distinguished Agencies customer_street_address: 9999 Scott St. customer_zip_code: 17050 customer_city: Mechanicsburg customer_state: PA version: 1 effective_date: 2013-03-01 expiry_date: 2200-01-01 *************************** 8. row *************************** customer_sk: 8 customer_number: 6 customer_name: Loyal Clients customer_street_address: 7777 Ritter Rd. customer_zip_code: 17055 customer_city: Pittsburgh customer_state: PA version: 2 effective_date: 2015-03-01 expiry_date: 2200-01-01 *************************** 9. row *************************** customer_sk: 9 customer_number: 8 customer_name: Subsidiaries customer_street_address: 10000 Wetline Blvd. customer_zip_code: 17055 customer_city: Pittsburgh customer_state: PA version: 1 effective_date: 2015-03-01 expiry_date: 2200-01-01 9 rows in set (0.00 sec) 客户6的街道地址变更使用了SCD2,客户7的姓名变更使用了SCD1,新增了客户8。注意客户6第一个版本的到期日期和第二个版本的生效日期同为'2015-03-01',这是因为任何一个SCD的有效期是一个“左闭右开”的区间,以客户6为例,其第一个版本的有效期大于等于'2013-03-01',小于'2015-03-01',即为'2013-03-01'到'2013-02-28'。 mysql> select * from product_dim G *************************** 1. row *************************** product_sk: 1 product_code: 1 product_name: Hard Disk Drive product_category: Storage version: 1 effective_date: 2013-03-01 expiry_date: 2200-01-01 *************************** 2. row *************************** product_sk: 2 product_code: 2 product_name: Floppy Drive product_category: Storage version: 1 effective_date: 2013-03-01 expiry_date: 2200-01-01 *************************** 3. row *************************** product_sk: 3 product_code: 3 product_name: LCD Panel product_category: Monitor version: 1 effective_date: 2013-03-01 expiry_date: 2015-03-01 *************************** 4. row *************************** product_sk: 4 product_code: 3 product_name: Flat Panel product_category: Monitor version: 2 effective_date: 2015-03-01 expiry_date: 2200-01-01 *************************** 5. row *************************** product_sk: 5 product_code: 4 product_name: Keyboard product_category: Peripheral version: 1 effective_date: 2015-03-01 expiry_date: 2200-01-01 5 rows in set (0.00 sec) 产品3的名称变更使用了SCD2,新增了产品4。 mysql> select * from order_dim ; ---------- -------------- --------- ---------------- ------------- | order_sk | order_number | version | effective_date | expiry_date | ---------- -------------- --------- ---------------- ------------- | 1 | 3 | 1 | 2013-03-01 | 2200-01-01 | | 2 | 4 | 1 | 2013-04-15 | 2200-01-01 | | 3 | 5 | 1 | 2013-05-20 | 2200-01-01 | | 4 | 6 | 1 | 2013-07-30 | 2200-01-01 | | 5 | 7 | 1 | 2013-09-01 | 2200-01-01 | | 6 | 8 | 1 | 2013-11-10 | 2200-01-01 | | 7 | 9 | 1 | 2014-01-05 | 2200-01-01 | | 8 | 10 | 1 | 2014-02-10 | 2200-01-01 | | 9 | 11 | 1 | 2014-03-15 | 2200-01-01 | | 10 | 12 | 1 | 2014-04-20 | 2200-01-01 | | 11 | 13 | 1 | 2014-05-30 | 2200-01-01 | | 12 | 14 | 1 | 2014-06-01 | 2200-01-01 | | 13 | 15 | 1 | 2014-07-15 | 2200-01-01 | | 14 | 16 | 1 | 2014-08-30 | 2200-01-01 | | 15 | 17 | 1 | 2014-09-05 | 2200-01-01 | | 16 | 18 | 1 | 2014-10-05 | 2200-01-01 | | 17 | 19 | 1 | 2015-01-10 | 2200-01-01 | | 18 | 20 | 1 | 2015-02-20 | 2200-01-01 | | 19 | 21 | 1 | 2015-02-28 | 2200-01-01 | | 20 | 22 | 1 | 2015-03-01 | 2200-01-01 | | 21 | 23 | 1 | 2015-03-01 | 2200-01-01 | | 22 | 24 | 1 | 2015-03-01 | 2200-01-01 | | 23 | 25 | 1 | 2015-03-01 | 2200-01-01 | | 24 | 26 | 1 | 2015-03-01 | 2200-01-01 | | 25 | 27 | 1 | 2015-03-01 | 2200-01-01 | | 26 | 28 | 1 | 2015-03-01 | 2200-01-01 | | 27 | 29 | 1 | 2015-03-01 | 2200-01-01 | | 28 | 30 | 1 | 2015-03-01 | 2200-01-01 | | 29 | 31 | 1 | 2015-03-01 | 2200-01-01 | | 30 | 32 | 1 | 2015-03-01 | 2200-01-01 | | 31 | 33 | 1 | 2015-03-01 | 2200-01-01 | | 32 | 34 | 1 | 2015-03-01 | 2200-01-01 | | 33 | 35 | 1 | 2015-03-01 | 2200-01-01 | | 34 | 36 | 1 | 2015-03-01 | 2200-01-01 | | 35 | 37 | 1 | 2015-03-01 | 2200-01-01 | ---------- -------------- --------- ---------------- ------------- 35 rows in set (0.00 sec) 有35个订单,19个是“初始导入”装载的,16个是本次定期装载的。 mysql> select * from sales_order_fact; ---------- ------------- ------------ --------------- -------------- | order_sk | customer_sk | product_sk | order_date_sk | order_amount | ---------- ------------- ------------ --------------- -------------- | 1 | 3 | 3 | 4809 | 4000.00 | | 2 | 4 | 1 | 4854 | 4000.00 | | 3 | 5 | 2 | 4889 | 6000.00 | | 4 | 6 | 3 | 4960 | 6000.00 | | 5 | 7 | 1 | 4993 | 8000.00 | | 6 | 1 | 2 | 5063 | 8000.00 | | 7 | 2 | 3 | 5119 | 1000.00 | | 8 | 3 | 1 | 5155 | 1000.00 | | 9 | 4 | 2 | 5188 | 2000.00 | | 10 | 5 | 3 | 5224 | 2500.00 | | 11 | 6 | 1 | 5264 | 3000.00 | | 12 | 7 | 2 | 5266 | 3500.00 | | 13 | 1 | 3 | 5310 | 4000.00 | | 14 | 2 | 1 | 5356 | 4500.00 | | 15 | 3 | 2 | 5362 | 1000.00 | | 16 | 4 | 3 | 5392 | 1000.00 | | 17 | 5 | 1 | 5489 | 4000.00 | | 18 | 6 | 2 | 5530 | 4000.00 | | 19 | 7 | 3 | 5538 | 4000.00 | | 20 | 1 | 1 | 5539 | 1000.00 | | 21 | 2 | 2 | 5539 | 2000.00 | | 22 | 3 | 4 | 5539 | 3000.00 | | 23 | 4 | 5 | 5539 | 4000.00 | | 24 | 5 | 2 | 5539 | 1000.00 | | 25 | 8 | 2 | 5539 | 3000.00 | | 26 | 7 | 4 | 5539 | 5000.00 | | 27 | 9 | 5 | 5539 | 7000.00 | | 28 | 1 | 1 | 5539 | 1000.00 | | 29 | 2 | 2 | 5539 | 2000.00 | | 30 | 3 | 4 | 5539 | 4000.00 | | 31 | 4 | 5 | 5539 | 6000.00 | | 32 | 5 | 1 | 5539 | 2500.00 | | 33 | 8 | 2 | 5539 | 5000.00 | | 34 | 7 | 4 | 5539 | 7500.00 | | 35 | 9 | 5 | 5539 | 1000.00 | ---------- ------------- ------------ --------------- -------------- 35 rows in set (0.00 sec) 2015年3月1日的16个销售订单被添加,产品3的代理键是4而不是3,客户6的代理键是8而不是6 mysql> select * from cdc_time; ------------ -------------- | last_load | current_load | ------------ -------------- | 2015-03-02 | 2015-03-02 | ------------ -------------- 1 row in set (0.00 sec) 时间戳表的最后装载日期已经更新。 定期装载调度 一旦数据仓库开始使用,你就需要定期从源给数据仓库提供新数据。为了确保数据流的稳定,需要使用所在平台上可用的任务调度器来调度定期装载。本篇最后说明如何在Linux下调度每天执行定期装载任务。假定/root/data-integration为SQL和Kettle的运行目录。 1. 创建定期装载SQL脚本文件,将清单(四)- 2里的SQL脚本保存为文件 /root/data-integration/dw_regular.sql 2. 创建可执行shell文件 使用SQL方式,创建执行SQL脚本的shell文件 /root/data-integration/dw_regular_load_sql.sh,内容如清单(四)- 4所示
代码语言:javascript复制#!/bin/bash
cd /root/data-integration
mysql -uroot -p123456 < dw_regular.sql
清单(四)- 4 使用Kettle方式,创建执行Kettle JOB的shell文件 /root/data-integration/dw_regular_load_kettle.sh,内容如清单(四)- 5所示
代码语言:javascript复制#!/bin/bash
cd /root/data-integration
./kitchen.sh -rep kettle -user admin -pass admin -dir / -job regular_load -level=basic > regular_load.log
清单(四)- 5 使用下面的命令修改shell文件为可执行模式: chmod 755 /root/data-integration/dw_regular_load_sql.sh chmod 755 /root/data-integration/dw_regular_load_kettle.sh 3. 创建crontab定时执行任务,例如每天2点执行定期装载任务 crontab -e 使用SQL方式,内容如下: 0 2 * * * /root/data-integration/dw_regular_load_sql.sh 使用Kettle方式,内容如下: 0 2 * * * /root/data-integration/dw_regular_load_kettle.sh > /dev/null 2>&1