MaxTemperature.java
package cn.kissoft.hadoop.week05; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class MaxTemperature { public static void main(String[] args) throws Exception { if (args.length != 2) { System.err.println("Usage: MaxTemperature <input path> <output path>"); System.exit(-1); } Job job = new Job(); job.setJarByClass(MaxTemperature.class); job.setJobName("Max temperature"); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.setMapperClass(MaxTemperatureMapper.class); job.setReducerClass(MaxTemperatureReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
MaxTemperatureMapper.java
package cn.kissoft.hadoop.week05; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class MaxTemperatureMapper extends Mapper<LongWritable, Text, Text, IntWritable> { private static final int MISSING = 9999; @Override public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String year = line.substring(0, 4); int airTemperature = Integer.parseInt(line.substring(13, 19).trim()); if (airTemperature != MISSING) { context.write(new Text(year), new IntWritable(airTemperature)); } } }
MaxTemperatureReducer.java
package cn.kissoft.hadoop.week05; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class MaxTemperatureReducer extends Reducer<Text, IntWritable, Text, IntWritable> { @Override public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int maxValue = Integer.MIN_VALUE; for (IntWritable value : values) { maxValue = Math.max(maxValue, value.get()); } context.write(key, new IntWritable(maxValue)); } }
运行结果
[[email protected] guide]$ hadoop jar pc.jar cn.kissoft.hadoop.week05.MaxTemperature ./ch02/1959.txt ./ch02/out/ Warning: $HADOOP_HOME is deprecated. 14/08/15 11:27:15 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same. 14/08/15 11:27:15 INFO input.FileInputFormat: Total input paths to process : 1 14/08/15 11:27:15 INFO util.NativeCodeLoader: Loaded the native-hadoop library 14/08/15 11:27:15 WARN snappy.LoadSnappy: Snappy native library not loaded 14/08/15 11:27:17 INFO mapred.JobClient: Running job: job_201408150916_0003 14/08/15 11:27:18 INFO mapred.JobClient: map 0% reduce 0% 14/08/15 11:27:31 INFO mapred.JobClient: map 100% reduce 0% 14/08/15 11:27:43 INFO mapred.JobClient: map 100% reduce 100% 14/08/15 11:27:46 INFO mapred.JobClient: Job complete: job_201408150916_0003 14/08/15 11:27:46 INFO mapred.JobClient: Counters: 29 14/08/15 11:27:46 INFO mapred.JobClient: Job Counters 14/08/15 11:27:46 INFO mapred.JobClient: Launched reduce tasks=1 14/08/15 11:27:46 INFO mapred.JobClient: SLOTS_MILLIS_MAPS=14494 14/08/15 11:27:46 INFO mapred.JobClient: Total time spent by all reduces waiting after reserving slots (ms)=0 14/08/15 11:27:46 INFO mapred.JobClient: Total time spent by all maps waiting after reserving slots (ms)=0 14/08/15 11:27:46 INFO mapred.JobClient: Launched map tasks=1 14/08/15 11:27:46 INFO mapred.JobClient: Data-local map tasks=1 14/08/15 11:27:46 INFO mapred.JobClient: SLOTS_MILLIS_REDUCES=12558 14/08/15 11:27:46 INFO mapred.JobClient: File Output Format Counters 14/08/15 11:27:46 INFO mapred.JobClient: Bytes Written=9 14/08/15 11:27:46 INFO mapred.JobClient: FileSystemCounters 14/08/15 11:27:46 INFO mapred.JobClient: FILE_BYTES_READ=9773826 14/08/15 11:27:46 INFO mapred.JobClient: HDFS_BYTES_READ=27544475 14/08/15 11:27:46 INFO mapred.JobClient: FILE_BYTES_WRITTEN=14776914 14/08/15 11:27:46 INFO mapred.JobClient: HDFS_BYTES_WRITTEN=9 14/08/15 11:27:46 INFO mapred.JobClient: File Input Format Counters 14/08/15 11:27:46 INFO mapred.JobClient: Bytes Read=27544368 14/08/15 11:27:46 INFO mapred.JobClient: Map-Reduce Framework 14/08/15 11:27:46 INFO mapred.JobClient: Map output materialized bytes=4886910 14/08/15 11:27:46 INFO mapred.JobClient: Map input records=444264 14/08/15 11:27:46 INFO mapred.JobClient: Reduce shuffle bytes=4886910 14/08/15 11:27:46 INFO mapred.JobClient: Spilled Records=1332792 14/08/15 11:27:46 INFO mapred.JobClient: Map output bytes=3998376 14/08/15 11:27:46 INFO mapred.JobClient: Total committed heap usage (bytes)=219496448 14/08/15 11:27:46 INFO mapred.JobClient: CPU time spent (ms)=6770 14/08/15 11:27:46 INFO mapred.JobClient: Combine input records=0 14/08/15 11:27:46 INFO mapred.JobClient: SPLIT_RAW_BYTES=107 14/08/15 11:27:46 INFO mapred.JobClient: Reduce input records=444264 14/08/15 11:27:46 INFO mapred.JobClient: Reduce input groups=1 14/08/15 11:27:46 INFO mapred.JobClient: Combine output records=0 14/08/15 11:27:46 INFO mapred.JobClient: Physical memory (bytes) snapshot=310345728 14/08/15 11:27:46 INFO mapred.JobClient: Reduce output records=1 14/08/15 11:27:46 INFO mapred.JobClient: Virtual memory (bytes) snapshot=1455665152 14/08/15 11:27:46 INFO mapred.JobClient: Map output records=444264 [[email protected] guide]$ hadoop fs -ls ./ch02/out/ Warning: $HADOOP_HOME is deprecated. Found 3 items -rw-r--r-- 1 wukong supergroup 0 2014-08-15 11:27 /user/wukong/ch02/out/_SUCCESS drwxr-xr-x - wukong supergroup 0 2014-08-15 11:27 /user/wukong/ch02/out/_logs -rw-r--r-- 1 wukong supergroup 9 2014-08-15 11:27 /user/wukong/ch02/out/part-r-00000 [[email protected] guide]$ hadoop fs -cat ./ch02/out/part-r-00000 Warning: $HADOOP_HOME is deprecated. 1959 418
截图
MapReduce示例-气象站
时间: 2024-11-05 15:58:10