【甘道夫】Sqoop1.4.4 实现将 Oracle10g 中的增量数据导入 Hive0.13.1 ,并更新Hive中的主表

需求

将Oracle中的业务基础表增量数据导入Hive中,与当前的全量表合并为最新的全量表。



***欢迎转载,请注明来源*** 
 

http://blog.csdn.net/u010967382/article/details/38735381


设计

涉及的三张表:

  • 全量表:保存了截止上一次同步时间的全量基础数据表
  • 增量表:增量临时表
  • 更新后的全量表:更新后的全量数据表

步骤:

  1. 通过Sqoop将Oracle中的表导入Hive,模拟全量表和增量表

  2. 通过Hive将“全量表+增量表”合并为“更新后的全量表”,覆盖当前的全量表

步骤1:通过Sqoop将Oracle中表的导入Hive,模拟全量表和增量表

为了模拟场景,需要一张全量表,和一张增量表,由于数据源有限,所以两个表都来自Oracle中的OMP_SERVICE,全量表包含所有数据,在Hive中名称叫service_all,增量表包含部分时间段数据,在Hive中名称叫service_tmp。


(1)全量表导入:导出所有数据,只要部分字段,导入到Hive指定表里

为实现导入Hive功能,需要先配置HCatalog(HCatalog是Hive子模块)的环境变量,/etc/profile中新增:

export HCAT_HOME=/home/fulong/Hive/apache-hive-0.13.1-bin/hcatalog

执行以下命令导入数据:

[email protected]:~/Sqoop/sqoop-1.4.4/bin$ ./sqoop import \

> --connect jdbc:oracle:thin:@192.168.0.147:1521:ORCLGBK  --username SP --password fulong \

> --table OMP_SERVICE \

> --columns "SERVICE_CODE,SERVICE_NAME,SERVICE_PROCESS,CREATE_TIME,ENABLE_ORG,ENABLE_PLATFORM,IF_DEL" \

> --hive-import --hive-table SERVICE_ALL

注意:用户名必须大写


(2)增量表导入:只导出所需时间范围内的数据,只要部分字段,导入到Hive指定表里

使用以下命令导入数据:

[email protected]:~/Sqoop/sqoop-1.4.4/bin$ ./sqoop import \

> --connect jdbc:oracle:thin:@192.168.0.147:1521:ORCLGBK  --username SP --password fulong \

> --table OMP_SERVICE \

> --columns "SERVICE_CODE,SERVICE_NAME,SERVICE_PROCESS,CREATE_TIME,ENABLE_ORG,ENABLE_PLATFORM,IF_DEL" \

> --where "CREATE_TIME > to_date(‘2012/12/4 17:00:00‘,‘yyyy-mm-dd hh24:mi:ss‘) and CREATE_TIME < to_date(‘2012/12/4 18:00:00‘,‘yyyy-mm-dd hh24:mi:ss‘)" \

> --hive-import --hive-overwrite --hive-table SERVICE_TMP

注意:

  1. 由于使用了--hive-overwrite参数,所以该语句可反复执行,往service_tmp表中覆盖插入最新的增量数据;
  2. Sqoop还支持使用复杂Sql语句查询数据导入,相亲参见http://sqoop.apache.org/docs/1.4.4/SqoopUserGuide.html的“7.2.3.Free-form Query Imports”章节

(3)验证导入结果:列出所有表,统计行数,查看表结构

hive> show tables;

OK

searchlog

searchlog_tmp

service_all

service_tmp

Time taken: 0.04 seconds, Fetched: 4 row(s)

hive> select count(*) from service_all;

Total jobs = 1

Launching Job 1 out of 1

Number of reduce tasks determined at compile time: 1

In order to change the average load for a reducer (in bytes):

set hive.exec.reducers.bytes.per.reducer=<number>

In order to limit the maximum number of reducers:

set hive.exec.reducers.max=<number>

In order to set a constant number of reducers:

set mapreduce.job.reduces=<number>

Starting Job = job_1407233914535_0013, Tracking URL = http://FBI003:8088/proxy/application_1407233914535_0013/

Kill Command = /home/fulong/Hadoop/hadoop-2.2.0/bin/hadoop job  -kill job_1407233914535_0013

Hadoop job information for Stage-1: number of mappers: 3; number of reducers: 1

2014-08-21 16:51:47,389 Stage-1 map = 0%,  reduce = 0%

2014-08-21 16:51:59,816 Stage-1 map = 33%,  reduce = 0%, Cumulative CPU 1.36 sec

2014-08-21 16:52:01,996 Stage-1 map = 67%,  reduce = 0%, Cumulative CPU 2.45 sec

2014-08-21 16:52:07,877 Stage-1 map = 100%,  reduce = 0%, Cumulative CPU 3.96 sec

2014-08-21 16:52:17,639 Stage-1 map = 100%,  reduce = 100%, Cumulative CPU 5.29 sec

