基于Hadoop生态圈的数据仓库实践 —— 进阶技术(四)

四、角色扮演维度
        当一个事实表多次引用一个维度表时会用到角色扮演维度。例如,一个销售订单有一个是订单日期,还有一个交货日期,这时就需要引用日期维度表两次。
        本节将说明两类角色扮演维度的实现,分别是表别名和数据库视图。这两种都使用了Hive的功能。表别名是在SQL语句里引用维度表多次,每次引用都赋予维度表一个别名。而数据库视图,则是按照事实表需要引用维度表的次数,建立相同数量的视图。
1. 修改数据库模式
        使用下面的脚本修改数据库模式。分别给数据仓库里的事实表sales_order_fact和源数据库中订单销售表sales_order增加request_delivery_date_sk和request_delivery_date列。

-- in hive
USE dw; 

-- sales_order_fact表是ORC格式,增加列需要重建数据
ALTER TABLE sales_order_fact RENAME TO sales_order_fact_old;
CREATE TABLE sales_order_fact (
    order_sk INT comment ‘order surrogate key‘,
    customer_sk INT comment ‘customer surrogate key‘,
    product_sk INT comment ‘product surrogate key‘,
    order_date_sk INT comment ‘date surrogate key‘,
    request_delivery_date_sk INT comment ‘request delivery date surrogate key‘,
    order_amount DECIMAL(10 , 2 ) comment ‘order amount‘,
    order_quantity INT COMMENT ‘order_quantity‘
)
CLUSTERED BY (order_sk) INTO 8 BUCKETS
STORED AS ORC TBLPROPERTIES (‘transactional‘=‘true‘);
INSERT INTO sales_order_fact
SELECT order_sk, customer_sk, product_sk, order_date_sk, NULL, order_amount, order_quantity
  FROM sales_order_fact_old;
DROP TABLE sales_order_fact_old;  

USE rds;
ALTER TABLE sales_order ADD COLUMNS (request_delivery_date DATE COMMENT ‘request delivery date‘) ;  

-- in mysql
USE source;
ALTER TABLE sales_order ADD request_delivery_date DATE AFTER order_date ;

修改后源数据库模式如下图所示。

修改后DW数据库模式如下图所示。

Hive不能像MySQL那样指定新增列的位置,它新增的列都是在表的最后。
2. 重建Sqoop作业
        使用下面的脚本重建Sqoop作业,增加request_delivery_date列。

last_value=`sqoop job --show myjob_incremental_import --meta-connect jdbc:hsqldb:hsql://cdh2:16000/sqoop | grep incremental.last.value | awk ‘{print $3}‘`
sqoop job --delete myjob_incremental_import --meta-connect jdbc:hsqldb:hsql://cdh2:16000/sqoop
sqoop job --meta-connect jdbc:hsqldb:hsql://cdh2:16000/sqoop --create myjob_incremental_import -- import --connect "jdbc:mysql://cdh1:3306/source?useSSL=false&user=root&password=mypassword" --table sales_order --columns "order_number, customer_number, product_code, order_date, entry_date, order_amount, order_quantity, request_delivery_date" --hive-import --hive-table rds.sales_order --incremental append --check-column order_number --last-value $last_value

注意columns参数值中列的顺序(MySQL里的source.sales_order)要和rds.sales_order的顺序保持一致。
3. 修改定期装载regular_etl.sql文件
        定期装载HiveQL脚本需要增加对交货日期列的处理,修改后的脚本如下所示。

-- 设置变量以支持事务
set hive.support.concurrency=true;
set hive.exec.dynamic.partition.mode=nonstrict;
set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
set hive.compactor.initiator.on=true;
set hive.compactor.worker.threads=1;  

USE dw;  

-- 设置SCD的生效时间和过期时间
SET hivevar:cur_date = CURRENT_DATE();
SET hivevar:pre_date = DATE_ADD(${hivevar:cur_date},-1);
SET hivevar:max_date = CAST(‘2200-01-01‘ AS DATE);  

