深入讲解Hadoop管道

Hadoop管道是Hadoop
MapReduce的C++接口的代称。与流不同,流使用标准输入和输出让map和reduce节点之间相互交流,管道使用sockets作为tasktracker与C++编写的map或者reduce函数的进程之间的通道。JNI未被使用。

我们将用C++重写贯穿本章的示例,然后,我们将看到如何使用管道来运行它。例2-12显示了用C++语言编写的map函数和reduce函数的源代码。

例2-12:用C++语言编写的最高气温程序

1.  #include <algorithm>

2.  #include <limits>

3.  #include <string>

4.

5.  #include "hadoop/Pipes.hh"

6.  #include "hadoop/TemplateFactory.hh"

7.  #include "hadoop/StringUtils.hh"

8.

9.  class MaxTemperatureMapper : public HadoopPipes::Mapper {

10. public:

11.   MaxTemperatureMapper(HadoopPipes::TaskContext& context) {

12.   }

13.   void map(HadoopPipes::MapContext& context) {

14.     std::string line = context.getInputValue();

15.     std::string year = line.substr(15, 4);

16.     std::string airTemperature = line.substr(87, 5);

17.     std::string q = line.substr(92, 1);

18.     if (airTemperature != "+9999" &&

19.        (q == "0" || q == "1" || q == "4" || q == "5" || q == "9")) {

20.       context.emit(year, airTemperature);

21.     }

22.   }

23. };

24.

25.

26. class MapTemperatureReducer : public HadoopPipes::Reducer {

27. public:

28.   MapTemperatureReducer(HadoopPipes::TaskContext& context) {

29.   }

30.   void reduce(HadoopPipes::ReduceContext& context) {

31.     int maxValue = INT_MIN;

32.     while (context.nextValue()) {

33.       maxValue = std::max(maxValue,

34.         HadoopUtils::toInt(context.getInputValue()));

35.     }

36.     context.emit(context.getInputKey(),

37.       HadoopUtils::toString(maxValue));

38.   }

39. };

40.

41. int main(int argc, char *argv[]) {

42.   return HadoopPipes::runTask(HadoopPipes::TemplateFactory

43.                         <MaxTemperatureMapper,

MapTemperatureReducer>());

44. }

此应用程序连接到Hadoop C++库,后者是一个用于与tasktracker子进程进行通信的轻量级的封装器。通过扩展在HadoopPipes命名空间的Mapper和Reducer类且提供map()和reduce()方法的实现,我们便可定义map和reduce函数。这些方法采用了一个上下文对象(MapContext类型或ReduceContext类型),后者提供读取输入和写入输出,通过JobConf类来访问作业配置信息。本例中的处理过程非常类似于Java的处理方式。

与Java接口不同,C++接口中的键和值是字节缓冲,表示为标准模板库(Standard Template Library,STL)的字符串。这使接口变得更简单,尽管它把更重的负担留给了应用程序的开发人员,因为开发人员必须将字符串convert to and from表示to和from两个逆操作。开发人员必须在字符串及其他类型之间进行转换。这一点在MapTemperatureReducer中十分明显,其中,我们必须把输入值转换为整数的输入值(使用HadoopUtils中的便利方法),在最大值被写出之前将其转换为字符串。在某些情况下,我们可以省略这个转化,如在MaxTemperatureMapper中,它的airTemperature值从来不用转换为整数,因为它在map()方法中从来不会被当作数字来处理。

main()方法是应用程序的入口点。它调用HadoopPipes::runTask,连接到从Mapper或Reducer连接到Java的父进程和marshals 数据。runTask()方法被传入一个Factory参数,使其可以创建Mapper或Reducer的实例。它创建的其中一个将受套接字连接中Java父进程控制。我们可以用重载模板factory方法来设置一个combiner(combiner)、partitioner(partitioner)、记录读取函数(record
reader)或记录写入函数(record writer)。

编译运行

现在我们可以用Makerfile编译连接例2-13的程序。

