大数据实战【千亿级数仓】阶段三

2021-01-27 16:12:31 浏览数 (1)

写在前面: 博主是一名大数据的初学者,昵称来源于《爱丽丝梦游仙境》中的Alice和自己的昵称。作为一名互联网小白,写博客一方面是为了记录自己的学习历程,一方面是希望能够帮助到很多和自己一样处于起步阶段的萌新。由于水平有限,博客中难免会有一些错误,有纰漏之处恳请各位大佬不吝赐教!个人小站:http://alices.ibilibili.xyz/ , 博客主页:https://alice.blog.csdn.net/ 尽管当前水平可能不及各位大佬,但我还是希望自己能够做得更好,因为一天的生活就是一生的缩影。我希望在最美的年华,做最好的自己

本篇博客,菌哥为大家带来的是大数据实战【千亿级数仓】阶段三的内容。

先让我们来看看阶段三具体需要掌握哪些内容:

  • 学习数据仓库理论知识、创建数据仓库,并导入数据
  • 解决数据缓慢变化维问题

其中关于什么是数据缓慢变化维(SCD),以及SCD问题的解决方案,拉链表的简单使用,可以?《通俗易懂讲数据仓库之【缓慢变化维】》。

关于数据仓库理论知识可以?《一文带你认清数据仓库【维度模型设计】与【分层架构】》。

本篇博客,就是做该阶段的收尾工作,将拉链表真正用在咋们的【千亿级数仓】项目上。


使用拉链表解决商品SCD问题

1.dw层建表

代码语言:javascript复制
-- dw层建表
DROP TABLE IF EXISTS `itcast_dw`.`dim_goods`;
CREATE TABLE `itcast_dw`.`dim_goods`(
  goodsId bigint,
  goodsSn string,
  productNo string,
  goodsName string,
  goodsImg string,
  shopId bigint,
  goodsType bigint,
  marketPrice double,
  shopPrice double,
  warnStock bigint,
  goodsStock bigint,
  goodsUnit string,
  goodsTips string,
  isSale bigint,
  isBest bigint,
  isHot bigint,
  isNew bigint,
  isRecom bigint,
  goodsCatIdPath string,
  goodsCatId bigint,
  shopCatId1 bigint,
  shopCatId2 bigint,
  brandId bigint,
  goodsDesc string,
  goodsStatus bigint,
  saleNum bigint,
  saleTime string,
  visitNum bigint,
  appraiseNum bigint,
  isSpec bigint,
  gallery string,
  goodsSeoKeywords string,
  illegalRemarks string,
  dataFlag bigint,
  createTime string,
  isFreeShipping bigint,
  goodsSerachKeywords string,
  modifyTime string,
  dw_start_date string,
  dw_end_date string
)
STORED AS PARQUET;

2.具体步骤

让我们来回顾一下 拉链表设计一共分为以下几个步骤:

1 . 第一次全量导入

所有的ODS数据全部导入到拉链历史记录表中

2 .增量导入(某天,举例:2018-09-09)

  • 增量导入某天的数据到ODS分区
  • 合并历史数据

通过连接查询方式更新

2.1 全量导入
  • 将所有 2019年09月08日以前创建的商品以及修改的数据全部导入到拉链历史记录表中

操作步骤:

1、使用Kettle将20190908以前的数据抽取到ods

代码语言:javascript复制
SELECT *
FROM itcast_ods.itcast_goods
WHERE DATE_FORMAT(createtime, '%Y%m%d') <= '20190908' OR DATE_FORMAT(modifyTime, '%Y%m%d') <= '20190908';

2、使用spark sql将全量数据导入到dw层维度表

