参考http://lxw1234.com/archives/2015/08/473.htm
测试数据
order_2015-08-21
1 2015-08-18 2015-08-18 创建
2 2015-08-18 2015-08-18 创建
3 2015-08-19 2015-08-21 支付
4 2015-08-19 2015-08-21 完成
5 2015-08-19 2015-08-20 支付
6 2015-08-20 2015-08-20 创建
7 2015-08-20 2015-08-21 支付
8 2015-08-21 2015-08-21 创建
order_2015-08-22
1 2015-08-18 2015-08-22 创建
2 2015-08-18 2015-08-22 创建
3 2015-08-19 2015-08-21 支付
4 2015-08-19 2015-08-21 完成
5 2015-08-19 2015-08-20 支付
6 2015-08-20 2015-08-22 创建
7 2015-08-20 2015-08-21 支付
8 2015-08-21 2015-08-22 创建
9 2015-08-22 2015-08-22 创建
10 2015-08-22 2015-08-22 支付
order_2015-08-23
1 2015-08-18 2015-08-23 完成
2 2015-08-18 2015-08-22 创建
3 2015-08-19 2015-08-23 完成
4 2015-08-19 2015-08-21 完成
5 2015-08-19 2015-08-23 完成
6 2015-08-20 2015-08-22 创建
7 2015-08-20 2015-08-21 支付
8 2015-08-21 2015-08-23 完成
9 2015-08-22 2015-08-22 创建
10 2015-08-22 2015-08-22 支付
11 2015-08-23 2015-08-23 创建
12 2015-08-23 2015-08-23 创建
13 2015-08-23 2015-08-23 支付
-------------------------------------------------------------------------
步骤
--建立目录
hdfs dfs -mkdir /user/hive_remote/warehouse/demoData
--上传测试数据到本地
rz--选择文件order_2015-08-21.txt
rz--选择文件order_2015-08-22.txt
rz--选择文件order_2015-08-23.txt
--上传测试数据到hdfs
hdfs dfs -put ./order_2015-08-21.txt /user/hive_remote/warehouse/demoData
hdfs dfs -put ./order_2015-08-22.txt /user/hive_remote/warehouse/demoData
hdfs dfs -put ./order_2015-08-23.txt /user/hive_remote/warehouse/demoData
--启动hive
./bin/hive
--创建表数据库demo
DROP DATABASE IF EXISTS demo;
create schema demo;
--使用数据库demo
use demo;
--------------------------------------
--源系统中订单表
CREATE external TABLE orders (
orderid INT,
createtime STRING,
modifiedtime STRING,
status STRING
) PARTITIONED BY (day STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY ‘\t‘ stored AS textfile;
--初始化订单表order数据
load data inpath ‘/user/hive_remote/warehouse/demoData/order_2015-08-21.txt‘ into table orders PARTITION (day = ‘2015-08-21‘);
load data inpath ‘/user/hive_remote/warehouse/demoData/order_2015-08-22.txt‘ into table orders PARTITION (day = ‘2015-08-22‘);
load data inpath ‘/user/hive_remote/warehouse/demoData/order_2015-08-23.txt‘ into table orders PARTITION (day = ‘2015-08-23‘);
--验证导入是否成功
show partitions orders;
------------------------------------------------
select * from orders where day = ‘2015-08-21‘;
select * from orders where day = ‘2015-08-22‘;
select * from orders where day = ‘2015-08-23‘;
-------------------------------------------------
--ODS层,有一张订单的增量数据表,按天分区,存放每天的增量数据:
DROP TABLE IF EXISTS t_ods_orders_inc;
CREATE external TABLE t_ods_orders_inc (
orderid INT,
createtime STRING,
modifiedtime STRING,
status STRING
) PARTITIONED BY (day STRING)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ‘\t‘ stored AS textfile;
--DW层,有一张订单的历史数据拉链表,存放订单的历史状态数据:
DROP TABLE IF EXISTS t_dw_orders_his;
CREATE external TABLE t_dw_orders_his (
orderid INT,
createtime STRING,
modifiedtime STRING,
status STRING,
dw_start_date STRING,
dw_end_date STRING
) ROW FORMAT DELIMITED FIELDS TERMINATED BY ‘\t‘ stored AS textfile;
--全量初始化
--在数据从源业务系统每天正常抽取和刷新到DW订单历史表之前,需要做一次全量的初始化,就是从源订单表中昨天以前的数据全部抽取到ODW,并刷新到DW。
--第一步,抽取全量数据到ODS:
INSERT overwrite TABLE t_ods_orders_inc PARTITION (day = ‘2015-08-20‘)
SELECT orderid,createtime,modifiedtime,status
FROM orders
WHERE createtime <= ‘2015-08-20‘ and day = ‘2015-08-21‘;
--验证抽取是否成功
select * from t_ods_orders_inc where day = ‘2015-08-20‘;
-----------------------------------------------------------
1 2015-08-18 2015-08-18 创建 2015-08-20
2 2015-08-18 2015-08-18 创建 2015-08-20
3 2015-08-19 2015-08-21 支付 2015-08-20
4 2015-08-19 2015-08-21 完成 2015-08-20
5 2015-08-19 2015-08-20 支付 2015-08-20
6 2015-08-20 2015-08-20 创建 2015-08-20
7 2015-08-20 2015-08-21 支付 2015-08-20
-----------------------------------------------------------
--第二步,从ODS刷新到DW:
INSERT overwrite TABLE t_dw_orders_his
SELECT orderid,createtime,modifiedtime,status,
createtime AS dw_start_date,
‘9999-12-31‘ AS dw_end_date
FROM t_ods_orders_inc
WHERE day = ‘2015-08-20‘;
--验证刷新是否成功
select * from t_dw_orders_his;
----------------------------------------------------------
1 2015-08-18 2015-08-18 创建 2015-08-18 9999-12-31
2 2015-08-18 2015-08-18 创建 2015-08-18 9999-12-31
3 2015-08-19 2015-08-21 支付 2015-08-19 9999-12-31
4 2015-08-19 2015-08-21 完成 2015-08-19 9999-12-31
5 2015-08-19 2015-08-20 支付 2015-08-19 9999-12-31
6 2015-08-20 2015-08-20 创建 2015-08-20 9999-12-31
7 2015-08-20 2015-08-21 支付 2015-08-20 9999-12-31
----------------------------------------------------------
--增量抽取
--增量刷新历史数据
--从2015-08-22开始,需要每天正常刷新前一天(2015-08-21)的增量数据到历史表。
--将2015-08-21的数据抽取到ODS 2015-08-21:
INSERT overwrite TABLE t_ods_orders_inc PARTITION (day = ‘2015-08-21‘)
SELECT orderid,createtime,modifiedtime,status FROM orders WHERE (createtime = ‘2015-08-21‘ OR modifiedtime = ‘2015-08-21‘) and day = ‘2015-08-21‘;
--验证抽取2015-08-21的增量数据是否成功
select * from t_ods_orders_inc where day = ‘2015-08-21‘;
------------------------------------------------------------------------
3 2015-08-19 2015-08-21 支付 2015-08-21
4 2015-08-19 2015-08-21 完成 2015-08-21
7 2015-08-20 2015-08-21 支付 2015-08-21
8 2015-08-21 2015-08-21 创建 2015-08-21
------------------------------------------------------------------------
--将2015-08-22的数据抽取到ODS 2015-08-22:
INSERT overwrite TABLE t_ods_orders_inc PARTITION (day = ‘2015-08-22‘)
SELECT orderid,createtime,modifiedtime,status FROM orders WHERE (createtime = ‘2015-08-22‘ OR modifiedtime = ‘2015-08-22‘) and day = ‘2015-08-22‘;
--验证抽取2015-08-22的增量数据是否成功
select * from t_ods_orders_inc where day = ‘2015-08-22‘;
-------------------------------------------------------------
1 2015-08-18 2015-08-22 创建 2015-08-22
2 2015-08-18 2015-08-22 创建 2015-08-22
6 2015-08-20 2015-08-22 创建 2015-08-22
8 2015-08-21 2015-08-22 创建 2015-08-22
9 2015-08-22 2015-08-22 创建 2015-08-22
10 2015-08-22 2015-08-22 支付 2015-08-22
-------------------------------------------------------------
--将2015-08-23的数据抽取到ODS 2015-08-23:
INSERT overwrite TABLE t_ods_orders_inc PARTITION (day = ‘2015-08-23‘)
SELECT orderid,createtime,modifiedtime,status FROM orders WHERE day = ‘2015-08-23‘ and (createtime = ‘2015-08-23‘ OR modifiedtime = ‘2015-08-23‘);
--验证抽取2015-08-22的增量数据是否成功
select * from t_ods_orders_inc where day = ‘2015-08-23‘;
--secc
--通过DW历史数据(数据日期为2015-08-20),和ODS增量数据(2015-08-21),刷新历史表:
DROP TABLE IF EXISTS t_dw_orders_his_tmp;
CREATE TABLE t_dw_orders_his_tmp AS
SELECT orderid,
createtime,
modifiedtime,
status,
dw_start_date,
dw_end_date
FROM (
SELECT a.orderid,
a.createtime,
a.modifiedtime,
a.status,
a.dw_start_date,
CASE WHEN b.orderid IS NOT NULL AND a.dw_end_date > ‘2015-08-21‘ THEN ‘2015-08-20‘ ELSE a.dw_end_date END AS dw_end_date
FROM t_dw_orders_his a
left outer join (SELECT * FROM t_ods_orders_inc WHERE day = ‘2015-08-21‘) b
ON (a.orderid = b.orderid)
UNION ALL
SELECT orderid,
createtime,
modifiedtime,
status,
modifiedtime AS dw_start_date,
‘9999-12-31‘ AS dw_end_date
FROM t_ods_orders_inc
WHERE day = ‘2015-08-21‘
) x
ORDER BY orderid,dw_start_date;
--验证是否成功
select * from t_dw_orders_his_tmp order by orderid,dw_start_date;
-----------------------------------------------------------------
1 2015-08-18 2015-08-18 创建 2015-08-18 9999-12-31
2 2015-08-18 2015-08-18 创建 2015-08-18 9999-12-31
3 2015-08-19 2015-08-21 支付 2015-08-19 2015-08-20
3 2015-08-19 2015-08-21 支付 2015-08-21 9999-12-31
4 2015-08-19 2015-08-21 完成 2015-08-19 2015-08-20
4 2015-08-19 2015-08-21 完成 2015-08-21 9999-12-31
5 2015-08-19 2015-08-20 支付 2015-08-19 9999-12-31
6 2015-08-20 2015-08-20 创建 2015-08-20 9999-12-31
7 2015-08-20 2015-08-21 支付 2015-08-20 2015-08-20
7 2015-08-20 2015-08-21 支付 2015-08-21 9999-12-31
8 2015-08-21 2015-08-21 创建 2015-08-21 9999-12-31
-----------------------------------------------------------------
--其中:
--UNION ALL的两个结果集中,第一个是用历史表left outer join 日期为 ${yyy-MM-dd} 的增量,能关联上的,并且dw_end_date > ${yyy-MM-dd},说明状态有变化,则把原来的dw_end_date置为(${yyy-MM-dd} – 1), 俗称闭链 。关联不上的,说明状态无变化,dw_end_date无变化。
--第二个结果集是直接将增量数据插入历史表。
dw_end_date 改为9999-12-31俗称开链
--最后把临时表中数据插入历史表:
INSERT overwrite TABLE t_dw_orders_his SELECT * FROM t_dw_orders_his_tmp;
--验证是否成功
select * from t_dw_orders_his order by orderid,dw_start_date;
---------------------------------------------------------------
1 2015-08-18 2015-08-18 创建 2015-08-18 9999-12-31
2 2015-08-18 2015-08-18 创建 2015-08-18 9999-12-31
3 2015-08-19 2015-08-21 支付 2015-08-19 2015-08-20
3 2015-08-19 2015-08-21 支付 2015-08-21 9999-12-31
4 2015-08-19 2015-08-21 完成 2015-08-19 2015-08-20
4 2015-08-19 2015-08-21 完成 2015-08-21 9999-12-31
5 2015-08-19 2015-08-20 支付 2015-08-19 9999-12-31
6 2015-08-20 2015-08-20 创建 2015-08-20 9999-12-31
7 2015-08-20 2015-08-21 支付 2015-08-20 2015-08-20
7 2015-08-20 2015-08-21 支付 2015-08-21 9999-12-31
8 2015-08-21 2015-08-21 创建 2015-08-21 9999-12-31
---------------------------------------------------------------
------------------------------------------
--将2015-08-22号的增量数据刷新到历史表中:
DROP TABLE IF EXISTS t_dw_orders_his_tmp;
CREATE TABLE t_dw_orders_his_tmp AS
SELECT orderid,
createtime,
modifiedtime,
status,
dw_start_date,
dw_end_date
FROM (
SELECT a.orderid,
a.createtime,
a.modifiedtime,
a.status,
a.dw_start_date,
CASE WHEN b.orderid IS NOT NULL AND a.dw_end_date > ‘2015-08-22‘ THEN ‘2015-08-21‘ ELSE a.dw_end_date END AS dw_end_date
FROM t_dw_orders_his a
left outer join (SELECT * FROM t_ods_orders_inc WHERE day = ‘2015-08-22‘) b
ON (a.orderid = b.orderid)
UNION ALL
SELECT orderid,
createtime,
modifiedtime,
status,
modifiedtime AS dw_start_date,
‘9999-12-31‘ AS dw_end_date
FROM t_ods_orders_inc
WHERE day = ‘2015-08-22‘
) x
ORDER BY orderid,dw_start_date;
--最后把临时表中数据插入历史表:
INSERT overwrite TABLE t_dw_orders_his SELECT * FROM t_dw_orders_his_tmp;
--验证刷新是否成功
select * from t_dw_orders_his;
----------------------------------------------------------
1 2015-08-18 2015-08-18 创建 2015-08-18 2015-08-21
1 2015-08-18 2015-08-22 创建 2015-08-22 9999-12-31
2 2015-08-18 2015-08-18 创建 2015-08-18 2015-08-21
2 2015-08-18 2015-08-22 创建 2015-08-22 9999-12-31
3 2015-08-19 2015-08-21 支付 2015-08-19 2015-08-20
3 2015-08-19 2015-08-21 支付 2015-08-21 9999-12-31
4 2015-08-19 2015-08-21 完成 2015-08-19 2015-08-20
4 2015-08-19 2015-08-21 完成 2015-08-21 9999-12-31
5 2015-08-19 2015-08-20 支付 2015-08-19 9999-12-31
6 2015-08-20 2015-08-20 创建 2015-08-20 2015-08-21
6 2015-08-20 2015-08-22 创建 2015-08-22 9999-12-31
7 2015-08-20 2015-08-21 支付 2015-08-20 2015-08-20
7 2015-08-20 2015-08-21 支付 2015-08-21 9999-12-31
8 2015-08-21 2015-08-21 创建 2015-08-21 2015-08-21
8 2015-08-21 2015-08-22 创建 2015-08-22 9999-12-31
9 2015-08-22 2015-08-22 创建 2015-08-22 9999-12-31
10 2015-08-22 2015-08-22 支付 2015-08-22 9999-12-31
----------------------------------------------------------
--2015-08-21 的快照
select * from t_dw_orders_his where dw_start_date <= ‘2015-08-21‘ and dw_end_date >= ‘2015-08-21‘;
----------------------------------------------------------
1 2015-08-18 2015-08-18 创建 2015-08-18 2015-08-21
2 2015-08-18 2015-08-18 创建 2015-08-18 2015-08-21
3 2015-08-19 2015-08-21 支付 2015-08-21 9999-12-31
4 2015-08-19 2015-08-21 完成 2015-08-21 9999-12-31
5 2015-08-19 2015-08-20 支付 2015-08-19 9999-12-31
6 2015-08-20 2015-08-20 创建 2015-08-20 2015-08-21
7 2015-08-20 2015-08-21 支付 2015-08-21 9999-12-31
8 2015-08-21 2015-08-21 创建 2015-08-21 2015-08-21
----------------------------------------------------------
--2015-08-22 的快照
select * from t_dw_orders_his where dw_start_date <= ‘2015-08-22‘ and dw_end_date >= ‘2015-08-22‘;
--将2015-08-23号的增量数据刷新到历史表中:
DROP TABLE IF EXISTS t_dw_orders_his_tmp;
CREATE TABLE t_dw_orders_his_tmp AS
SELECT orderid,
createtime,
modifiedtime,
status,
dw_start_date,
dw_end_date
FROM (
SELECT a.orderid,
a.createtime,
a.modifiedtime,
a.status,
a.dw_start_date,
CASE WHEN b.orderid IS NOT NULL AND a.dw_end_date > ‘2015-08-23‘ THEN ‘2015-08-22‘ ELSE a.dw_end_date END AS dw_end_date
FROM t_dw_orders_his a
left outer join (SELECT * FROM t_ods_orders_inc WHERE day = ‘2015-08-23‘) b
ON (a.orderid = b.orderid)
UNION ALL
SELECT orderid,
createtime,
modifiedtime,
status,
modifiedtime AS dw_start_date,
‘9999-12-31‘ AS dw_end_date
FROM t_ods_orders_inc
WHERE day = ‘2015-08-23‘
) x
ORDER BY orderid,dw_start_date;
--验证
select * from t_dw_orders_his_tmp order by orderid,dw_start_date;
----------------------------------------------------------
1 2015-08-18 2015-08-18 创建 2015-08-18 2015-08-21
1 2015-08-18 2015-08-22 创建 2015-08-22 2015-08-22
1 2015-08-18 2015-08-23 完成 2015-08-23 9999-12-31
2 2015-08-18 2015-08-18 创建 2015-08-18 2015-08-21
2 2015-08-18 2015-08-22 创建 2015-08-22 9999-12-31
3 2015-08-19 2015-08-21 支付 2015-08-19 2015-08-20
3 2015-08-19 2015-08-21 支付 2015-08-21 2015-08-22
3 2015-08-19 2015-08-23 完成 2015-08-23 9999-12-31
4 2015-08-19 2015-08-21 完成 2015-08-19 2015-08-20
4 2015-08-19 2015-08-21 完成 2015-08-21 9999-12-31
5 2015-08-19 2015-08-20 支付 2015-08-19 2015-08-22
5 2015-08-19 2015-08-23 完成 2015-08-23 9999-12-31
6 2015-08-20 2015-08-20 创建 2015-08-20 2015-08-21
6 2015-08-20 2015-08-22 创建 2015-08-22 9999-12-31
7 2015-08-20 2015-08-21 支付 2015-08-20 2015-08-20
7 2015-08-20 2015-08-21 支付 2015-08-21 9999-12-31
8 2015-08-21 2015-08-21 创建 2015-08-21 2015-08-21
8 2015-08-21 2015-08-22 创建 2015-08-22 2015-08-22
8 2015-08-21 2015-08-23 完成 2015-08-23 9999-12-31
9 2015-08-22 2015-08-22 创建 2015-08-22 9999-12-31
10 2015-08-22 2015-08-22 支付 2015-08-22 9999-12-31
11 2015-08-23 2015-08-23 创建 2015-08-23 9999-12-31
12 2015-08-23 2015-08-23 创建 2015-08-23 9999-12-31
13 2015-08-23 2015-08-23 支付 2015-08-23 9999-12-31
----------------------------------------------------------
--最后把临时表中数据插入历史表:
INSERT overwrite TABLE t_dw_orders_his SELECT * FROM t_dw_orders_his_tmp;
--2015-08-21 的快照
select * from t_dw_orders_his where dw_start_date <= ‘2015-08-21‘ and dw_end_date >= ‘2015-08-21‘;
--2015-08-22 的快照
select * from t_dw_orders_his where dw_start_date <= ‘2015-08-22‘ and dw_end_date >= ‘2015-08-22‘;
--2015-08-23 的快照
select * from t_dw_orders_his where dw_start_date <= ‘2015-08-23‘ and dw_end_date >= ‘2015-08-23‘;
实战过程中可以分三层
一层 从上游系统中原样抽取到 hive 的数据库中 如src
二层 从src中抽取每日增量数据 到 hive 的数据库中 如ods
三层 从ods中抽取数据 到 hive 的数据库中 如his,即拉链表。