-- 设置CDC的上限时间
INSERT OVERWRITE TABLE rds.cdc_time SELECT last_load, ${hivevar:cur_date} FROM rds.cdc_time;  

-- 装载customer维度
-- 设置已删除记录和地址相关列上SCD2的过期,用<=>运算符处理NULL值。
UPDATE customer_dim
   SET expiry_date = ${hivevar:pre_date}
 WHERE customer_dim.customer_sk IN
(SELECT a.customer_sk
   FROM (SELECT customer_sk,
                customer_number,
                customer_street_address,
                customer_zip_code,
                customer_city,
                customer_state,
                shipping_address,
                shipping_zip_code,
                shipping_city,
                shipping_state
           FROM customer_dim WHERE expiry_date = ${hivevar:max_date}) a LEFT JOIN
                rds.customer b ON a.customer_number = b.customer_number
          WHERE b.customer_number IS NULL OR
          (  !(a.customer_street_address <=> b.customer_street_address)
          OR !(a.customer_zip_code <=> b.customer_zip_code)
          OR !(a.customer_city <=> b.customer_city)
          OR !(a.customer_state <=> b.customer_state)
          OR !(a.shipping_address <=> b.shipping_address)
          OR !(a.shipping_zip_code <=> b.shipping_zip_code)
          OR !(a.shipping_city <=> b.shipping_city)
          OR !(a.shipping_state <=> b.shipping_state)
          ));   

-- 处理customer_street_addresses列上SCD2的新增行
INSERT INTO customer_dim
SELECT
    ROW_NUMBER() OVER (ORDER BY t1.customer_number) + t2.sk_max,
    t1.customer_number,
    t1.customer_name,
    t1.customer_street_address,
    t1.customer_zip_code,
    t1.customer_city,
    t1.customer_state,
    t1.shipping_address,
    t1.shipping_zip_code,
    t1.shipping_city,
    t1.shipping_state,
    t1.version,
    t1.effective_date,
    t1.expiry_date
FROM
(
SELECT
    t2.customer_number customer_number,
    t2.customer_name customer_name,
    t2.customer_street_address customer_street_address,
    t2.customer_zip_code customer_zip_code,
    t2.customer_city customer_city,
    t2.customer_state customer_state,
    t2.shipping_address shipping_address,
    t2.shipping_zip_code shipping_zip_code,
    t2.shipping_city shipping_city,
    t2.shipping_state shipping_state,
    t1.version + 1 version,
    ${hivevar:pre_date} effective_date,
    ${hivevar:max_date} expiry_date
 FROM customer_dim t1
INNER JOIN rds.customer t2
   ON t1.customer_number = t2.customer_number
  AND t1.expiry_date = ${hivevar:pre_date}
 LEFT JOIN customer_dim t3
   ON t1.customer_number = t3.customer_number
  AND t3.expiry_date = ${hivevar:max_date}
WHERE (!(t1.customer_street_address <=> t2.customer_street_address)
   OR  !(t1.customer_zip_code <=> t2.customer_zip_code)
   OR  !(t1.customer_city <=> t2.customer_city)
   OR  !(t1.customer_state <=> t2.customer_state)
   OR  !(t1.shipping_address <=> t2.shipping_address)
   OR  !(t1.shipping_zip_code <=> t2.shipping_zip_code)
   OR  !(t1.shipping_city <=> t2.shipping_city)
   OR  !(t1.shipping_state <=> t2.shipping_state)
   )
  AND t3.customer_sk IS NULL) t1
CROSS JOIN
(SELECT COALESCE(MAX(customer_sk),0) sk_max FROM customer_dim) t2;  