MapReduce Total cumulative CPU time: 5 seconds 290 msec

Ended Job = job_1407233914535_0013

MapReduce Jobs Launched:

Job 0: Map: 3  Reduce: 1   Cumulative CPU: 5.46 sec   HDFS Read: 687141 HDFS Write: 5 SUCCESS

Total MapReduce CPU Time Spent: 5 seconds 460 msec

OK

6803

Time taken: 59.386 seconds, Fetched: 1 row(s)

hive> select count(*) from service_tmp;

Total jobs = 1

Launching Job 1 out of 1

Number of reduce tasks determined at compile time: 1

In order to change the average load for a reducer (in bytes):

set hive.exec.reducers.bytes.per.reducer=<number>

In order to limit the maximum number of reducers:

set hive.exec.reducers.max=<number>

In order to set a constant number of reducers:

set mapreduce.job.reduces=<number>

Starting Job = job_1407233914535_0014, Tracking URL = http://FBI003:8088/proxy/application_1407233914535_0014/

Kill Command = /home/fulong/Hadoop/hadoop-2.2.0/bin/hadoop job  -kill job_1407233914535_0014

Hadoop job information for Stage-1: number of mappers: 3; number of reducers: 1

2014-08-21 16:53:03,951 Stage-1 map = 0%,  reduce = 0%

2014-08-21 16:53:15,189 Stage-1 map = 67%,  reduce = 0%, Cumulative CPU 2.17 sec

2014-08-21 16:53:16,236 Stage-1 map = 100%,  reduce = 0%, Cumulative CPU 3.38 sec

2014-08-21 16:53:57,935 Stage-1 map = 100%,  reduce = 22%, Cumulative CPU 3.78 sec

2014-08-21 16:54:01,811 Stage-1 map = 100%,  reduce = 100%, Cumulative CPU 5.34 sec

MapReduce Total cumulative CPU time: 5 seconds 340 msec

Ended Job = job_1407233914535_0014

MapReduce Jobs Launched:

Job 0: Map: 3  Reduce: 1   Cumulative CPU: 5.66 sec   HDFS Read: 4720 HDFS Write: 3 SUCCESS

Total MapReduce CPU Time Spent: 5 seconds 660 msec

OK

13

Time taken: 75.856 seconds, Fetched: 1 row(s)

hive> describe service_all;

OK

service_code            string

service_name            string

service_process         string

create_time             string

enable_org              string

enable_platform         string

if_del                  string

Time taken: 0.169 seconds, Fetched: 7 row(s)

hive> describe service_tmp;

OK

service_code            string

service_name            string

service_process         string

create_time             string

enable_org              string

enable_platform         string

if_del                  string

Time taken: 0.117 seconds, Fetched: 7 row(s)


步骤2:通过Hive将“全量表+增量表”合并为“更新后的全量表”,覆盖当前的全量表


合并新表的逻辑如下:

  • 整个tmp表进入最终表中
  • all表的数据中不包含在tmp表service_code范围内的数据全部进入新表

执行以下sql语句可以合并得到更新后的全量表:

hive> select * from service_tmp union all select a.* from service_all a left outer join service_tmp b on a.service_code = b.service_code where b.service_code
is null;

我们需要直接将查询结果更新回全量表中:

hive> insert overwrite table service_all select * from service_tmp union all select a.* from service_all a left outer join service_tmp b on a.service_code = b.service_code where b.service_code is
null;

注意,将查询结果插入表有以下两类语法:

  • INSERT OVERWRITE TABLE tablename1 [PARTITION (partcol1=val1, partcol2=val2 ...) [IF NOT EXISTS]] select_statement1 FROM from_statement;
  • INSERT INTO TABLE tablename1 [PARTITION (partcol1=val1, partcol2=val2 ...)] select_statement1 FROM from_statement;

INSERT OVERWRITE 将会覆盖现有数据,由于当前场景需要更新全量表,所以使用了覆盖模式;

INSERT INTO 不会覆盖现有数据,是追加数据


到此为止,Hive中的service_all表已经更新为最新的数据!

在真实场景中,需要结合shell+cron实现该过程的定时执行。

【甘道夫】Sqoop1.4.4 实现将 Oracle10g 中的增量数据导入 Hive0.13.1 ,并更新Hive中的主表

时间: 2024-12-22 08:10:25

【甘道夫】Sqoop1.4.4 实现将 Oracle10g 中的增量数据导入 Hive0.13.1 ,并更新Hive中的主表的相关文章

【甘道夫】实现Hive数据同步更新的shell脚本

引言: 上一篇文章<[甘道夫]Sqoop1.4.4 实现将 Oracle10g 中的增量数据导入 Hive0.13.1 ,并更新Hive中的主表>http://blog.csdn.net/u010967382/article/details/38735381 描述了增量更新Hive表的原理和Sqoop,Hive命令,本文基于上一篇文章的内容实现了shell脚本的编写,稍加修改就可用于实际工程. ***欢迎转载,请注明来源***    http://blog.csdn.net/u01096738

