如果Hadoop命令是以类名作为第一个参数,它就会启动一个JVM来运行这个类。使用命令比直接使用Java更方便,因为前者把类的路径(及其依赖关系)加入Hadoop的库中,并获得Hadoop的配置。要添加应用程序类的路径,我们需要定义一个HADOOP_CLASSPATH环境变量,Hadoop脚本会来执行相关操作。
注意:以本地(独立)模式运行时,本书所有程序希望都以这种方式来设置HADOOP_CLA-SSPATH。命令必须在示例代码所在的文件夹下被运行。
运行作业所得到的输出提供了一些有用的信息。(无法找到作业JAR文件的相关信息是意料之中的,因为我们是在本地模式下没有JAR的情况下运行的。在集群上运行时,不会看到此警告。)例如,我们可以看到,这个作业被给予了一个IDjob_local_0001,并且它运行了一个map任务和一个reduce任务(使用attempt_local_0001_m_000000_0和attempt_local_0001_r_000000_0两个ID)。在调试MapReduce作业时,知道作业和任务的ID是非常有用的。
输出的最后一部分叫"计数器"(Counter),显示了在Hadoop上运行的每个作业产生的统计信息。这些对检查处理的数据量是否符合预期非常有用。例如,我们可以遵循整个系统中记录的数目:5个map输入产生了5个map的输出,然后5个reduce输入产生两个reduce输出。
输出被写入output目录,其中每个reducer包括一个输出文件。作业包含一个reducer,所以我们只能找到一个文件,名为part-00000:
1.%catoutput/part-00000
2.1949 111
3.1950 22
这个结果和我们之前手动寻找的结果一样。我们可以把前面这个结果解释为在1949年的最高气温记录为11.1℃,而在1950年为2.2℃。
新的JavaMapreduce API
Hadoop最新版JavaMapReduce Release 0.20.0的API包括一个全新的MapReduce
JavaAPI,有时也称为"contextobject"(上下文对象),旨在使API在未来更容易扩展。新的API类型上不兼容以前的API,所以,以前的应用程序需要重写才能使新的API发挥其作用。
新的API和旧的API之间有下面几个明显的区别。
新的API倾向于使用抽象类,而不是接口,因为这更容易扩展。例如,你可以添加一个方法(用默认的实现)到一个抽象类而不需修改类之前的实现方法。在新的API中,Mapper和Reducer是抽象类。
新的API是在org.apache.hadoop.mapreduce包(和子包)中的。之前版本的API则是放在org.apache.hadoop.mapred中的。
新的API广泛使用context object(上下文对象),并允许用户代码与MapReduce系统进行通信。例如,MapContext基本上充当着JobConf的OutputCollector和Reporter的角色。
新的API同时支持"推"和"拉"式的迭代。在这两个新老API中,键/值记录对被推mapper中,但除此之外,新的API允许把记录从map()方法中拉出,这也适用于reducer。"拉"式的一个有用的例子是分批处理记录,而不是一个接一个。
新的API统一了配置。旧的API有一个特殊的JobConf对象用于作业配置,这是一个对于Hadoop通常的Configuration对象的扩展(用于配置守护进程,请参见5.1节)。在新的API中,这种区别没有了,所以作业配置通过Configuration来完成。
作业控制的执行由Job类来负责,而不是JobClient,它在新的API中已经荡然无存。
例2-6使用新API重写了MaxTemperature的代码,不同之处用黑体字突出显示。
例2-6:使用新的context object(上下文对象)MapReduce
API在气象数据集中查找最高气温
1.public class NewMaxTemperature {
2.static class NewMaxTemperatureMapper
3.extends Mapper<LongWritable,Text, Text, IntWritable>
{
4.
5.private static final int
MISSING =
9999;
6.
7.public void map(LongWritable key, Text value,Context context)
8.throws IOException, InterruptedException {
9.
10. String line =value.toString();
11. String year =line.substring(15, 19);
12. int airTemperature;
13. if (line.charAt(87) == ‘+‘) { // parseInt doesn‘t like
14. leading plus signs
15. airTemperature =Integer.parseInt(line.substring(88, 92));
16. } else {
17. airTemperature =Integer.parseInt(line.substring(87, 92));
18. }
19. String quality =line.substring(92, 93);
20. if (airTemperature !=MISSING &&quality.matches("[01459]")){
21. context.write(new Text(year), new
22. IntWritable(airTemperature));
23. }
24. }
25.}
26.
27.static class NewMaxTemperatureReducer
28. extends Reducer<Text, IntWritable, Text,IntWritable>
{
29.
30. public void reduce(Text key, Iterable<IntWritable> values,
31. Context context)
32. throws IOException, InterruptedException {
33.
34. int maxValue =
Integer.MIN_VALUE;
35. for (IntWritable value : values) {
36. maxValue =Math.max(maxValue, value.get());
37. }
38. context.write(key, new IntWritable(maxValue));
39. }
40.}
41.
42.public static void main(String[] args) throws Exception {
43. if (args.length != 2) {
44. System.err.println("Usage: NewMaxTemperature
<input path>
45. <output path>");
46. System.exit(-1);
47. }
48.
49. Job job =new Job();
50. job.setJarByClass(NewMaxTemperature.class);
51.
52. FileInputFormat.addInputPath(job, new Path(args[0]));
53. FileOutputFormat.setOutputPath(job, new Path(args[1]));
54.
55. job.setMapperClass(NewMaxTemperatureMapper.class);
56. job.setReducerClass(NewMaxTemperatureReducer.class);
57.
58. job.setOutputKeyClass(Text.class);
59. job.setOutputValueClass(IntWritable.class);
60.
61. System.exit(job.waitForCompletion(true) ? 0 : 1);
62.
63. }
64.}