基于Hadoop生态圈的数据仓库实践 —— ETL(三)

三、使用Oozie定期自动执行ETL
1. Oozie简介

(1)Oozie是什么
        Oozie是一个管理Hadoop作业、可伸缩、可扩展、可靠的工作流调度系统,其工作流作业是由一系列动作构成的有向无环图(DAGs),协调器作业是按时间频率周期性触发的Oozie工作流作业。Oozie支持的作业类型有Java map-reduce、Streaming map-reduce、Pig、 Hive、Sqoop和Distcp,及其Java程序和shell脚本等特定的系统作业。
        第一版Oozie是一个基于工作流引擎的服务器,通过执行Hadoop Map/Reduce和Pig作业的动作运行工作流作业。第二版Oozie是一个基于协调器引擎的服务器,按时间和数据触发工作流执行。它可以基于时间(如每小时执行一次)或数据可用性(如等待输入数据完成后再执行)连续运行工作流。第三版Oozie是一个基于Bundle引擎的服务器。它提供更高级别的抽象,批量处理一系列协调器应用。用户可以在bundle级别启动、停止、挂起、继续、重做协调器作业,这样可以更好地简化操作控制。
(2)为什么需要Oozie

  • 在Hadoop中执行的任务有时候需要把多个Map/Reduce作业连接到一起执行,或者需要多个作业并行处理。Oozie可以把多个Map/Reduce作业组合到一个逻辑工作单元中,从而完成更大型的任务。
  • 从调度的角度看,如果使用crontab的方式调用多个工作流作业,可能需要编写大量的脚本,还要通过脚本来控制好各个工作流作业的执行时序问题,不但脚本不好维护,而且监控也不方便。基于这样的背景,Oozie提出了Coordinator的概念,它能够将每个工作流作业作为一个动作来运行,相当于工作流定义中的一个执行节点,这样就能够将多个工作流作业组成一个称为Coordinator Job的作业,并指定触发时间和频率,还可以配置数据集、并发数等。

