天天看点

Sqoop+mysql+Hive+ Ozzie数据仓库案例

mysql 数据库脚本为:

Sqoop+mysql+Hive+ Ozzie数据仓库案例
Sqoop+mysql+Hive+ Ozzie数据仓库案例

/*==============================================================*/
/* DBMS name:      MySQL 5.0                                    */
/* Created on:     2018/11/23 1:09:10                           */
/*==============================================================*/
DROP DATABASE IF EXISTS mysql_sales_source;
CREATE DATABASE IF NOT EXISTS mysql_sales_source DEFAULT CHARSET utf8 COLLATE utf8_general_ci; 

USE mysql_sales_source;

DROP TABLE IF EXISTS customer;

DROP TABLE IF EXISTS product;

DROP TABLE IF EXISTS sales_order;

/*==============================================================*/
/* Table: customer                                              */
/*==============================================================*/
CREATE TABLE customer
(
   customer_number      INT(11) NOT NULL AUTO_INCREMENT,
   customer_name        VARCHAR(128) NOT NULL,
   customer_street_address VARCHAR(256) NOT NULL,
   customer_zip_code    INT(11) NOT NULL,
   customer_city        VARCHAR(32) NOT NULL,
   customer_state       VARCHAR(32) NOT NULL,
   PRIMARY KEY (customer_number)
);

/*==============================================================*/
/* Table: product                                               */
/*==============================================================*/
CREATE TABLE product
(
   product_code         INT(11) NOT NULL AUTO_INCREMENT,
   product_name         VARCHAR(128) NOT NULL,
   product_category     VARCHAR(256) NOT NULL,
   PRIMARY KEY (product_code)
);

/*==============================================================*/
/* Table: sales_order                                           */
/*==============================================================*/
CREATE TABLE sales_order
(
   order_number         INT(11) NOT NULL AUTO_INCREMENT,
   customer_number      INT(11) NOT NULL,
   product_code         INT(11) NOT NULL,
   order_date           DATETIME NOT NULL,
   entry_date           DATETIME NOT NULL,
   order_amount         DECIMAL(18,2) NOT NULL,
   PRIMARY KEY (order_number)
);

/*==============================================================*/
/* insert data                                        */
/*==============================================================*/

INSERT INTO customer
( customer_name
, customer_street_address
, customer_zip_code
, customer_city
, customer_state
 )
VALUES
  ('Big Customers', '7500 Louise Dr.', '17050','Mechanicsburg', 'PA')
, ( 'Small Stores', '2500 Woodland St.', '17055', 'Pittsburgh', 'PA')
, ('Medium Retailers', '1111 Ritter Rd.', '17055','Pittsburgh', 'PA')
,  ('Good Companies', '9500 Scott St.', '17050','Mechanicsburg', 'PA')
, ('Wonderful Shops', '3333 Rossmoyne Rd.', '17050','Mechanicsburg', 'PA')
, ('Loyal Clients', '7070 Ritter Rd.', '17055','Pittsburgh', 'PA');
       
       
INSERT INTO product(product_name,product_category) VALUES
('Hard Disk','Storage'),
('Floppy Drive','Storage'),
('lcd panel','monitor');



DROP PROCEDURE  IF EXISTS usp_generate_order_data;
DELIMITER //
CREATE PROCEDURE usp_generate_order_data()
BEGIN

    DROP TABLE IF EXISTS tmp_sales_order;
    CREATE TABLE tmp_sales_order AS SELECT * FROM sales_order WHERE 1=0;
    SET @start_date := UNIX_TIMESTAMP('2018-1-1');
    SET @end_date := UNIX_TIMESTAMP('2018-11-23');
    SET @i := 1;
    WHILE @i<=10000 DO
        SET @customer_number := FLOOR(1+RAND()*6);
        SET @product_code := FLOOR(1+RAND()* 3);
        SET @order_date := FROM_UNIXTIME(@start_date+RAND()*(@end_date-@start_date));
        SET @amount := FLOOR(1000+RAND()*9000);
        INSERT INTO tmp_sales_order VALUES (@i,@customer_number,@product_code,@order_date,@order_date,@amount);
        SET @i := @i +1;
    END WHILE;
    TRUNCATE TABLE sales_order;
    INSERT INTO sales_order
    SELECT NULL,customer_number,product_code,order_date,entry_date,order_amount
    FROM tmp_sales_order;
    COMMIT;
    DROP TABLE IF EXISTS tmp_sales_order;
