基于Hadoop生态圈的数据仓库实践 —— ETL(一)

一、使用Sqoop抽取数据

1. Sqoop简介

Sqoop是一个在Hadoop与结构化数据存储(如关系数据库)之间高效传输大批量数据的工具。它在2012年3月被成功孵化,现在已是Apache的顶级项目。Sqoop有Sqoop1和Sqoop2两代,Sqoop1最后的稳定版本是1.4.6,Sqoop2最后版本是1.99.6。需要注意的是,1.99.6与1.4.6并不兼容,而且截止目前为止,1.99.6并不完善,不推荐在生产环境中部署。

(1)Sqoop1

Sqoop1的架构图如下所示。

第一代Sqoop的设计目标很简单:

  • 在企业级数据仓库、关系数据库、文档系统和Hive、HDFS之间导入导出数据。
  • 基于客户端的模型。
  • 连接器使用厂商提供的驱动。
  • 没有集中的元数据存储。
  • 只有Map任务,没有Reduce任务。数据传输和转化都由Mappers提供。
  • 可以使用Oozie调度和管理Sqoop作业。

Sqoop1是用Java开发的,完全客户端驱动,严重依赖于JDBC,可以使用简单的命令行命令导入导出数据。例如:

# 导入

sqoop import --connect jdbc:mysql://localhost/testdb --table PERSON --username test --password ****

上面这条命令形成一系列任务:

  • 生成SQL代码。
  • 执行SQL代码。
  • 生成Map作业。
  • 执行Map作业。
  • 数据传输到HDFS。

# 导出

sqoop export --connect jdbc:mysql://localhost/testdb --table CLIENTS_INTG --username test --password **** --export-dir /user/localadmin/CLIENTS

上面这条命令形成一系列任务:

  • 生成Map作业。
  • 执行Map作业。
  • 从HDFS的/user/localadmin/CLIENTS路径传输数据。
  • 生成SQL代码。
  • 向CLIENTS_INTG插入数据。

Sqoop1有许多简单易用的特性,如可以在命令行指定直接导入至Hive、HDFS或HBase。连接器可以连接大部分流行的数据库:Oracle、SQLServer、MySQL、Teradata、PostgreSQL等。

Sqoop1的主要问题包括:

  • 繁多的命令行参数。
  • 不安全的连接方式(直接在命令行写密码等)。
  • 没有元数据存储,只能本地配置和管理,使复用受限。

(2)Sqoop2

Sqoop2的架构图如下所示。

Sqoop2体系结构比Sqoop1复杂得多,被设计用来解决Sqoop1的问题。

易用性

Sqoop1需要客户端的安装和配置,而Sqoop2是在服务器端安装和配置。这意味着连接器只在一个地方统一配置,由管理员角色管理,操作员角色使用。类似地,只需要在一台服务器上配置JDBC驱动和数据库连接。Sqoop2还有一个基于Web的服务:前端是命令行接口(CLI)和浏览器,后端是一个元数据知识库。用户可以通过Web接口进行导入导出,避免的错误选项和繁冗的步骤。此外,Sqoop2还在服务器端整合了Hive和HBase。Oozie通过REST API管理Sqoop任务,这样当安装一个新的Sqoop连接器后,无需在Oozie中安装它。

可扩展性

在Sqoop2中,连接器不再受限于JDBC词汇(必须指定database、table等),它甚至可以定义自己使用的词汇。例如,Couchbase不需要指定表名,只需在充填或卸载操作时重载它。通用的功能将从连接器中抽取出来,使之只负责数据传输。在Reduce阶段实现通用功能,确保连接器可以从将来的功能性开发中受益。连接器不再需要提供与其它系统整合等下游功能,因此,连接器的开发者不再需要了解所有Sqoop支持的特性。

安全性

当前,用户是通过执行‘sqoop’命令运行Sqoop。Sqoop作业的安全性主要由对执行Sqoop的用户信任所决定。Sqoop2将作为基于应用的服务,通过按不同角色连接对象,支持对外部系统的安全访问。为了进一步安全,Sqoop2不再允许生成代码、请求直接访问Hive或HBase,也不对运行的作业开放访问所有客户端的权限。Sqoop2将连接作为一级对象。包含证书的连接一旦生成,可以被不同的导入导出作业多次使用。连接由管理员生成,被操作员使用,因此避免了最终用户的权限泛滥。此外,连接还可以被限制只能进行某些基本操作(如导入导出)。通过限制同一时间打开连接的总数和一个禁止连接的选项来管理资源。

2. CDH 5.7.0中的Sqoop

CDH 5.7.0中的Sqoop既包含Sqoop1又包含Sqoop2,Sqoop1的版本是1.4.6,Sqoop2的版本是1.99.5。当前的Sqoop2还缺少Sqoop1的某些特性,因此Cloudera的建议是,只有当Sqoop2完全满足需要的特性时才使用它,否则继续使用Sqoop1。CDH 5.7.0中的Sqoop1和Sqoop2的特性区别如下表所示。


