一、拉連結清單
資料倉庫的資料模型設計過程中,經常會遇到這樣的需求:
- 表中的部分字段會被update,例如:
- 使用者的位址,産品的描述資訊,品牌資訊等等;
- 需要檢視某一個時間點或者時間段的曆史快照資訊,例如:
- 檢視某一個産品在曆史某一時間點的狀态
- 檢視某一個使用者在過去某一段時間内,更新過幾次等等
- 變化的比例和頻率不是很大,例如:
- 總共有1000萬的會員,每天新增和發生變化的有10萬左右
商品曆史快照案例
需求:
有一個商品表:
列名 | 類型 | 說明 |
---|---|---|
goods_id | varchar(50) | 商品編号 |
goods_status | varchar(50) | 商品狀态(待稽核、待售、在售、已删除) |
createtime | varchar(50) | 商品建立日期 |
modifytime | varchar(50) | 商品修改日期 |
2019年12月20日的資料如下所示:
goods_id | goods_status | createtime | modifytime |
---|---|---|---|
001 | 待稽核 | 2019-12-20 | 2019-12-20 |
002 | 待售 | 2019-12-20 | 2019-12-20 |
003 | 在售 | 2019-12-20 | 2019-12-20 |
004 | 已删除 | 2019-12-20 | 2019-12-20 |
商品的狀态,會随着時間推移而變化,我們需要将商品的所有變化的曆史資訊都儲存下來。如何實作呢?
方案一:快照每一天的資料到數倉(備援資料)
1、方案介紹
該方案為:
- 每一天都儲存一份全量,将所有資料同步到數倉中
- 很多記錄都是重複儲存,沒有任何變化
12月20日(4條資料)
goods_id | goods_status | createtime | modifytime |
---|---|---|---|
001 | 待稽核 | 2019-12-18 | 2019-12-20 |
002 | 待售 | 2019-12-19 | 2019-12-20 |
003 | 在售 | 2019-12-20 | 2019-12-20 |
004 | 已删除 | 2019-12-15 | 2019-12-20 |
12月21日(10條資料)
goods_id | goods_status | createtime | modifytime |
---|---|---|---|
以下為12月20日快照資料 | |||
001 | 待稽核 | 2019-12-18 | 2019-12-20 |
002 | 待售 | 2019-12-19 | 2019-12-20 |
003 | 在售 | 2019-12-20 | 2019-12-20 |
004 | 已删除 | 2019-12-15 | 2019-12-20 |
以下為12月21日快照資料 | |||
001 | 待售(從待稽核到待售) | 2019-12-18 | 2019-12-21 |
002 | 待售 | 2019-12-19 | 2019-12-20 |
003 | 在售 | 2019-12-20 | 2019-12-20 |
004 | 已删除 | 2019-12-15 | 2019-12-20 |
005(新商品) | 待稽核 | 2019-12-21 | 2019-12-21 |
006(新商品) | 待稽核 | 2019-12-21 | 2019-12-21 |
12月22日(18條資料)
goods_id | goods_status | createtime | modifytime |
---|---|---|---|
以下為12月20日快照資料 | |||
001 | 待稽核 | 2019-12-18 | 2019-12-20 |
002 | 待售 | 2019-12-19 | 2019-12-20 |
003 | 在售 | 2019-12-20 | 2019-12-20 |
004 | 已删除 | 2019-12-15 | 2019-12-20 |
以下為12月21日快照資料 | |||
001 | 待售(從待稽核到待售) | 2019-12-18 | 2019-12-21 |
002 | 待售 | 2019-12-19 | 2019-12-20 |
003 | 在售 | 2019-12-20 | 2019-12-20 |
004 | 已删除 | 2019-12-15 | 2019-12-20 |
005 | 待稽核 | 2019-12-21 | 2019-12-21 |
006 | 待稽核 | 2019-12-21 | 2019-12-21 |
以下為12月22日快照資料 | |||
001 | 待售 | 2019-12-18 | 2019-12-21 |
002 | 待售 | 2019-12-19 | 2019-12-20 |
003 | 已删除(從在售到已删除) | 2019-12-20 | 2019-12-22 |
004 | 待稽核 | 2019-12-21 | 2019-12-21 |
005 | 待稽核 | 2019-12-21 | 2019-12-21 |
006 | 已删除(從待稽核到已删除) | 2019-12-21 | 2019-12-22 |
007 | 待稽核 | 2019-12-22 | 2019-12-22 |
008 | 待稽核 | 2019-12-22 | 2019-12-22 |
2、MySQL到Hive數倉代碼實作
MySQL&Hive初始化
1 在MySQL demo庫中 建立表
-- 建立資料庫
CREATE DATABASE demo DEFAULT CHARACTER SET utf8 COLLATE utf8_general_ci;
-- 建立商品表
create table if not exists `demo`.`t_product`(
goods_id varchar(50), -- 商品編号
goods_status varchar(50), -- 商品狀态
createtime varchar(50), -- 商品建立時間
modifytime varchar(50) -- 商品修改時間
);
2 在Hive中 demo庫建立表
-- 建立表
create database if not exists `demo`;
-- 建立ods層表
create table if not exists `demo`.`ods_product`(
goods_id string, -- 商品編号
goods_status string, -- 商品狀态
createtime string, -- 商品建立時間
modifytime string -- 商品修改時間
)
partitioned by (dt string)
row format delimited fields terminated by ',' stored as TEXTFILE;
-- 建立dw層表
create table if not exists `demo`.`dw_product`(
goods_id string, -- 商品編号
goods_status string, -- 商品狀态
createtime string, -- 商品建立時間
modifytime string -- 商品修改時間
)
partitioned by (dt string)
row format delimited fields terminated by ',' stored as TEXTFILE;
增量導入12月20日資料
1 MySQL資料庫導入12月20日資料(4條資料)
insert into `demo`.`t_product`(goods_id, goods_status, createtime, modifytime) values
('001', '待稽核', '2019-12-18', '2019-12-20'),
('002', '待售', '2019-12-19', '2019-12-20'),
('003', '在售', '2019-12-20', '2019-12-20'),
('004', '已删除', '2019-12-15', '2019-12-20');
mysql圖示 |
---|
![]() |
2 使用Kettle将MySQL資料導出,并導入到分區HDFS位置
Kettle轉換流程圖 |
---|
|
建立Hive分區
-- 建立分區
alter table `demo`.`ods_product` add if not exists partition (dt='2019-12-20');
增加分區 |
---|
|
表輸入 |
|
Hadoop File output |
|
3 Hive中查詢資料
4 資料導入次元表
insert overwrite table `demo`.`dw_product` partition(dt='2019-12-20')
select
goods_id,
goods_status,
createtime,
modifytime
from `demo`.`ods_product` where dt='2019-12-20';
增量導入12月21日資料
1 MySQL資料庫導入12月21日資料(6條資料)
UPDATE `demo`.`t_product` SET goods_status = '待售', modifytime = '2019-12-21' WHERE goods_id = '001';
INSERT INTO `demo`.`t_product`(goods_id, goods_status, createtime, modifytime) VALUES
('005', '待稽核', '2019-12-21', '2019-12-21'),
('006', '待稽核', '2019-12-21', '2019-12-21');
2 運作Kettle轉換,導入2019年12月21日資料
執行kettle轉換 |
---|
|
3 Hive查詢資料
mysql資料展示 |
---|
|
4 資料導入dw層表
insert overwrite table `demo`.`dw_product` partition(dt='2019-12-21')
select
goods_id,
goods_status,
createtime,
modifytime
from `demo`.`ods_product` where dt='2019-12-21';
最終資料展示 |
---|
|
增量導入12月22日資料
1 MySQL資料庫導入12月22日資料(6條資料)
UPDATE `demo`.`t_product` SET goods_status = '已删除', modifytime = '2019-12-22' WHERE goods_id = '003';
UPDATE `demo`.`t_product` SET goods_status = '已删除', modifytime = '2019-12-22' WHERE goods_id = '006';
INSERT INTO `demo`.`t_product`(goods_id, goods_status, createtime, modifytime) VALUES
('007', '待稽核', '2019-12-22', '2019-12-22'),
('008', '待稽核', '2019-12-22', '2019-12-22');
2 運作Kettle轉換,導入2019年12月22日資料
導入2019年12月22日資料 |
---|
|
3 Hive查詢資料
hive資料 |
---|
|
4 資料導入dw層表
insert overwrite table `demo`.`dw_product` partition(dt='2019-12-22')
select
goods_id,
goods_status,
createtime,
modifytime
from `demo`.`ods_product` where dt='2019-12-22';
2019-12-22資料 |
---|
|
從上述案例,可以看到:表每天保留一份全量,每次全量中會儲存很多不變的資訊,如果資料量很大的話,對存儲是極大的浪費。
可以将表設計為拉連結清單,既能滿足反應資料的曆史狀态,又可以最大限度地節省存儲空間
方案二:拉連結清單【使用拉連結清單儲存曆史快照】
1、拉連結清單方案介紹
拉連結清單
- 拉連結清單不存儲備援的資料,隻有某行的資料發生變化,才需要儲存下來,相比每次全量同步會節省存儲空間
- 能夠查詢到曆史快照
- 額外的增加了兩列(dw_start_date dw_end_date),為資料行的生命周期
12月20日商品拉連結清單的資料:
goods_id | goods_status | createtime | modifytime | dw_start_date | dw_end_date |
---|---|---|---|---|---|
001 | 待稽核 | 2019-12-18 | 2019-12-20 | 2019-12-20 | 9999-12-31 |
002 | 待售 | 2019-12-19 | 2019-12-20 | 2019-12-20 | 9999-12-31 |
003 | 在售 | 2019-12-20 | 2019-12-20 | 2019-12-20 | 9999-12-31 |
004 | 已删除 | 2019-12-15 | 2019-12-20 | 2019-12-20 | 9999-12-31 |
- 12月20日的資料是全新的資料導入到dw表
- dw_start_date表示某一條資料的生命周期起始時間,即資料從該時間開始有效(即生效日期)
- dw_end_date表示某一條資料的生命周期結束時間,即資料到這一天失效(即失效日期)
- dw_end_date為9999-12-31,表示目前這條資料是最新的資料,資料到9999-12-31才過期
12月21日商品拉連結清單的資料
goods_id | goods_status | createtime | modifytime | dw_start_date | dw_end_date |
---|---|---|---|---|---|
001 | 待稽核 | 2019-12-18 | 2019-12-20 | 2019-12-20 | 2019-12-20 |
002 | 待售 | 2019-12-19 | 2019-12-20 | 2019-12-20 | 9999-12-31 |
003 | 在售 | 2019-12-20 | 2019-12-20 | 2019-12-20 | 9999-12-31 |
004 | 已删除 | 2019-12-15 | 2019-12-20 | 2019-12-20 | 9999-12-31 |
001 | 待售 | 2019-12-18 | 2019-12-21 | 2019-12-21 | 9999-12-31 |
005 | 待稽核 | 2019-12-21 | 2019-12-21 | 2019-12-21 | 9999-12-31 |
006 | 待稽核 | 2019-12-21 | 2019-12-21 | 2019-12-21 | 9999-12-31 |
- 拉連結清單中沒有存儲備援的資料,隻要資料沒有變化,無需同步
- 001編号的商品資料的狀态發生了變化(從待稽核 → 待售),需要将原有的dw_end_date變為2019-12-21,表示待稽核狀态,在2019/12/20(包含) - 2019/12/21有效
- 001編号新的狀态重新儲存了一條記錄,dw_start_date為2019/12/21,dw_end_date為9999/12/31
12月22日商品拉連結清單的資料
goods_id | goods_status | createtime | modifytime | dw_start_date | dw_end_date |
---|---|---|---|---|---|
001 | 待稽核 | 2019-12-18 | 2019-12-20 | 2019-12-20 | 2019-12-20 |
002 | 待售 | 2019-12-19 | 2019-12-20 | 2019-12-20 | 9999-12-31 |
003 | 在售 | 2019-12-20 | 2019-12-20 | 2019-12-20 | 2019-12-21 |
004 | 已删除 | 2019-12-15 | 2019-12-20 | 2019-12-20 | 9999-12-31 |
001 | 待售 | 2019-12-18 | 2019-12-21 | 2019-12-21 | 9999-12-31 |
005 | 待稽核 | 2019-12-21 | 2019-12-21 | 2019-12-21 | 9999-12-31 |
006 | 待稽核 | 2019-12-21 | 2019-12-21 | 2019-12-21 | 9999-12-31 |
003 | 已删除 | 2019-12-20 | 2019-12-22 | 2019-12-22 | 9999-12-31 |
007 | 待稽核 | 2019-12-22 | 2019-12-22 | 2019-12-22 | 9999-12-31 |
008 | 待稽核 | 2019-12-22 | 2019-12-22 | 2019-12-22 | 9999-12-31 |
查詢拉連結清單
1 擷取2019-12-20日的曆史快照資料
2 擷取最新的商品快照資料
2、拉連結清單方案存儲曆史快照代碼實作
操作步驟:
- 在原有dw層表上,添加額外的兩列
- 生效日期(dw_start_date)
- 失效日期(dw_end_date)
- 隻同步當天修改的資料到ods層
- 拉連結清單算法實作
- 編寫SQL處理當天最新的資料
- 編寫SQL處理dw層曆史資料,重新計算之前的dw_end_date
- 拉連結清單的資料為:當天最新的資料 UNION ALL 曆史資料
- 拉連結清單的資料為:當天最新的資料 UNION ALL 曆史資料
代碼實作:
1 MySQL&Hive表初始化
MySQL建立商品表2
-- 建立資料庫
CREATE DATABASE demo DEFAULT CHARACTER SET utf8 COLLATE utf8_general_ci;
-- 建立商品表
create table if not exists `demo`.`t_product_2`(
goods_id varchar(50), -- 商品編号
goods_status varchar(50), -- 商品狀态
createtime varchar(50), -- 商品建立時間
modifytime varchar(50) -- 商品修改時間
)ENGINE=InnoDB DEFAULT CHARSET=utf8 ;
Hive ODS層建表
-- 建立表
create database if not exists `demo`;
-- 建立ods層表
create table if not exists `demo`.`ods_product_2`(
goods_id string, -- 商品編号
goods_status string, -- 商品狀态
createtime string, -- 商品建立時間
modifytime string -- 商品修改時間
)
partitioned by (dt string) --按照天分區
row format delimited fields terminated by ',' stored as TEXTFILE;
Hive dw層建立拉連結清單
-- 建立拉連結清單
create table if not exists `demo`.`dw_product_2`(
goods_id string, -- 商品編号
goods_status string, -- 商品狀态
createtime string, -- 商品建立時間
modifytime string, -- 商品修改時間
dw_start_date string, -- 生效日期
dw_end_date string -- 失效日期
)
row format delimited fields terminated by ',' stored as TEXTFILE;
全量導入2019年12月20日資料
1 MySQL資料庫導入12月20日資料(4條資料)
insert into `demo`.`t_product_2`(goods_id, goods_status, createtime, modifytime) values
('001', '待稽核', '2019-12-18', '2019-12-20'),
('002', '待售', '2019-12-19', '2019-12-20'),
('003', '在售', '2019-12-20', '2019-12-20'),
('004', '已删除', '2019-12-15', '2019-12-20');
mysql中資料 |
---|
![]() |
2 使用Kettle進行全量同步MySQL資料到Hive ods層表
Kettle元件圖 |
---|
|
設定命名參數 |
|
建立Hive分區
-- 建立分區
alter table `demo`.`ods_product_2` add if not exists partition (dt='${dt}');
hive增加分區 |
---|
|
表輸入
SELECT
*
FROM t_product_2
where modifytime <= '${dt}'
表輸入元件 |
---|
|
Hadoop File Ouput |
|
3 編寫SQL從ods導入dw當天最新的資料
-- 從ods層導入dw當天最新資料
insert overwrite table `demo`.`dw_product_2`
select
goods_id, -- 商品編号
goods_status, -- 商品狀态
createtime, -- 商品建立時間
modifytime, -- 商品修改時間
modifytime as dw_start_date, -- 生效日期
'9999-12-31' as dw_end_date -- 失效日期
from
`demo`.`ods_product_2`
where
dt = '2019-12-20';
當天最新的資料 |
---|
|
增量導入2019年12月21日資料
1 MySQL資料庫導入12月21日資料(6條資料)
UPDATE `demo`.`t_product_2` SET goods_status = '待售', modifytime = '2019-12-21' WHERE goods_id = '001';
INSERT INTO `demo`.`t_product_2`(goods_id, goods_status, createtime, modifytime) VALUES
('005', '待稽核', '2019-12-21', '2019-12-21'),
('006', '待稽核', '2019-12-21', '2019-12-21');
2 使用Kettle開發增量同步MySQL資料到Hive ods層表
Hive建立分區
-- 建立分區
alter table `demo`.`ods_product_2` add if not exists partition (dt='${dt}');
表輸入讀取MySQL資料
SELECT
*
FROM t_product_2
where modifytime = '${dt}'
ods中資料 |
---|
|
3 編寫SQL處理dw層曆史資料,重新計算之前的dw_end_date
-- 重新計算dw層拉連結清單中的失效時間
select
t1.goods_id, -- 商品編号
t1.goods_status, -- 商品狀态
t1.createtime, -- 商品建立時間
t1.modifytime, -- 商品修改時間
t1.dw_start_date, -- 生效日期(生效日期無需重新計算)
case when (t2.goods_id is not null and t1.dw_end_date > '2019-12-21')
then '2019-12-21'
else t1.dw_end_date
end as dw_end_date -- 更新生效日期(需要重新計算)
from
`demo`.`dw_product_2` t1
left join
(select * from `demo`.`ods_product_2` where dt='2019-12-21') t2
on t1.goods_id = t2.goods_id
6 合并當天最新的資料和曆史資料到
insert overwrite table `demo`.`dw_product_2`
select
t1.goods_id, -- 商品編号
t1.goods_status, -- 商品狀态
t1.createtime, -- 商品建立時間
t1.modifytime, -- 商品修改時間
t1.dw_start_date, -- 生效日期(生效日期無需重新計算)
case when (t2.goods_id is not null and t1.dw_end_date > '2019-12-21')
then '2019-12-21'
else t1.dw_end_date
end as dw_end_date -- 更新生效日期(需要重新計算)
from
`demo`.`dw_product_2` t1
left join
(select * from `demo`.`ods_product_2` where dt='2019-12-21') t2
on t1.goods_id = t2.goods_id
union all
select
goods_id, -- 商品編号
goods_status, -- 商品狀态
createtime, -- 商品建立時間
modifytime, -- 商品修改時間
modifytime as dw_start_date, -- 生效日期
'9999-12-31' as dw_end_date -- 失效日期
from
`demo`.`ods_product_2` where dt='2019-12-21'
order by dw_start_date, goods_id;
最終資料 |
---|
|
參考資料:
圖解資料倉庫之拉連結清單,超簡單!
數倉中的全量表,增量表,拉連結清單,流水表,快照表