END;
//
DELIMITER ;
CALL usp_generate_order_data();      

View Code

ods脚本为:

Sqoop+mysql+Hive+ Ozzie数据仓库案例
Sqoop+mysql+Hive+ Ozzie数据仓库案例
create database sales_ods
/*==============================================================*/
/* DBMS name:      Hive                                         */
/* Created on:     2018/11/23 1:09:10                           */
/*==============================================================*/

CREATE DATABASE IF NOT EXISTS sales_ods DEFAULT CHARSET utf8 COLLATE utf8_general_ci; 

USE sales_ods;

DROP TABLE IF EXISTS rds.customer;

DROP TABLE IF EXISTS rds.product;

DROP TABLE IF EXISTS rds.sales_order;

/*==============================================================*/
/* Table: customer                                              */
/*==============================================================*/
CREATE TABLE sales_rds.customer
(
   customer_number      INT ,
   customer_name        VARCHAR(128)  ,
   customer_street_address VARCHAR(256)  ,
   customer_zip_code    INT  ,
   customer_city        VARCHAR(32)  ,
   customer_state       VARCHAR(32)  
);

/*==============================================================*/
/* Table: product                                               */
/*==============================================================*/
CREATE TABLE sales_rds.product
(
   product_code         INT,
   product_name         VARCHAR(128)  ,
   product_category     VARCHAR(256)  
);

/*==============================================================*/
/* Table: sales_order                                           */
/*==============================================================*/
CREATE TABLE sales_rds.sales_order
(
   order_number         INT ,
   customer_number      INT,
   product_code         INT ,
   order_date           timestamp  ,
   entry_date           timestamp  ,
   order_amount         DECIMAL(18,2)  
);      

DW脚本为:

Sqoop+mysql+Hive+ Ozzie数据仓库案例
Sqoop+mysql+Hive+ Ozzie数据仓库案例
create database dw;
create table dim_product
(
   product_sk           int   ,
   product_code         int ,
   product_name         varchar(128),
   product_category     varchar(256),
   version              varchar(32),
   effective_date       date,
   expiry_date          date
)
clustered by (product_sk ) into 8 buckets
stored as orc tblproperties('transactional'='true');


/*==============================================================*/
/* Table: dim_customer                                          */
/*==============================================================*/
create table dim_customer
(
   customer_sk          int   ,
   customer_number      int ,
   customer_name        varchar(128),
   customer_street_address varchar(256),
   customer_zip_code    int,
   customer_city        varchar(32),
   customer_state       varchar(32),
   version              varchar(32),
   effective_date       date,
   expiry_date          date
)
clustered by (customer_sk ) into 8 buckets
stored as orc tblproperties('transactional'='true');

/*==============================================================*/
/* Table: dim_date                                              */
/*==============================================================*/
create table dw.dim_date
(
   date_sk              int   ,
   date                 date,
   month                tinyint,
   month_name            varchar(16),
   quarter              tinyint,
   year                 int
) row format delimited fields terminated by ','
stored as textfile;

/*==============================================================*/
/* Table: dim_order                                             */
/*==============================================================*/
create table dim_order
(
   order_sk             int  ,
   order_number         int,
   version              varchar(32),
   effective_date       date,
   expiry_date          date
)
clustered by (order_sk ) into 8 buckets
stored as orc tblproperties('transactional'='true');
;

/*==============================================================*/
/* Table: fact_sales_order                                      */
/*==============================================================*/
create table fact_sales_order
(
   order_sk             int  ,
   customer_sk          int  ,
   product_sk           int  ,
   order_date_sk        int  ,
   order_amount         decimal(18,2)
)
partitioned by(order_date string)
clustered by (order_sk ) into 8 buckets
stored as orc tblproperties('transactional'='true');
;      