特性


Sqoop1


Sqoop2


所有主要RDBMS的连接器


支持


不支持

变通方案:使用的通用的JDBC连接器,它已经在Microsoft SQL Server、PostgreSQL、MySQL和Oracle数据库上测试过。

这个连接器应该可以在任何JDBC兼容的数据库上使用,但性能比不上Sqoop1的专用连接器。


Kerberos整合


支持


不支持


数据从RDBMS传输到Hive或Hbase


支持


不支持

变通方案:用下面两步方法。

1.       数据从RDBMS导入HDFS

2.       使用适当的工具或命令(如Hive的LOAD DATA语句)手工把数据导入Hive或Hbase。


数据从Hive或Hbase传输到RDBMS


不支持

变通方案:用下面两步方法。

1.       从Hive或Hbase抽出数据到HDFS(文本文件或Avro文件)

2.       使用Sqoop将上一步的输出导入RDBMS


不支持

变通方案如Sqoop1。

3. 使用Sqoop抽取数据

在本示例中使用Sqoop1从MySQL库抽取数据到Hive。

从源抽取数据导入数据仓库(本示例的RDS)有两种方式,可以从源把数据抓取出来(拉),也可以请求源把数据发送(推)到数据仓库。影响选择数据抽取方式的一个重要因素是操作型系统的可用性和数据量,这基于是抽取整个数据还是仅仅抽取自最后一次抽取以来的变化数据。考虑以下两个问题:

  • 需要抽取哪部分源数据加载到数据仓库?有两种方式,完全抽取和变化数据捕获。
  • 数据抽取的方向是什么?有两种方式,拉模式(从数据仓库去拉)和推模式(通过源去推)。

完全抽取和变化数据捕获(CDC)

如果数据量很小并且易处理,一般来说采取完全源数据抽取(将所有的文件记录或所有的数据库表数据抽取至数据仓库)。这种方式适合引用类型的源数据,比如邮政编码。引用型源数据通常是维度表的源。如果源数据量很大,抽取全部数据是不可行的,那么只能抽取变化的源数据(自最后一次抽取以来变化的数据)。这种数据抽取模式称为变化数据捕获(CDC),通常被用于抽取操作型系统的事务数据,比如销售订单。

CDC大体可以分为两种,一种是侵入式的,另一种是非侵入式的。所谓侵入式的是指CDC操作会给源系统带来性能的影响。只要CDC操作以任何一种方式执行了SQL语句,就可以认为是侵入式的CDC。常用的四种CDC方法中有三种是侵入性的,这四种方法是:基于时间戳的CDC、基于触发器的CDC、基于快照的CDC、基于日志的CDC。下表总结了四种CDC方案的特点。


 


时间戳方式


快照方式


触发器方式


日志方式


能区分插入/更新






周期内,检测到多次更新






能检测到删除






不具有侵入性






支持实时






需要DBA






不依赖数据库





从源拉数据或源来推数据

如果想让数据源只是简单的等待数据仓库来抽取,那么可以使用拉模式。但是必须确认,在数据仓库抽取数据时,源数据必须是可用的而且已经准备好了数据。如果抽取数据的实时性非常重要,或者希望数据源一旦准备好数据就立即发送,那么应该使用由数据源推数据的抽取模式。如果数据源是受到保护并且是禁止访问的,则只能使用数据源推数据的方式。

下表中汇总了本示例中维度表和事实表用到的源数据表及其抽取模式。


 


时间戳方式


快照方式


触发器方式


日志方式


能区分插入/更新






周期内,检测到多次更新






能检测到删除






不具有侵入性






支持实时






需要DBA






不依赖数据库





(1)覆盖导入

对于customer、product这两个表采用整体拉取的方式抽数据。而ETL通常是按一个固定的时间间隔,周期性定时执行的,因此对于整体拉取的方式而言,每次导入的数据需要覆盖上次导入的数据。Sqoop中提供了hive-overwrite参数实现覆盖导入。hive-overwrite的另一个作用是提供了一个幂等操作的选择。所谓幂等操作指的是其任意多次执行所产生的影响均与一次执行的影响相同。这样在导入失败或修复bug后可以再次执行该操作,而不用担心重复执行会对系统造成改变。

具体命令如下:

sqoop import --connect jdbc:mysql://cdh1:3306/source?useSSL=false --username root --password mypassword --table customer --hive-import --hive-table rds.customer --hive-overwrite
sqoop import --connect jdbc:mysql://cdh1:3306/source?useSSL=false --username root --password mypassword --table product --hive-import --hive-table rds.product --hive-overwrite

(2)增量导入