(3)Oozie的体系结构(摘自http://www.infoq.com/cn/articles/introductionOozie/
        Oozie的体系结构如下图所示。

Oozie是一种Java Web应用程序,它运行在Java servlet容器——即Tomcat——中,并使用数据库来存储以下内容:

  • 工作流定义
  • 当前运行的工作流实例,包括实例的状态和变量

Oozie工作流是放置在控制依赖DAG(有向无环图 Direct Acyclic Graph)中的一组动作(例如,Hadoop的Map/Reduce作业、Pig作业等),其中指定了动作执行的顺序。我们会使用hPDL(一种XML流程定义语言)来描述这个图。
        hPDL是一种很简洁的语言,只会使用少数流程控制和动作节点。控制节点会定义执行的流程,并包含工作流的起点和终点(start、end和fail节点)以及控制工作流执行路径的机制(decision、fork和join节点)。动作节点是一些机制,通过它们工作流会触发执行计算或者处理任务。Oozie为以下类型的动作提供支持: Hadoop map-reduce、Hadoop文件系统、Pig、Java和Oozie的子工作流(SSH动作已经从Oozie schema 0.2之后的版本中移除了)。
        所有由动作节点触发的计算和处理任务都不在Oozie之中——它们是由Hadoop的Map/Reduce框架执行的。这种方法让Oozie可以支持现存的Hadoop用于负载平衡、灾难恢复的机制。这些任务主要是异步执行的(只有文件系统动作例外,它是同步处理的)。这意味着对于大多数工作流动作触发的计算或处理任务的类型来说,在工作流操作转换到工作流的下一个节点之前都需要等待,直到计算或处理任务结束了之后才能够继续。Oozie可以通过两种不同的方式来检测计算或处理任务是否完成,也就是回调和轮询。当Oozie启动了计算或处理任务的时候,它会为任务提供唯一的回调URL,然后任务会在完成的时候发送通知给特定的URL。在任务无法触发回调URL的情况下(可能是因为任何原因,比方说网络闪断),或者当任务的类型无法在完成时触发回调URL的时候,Oozie有一种机制,可以对计算或处理任务进行轮询,从而保证能够完成任务。
        Oozie工作流可以参数化(在工作流定义中使用像${inputDir}之类的变量)。在提交工作流操作的时候,我们必须提供参数值。如果经过合适地参数化(比方说,使用不同的输出目录),那么多个同样的工作流操作可以并发。
        一些工作流是根据需要触发的,但是大多数情况下,我们有必要基于一定的时间段和(或)数据可用性和(或)外部事件来运行它们。Oozie协调系统(Coordinator system)让用户可以基于这些参数来定义工作流执行计划。Oozie协调程序让我们可以以谓词的方式对工作流执行触发器进行建模,那可以指向数据、事件和(或)外部事件。工作流作业会在谓词得到满足的时候启动。
        经常我们还需要连接定时运行、但时间间隔不同的工作流操作。多个随后运行的工作流的输出会成为下一个工作流的输入。把这些工作流连接在一起,会让系统把它作为数据应用的管道来引用。Oozie协调程序支持创建这样的数据应用管道。

(4)CDH 5.7.0中的Oozie

CDH 5.7.0中,Oozie的版本是4.1.0,元数据存储使用MySQL。关于CDH 5.7.0中Oozie的属性,参考以下链接:
https://www.cloudera.com/documentation/enterprise/latest/topics/cm_props_cdh570_oozie.html

2. 建立定期装载工作流
(1)修改资源配置
        需要将以下两个参数的值调高:

yarn.nodemanager.resource.memory-mb = 2000
yarn.scheduler.maximum-allocation-mb = 2000

否则会在执行工作流作业时报类似下面的错误:
org.apache.oozie.action.ActionExecutorException: JA009: org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException: Invalid resource request, requested memory < 0, or requested memory > max configured, requestedMemory=1536, maxMemory=1500
        具体做法是,从CDH Web控制台修改相关参数,保存更改并重启集群。
        yarn.nodemanager.resource.memory-mb参数在YARN服务的NodeManager范围里,如下图所示。

yarn.scheduler.maximum-allocation-mb参数在YARN服务的ResourceManager范围里,如下图所示。

从Web控制台重启集群的界面如下图所示。

(2)启用Oozie Web Console
        缺省配置时,Oozie Web Console是禁用的,为了后面方便监控Oozie作业的执行,需要将其改为启用。“启用 Oozie 服务器 Web 控制台”参数在Oozie服务的主要范围里,如下图所示。

具体的做法是:

  • 下载安装ext-2.2。
  • 从CDH Web控制台修改相关参数,保存更改并重启Oozie服务。

详细步骤参考以下链接:http://www.cloudera.com/documentation/enterprise/5-5-x/topics/admin_oozie_console.html

(3)启动sqoop的share metastore service
        定期装载工作流需要用Oozie调用Sqoop执行,这需要开启Sqoop元数据共享存储,命令如下:

sqoop metastore > /tmp/sqoop_metastore.log 2>&1 &

关于Oozie无法运行Sqoop Job的问题,参考以下链接:http://www.lamborryan.com/oozie-sqoop-fail/

(4)连接metastore重建sqoop job
        前面建立的sqoop job,其元数据并没有存储在share metastore里,所以需要使用以下的命令重建。

sqoop job --show myjob_incremental_import | grep incremental.last.value
sqoop job --delete myjob_incremental_import
sqoop job --meta-connect jdbc:hsqldb:hsql://cdh2:16000/sqoop --create myjob_incremental_import -- import --connect "jdbc:mysql://cdh1:3306/source?useSSL=false&user=root&password=mypassword" --table sales_order --columns "order_number, customer_number, product_code, order_date, entry_date, order_amount" --hive-import --hive-table rds.sales_order --incremental append --check-column order_number --last-value 116

其中last-value是上次ETL执行后的值,用第一条命令可以看到该值。

(5)定义工作流
        建立内容如下的workflow.xml文件:

<?xml version="1.0" encoding="UTF-8"?>
<workflow-app xmlns="uri:oozie:workflow:0.1" name="regular_etl">
    <start to="fork-node"/>
    <fork name="fork-node">
        <path start="sqoop-customer" />
        <path start="sqoop-product" />
        <path start="sqoop-sales_order" />
    </fork>
    <action name="sqoop-customer">
        <sqoop xmlns="uri:oozie:sqoop-action:0.2">
            <job-tracker>${jobTracker}</job-tracker>
            <name-node>${nameNode}</name-node>
            <arg>import</arg>
            <arg>--connect</arg>
            <arg>jdbc:mysql://cdh1:3306/source?useSSL=false</arg>
            <arg>--username</arg>
            <arg>root</arg>
            <arg>--password</arg>
            <arg>mypassword</arg>
            <arg>--table</arg>
            <arg>customer</arg>
            <arg>--hive-import</arg>
            <arg>--hive-table</arg>
            <arg>rds.customer</arg>
            <arg>--hive-overwrite</arg>
            <file>/tmp/hive-site.xml#hive-site.xml</file>
            <archive>/tmp/mysql-connector-java-5.1.38-bin.jar#mysql-connector-java-5.1.38-bin.jar</archive>
        </sqoop>
        <ok to="joining"/>
        <error to="fail"/>
    </action>
	<action name="sqoop-product">
        <sqoop xmlns="uri:oozie:sqoop-action:0.2">
            <job-tracker>${jobTracker}</job-tracker>
            <name-node>${nameNode}</name-node>
            <arg>import</arg>
            <arg>--connect</arg>
            <arg>jdbc:mysql://cdh1:3306/source?useSSL=false</arg>
            <arg>--username</arg>
            <arg>root</arg>
            <arg>--password</arg>
            <arg>mypassword</arg>
            <arg>--table</arg>
            <arg>product</arg>
            <arg>--hive-import</arg>
            <arg>--hive-table</arg>
            <arg>rds.product</arg>
            <arg>--hive-overwrite</arg>
            <file>/tmp/hive-site.xml#hive-site.xml</file>
            <archive>/tmp/mysql-connector-java-5.1.38-bin.jar#mysql-connector-java-5.1.38-bin.jar</archive>
        </sqoop>
        <ok to="joining"/>
        <error to="fail"/>
    </action>
    <action name="sqoop-sales_order">
        <sqoop xmlns="uri:oozie:sqoop-action:0.2">
            <job-tracker>${jobTracker}</job-tracker>
            <name-node>${nameNode}</name-node>
            <command>job --exec myjob_incremental_import --meta-connect jdbc:hsqldb:hsql://cdh2:16000/sqoop</command>
            <file>/tmp/hive-site.xml#hive-site.xml</file>
            <archive>/tmp/mysql-connector-java-5.1.38-bin.jar#mysql-connector-java-5.1.38-bin.jar</archive>
        </sqoop>
        <ok to="joining"/>
        <error to="fail"/>
    </action>
    <join name="joining" to="hive-node"/>
    <action name="hive-node">
        <hive xmlns="uri:oozie:hive-action:0.2">
            <job-tracker>${jobTracker}</job-tracker>
            <name-node>${nameNode}</name-node>
            <job-xml>/tmp/hive-site.xml</job-xml>
            <script>/tmp/regular_etl.sql</script>
        </hive>
        <ok to="end"/>
        <error to="fail"/>
    </action>
    <kill name="fail">
        <message>Sqoop failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
    </kill>
    <end name="end"/>
</workflow-app>

其DAG如下图所示。

该工作流包括9个节点,其中有5个控制节点,4个动作节点:工作流的起点(start)、终点(end)、失败处理节点(fail,DAG图中未显示),两个执行路径控制节点(fork-node和joining,fork与join节点必须成对出现),三个并行处理的Sqoop行动节点(sqoop-customer、sqoop-product、sqoop-sales_order)用作数据抽取,一个Hive行动节点(hive-node)用作数据转换与装载。
(6)部署工作流

hdfs dfs -put -f workflow.xml /user/root/
hdfs dfs -put /etc/hive/conf.cloudera.hive/hive-site.xml /tmp/
hdfs dfs -put /root/mysql-connector-java-5.1.38/mysql-connector-java-5.1.38-bin.jar /tmp/
hdfs dfs -put /root/regular_etl.sql /tmp/

(7)建立作业属性文件
        建立内容如下的job.properties文件:

nameNode=hdfs://cdh2:8020
jobTracker=cdh2:8032
queueName=default
oozie.use.system.libpath=true
oozie.wf.application.path=${nameNode}/user/${user.name}

(8)运行工作流

oozie job -oozie http://cdh2:11000/oozie -config /root/job.properties -run

此时从Oozie Web Console可以看到正在运行的作业,如下图所示。

点击作业所在行,可以打开作业的详细信息窗口,如下图所示。

点击动作所在行,可以打开动作的详细信息窗口,如下图所示。

可以点击Console URL右侧的图标,可以打开Map/Reduce作业的跟踪窗口,如下图所示。

当Oozie作业执行完,可以在“All Jobs”标签页看到,Status列已经从RUNNING变为SUCCEEDED,如下图所示。

此时查看cdc_time表的数据,可以看到日期已经改为当前日期,如下图所示。

3. 建立协调作业定期自动执行工作流
(1)建立协调作业属性文件
        建立内容如下的job-coord.properties文件:

nameNode=hdfs://cdh2:8020
jobTracker=cdh2:8032
queueName=default
oozie.use.system.libpath=true
oozie.coord.application.path=${nameNode}/user/${user.name}
timezone=UTC
start=2016-07-11T06:00Z
end=2020-12-31T07:15Z
workflowAppUri=${nameNode}/user/${user.name}

(2)建立协调作业配置文件
        建立内容如下的coordinator.xml文件:

<coordinator-app name="regular_etl-coord" frequency="${coord:days(1)}" start="${start}" end="${end}" timezone="${timezone}" xmlns="uri:oozie:coordinator:0.1">
    <action>
        <workflow>
            <app-path>${workflowAppUri}</app-path>
            <configuration>
                <property>
                    <name>jobTracker</name>
                    <value>${jobTracker}</value>
                </property>
                <property>
                    <name>nameNode</name>
                    <value>${nameNode}</value>
                </property>
                <property>
                    <name>queueName</name>
                    <value>${queueName}</value>
                </property>
            </configuration>
        </workflow>
    </action>
</coordinator-app>

(3)部署协调作业

hdfs dfs -put -f coordinator.xml /user/root/

(4)运行协调作业

oozie job -oozie http://cdh2:11000/oozie -config /root/job-coord.properties -run

此时从Oozie Web Console可以看到准备运行的协调作业,作业的状态为PREP,如下图所示。

此协调作业自2016年7月11日开始,每天14点执行一次。结束日期非常晚,这里设置的是2020年12月31日。需要注意一下时区的设置。Oozie默认的时区是UTC,而且即便在属性文件中设置了timezone=GMT+0800也不起作用,所以start属性设置的是06:00,实际就是北京时间14:00。
        当时间到达14:00时,协调作业开始运行,状态由PREP变为RUNNING,如下图所示。

点击作业所在行,可以打开协调作业的详细信息窗口,如下图所示。

点击协调作业所在行,可以打开工作流作业的详细信息窗口,如下图所示。

点击动作所在行,可以打开动作的详细信息窗口,如下图所示。

可以点击Console URL右侧的图标,可以打开Map/Reduce作业的跟踪窗口,如下图所示。

至此介绍了使用Oozie定期自动执行ETL的一般方法。Oozie 4.1.0的官方文档链接地址如下:http://oozie.apache.org/docs/4.1.0/index.html

时间: 2024-10-10 07:16:14

基于Hadoop生态圈的数据仓库实践 —— ETL(三)的相关文章

基于Hadoop生态圈的数据仓库实践 —— ETL

使用Hive转换.装载数据 1. Hive简介 (1)Hive是什么         Hive是一个数据仓库软件,使用SQL读.写.管理分布式存储上的大数据集.它建立在Hadoop之上,具有以下功能和特点: 通过SQL方便地访问数据,适合执行ETL.报表.数据分析等数据仓库任务. 提供一种机制,给各种各样的数据格式加上结构. 直接访问HDFS的文件,或者访问如HBase的其它数据存储. 可以通过MapReduce.Spark或Tez等多种计算框架执行查询.         Hive提供标准的SQ

基于Hadoop生态圈的数据仓库实践 —— ETL(一)

一.使用Sqoop抽取数据 1. Sqoop简介 Sqoop是一个在Hadoop与结构化数据存储(如关系数据库)之间高效传输大批量数据的工具.它在2012年3月被成功孵化,现在已是Apache的顶级项目.Sqoop有Sqoop1和Sqoop2两代,Sqoop1最后的稳定版本是1.4.6,Sqoop2最后版本是1.99.6.需要注意的是,1.99.6与1.4.6并不兼容,而且截止目前为止,1.99.6并不完善,不推荐在生产环境中部署. (1)Sqoop1 Sqoop1的架构图如下所示. 第一代Sq

基于Hadoop生态圈的数据仓库实践 —— 环境搭建(三)

三.建立数据仓库示例模型 Hadoop及其相关服务安装配置好后,下面用一个小而完整的示例说明多维模型及其相关ETL技术在Hadoop上的具体实现. 1. 设计ERD 操作型系统是一个销售订单系统,初始时只有产品.客户.订单三个表,ERD如下图所示. 多维数据仓库包含有一个销售订单事实表,产品.客户.订单.日期四个维度表,ERD如下图所示. 作为示例,上面这些ERD里的属性都很简单,看属性名字便知其含义.维度表除了日期维度外,其它三个表都在源表的基础上增加了代理键.版本号.生效日期.过期日期四个属

基于Hadoop生态圈的数据仓库实践 —— 概述(二)

二.在Hadoop上实现数据仓库 (大部分翻译自<Big Data Warehousing>) 数据仓库技术出现很长时间了,现在为什么要从传统数据库工具转为使用Hadoop呢?答案就是最引人关注的流行词汇 -- 大数据.对许多组织来说,传统关系数据库已经不能够经济地处理他们所面临的数据量.而Hadoop生态圈就是为了能够廉价处理大量数据的目的应运而生的.下面看看大数据是怎么定义的. 1. 大数据的定义 虽然数据仓库技术自诞生之日起的二十多年里一直被用来处理大数据,但"大数据"

基于hadoop生态圈的数据仓库实践 —— OLAP与数据可视化(三)

三.Impala OLAP实例        本节使用前面销售订单的例子说明如何使用Impala做OLAP类型的查询,以及实际遇到的问题及解决方案.为了处理SCD和行级更新,我们前面的ETL使用了Hive ORCFile格式的表,可惜到目前为止,Impala还不支持ORCFile.用Impala查询ORCFile表时,错误信息如下图所示. 这是一个棘手的问题.如果我们再建一套和dw库中表结构一样的表,但使用Impala能够识别的文件类型,如Parquet,又会引入两个新的问题:一是CDH 5.7

基于Hadoop生态圈的数据仓库实践 —— 进阶技术(三)

三.维度子集        有些需求不需要最细节的数据.例如更想要某个月而不是某天的记录.再比如相对于全部的销售数据,可能对某些特定状态的数据更感兴趣等.这些特定维度包含在从细节维度选择的行中,所以叫维度子集.维度子集比细节维度的数据少,因此更易使用,查询也更快.        本节中将准备两个特定维度,它们均取自现有的维度:月份维度(日期维度的子集),Pennsylvania州客户维度(客户维度的子集). 1. 建立月份维度表        执行下面的脚本建立月份维度表.注意月份维度不包含pr

基于Hadoop生态圈的数据仓库实践 —— 进阶技术(四)

四.角色扮演维度        当一个事实表多次引用一个维度表时会用到角色扮演维度.例如,一个销售订单有一个是订单日期,还有一个交货日期,这时就需要引用日期维度表两次.        本节将说明两类角色扮演维度的实现,分别是表别名和数据库视图.这两种都使用了Hive的功能.表别名是在SQL语句里引用维度表多次,每次引用都赋予维度表一个别名.而数据库视图,则是按照事实表需要引用维度表的次数,建立相同数量的视图.1. 修改数据库模式        使用下面的脚本修改数据库模式.分别给数据仓库里的事实

基于Hadoop生态圈的数据仓库实践 —— 进阶技术(一)

一.增加列         数据仓库最常碰到的扩展是给一个已经存在的维度表和事实表添加列.本节说明如何在客户维度表和销售订单事实表上添加列,并在新列上应用SCD2,以及对定时装载脚本所做的修改.假设需要在客户维度中增加送货地址属性,并在销售订单事实表中增加数量度量值.        先看一下增加列时模式发生的变化.        修改后源数据库模式如下图所示. 修改后DW数据库模式如下图所示. 1. 修改数据库模式        使用下面的SQL脚本修改源数据库模式. USE source; A

基于Hadoop生态圈的数据仓库实践 —— 进阶技术(五)

五.快照        前面实验说明了处理维度的扩展.本节讨论两种事实表的扩展技术.        有些用户,尤其是管理者,经常要看某个特定时间点的数据.也就是说,他们需要数据的快照.周期快照和累积快照是两种常用的事实表扩展技术.        周期快照是在一个给定的时间对事实表进行一段时期的总计.例如,一个月销售订单周期快照汇总每个月底时总的销售订单金额.        累积快照用于跟踪事实表的变化.例如,数据仓库可能需要累积(存储)销售订单从下订单的时间开始,到订单中的商品被打包.运输和到达