生成dim_date数据:

Sqoop+mysql+Hive+ Ozzie数据仓库案例
Sqoop+mysql+Hive+ Ozzie数据仓库案例
#generate_dim_date.sh

#!/bin/bash
date1="$1"
date2="$2"
tempdate=`date -d "$date1" +%F`
tempdateSec=`date -d "$date2" +%s`
enddateSec=`date -d "$date2" +$s`
min=1
#max=`expr \( $enddateSec - $tempdateSec \) /  \( 24 \* 60 \* 60 \) + 1`
max=14611
cat /datas >./dim_date.csv

while [ $min -le $max ] 
do
    month=`date -d "$tempdate" +%m`
    month_name=`date -d "$tempdate" +%B`
    quarter=`echo $month | awk '{print int(($0-1)/3 +1 }'`
    year=`date -d "$tempdate" +%Y`
    echo ${min}","${tempdate}","${month}","${month_name}","${quarter}","${year} >> ./dim_date.csv
    tempdate=`date -d "+$min day $date1" +%F`
    tempdateSec=`date -d "+min day $date1" +%s`
    min=`expr $min + 1`
done      

init_dw_etl.sql hive脚本:

Sqoop+mysql+Hive+ Ozzie数据仓库案例
Sqoop+mysql+Hive+ Ozzie数据仓库案例
USE dw;

-- 清空表
TRUNCATE TABLE dim_customer;
TRUNCATE TABLE dim_product;
TRUNCATE TABLE dim_order;
TRUNCATE TABLE fact_sales_order;

-- 装载客户维度表
INSERT INTO customer_dim (customer_sk,customer_number,customer_name,customer_street_address,customer_zip_code,customer_city,customer_state,`version`,effective_date,expiry_date)
SELECT
    row_number() over (ORDER BY t1.customer_number) + t2.sk_max,
    t1.customer_number, 
    t1.customer_name, 
    t1.customer_street_address,
    t1.customer_zip_code, 
    t1.customer_city, 
    t1.customer_state, 
    1,
    '2016-03-01', 
    '2050-01-01'
FROM ods.customer t1
CROSS JOIN 
    (SELECT COALESCE(MAX(customer_sk),0) sk_max 
    FROM dim_customer) t2;
    
-- 装载产品维度表
INSERT INTO dim_product (product_sk,product_code,product_name,product_category,`version`,effective_date,expiry_date)
SELECT row_number() over (ORDER BY t1.product_code) + t2.sk_max,
    product_code, 
    product_name, 
    product_category, 
    1,
    '2016-03-01', 
    '2050-01-01'
FROM ods.product t1
CROSS JOIN
    (SELECT COALESCE(MAX(product_sk),0) sk_max 
    FROM product_dim) t2;
    
-- 装载订单维度表
INSERT INTO dim_order(order_sk,order_number,`version`,effective_date,expiry_date)
SELECT row_number() over (ORDER BY t1.order_number) + t2.sk_max,
    order_number, 
    1, 
    order_date, 
    '2050-01-01'
FROM ods.sales_order t1
CROSS JOIN
    (SELECT COALESCE(MAX(order_sk),0) sk_max 
    FROM dim_order) t2;
    
-- 装载销售订单事实表
INSERT INTO fact_sales_order()
SELECT order_sk, 
    customer_sk, 
    product_sk, 
    date_sk, 
    order_amount
FROM ods.sales_order a
JOIN dim_order b ON a.order_number = b.order_number
JOIN dim_customer c ON a.customer_number = c.customer_number
JOIN dim_product d ON a.product_code = d.product_code
JOIN dim_date e ON (a.order_date) = e.date      

init_all_etl.sh脚本:

