Hadoop管道是Hadoop
MapReduce的C++接口的代称。与流不同,流使用标准输入和输出让map和reduce节点之间相互交流,管道使用sockets作为tasktracker与C++编写的map或者reduce函数的进程之间的通道。JNI未被使用。
我们将用C++重写贯穿本章的示例,然后,我们将看到如何使用管道来运行它。例2-12显示了用C++语言编写的map函数和reduce函数的源代码。
例2-12:用C++语言编写的最高气温程序
1. #include <algorithm>
2. #include <limits>
3. #include <string>
4.
5. #include "hadoop/Pipes.hh"
6. #include "hadoop/TemplateFactory.hh"
7. #include "hadoop/StringUtils.hh"
8.
9. class MaxTemperatureMapper : public HadoopPipes::Mapper {
10. public:
11. MaxTemperatureMapper(HadoopPipes::TaskContext& context) {
12. }
13. void map(HadoopPipes::MapContext& context) {
14. std::string line = context.getInputValue();
15. std::string year = line.substr(15, 4);
16. std::string airTemperature = line.substr(87, 5);
17. std::string q = line.substr(92, 1);
18. if (airTemperature != "+9999" &&
19. (q == "0" || q == "1" || q == "4" || q == "5" || q == "9")) {
20. context.emit(year, airTemperature);
21. }
22. }
23. };
24.
25.
26. class MapTemperatureReducer : public HadoopPipes::Reducer {
27. public:
28. MapTemperatureReducer(HadoopPipes::TaskContext& context) {
29. }
30. void reduce(HadoopPipes::ReduceContext& context) {
31. int maxValue = INT_MIN;
32. while (context.nextValue()) {
33. maxValue = std::max(maxValue,
34. HadoopUtils::toInt(context.getInputValue()));
35. }
36. context.emit(context.getInputKey(),
37. HadoopUtils::toString(maxValue));
38. }
39. };
40.
41. int main(int argc, char *argv[]) {
42. return HadoopPipes::runTask(HadoopPipes::TemplateFactory
43. <MaxTemperatureMapper,
MapTemperatureReducer>());
44. }
此应用程序连接到Hadoop C++库,后者是一个用于与tasktracker子进程进行通信的轻量级的封装器。通过扩展在HadoopPipes命名空间的Mapper和Reducer类且提供map()和reduce()方法的实现,我们便可定义map和reduce函数。这些方法采用了一个上下文对象(MapContext类型或ReduceContext类型),后者提供读取输入和写入输出,通过JobConf类来访问作业配置信息。本例中的处理过程非常类似于Java的处理方式。
与Java接口不同,C++接口中的键和值是字节缓冲,表示为标准模板库(Standard Template Library,STL)的字符串。这使接口变得更简单,尽管它把更重的负担留给了应用程序的开发人员,因为开发人员必须将字符串convert to and from表示to和from两个逆操作。开发人员必须在字符串及其他类型之间进行转换。这一点在MapTemperatureReducer中十分明显,其中,我们必须把输入值转换为整数的输入值(使用HadoopUtils中的便利方法),在最大值被写出之前将其转换为字符串。在某些情况下,我们可以省略这个转化,如在MaxTemperatureMapper中,它的airTemperature值从来不用转换为整数,因为它在map()方法中从来不会被当作数字来处理。
main()方法是应用程序的入口点。它调用HadoopPipes::runTask,连接到从Mapper或Reducer连接到Java的父进程和marshals 数据。runTask()方法被传入一个Factory参数,使其可以创建Mapper或Reducer的实例。它创建的其中一个将受套接字连接中Java父进程控制。我们可以用重载模板factory方法来设置一个combiner(combiner)、partitioner(partitioner)、记录读取函数(record
reader)或记录写入函数(record writer)。
编译运行
现在我们可以用Makerfile编译连接例2-13的程序。
例2-13:C++版本的MapReduce程序的Makefile
1. CC = g++
2. CPPFLAGS = -m32 -I$(HADOOP_INSTALL)/c++/$(PLATFORM)/include
3.
4. max_temperature: max_temperature.cpp
5. $(CC) $(CPPFLAGS) $< -Wall
6. -L$(HADOOP_INSTALL)/c++/$(PLATFORM)/lib -lhadooppipes \
7. -lhadooputils -lpthread -g -O2 -o [email protected]
在Makefile中应当设置许多环境变量。除了HADOOP_INSTALL(如果遵循附录A的安装说明,应该已经设置好),还需要定义平台,指定操作系统、体系结构和数据模型(例如,32位或64位)。我在32位Linux系统的机器编译运行了如下内容:
1. % export PLATFORM=Linux-i386-32
2. % make
在成功完成之前,当前目录中便有max_temperature可执行文件。
要运行管道作业,我们需要在伪分布式(pseudo_distrinuted)模式下(其中所有守护进程运行在本地计算机上)运行Hadoop,其中的安装步骤参见附录A。管道不在独立模式(本地运行)中运行,因为它依赖于Hadoop的分布式缓存机制,仅在HDFS运行时才运行。
Hadoop守护进程开始运行后,第一步是把可执行文件复制到HDFS,以便它们启动map和reduce任务时,它能够被tasktracker取出:
1. % hadoop fs -put max_temperature bin/max_temperature
示例数据也需要从本地文件系统复制到HDFS:
1. % hadoop fs -put input/ncdc/sample.txt sample.txt
现在可以运行这个作业。为了使其运行,我们用Hadoop 管道命令,使用-program参数来传递在HDFS中可执行文件的URI。
1. % hadoop pipes\
2. -D hadoop.pipes.java.recordreader\
3. -D hadoop.pipes.java.recordwriter\
4. inpit sample.txt\
5. output output
6. program bin/max_temperature
我们使用-D选项来指定两个属性:hadoop.pipes.java.recordreader和hadoop.pipes.java.recordwriter,这两个属性都被设置为true,表示我们并没有指定一个C+++记录读取函数或记录写入函数,但我们要使用默认的Java设置(用来设置文本输入和输出)。管道还允许你设置一个Java mapper,reducer,combiner或partitioner。事实上,在任何一个作业中,都可以混合使用Java类或C++类。
结果和用其他语言编写的程序所得的结果一样。