代码语言:javascript复制
set spark.sql.shuffle.partitions=1; --shuffle时的分区数,默认是200个
-- 使用spark sql将全量数据导入到dw层维度表
insert overwrite table `itcast_dw`.`dim_goods`
select
  goodsId,
  goodsSn,
  productNo,
  goodsName,
  goodsImg,
  shopId,
  goodsType,
  marketPrice,
  shopPrice,
  warnStock,
  goodsStock,
  goodsUnit,
  goodsTips,
  isSale,
  isBest,
  isHot,
  isNew,
  isRecom,
  goodsCatIdPath,
  goodsCatId,
  shopCatId1,
  shopCatId2,
  brandId,
  goodsDesc,
  goodsStatus,
  saleNum,
  saleTime,
  visitNum,
  appraiseNum,
  isSpec,
  gallery,
  goodsSeoKeywords,
  illegalRemarks,
  dataFlag,
  createTime,
  isFreeShipping,
  goodsSerachKeywords,
  modifyTime,
    case when modifyTime is not null
      then from_unixtime(unix_timestamp(modifyTime, 'yyyy-MM-dd HH:mm:ss'),'yyyy-MM-dd')
      else from_unixtime(unix_timestamp(createTime, 'yyyy-MM-dd HH:mm:ss'), 'yyyy-MM-dd') 
      end as dw_start_date,
   '9999-12-31' as dw_end_date
from
  `itcast_ods`.`itcast_goods` t
where dt='20190908';
2.2 增量导入
  • 将2019年09月09日创建的、修改的数据全部导入到历史拉链表中

操作步骤:

1、使用Kettle将20190909创建的、或者修改的数据抽取到ods

代码语言:javascript复制
SELECT *
FROM itcast_goods
WHERE DATE_FORMAT(createtime, '%Y%m%d') = '${dt}' OR DATE_FORMAT(modifyTime, '%Y%m%d') = '${dt}';

2、编写spark-sql更新历史数据

代码语言:javascript复制
-- 更新历史数据
select
  dw.goodsId,
  dw.goodsSn,
  dw.productNo,
  dw.goodsName,
  dw.goodsImg,
  dw.shopId,
  dw.goodsType,
  dw.marketPrice,
  dw.shopPrice,
  dw.warnStock,
  dw.goodsStock,
  dw.goodsUnit,
  dw.goodsTips,
  dw.isSale,
  dw.isBest,
  dw.isHot,
  dw.isNew,
  dw.isRecom,
  dw.goodsCatIdPath,
  dw.goodsCatId,
  dw.shopCatId1,
  dw.shopCatId2,
  dw.brandId,
  dw.goodsDesc,
  dw.goodsStatus,
  dw.saleNum,
  dw.saleTime,
  dw.visitNum,
  dw.appraiseNum,
  dw.isSpec,
  dw.gallery,
  dw.goodsSeoKeywords,
  dw.illegalRemarks,
  dw.dataFlag,
  dw.createTime,
  dw.isFreeShipping,
  dw.goodsSerachKeywords,
  dw.modifyTime,
  dw.dw_start_date,
  case when dw.dw_end_date = '9999-12-31' and ods.goodsId is not null
      then '2019-09-08'
      else dw.dw_end_date
      end as dw_end_date
from
  `itcast_dw`.`dim_goods` dw
  left join 
  (select * from `itcast_ods`.`itcast_goods` where dt='20190909') ods
   on dw.goodsId = ods.goodsId;

3、编写spark-sql获取当日数据

代码语言:javascript复制
-- 今日数据
select
  goodsId,
  goodsSn,
  productNo,
  goodsName,
  goodsImg,
  shopId,
  goodsType,
  marketPrice,
  shopPrice,
  warnStock,
  goodsStock,
  goodsUnit,
  goodsTips,
  isSale,
  isBest,
  isHot,
  isNew,
  isRecom,
  goodsCatIdPath,
  goodsCatId,
  shopCatId1,
  shopCatId2,
  brandId,
  goodsDesc,
  goodsStatus,
  saleNum,
  saleTime,
  visitNum,
  appraiseNum,
  isSpec,
  gallery,
  goodsSeoKeywords,
  illegalRemarks,
  dataFlag,
  createTime,
  isFreeShipping,
  goodsSerachKeywords,
  modifyTime,
  case when modifyTime is not null
      then from_unixtime(unix_timestamp(modifyTime, 'yyyy-MM-dd HH:mm:ss'),'yyyy-MM-dd')
      else from_unixtime(unix_timestamp(createTime, 'yyyy-MM-dd HH:mm:ss'), 'yyyy-MM-dd') 
      end as dw_start_date,
   '9999-12-31' as dw_end_date
from
  `itcast_ods`.`itcast_goods`
where dt = '20190909';

4、将历史数据、当日数据合并加载到临时表