Sqoop+mysql+Hive+ Ozzie数据仓库案例
Sqoop+mysql+Hive+ Ozzie数据仓库案例
#!/bin/bash
# 建立Sqoop增量导入作业,以order_number作为检查列,初始的last-value是0
sqoop job --delete rds_incremental_import_job
sqoop job --create rds_incremental_import_job \
-- \
import \
--connect "jdbc:mysql://192.168.25.120:3306/sales_source?useSSL=false&user=root&password=123456" \
--table sales_order \
--columns "order_number, customer_number, product_code, order_date, entry_date, order_amount"
\ --hive-import \
--hive-table rds.sales_order \
--incremental append \
--check-column order_number \
--last-value 0
# 首次抽取,将全部数据导入RDS库
sqoop import --connect jdbc:mysql://192.168.25.120:3306/sales_source?useSSL=false --username root --password 123456 --table customer --hive-import --hive-table rds.customer --hive-overwrite
sqoop import --connect jdbc:mysql://192.168.25.120:3306/sales_source?useSSL=false --username root --password 123456 --table product --hive-import --hive-table rds.product --hive-overwrite
beeline -u jdbc:hive2://cdh2:10000/dw -e "TRUNCATE TABLE rds.sales_order"
# 执行增量导入,因为last-value初始值为0,所以此次会导入全部数据
sqoop job --exec rds_incremental_import_job
# 调用init_etl.sql文件执行初始装载
beeline -u jdbc:hive2://cdh2:10000/dw -f init_dw_etl.sql      

load_source_dim_date.sql脚本:

Sqoop+mysql+Hive+ Ozzie数据仓库案例
Sqoop+mysql+Hive+ Ozzie数据仓库案例
DELIMITER //
CREATE PROCEDURE USP_Load_Dim_Date(dt_start DATE,dt_end DATE)
BEGIN
WHILE dt_start<=dt_end DO
    INSERT INTO dim_date (`date`,`month`,`month_name`,`quarter`,`year`)
    VALUES (dt_start,MONTH(dt_start),MONTHNAME(dt_start),QUARTER(dt_start),YEAR(dt_start));
    SET dt_start =ADDDATE(dt_start,1);
END WHILE;
COMMIT;
END;
 //

CALL USP_Load_Dim_Date('2010-1-1','2050-1-1')

SELECT * FROM dim_date      

schedule_daily_etl.sql 每日周期调度sql脚本:

Sqoop+mysql+Hive+ Ozzie数据仓库案例
Sqoop+mysql+Hive+ Ozzie数据仓库案例
-- 设置scd的生效时间和过期时间
SET hivevar:cur_date = CURRENT_DATE(); 
SET hivevar:pre_date = DATE_ADD(${hivevar:cur_date},-1);
SET hivevar:max_date = CAST('2050-01-01' AS DATE);

-- 设置cdc的开始结束日期
INSERT overwrite TABLE rds.cdc_time
SELECT last_load, ${hivevar:cur_date} FROM rds.cdc_time;

-- 装载customer维度
-- 获取源数据中被删除的客户和地址发生改变的客户,将这些数据设置为过期时间,即当前时间的前一天
UPDATE dim_customer
SET expiry_date = ${hivevar:pre_date}
WHERE dim_customer.customer_sk IN(SELECT
                                    a.customer_sk
                                  FROM (SELECT
                                          customer_sk,
                                          customer_number,
                                          customer_street_address
                                        FROM dim_customer
                                        WHERE expiry_date = ${hivevar:max_date}) a
                                  LEFT JOIN rds.customer b ON a.customer_number = b.customer_number
                                  WHERE b.customer_number IS NULL
                                       OR a.customer_street_address <> b.customer_street_address);

-- 将有地址变化的插入到dim_customer表,如果有相同数据存在有不过期的数据则不插入
INSERT INTO dim_customer
SELECT row_number() over (ORDER BY t1.customer_number) + t2.sk_max,
    t1.customer_number,
    t1.customer_name,
    t1.customer_street_address,
    t1.customer_zip_code,
    t1.customer_city,
    t1.customer_state,
    t1.version,
    t1.effective_date,
    t1.expiry_date