Sqoop提供增量导入模式用于只导入比已经导入行新的行。下表的参数用来控制增量导入。


参数


描述


--check-column (col)


在确定应该导入哪些行时,指定被检查的列。(列不应该是CHAR/NCHAR/VARCHAR/VARNCHAR/LONGVARCHAR/LONGNVARCHAR数据类型)


--incremental (mode)


指定Sqoop怎样确定哪些行是新行。有效值是append和lastmodified。


--last-value (value)


指定已经导入数据的被检查列的最大值

Sqoop支持两种类型的增量导入:append和lastmodified。可以使用--incremental参数指定增量导入的类型。

当被导入表的新行具有连续递增的行id值时,应该使用append模式。指定行id为--check-column的列。Sqoop导入那些被检查列的值比--last-value给出的值大的数据行。

Sqoop支持的另一个表修改策略叫做lastmodified模式。当源表的数据行可能被修改,并且每次修改都会更新一个last-modified列为当前时间戳时,应该使用lastmodified模式。那些被检查列的时间戳比--last-value给出的时间戳新的数据行被导入。

在增量导入的最后,后续导入使用的--last-value会被打印出来。当执行后面的导入时,应该用这种方式指定--last-value参数的值,以确保只导入新的或修改过的数据。可以通过一个增量导入的保存作业自动执行这个过程,这是适合重复执行增量导入的方式。

有了对Sqoop增量导入的基本了解,下面看一下如何在本示例中使用它抽取数据。对于sales_order这个表采用基于时间戳的CDC拉取方式抽数据。这里假设源系统中销售订单记录一旦入库就不再改变,或者可以忽略改变。也就是说销售订单是一个随时间变化单向追加数据的表。sales_order表中有两个关于时间的字段,order_date表示订单时间,entry_date表示订单数据实际插入表里的时间,在后面讨论“迟到的事实”时就会看到两个时间可能不同。那么用哪个字段作为CDC的时间戳呢?设想这样的场景,一个销售订单的订单时间是2015年1月1日,实际插入表里的时间是2015年1月2日,ETL每天0点执行,抽取前一天的数据。如果按order_date抽取数据,条件为where
order_date >= ‘2015-01-02‘ AND order_date < ‘2015-01-03‘,则2015年1月3日0点执行的ETL不会捕获到这个新增的订单数据。所以应该以entry_date作为CDC的时间戳。

下面测试一下增量导入:

1)先执行初始的全量导入,作为增量的基础

sqoop import --connect jdbc:mysql://cdh1:3306/source?useSSL=false --username root --password mypassword --table sales_order --hive-import --hive-table rds.sales_order --hive-overwrite

2)建立sqoop增量导入作业

sqoop job --create myjob_1 -- import --connect "jdbc:mysql://cdh1:3306/source?useSSL=false&user=root&password=mypassword" --table sales_order --columns "order_number, customer_number, product_code, order_date, entry_date, order_amount" --where "entry_date >= date_add(current_date(),interval -1 day) and entry_date < current_date()" --hive-import --hive-table rds.sales_order --incremental append --check-column entry_date --last-value '1900-01-01'

3)源库增加一条数据

insert into source.sales_order values(null,7,3,date_add(current_date(),interval -1 day),date_add(current_date(),interval -1 day),10000);
commit;

4)执行sqoop作业

sqoop job --exec myjob_1

5)在hive的rds库里查询

select * from sales_order order by order_number desc;

结果如下图所示,可以看到新增的数据已经追加到rds.sales_order表中。

至此介绍了使用Sqoop抽取数据的一般方法。Sqoop有众多的命令行参数,具体可参考官方文档,链接地址如下:http://sqoop.apache.org/docs/1.4.6/SqoopUserGuide.html

时间: 2024-10-07 17:12:24

基于Hadoop生态圈的数据仓库实践 —— ETL(一)的相关文章

基于Hadoop生态圈的数据仓库实践 —— ETL

使用Hive转换.装载数据 1. Hive简介 (1)Hive是什么         Hive是一个数据仓库软件,使用SQL读.写.管理分布式存储上的大数据集.它建立在Hadoop之上,具有以下功能和特点: 通过SQL方便地访问数据,适合执行ETL.报表.数据分析等数据仓库任务. 提供一种机制,给各种各样的数据格式加上结构. 直接访问HDFS的文件,或者访问如HBase的其它数据存储. 可以通过MapReduce.Spark或Tez等多种计算框架执行查询.         Hive提供标准的SQ

基于Hadoop生态圈的数据仓库实践 —— ETL(三)

三.使用Oozie定期自动执行ETL1. Oozie简介(1)Oozie是什么        Oozie是一个管理Hadoop作业.可伸缩.可扩展.可靠的工作流调度系统,其工作流作业是由一系列动作构成的有向无环图(DAGs),协调器作业是按时间频率周期性触发的Oozie工作流作业.Oozie支持的作业类型有Java map-reduce.Streaming map-reduce.Pig. Hive.Sqoop和Distcp,及其Java程序和shell脚本等特定的系统作业.        第一版