-- 处理customer_name列上的SCD1
-- 因为hive的update的set子句还不支持子查询,所以这里使用了一个临时表存储需要更新的记录,用先delete再insert代替update
-- 因为SCD1本身就不保存历史数据,所以这里更新维度表里的所有customer_name改变的记录,而不是仅仅更新当前版本的记录
DROP TABLE IF EXISTS tmp;
CREATE TABLE tmp AS
SELECT
    a.customer_sk,
    a.customer_number,
    b.customer_name,
    a.customer_street_address,
    a.customer_zip_code,
    a.customer_city,
    a.customer_state,
    a.shipping_address,
    a.shipping_zip_code,
    a.shipping_city,
    a.shipping_state,
    a.version,
    a.effective_date,
    a.expiry_date
  FROM customer_dim a, rds.customer b
 WHERE a.customer_number = b.customer_number AND !(a.customer_name <=> b.customer_name);
DELETE FROM customer_dim WHERE customer_dim.customer_sk IN (SELECT customer_sk FROM tmp);
INSERT INTO customer_dim SELECT * FROM tmp;  

-- 处理新增的customer记录
INSERT INTO customer_dim
SELECT
    ROW_NUMBER() OVER (ORDER BY t1.customer_number) + t2.sk_max,
    t1.customer_number,
    t1.customer_name,
    t1.customer_street_address,
    t1.customer_zip_code,
    t1.customer_city,
    t1.customer_state,
    t1.shipping_address,
    t1.shipping_zip_code,
    t1.shipping_city,
    t1.shipping_state,
    1,
    ${hivevar:pre_date},
    ${hivevar:max_date}
FROM
(
SELECT t1.* FROM rds.customer t1 LEFT JOIN customer_dim t2 ON t1.customer_number = t2.customer_number
 WHERE t2.customer_sk IS NULL) t1
CROSS JOIN
(SELECT COALESCE(MAX(customer_sk),0) sk_max FROM customer_dim) t2;  

-- 重载PA客户维度
TRUNCATE TABLE pa_customer_dim;
INSERT INTO pa_customer_dim
SELECT
  customer_sk
, customer_number
, customer_name
, customer_street_address
, customer_zip_code
, customer_city
, customer_state
, shipping_address
, shipping_zip_code
, shipping_city
, shipping_state
, version
, effective_date
, expiry_date
FROM customer_dim
WHERE customer_state = ‘PA‘ ;   

-- 装载product维度
-- 设置已删除记录和product_name、product_category列上SCD2的过期
UPDATE product_dim
   SET expiry_date = ${hivevar:pre_date}
 WHERE product_dim.product_sk IN
(SELECT a.product_sk
   FROM (SELECT product_sk,product_code,product_name,product_category
           FROM product_dim WHERE expiry_date = ${hivevar:max_date}) a LEFT JOIN
                rds.product b ON a.product_code = b.product_code
          WHERE b.product_code IS NULL OR (a.product_name <> b.product_name OR a.product_category <> b.product_category));  

-- 处理product_name、product_category列上SCD2的新增行
INSERT INTO product_dim
SELECT
    ROW_NUMBER() OVER (ORDER BY t1.product_code) + t2.sk_max,
    t1.product_code,
    t1.product_name,
    t1.product_category,
    t1.version,
    t1.effective_date,
    t1.expiry_date
FROM
(
SELECT
    t2.product_code product_code,
    t2.product_name product_name,
    t2.product_category product_category,
    t1.version + 1 version,
    ${hivevar:pre_date} effective_date,
    ${hivevar:max_date} expiry_date
 FROM product_dim t1
INNER JOIN rds.product t2
   ON t1.product_code = t2.product_code
  AND t1.expiry_date = ${hivevar:pre_date}
 LEFT JOIN product_dim t3
   ON t1.product_code = t3.product_code
  AND t3.expiry_date = ${hivevar:max_date}
WHERE (t1.product_name <> t2.product_name OR t1.product_category <> t2.product_category) AND t3.product_sk IS NULL) t1
CROSS JOIN
(SELECT COALESCE(MAX(product_sk),0) sk_max FROM product_dim) t2;  

-- 处理新增的product记录
INSERT INTO product_dim
SELECT
    ROW_NUMBER() OVER (ORDER BY t1.product_code) + t2.sk_max,
    t1.product_code,
    t1.product_name,
    t1.product_category,
    1,
    ${hivevar:pre_date},
    ${hivevar:max_date}