FROM(SELECT
    t2.customer_number customer_number,
    t2.customer_name customer_name,
    t2.customer_street_address customer_street_address,
    t2.customer_zip_code,
    t2.customer_city,
    t2.customer_state,
    t1.version + 1 `version`,
    ${hivevar:pre_date} effective_date,
    ${hivevar:max_date} expiry_date
FROM dim_customer t1
INNER JOIN rds.customer t2 ON t1.customer_number = t2.customer_number
                AND t1.expiry_date = ${hivevar:pre_date}
LEFT JOIN dim_customer t3 ON t1.customer_number = t3.customer_number
            AND t3.expiry_date = ${hivevar:max_date}
WHERE t1.customer_street_address <> t2.customer_street_address 
    AND t3.customer_sk IS NULL
) t1
CROSS JOIN(SELECT 
        COALESCE(MAX(customer_sk),0) sk_max 
       FROM dim_customer) t2;
      

-- 处理customer_name列上的scd1,覆盖
-- 不进行更新,将源数据中的name列有变化的数据提取出来,放入临时表
-- 将 dim_couster中这些数据删除、
-- 将临时表中的数据插入
DROP TABLE IF EXISTS tmp;
CREATE TABLE tmp AS
SELECT a.customer_sk,
    a.customer_number,
    b.customer_name,
    a.customer_street_address,
    a.customer_zip_code,
    a.customer_city,
    a.customer_state,
    a.version,
    a.effective_date,
    a.expiry_date
FROM dim_customer a 
JOIN rds.customer b ON a.customer_number = b.customer_number 
            AND(a.customer_name <> b.customer_name);
-- 删除数据            
DELETE FROM
dim_customer WHERE
dim_customer.customer_sk IN (SELECT customer_sk FROM tmp);

-- 插入数据
INSERT INTO dim_customer 
SELECT * FROM tmp;



-- 处理新增的customer记录
INSERT INTO dim_customer
SELECT row_number() over (ORDER BY t1.customer_number) + t2.sk_max,
    t1.customer_number,
    t1.customer_name,
    t1.customer_street_address,
    t1.customer_zip_code,
    t1.customer_city,
    t1.customer_state,
    1,
    ${hivevar:pre_date},
    ${hivevar:max_date}
FROM( SELECT t1.* 
    FROM rds.customer t1 
    LEFT JOIN dim_customer t2 ON t1.customer_number = t2.customer_number
WHERE t2.customer_sk IS NULL ) t1
CROSS JOIN(SELECT 
        COALESCE(MAX(customer_sk),0) sk_max 
       FROM dim_customer) t2;




-- 装载product维度
-- 取源数据中删除或者属性发生变化的产品,将对应
UPDATE dim_product
SET expiry_date = ${hivevar:pre_date}
WHERE dim_product.product_sk IN(SELECT a.product_sk
                FROM(SELECT product_sk,
                        product_code,
                        product_name,
                        product_category
                     FROM dim_product 
                     WHERE expiry_date = ${hivevar:max_date}) a 
                     LEFT JOIN rds.product b ON a.product_code = b.product_code
                     WHERE b.product_code IS NULL 
                        OR (a.product_name <> b.product_name OR a.product_category <> b.product_category));
                    
-- 处理product_name、product_category列上scd2的新增行
INSERT INTO dim_product
SELECT row_number() over (ORDER BY t1.product_code) + t2.sk_max,
    t1.product_code,
    t1.product_name,
    t1.product_category,
    t1.version,
    t1.effective_date,
    t1.expiry_date
FROM( SELECT t2.product_code product_code,
        t2.product_name product_name,
        t2.product_category product_category,
        t1.version + 1 `version`,
        ${hivevar:pre_date} effective_date,
        ${hivevar:max_date} expiry_date
FROM dim_product t1
INNER JOIN rds.product t2 ON t1.product_code = t2.product_code
                AND t1.expiry_date = ${hivevar:pre_date}
LEFT JOIN dim_product t3 ON t1.product_code = t3.product_code 
                AND t3.expiry_date = ${hivevar:max_date}
WHERE(t1.product_name <> t2.product_name 
    OR t1.product_category <> t2.product_category) 
    AND t3.product_sk IS NULL
) t1
CROSS JOIN (SELECT COALESCE(MAX(product_sk),0) sk_max 
        FROM dim_product) t2;
        