例2-13:C++版本的MapReduce程序的Makefile

1.  CC = g++

2.  CPPFLAGS = -m32 -I$(HADOOP_INSTALL)/c++/$(PLATFORM)/include

3.

4.  max_temperature: max_temperature.cpp

5.      $(CC) $(CPPFLAGS) $< -Wall

6.      -L$(HADOOP_INSTALL)/c++/$(PLATFORM)/lib -lhadooppipes \

7.      -lhadooputils -lpthread -g -O2 -o [email protected]

在Makefile中应当设置许多环境变量。除了HADOOP_INSTALL(如果遵循附录A的安装说明,应该已经设置好),还需要定义平台,指定操作系统、体系结构和数据模型(例如,32位或64位)。我在32位Linux系统的机器编译运行了如下内容:

1.  % export PLATFORM=Linux-i386-32

2.  % make

在成功完成之前,当前目录中便有max_temperature可执行文件。

要运行管道作业,我们需要在伪分布式(pseudo_distrinuted)模式下(其中所有守护进程运行在本地计算机上)运行Hadoop,其中的安装步骤参见附录A。管道不在独立模式(本地运行)中运行,因为它依赖于Hadoop的分布式缓存机制,仅在HDFS运行时才运行。

Hadoop守护进程开始运行后,第一步是把可执行文件复制到HDFS,以便它们启动map和reduce任务时,它能够被tasktracker取出:

1.  % hadoop fs -put max_temperature bin/max_temperature

示例数据也需要从本地文件系统复制到HDFS:

1.  % hadoop fs -put input/ncdc/sample.txt sample.txt

现在可以运行这个作业。为了使其运行,我们用Hadoop 管道命令,使用-program参数来传递在HDFS中可执行文件的URI。

1.  % hadoop pipes\

2.    -D hadoop.pipes.java.recordreader\

3.    -D hadoop.pipes.java.recordwriter\

4.    inpit sample.txt\

5.    output output

6.    program bin/max_temperature

我们使用-D选项来指定两个属性:hadoop.pipes.java.recordreader和hadoop.pipes.java.recordwriter,这两个属性都被设置为true,表示我们并没有指定一个C+++记录读取函数或记录写入函数,但我们要使用默认的Java设置(用来设置文本输入和输出)。管道还允许你设置一个Java mapper,reducer,combiner或partitioner。事实上,在任何一个作业中,都可以混合使用Java类或C++类。

结果和用其他语言编写的程序所得的结果一样。

时间: 2024-10-08 15:33:13

深入讲解Hadoop管道的相关文章

[转载] 详细讲解Hadoop中的简单数据库HBase

转载自http://www.csdn.net/article/2010-11-28/282614 数据模型 HBase数据库使用了和Bigtable非常相似的数据模型.用户在表格里存储许多数据行.每个数据行都包括一个可排序的关键字,和任意数目的列.表格是稀疏的,所以同一个表格里的行可能有非常不同的列,只要用户喜欢这样做. 列名是“<族名>:<标签>”形式,其中<族名>和<标签>可以是任意字符串.一个表格的<族名>集合(又叫“列族”集合)是固定的,

Hadoop十年解读与发展预测

编者按:Hadoop于2006年1月28日诞生,至今已有10年,它改变了企业对数据的存储.处理和分析的过程,加速了大数据的发展,形成了自己的极其火爆的技术生态圈,并受到非常广泛的应用.在2016年Hadoop十岁生日之际,InfoQ策划了一个Hadoop热点系列文章,为大家梳理Hadoop这十年的变化,技术圈的生态状况,回顾以前,激励以后.本文是Cloudera资深工程师讲解Hadoop,让您一篇文章就能了解Hadoop的过去和未来. “昔我十年前,与君始相识.” ——白居易,<酬元九对新栽竹有

智传播客hadoop视频学习笔记(共2天)