基于Hadoop生态圈的数据仓库实践 —— 概述(二)

二.在Hadoop上实现数据仓库 (大部分翻译自<Big Data Warehousing>) 数据仓库技术出现很长时间了,现在为什么要从传统数据库工具转为使用Hadoop呢?答案就是最引人关注的流行词汇 -- 大数据.对许多组织来说,传统关系数据库已经不能够经济地处理他们所面临的数据量.而Hadoop生态圈就是为了能够廉价处理大量数据的目的应运而生的.下面看看大数据是怎么定义的. 1. 大数据的定义 虽然数据仓库技术自诞生之日起的二十多年里一直被用来处理大数据,但"大数据"

基于Hadoop生态圈的数据仓库实践 —— 环境搭建(三)

三.建立数据仓库示例模型 Hadoop及其相关服务安装配置好后,下面用一个小而完整的示例说明多维模型及其相关ETL技术在Hadoop上的具体实现. 1. 设计ERD 操作型系统是一个销售订单系统,初始时只有产品.客户.订单三个表,ERD如下图所示. 多维数据仓库包含有一个销售订单事实表,产品.客户.订单.日期四个维度表,ERD如下图所示. 作为示例,上面这些ERD里的属性都很简单,看属性名字便知其含义.维度表除了日期维度外,其它三个表都在源表的基础上增加了代理键.版本号.生效日期.过期日期四个属

基于Hadoop生态圈的数据仓库实践 —— 进阶技术(一)

一.增加列         数据仓库最常碰到的扩展是给一个已经存在的维度表和事实表添加列.本节说明如何在客户维度表和销售订单事实表上添加列,并在新列上应用SCD2,以及对定时装载脚本所做的修改.假设需要在客户维度中增加送货地址属性,并在销售订单事实表中增加数量度量值.        先看一下增加列时模式发生的变化.        修改后源数据库模式如下图所示. 修改后DW数据库模式如下图所示. 1. 修改数据库模式        使用下面的SQL脚本修改源数据库模式. USE source; A

基于hadoop生态圈的数据仓库实践 —— OLAP与数据可视化(三)

三.Impala OLAP实例        本节使用前面销售订单的例子说明如何使用Impala做OLAP类型的查询,以及实际遇到的问题及解决方案.为了处理SCD和行级更新,我们前面的ETL使用了Hive ORCFile格式的表,可惜到目前为止,Impala还不支持ORCFile.用Impala查询ORCFile表时,错误信息如下图所示. 这是一个棘手的问题.如果我们再建一套和dw库中表结构一样的表,但使用Impala能够识别的文件类型,如Parquet,又会引入两个新的问题:一是CDH 5.7

基于Hadoop生态圈的数据仓库实践 —— 进阶技术(四)

四.角色扮演维度        当一个事实表多次引用一个维度表时会用到角色扮演维度.例如,一个销售订单有一个是订单日期,还有一个交货日期,这时就需要引用日期维度表两次.        本节将说明两类角色扮演维度的实现,分别是表别名和数据库视图.这两种都使用了Hive的功能.表别名是在SQL语句里引用维度表多次,每次引用都赋予维度表一个别名.而数据库视图,则是按照事实表需要引用维度表的次数,建立相同数量的视图.1. 修改数据库模式        使用下面的脚本修改数据库模式.分别给数据仓库里的事实

基于Hadoop生态圈的数据仓库实践 —— 进阶技术(三)

三.维度子集        有些需求不需要最细节的数据.例如更想要某个月而不是某天的记录.再比如相对于全部的销售数据,可能对某些特定状态的数据更感兴趣等.这些特定维度包含在从细节维度选择的行中,所以叫维度子集.维度子集比细节维度的数据少,因此更易使用,查询也更快.        本节中将准备两个特定维度,它们均取自现有的维度:月份维度(日期维度的子集),Pennsylvania州客户维度(客户维度的子集). 1. 建立月份维度表        执行下面的脚本建立月份维度表.注意月份维度不包含pr

基于Hadoop生态圈的数据仓库实践 —— 进阶技术(五)

五.快照        前面实验说明了处理维度的扩展.本节讨论两种事实表的扩展技术.        有些用户,尤其是管理者,经常要看某个特定时间点的数据.也就是说,他们需要数据的快照.周期快照和累积快照是两种常用的事实表扩展技术.        周期快照是在一个给定的时间对事实表进行一段时期的总计.例如,一个月销售订单周期快照汇总每个月底时总的销售订单金额.        累积快照用于跟踪事实表的变化.例如,数据仓库可能需要累积(存储)销售订单从下订单的时间开始,到订单中的商品被打包.运输和到达