【甘道夫】Hadoop2.2.0环境使用Sqoop-1.4.4将Oracle11g数据导入HBase0.96,并自动生成组合行键

目的: 使用Sqoop将Oracle中的数据导入到HBase中,并自动生成组合行键! 环境: Hadoop2.2.0 Hbase0.96 sqoop-1.4.4.bin__hadoop-2.0.4-alpha.tar.gz Oracle11g jdk1.7 Ubuntu14 Server 这里关于环境吐槽一句: 最新版本的Sqoop1.99.3功能太弱,只支持导入数据到HDFS,没有别的任何选项,太土了!(如有不同意见欢迎讨论给出解决方案) 命令: sqoop import --connect

【甘道夫】Sqoop1.99.3基础操作--导入Oracle的数据到HDFS

第一步:进入客户端Shell [email protected]:~$ sqoop.sh client Sqoop home directory: /home/fulong/Sqoop/sqoop-1.99.3-bin-hadoop200 Sqoop Shell: Type 'help' or '\h' for help. sqoop:000> set server --host FBI003 --port 12000 --webapp sqoop Server is set successfu

【甘道夫】使用sqoop-1.4.4.bin__hadoop-2.0.4-alpha将Oracle11g数据导入HBase0.96

环境: Hadoop2.2.0 Hbase0.96 sqoop-1.4.4.bin__hadoop-2.0.4-alpha.tar.gz Oracle11g jdk1.7 Ubuntu14 Server 这里关于环境吐槽一句: 最新版本的Sqoop1.99.3功能太弱,只支持导入数据到HDFS,没有别的任何选项,太土了!(如有不同意见欢迎讨论给出解决方案) 命令: sqoop import --connect jdbc:oracle:thin:@192.168.0.147:1521:ORCLGB

【甘道夫】用贝叶斯文本分类测试打过1329-3.patch的Mahout0.9 on Hadoop2.2.0

引言 接前一篇文章<[甘道夫]Mahout0.9 打patch使其支持 Hadoop2.2.0> http://blog.csdn.net/u010967382/article/details/39088035, 为Mahout0.9打过Patch编译成功后,使用贝叶斯文本分类来测试Mahout0.9对Hadoop2.2.0的兼容性. 欢迎转载,转载请注明出处: http://blog.csdn.net/u010967382/article/details/39088285 步骤一:将20ne

【甘道夫】Hadoop2.2.0 NN HA详细配置+Client透明性试验【完整版】

引言: 前面转载过一篇团队兄弟[伊利丹]写的NN HA实验记录,我也基于他的环境实验了NN HA对于Client的透明性. 本篇文章记录的是亲自配置NN HA的详细全过程,以及全面测试HA对客户端访问透明性的全过程,希望对大家有帮助. 实验环境: Hadoop2.2.0的4节点集群,ZK节点3个(ZK节点数最好为奇数个),hosts文件和各节点角色分配如下: hosts: 192.168.66.91 master 192.168.66.92 slave1 192.168.66.93 slave2

【甘道夫】MapReduce实现矩阵乘法--实现代码

之前写了一篇分析MapReduce实现矩阵乘法算法的文章:[甘道夫]Mapreduce实现矩阵乘法的算法思路 为了让大家更直观的了解程序执行,今天编写了实现代码供大家参考. 编程环境: java version "1.7.0_40" Eclipse Kepler Windows7 x64 Ubuntu 12.04 LTS Hadoop2.2.0 Vmware 9.0.0 build-812388 输入数据: A矩阵存放地址:hdfs://singlehadoop:8020/wordsp

【甘道夫】并行化频繁模式挖掘算法FP Growth及其在Mahout下的命令使用

今天调研了并行化频繁模式挖掘算法PFP Growth及其在Mahout下的命令使用,简单记录下试验结果,供以后查阅: 环境:Jdk1.7 + Hadoop2.2.0单机伪集群 +  Mahout0.6(0.8和0.9版本都不包含该算法.Mahout0.6可以和Hadoop2.2.0和平共处有点意外orz) 部分输入数据,输入数据一行代表一个购物篮: 4750,19394,25651,6395,5592 26180,10895,24571,23295,20578,27791,2729,8637 7

【甘道夫】Win7x64环境下编译Apache Hadoop2.2.0的Eclipse小工具

目标: 编译Apache Hadoop2.2.0在win7x64环境下的Eclipse插件 环境: win7x64家庭普通版 eclipse-jee-kepler-SR1-win32-x86_64.zip Apache Ant(TM) version 1.8.4 compiled on May 22 2012 java version "1.7.0_45" 參考文章: http://kangfoo.u.qiniudn.com/article/2013/12/build-hadoop2x