代码语言:javascript复制
-- 将历史数据、当日数据合并加载到临时表
drop table if exists `itcast_dw`.`tmp_dim_goods_history`;
create table `itcast_dw`.`tmp_dim_goods_history`
as
select
  dw.goodsId,
  dw.goodsSn,
  dw.productNo,
  dw.goodsName,
  dw.goodsImg,
  dw.shopId,
  dw.goodsType,
  dw.marketPrice,
  dw.shopPrice,
  dw.warnStock,
  dw.goodsStock,
  dw.goodsUnit,
  dw.goodsTips,
  dw.isSale,
  dw.isBest,
  dw.isHot,
  dw.isNew,
  dw.isRecom,
  dw.goodsCatIdPath,
  dw.goodsCatId,
  dw.shopCatId1,
  dw.shopCatId2,
  dw.brandId,
  dw.goodsDesc,
  dw.goodsStatus,
  dw.saleNum,
  dw.saleTime,
  dw.visitNum,
  dw.appraiseNum,
  dw.isSpec,
  dw.gallery,
  dw.goodsSeoKeywords,
  dw.illegalRemarks,
  dw.dataFlag,
  dw.createTime,
  dw.isFreeShipping,
  dw.goodsSerachKeywords,
  dw.modifyTime,
  dw.dw_start_date,
  case when dw.dw_end_date >= '2019-09-09' and ods.goodsId is not null
      then '2019-09-08'
      else dw.dw_end_date
      end as dw_end_date
from
  `itcast_dw`.`dim_goods` dw
  left join 
  (select * from `itcast_ods`.`itcast_goods` where dt='20190909') ods
   on dw.goodsId = ods.goodsId
union
select
  goodsId,
  goodsSn,
  productNo,
  goodsName,
  goodsImg,
  shopId,
  goodsType,
  marketPrice,
  shopPrice,
  warnStock,
  goodsStock,
  goodsUnit,
  goodsTips,
  isSale,
  isBest,
  isHot,
  isNew,
  isRecom,
  goodsCatIdPath,
  goodsCatId,
  shopCatId1,
  shopCatId2,
  brandId,
  goodsDesc,
  goodsStatus,
  saleNum,
  saleTime,
  visitNum,
  appraiseNum,
  isSpec,
  gallery,
  goodsSeoKeywords,
  illegalRemarks,
  dataFlag,
  createTime,
  isFreeShipping,
  goodsSerachKeywords,
  modifyTime,
  case when modifyTime is not null
      then from_unixtime(unix_timestamp(modifyTime, 'yyyy-MM-dd HH:mm:ss'),'yyyy-MM-dd')
      else from_unixtime(unix_timestamp(createTime, 'yyyy-MM-dd HH:mm:ss'), 'yyyy-MM-dd') 
      end as dw_start_date,
   '9999-12-31' as dw_end_date
from
  `itcast_ods`.`itcast_goods`
where dt = '20190909';

5、将历史数据、当日数据导入到历史拉链表

代码语言:javascript复制
-- 将历史数据、当日数据导入到历史拉链表
insert overwrite table `itcast_dw`.`dim_goods`
select * from `itcast_dw`.`tmp_dim_goods_history`;
​
-- 获取2019-09-09日的商品数据
select * from `itcast_dw`.`dim_goods` where dw_start_date <= '2019-09-09' and dw_end_date >= '2019-09-09' limit 10;
2.3 测试

操作步骤:

1、将mysql中的一条数据的修改日期 改为 2019-09-10 ,这里我们举例修改的是id为100134的一条数据

2、设置kettle命名参数,重新抽取数据这一条数据到 20190910 分区

3、重新执行 spark-sql脚本加载数据到临时表