第一天:1.答疑解惑•  就业前景•  学习hadoop要有什么基础•  hadoop会像塞班一样,热一阵子吗•  hadoop学习起来容易还是困难•  课堂上的学习方法(所有实验必须按照要求做,重原理.重实践)•  通过本课程能学到什么 2. 课程简介•  以真实的电信详单分析程序为主线,讲解Hadoop,Hbase,Hive在大数据处理的应用场景与过程•  通过此课程,你能     •  掌握Hadoop基本知识,进行HadoopHDFS和MapReduce应用开发,搭建Hadoop集群  

Hadoop之——HDFS操作实例

转载请注明出处:http://blog.csdn.net/l1028386804/article/details/45921443 本文通过两种方式来讲解hadoop中对HDFS文件系统的操作,第一种方式是命令行,第二种方式是通过java代码来实现. 一.命令行方式:hadoop fs xxx hadoop fs -ls  /    查看hdfs的根目录下的内容的 hadoop fs -lsr /    递归查看hdfs的根目录下的内容的 hadoop fs -mkdir /d1    在hdf

MongoDB基础教程系列--第七篇 MongoDB 聚合管道

在讲解聚合管道(Aggregation Pipeline)之前,我们先介绍一下 MongoDB 的聚合功能,聚合操作主要用于对数据的批量处理,往往将记录按条件分组以后,然后再进行一系列操作,例如,求最大值.最小值.平均值,求和等操作.聚合操作还能够对记录进行复杂的操作,主要用于数理统计和数据挖掘.在 MongoDB 中,聚合操作的输入是集合中的文档,输出可以是一个文档,也可以是多条文档. MongoDB 提供了非常强大的聚合操作,有三种方式: 聚合管道(Aggregation Pipeline)

Hadoop权威指南学习笔记二

MapReduce简单介绍 声明:本文是本人基于Hadoop权威指南学习的一些个人理解和笔记,仅供学习參考,有什么不到之处还望指出,一起学习一起进步. 转载请注明:http://blog.csdn.net/my_acm 上一篇介绍了什么是Hadoop.Hadoop的作用等.本篇相同基于Hadoop权威指南,结合迪伦的Hadoop的视频教程对MapReduce做一个介绍. 1. MapReduce是Hadoop的核心之中的一个.MapReduce分为两个部分,Mapper和Ruducer模块.简单

Hadoop发行版本介绍

前言 从2011年开始,中国进入大数据风起云涌的时代,以Hadoop为代表的家族软件,占据了大数据处理的广阔地盘.开源界及厂商,所有数据软件,无一不向Hadoop靠拢.Hadoop也从小众的高富帅领域,变成了大数据开发的标准.在Hadoop原有技术基础之上,出现了Hadoop家族产品,通过“大数据”概念不断创新,推出科技进步. 目录 Hadoop的发展史 Hadoop的发行版本的选择和介绍 1. Hadoop发展史 1.1Hadoop产生背景 Hadoop 最早起源于Nutch .Nutch 是

Hadoop 2.x从零基础到挑战百万年薪第一季

鉴于目前大数据Hadoop 2.x被企业广泛使用,在实际的企业项目中需要更加深入的灵活运用,并且Hadoop 2.x是大数据平台处理 的框架的基石,尤其在海量数据的存储HDFS.分布式资源管理和任务调度YARN及分布式计算框架MapReduce.然而当前众多书籍和 视频教程资料中,没有一套完整的.深入浅出的.实战性操作强的一套资料,一此种情况下,结合鄙人多年实际项目经验,以项目中 使用为主线,编纂筹划此套Hadoop 2.x从零基础到项目实战的课程,带领大家从零基础开始上手,到如何理解HDFS.

hadoop任务执行过程中函数的调用层次

class Job extends JobContext class JobClient extends Configured implements MRConstants, Tool interface JobSubmissionProtocol extends VersionedProtocol class JobTracker implements MRConstants, InterTrackerProtocol,    JobSubmissionProtocol, TaskTracke