FROM
(
SELECT t1.* FROM rds.product t1 LEFT JOIN product_dim t2 ON t1.product_code = t2.product_code
 WHERE t2.product_sk IS NULL) t1
CROSS JOIN
(SELECT COALESCE(MAX(product_sk),0) sk_max FROM product_dim) t2;  

-- 装载order维度
INSERT INTO order_dim
SELECT
    ROW_NUMBER() OVER (ORDER BY t1.order_number) + t2.sk_max,
    t1.order_number,
    t1.version,
    t1.effective_date,
    t1.expiry_date
  FROM
(
SELECT
    order_number order_number,
    1 version,
    order_date effective_date,
    ‘2200-01-01‘ expiry_date
  FROM rds.sales_order, rds.cdc_time
 WHERE entry_date >= last_load AND entry_date < current_load ) t1
CROSS JOIN
(SELECT COALESCE(MAX(order_sk),0) sk_max FROM order_dim) t2;  

-- 装载销售订单事实表
INSERT INTO sales_order_fact
SELECT
    order_sk,
    customer_sk,
    product_sk,
    e.date_sk,
    f.date_sk,
    order_amount,
    order_quantity
  FROM
    rds.sales_order a,
    order_dim b,
    customer_dim c,
    product_dim d,
    date_dim e,
    date_dim f,
    rds.cdc_time g
 WHERE
    a.order_number = b.order_number
AND a.customer_number = c.customer_number
AND a.order_date >= c.effective_date
AND a.order_date < c.expiry_date
AND a.product_code = d.product_code
AND a.order_date >= d.effective_date
AND a.order_date < d.expiry_date
AND to_date(a.order_date) = e.date
AND to_date(a.request_delivery_date) = f.date
AND a.entry_date >= g.last_load AND a.entry_date < g.current_load ;  

-- 更新时间戳表的last_load字段
INSERT OVERWRITE TABLE rds.cdc_time SELECT current_load, current_load FROM rds.cdc_time;

4. 测试
(1)执行下面的SQL脚本增加三个带有交货日期的销售订单。

USE source;
/***
新增订单日期为2016年7月17日的3条订单。
***/
SET @start_date := unix_timestamp(‘2016-07-17‘);
SET @end_date := unix_timestamp(‘2016-07-18‘);
SET @request_delivery_date := ‘2016-07-20‘;
DROP TABLE IF EXISTS temp_sales_order_data;
CREATE TABLE temp_sales_order_data AS SELECT * FROM sales_order WHERE 1=0;     

SET @order_date := from_unixtime(@start_date + rand() * (@end_date - @start_date));
SET @amount := floor(1000 + rand() * 9000);
SET @quantity := floor(10 + rand() * 90);
INSERT INTO temp_sales_order_data VALUES (126, 1, 1, @order_date, @request_delivery_date, @order_date, @amount, @quantity);    

SET @order_date := from_unixtime(@start_date + rand() * (@end_date - @start_date));
SET @amount := floor(1000 + rand() * 9000);
SET @quantity := floor(10 + rand() * 90);
INSERT INTO temp_sales_order_data VALUES (127, 2, 2, @order_date, @request_delivery_date, @order_date, @amount, @quantity);    

SET @order_date := from_unixtime(@start_date + rand() * (@end_date - @start_date));
SET @amount := floor(1000 + rand() * 9000);
SET @quantity := floor(10 + rand() * 90);
INSERT INTO temp_sales_order_data VALUES (128, 3, 3, @order_date, @request_delivery_date, @order_date, @amount, @quantity);    

INSERT INTO sales_order
SELECT NULL,customer_number,product_code,order_date,request_delivery_date,entry_date,order_amount,order_quantity FROM temp_sales_order_data ORDER BY order_date;      

COMMIT ;

修改后的销售订单源数据如下图所示,最后三条含有交货日期。

(2)修改rds.cdc_time的值

USE rds;
INSERT OVERWRITE TABLE rds.cdc_time SELECT ‘2016-07-17‘, ‘2016-07-17‘ FROM rds.cdc_time;