-- 处理新增的 product 记录
INSERT INTO dim_product
SELECT row_number() over (ORDER BY t1.product_code) + t2.sk_max,
    t1.product_code,
    t1.product_name,
    t1.product_category,
    1,
    ${hivevar:pre_date},
    ${hivevar:max_date}
FROM( SELECT t1.* 
    FROM rds.product t1 
    LEFT JOIN dim_product t2 ON t1.product_code = t2.product_code
    WHERE t2.product_sk IS NULL
    ) t1
CROSS JOIN (SELECT COALESCE(MAX(product_sk),0) sk_max 
        FROM dim_product) t2;



-- 装载order维度
INSERT INTO dim_order
SELECT row_number() over (ORDER BY t1.order_number) + t2.sk_max,
    t1.order_number,
    t1.version,
    t1.effective_date,
    t1.expiry_date
FROM(  SELECT order_number order_number,
        1 `version`,
        order_date effective_date,
        '2050-01-01' expiry_date
    FROM rds.sales_order, rds.cdc_time
    WHERE entry_date >= last_load AND entry_date < current_load ) t1
    CROSS JOIN(    SELECT COALESCE(MAX(order_sk),0) sk_max 
            FROM dim_order) t2;


-- 装载销售订单事实表
INSERT INTO sales_fact_sales_order
SELECT order_sk,
    customer_sk,
    product_sk,
    date_sk,
    order_amount
FROM rds.sales_order a,
    dim_order b,
    dim_customer c,
    dim_product d,
    date_dim e,
    rds.cdc_time f
WHERE a.order_number = b.order_number
    AND a.customer_number = c.customer_number
    AND a.order_date >= c.effective_date
    AND a.order_date < c.expiry_date
    AND a.product_code = d.product_code
    AND a.order_date >= d.effective_date
    AND a.order_date < d.expiry_date
    AND to_date(a.order_date) = e.date
    AND a.entry_date >= f.last_load 
    AND a.entry_date < f.current_load ;



-- 更新时间戳表的last_load字段
INSERT overwrite TABLE rds.cdc_time 
SELECT current_load, current_load 
FROM rds.cdc_time;      

schedule_daily.sh每日周期调度sh脚本:

Sqoop+mysql+Hive+ Ozzie数据仓库案例
Sqoop+mysql+Hive+ Ozzie数据仓库案例
#!/bin/bash
# 整体拉取customer、product表数据
sqoop import --connect jdbc:mysql://cdh1:3306/source?useSSL=false --username root --password
mypassword --table customer --hive-import --hive-table rds.customer --hive-overwrite
sqoop import --connect jdbc:mysql://cdh1:3306/source?useSSL=false --username root --password
mypassword --table product --hive-import --hive-table rds.product --hive-overwrite
# 执行增量导入
sqoop job --exec myjob_incremental_import
# 调用 regular_etl.sql 文件执行定期装载
beeline -u jdbc:hive2://cdh2:10000/dw -f schedule_daily_etl.sql      
-- 2015年各城市的手机销量
USE test;
SELECT SUM(Units_Sold),City
FROM Fact_Sales a 
JOIN Dim_Store b ON a.Store_Id = b.id
JOIN Dim_Date c ON a.Date_Id = c.id
JOIN Dim_Product d ON a.Product_Id = d.id
WHERE c.Year=2018 AND d.Product_Category='mobile'
GROUP BY City;
USE snow;
SELECT SUM(Units_Sold),City
FROM Fact_Sales a
JOIN Dim_Store b ON a.Store_Id = b.id
JOIN Dim_Geography c ON  b.Geography_Id = c.id
JOIN Dim_Product d ON a.Product_Id = d.Product_Id
JOIN Dim_Category e ON d.Category_Id = e.Category_Id
JOIN Dim_Date f ON a.Date_Id = f.id
WHERE e.Categoryt_Name='mobile' AND f.Year = 2015
GROUP BY City;