代码语言:javascript复制
-- 导入2019-09-10的历史拉链数据
-- 将历史数据、当日数据合并加载到临时表
drop table if exists `itcast_dw`.`tmp_dim_goods_history`;
create table `itcast_dw`.`tmp_dim_goods_history`
as
select
  dw.goodsId,
  dw.goodsSn,
  dw.productNo,
  dw.goodsName,
  dw.goodsImg,
  dw.shopId,
  dw.goodsType,
  dw.marketPrice,
  dw.shopPrice,
  dw.warnStock,
  dw.goodsStock,
  dw.goodsUnit,
  dw.goodsTips,
  dw.isSale,
  dw.isBest,
  dw.isHot,
  dw.isNew,
  dw.isRecom,
  dw.goodsCatIdPath,
  dw.goodsCatId,
  dw.shopCatId1,
  dw.shopCatId2,
  dw.brandId,
  dw.goodsDesc,
  dw.goodsStatus,
  dw.saleNum,
  dw.saleTime,
  dw.visitNum,
  dw.appraiseNum,
  dw.isSpec,
  dw.gallery,
  dw.goodsSeoKeywords,
  dw.illegalRemarks,
  dw.dataFlag,
  dw.createTime,
  dw.isFreeShipping,
  dw.goodsSerachKeywords,
  dw.modifyTime,
  dw.dw_start_date,
  case when dw.dw_end_date >= '2019-09-10' and ods.goodsId is not null
      then '2019-09-09'
      else dw.dw_end_date
      end as dw_end_date
from
  `itcast_dw`.`dim_goods` dw
  left join 
  (select * from `itcast_ods`.`itcast_goods` where dt='20190910') ods
   on dw.goodsId = ods.goodsId
union
select
  goodsId,
  goodsSn,
  productNo,
  goodsName,
  goodsImg,
  shopId,
  goodsType,
  marketPrice,
  shopPrice,
  warnStock,
  goodsStock,
  goodsUnit,
  goodsTips,
  isSale,
  isBest,
  isHot,
  isNew,
  isRecom,
  goodsCatIdPath,
  goodsCatId,
  shopCatId1,
  shopCatId2,
  brandId,
  goodsDesc,
  goodsStatus,
  saleNum,
  saleTime,
  visitNum,
  appraiseNum,
  isSpec,
  gallery,
  goodsSeoKeywords,
  illegalRemarks,
  dataFlag,
  createTime,
  isFreeShipping,
  goodsSerachKeywords,
  modifyTime,
  case when modifyTime is not null
      then from_unixtime(unix_timestamp(modifyTime, 'yyyy-MM-dd HH:mm:ss'),'yyyy-MM-dd')
      else from_unixtime(unix_timestamp(createTime, 'yyyy-MM-dd HH:mm:ss'), 'yyyy-MM-dd') 
      end as dw_start_date,
   '9999-12-31' as dw_end_date
from
  `itcast_ods`.`itcast_goods`
where dt = '20190910';

4、重新导入数据到历史拉链表

代码语言:javascript复制
-- 将历史数据、当日数据导入到历史拉链表
insert overwrite table `itcast_dw`.`dim_goods`
select * from `itcast_dw`.`tmp_dim_goods_history`;

5、查看对应商品id的历史拉链数据

代码语言:javascript复制
select * from `itcast_dw`.`dim_goods` where goodsId = 100134;

我们最后可以查询到,id为100134的数据有两条,一条数据是之前的历史数据,一条数据是被我们从MySQL修改之后同步到ODS层作为新增数据而出现。

相信看了博主上一篇介绍缓慢变化维博客的朋友肯定清楚,我们也可以从拉链表两条数据的dw_end_date字段来分辨出数据是否有效。如果还不清楚什么是缓慢变化维,墙裂建议大家去看看《通俗易懂讲数据仓库之【缓慢变化维】》,希望对大家的理解能有所帮助!!!

到这里本阶段三的内容也就介绍了,或许看到这里的读者朋友会有些好奇,为什么看了本菌的博客,感觉一个实战项目的每个阶段感觉内容就那么一些。

我这里必须要解释一下,并不是阶段的内容不多,而是作为一个亲自做完了项目的"初级阶段选手",我也没法更一个阶段,就把所有的内容都整合进来。我能做到的,就是像这篇博客一样所介绍的阶段三一样,把一部分的内容单独整理成博客,而不是选择把所有的内容,像缓慢变化维,数仓理论,分层架构…等等全部放在一篇文章里去解释清楚。而且我这么做,项目所涉及到的技术,知识点也没给大家落下,大家也可以从其他整理好发出的博客中汲取营养

小结

大数据实战【千亿级数仓】阶段三的内容到这里就结束了。大家需要在了解数仓理论,分层架构的基础上,熟练掌握拉链表技术!!!

如果以上过程中出现了任何的纰漏错误,烦请大佬们指正?

受益的朋友或对大数据技术感兴趣的伙伴记得点赞关注支持一波?

0 人点赞