(3)执行定期装载并查看结果。
        使用下面的命令执行定期装载。

./regular_etl.sh

使用下面的查询验证结果。

use dw;
select a.order_sk, request_delivery_date_sk, c.date
  from sales_order_fact a, date_dim b, date_dim c
 where a.order_date_sk = b.date_sk
   and a.request_delivery_date_sk = c.date_sk ;

查询结果如下图所示,可以看到只有三个新的销售订单具有request_delivery_date_sk值,是2016年7月20日。

5. 使用角色扮演维度查询

-- 使用表别名查询
USE dw;  

SELECT
    order_date_dim.date order_date,
    request_delivery_date_dim.date request_delivery_date,
    SUM(order_amount),
    COUNT(*)
FROM
    sales_order_fact a,
    date_dim order_date_dim,
    date_dim request_delivery_date_dim
WHERE
    a.order_date_sk = order_date_dim.date_sk
        AND a.request_delivery_date_sk = request_delivery_date_dim.date_sk
GROUP BY order_date_dim.date , request_delivery_date_dim.date
CLUSTER BY order_date_dim.date , request_delivery_date_dim.date;

-- 使用视图查询
USE dw;  

CREATE VIEW order_date_dim
(order_date_sk, order_date, month, month_name,  quarter, year, promo_ind)
AS SELECT * FROM date_dim;  

CREATE VIEW request_delivery_date_dim
(request_delivery_date_sk, request_delivery_date, month, month_name, quarter, year, promo_ind)
AS SELECT * FROM date_dim;

SELECT
    order_date,
    request_delivery_date,
    SUM(order_amount),
    COUNT(*)
FROM
    sales_order_fact a,
    order_date_dim b,
    request_delivery_date_dim c
WHERE
    a.order_date_sk = b.order_date_sk
        AND a.request_delivery_date_sk = c.request_delivery_date_sk
GROUP BY order_date , request_delivery_date
CLUSTER BY order_date , request_delivery_date;

上面两个查询的结果相同,如下图所示:

时间: 2024-08-06 00:52:47

基于Hadoop生态圈的数据仓库实践 —— 进阶技术(四)的相关文章

基于Hadoop生态圈的数据仓库实践 —— 进阶技术(五)

五.快照        前面实验说明了处理维度的扩展.本节讨论两种事实表的扩展技术.        有些用户,尤其是管理者,经常要看某个特定时间点的数据.也就是说,他们需要数据的快照.周期快照和累积快照是两种常用的事实表扩展技术.        周期快照是在一个给定的时间对事实表进行一段时期的总计.例如,一个月销售订单周期快照汇总每个月底时总的销售订单金额.        累积快照用于跟踪事实表的变化.例如,数据仓库可能需要累积(存储)销售订单从下订单的时间开始,到订单中的商品被打包.运输和到达

基于Hadoop生态圈的数据仓库实践 —— 进阶技术(三)

三.维度子集        有些需求不需要最细节的数据.例如更想要某个月而不是某天的记录.再比如相对于全部的销售数据,可能对某些特定状态的数据更感兴趣等.这些特定维度包含在从细节维度选择的行中,所以叫维度子集.维度子集比细节维度的数据少,因此更易使用,查询也更快.        本节中将准备两个特定维度,它们均取自现有的维度:月份维度(日期维度的子集),Pennsylvania州客户维度(客户维度的子集). 1. 建立月份维度表        执行下面的脚本建立月份维度表.注意月份维度不包含pr

基于hadoop生态圈的数据仓库实践 —— 进阶技术(十七)

十七.分段维度        本节说明分段维度的实现技术.分段维度包含连续值的分段.例如,年度销售订单分段维度可能包含有叫做"低"."中"."高"的三档:各档定义分别为0.01到15000.15000.01到30000.00.30000.01到99999999.99.如果一个客户的年度销售订单金额为10000,则被归为"低"档.        分段维度可以存储多个分段集合.例如,可能有一个用于促销分析的分段集合,另一个用于市场

基于Hadoop生态圈的数据仓库实践 —— 进阶技术(一)

一.增加列         数据仓库最常碰到的扩展是给一个已经存在的维度表和事实表添加列.本节说明如何在客户维度表和销售订单事实表上添加列,并在新列上应用SCD2,以及对定时装载脚本所做的修改.假设需要在客户维度中增加送货地址属性,并在销售订单事实表中增加数量度量值.        先看一下增加列时模式发生的变化.        修改后源数据库模式如下图所示. 修改后DW数据库模式如下图所示. 1. 修改数据库模式        使用下面的SQL脚本修改源数据库模式. USE source; A

基于Hadoop生态圈的数据仓库实践 —— 概述(二)

二.在Hadoop上实现数据仓库 (大部分翻译自<Big Data Warehousing>) 数据仓库技术出现很长时间了,现在为什么要从传统数据库工具转为使用Hadoop呢?答案就是最引人关注的流行词汇 -- 大数据.对许多组织来说,传统关系数据库已经不能够经济地处理他们所面临的数据量.而Hadoop生态圈就是为了能够廉价处理大量数据的目的应运而生的.下面看看大数据是怎么定义的. 1. 大数据的定义 虽然数据仓库技术自诞生之日起的二十多年里一直被用来处理大数据,但"大数据"

基于Hadoop生态圈的数据仓库实践 —— ETL

使用Hive转换.装载数据 1. Hive简介 (1)Hive是什么         Hive是一个数据仓库软件,使用SQL读.写.管理分布式存储上的大数据集.它建立在Hadoop之上,具有以下功能和特点: 通过SQL方便地访问数据,适合执行ETL.报表.数据分析等数据仓库任务. 提供一种机制,给各种各样的数据格式加上结构. 直接访问HDFS的文件,或者访问如HBase的其它数据存储. 可以通过MapReduce.Spark或Tez等多种计算框架执行查询.         Hive提供标准的SQ

基于hadoop生态圈的数据仓库实践 —— OLAP与数据可视化(三)

三.Impala OLAP实例        本节使用前面销售订单的例子说明如何使用Impala做OLAP类型的查询,以及实际遇到的问题及解决方案.为了处理SCD和行级更新,我们前面的ETL使用了Hive ORCFile格式的表,可惜到目前为止,Impala还不支持ORCFile.用Impala查询ORCFile表时,错误信息如下图所示. 这是一个棘手的问题.如果我们再建一套和dw库中表结构一样的表,但使用Impala能够识别的文件类型,如Parquet,又会引入两个新的问题:一是CDH 5.7

基于Hadoop生态圈的数据仓库实践 —— 环境搭建(三)

三.建立数据仓库示例模型 Hadoop及其相关服务安装配置好后,下面用一个小而完整的示例说明多维模型及其相关ETL技术在Hadoop上的具体实现. 1. 设计ERD 操作型系统是一个销售订单系统,初始时只有产品.客户.订单三个表,ERD如下图所示. 多维数据仓库包含有一个销售订单事实表,产品.客户.订单.日期四个维度表,ERD如下图所示. 作为示例,上面这些ERD里的属性都很简单,看属性名字便知其含义.维度表除了日期维度外,其它三个表都在源表的基础上增加了代理键.版本号.生效日期.过期日期四个属

基于Hadoop生态圈的数据仓库实践 —— ETL(一)

一.使用Sqoop抽取数据 1. Sqoop简介 Sqoop是一个在Hadoop与结构化数据存储(如关系数据库)之间高效传输大批量数据的工具.它在2012年3月被成功孵化,现在已是Apache的顶级项目.Sqoop有Sqoop1和Sqoop2两代,Sqoop1最后的稳定版本是1.4.6,Sqoop2最后版本是1.99.6.需要注意的是,1.99.6与1.4.6并不兼容,而且截止目前为止,1.99.6并不完善,不推荐在生产环境中部署. (1)Sqoop1 Sqoop1的架构图如下所示. 第一代Sq