mapreduce分为map和reduce两个工作,map负责处理初始数据,处理后产生的新数据再汇聚给reduce处理。
map和reduce类的重写都是一个规则。都是类名<*,*,*,*>(尖括号里面,Java叫做泛型)四个参数,map的前两个参数是从文件处传输过来待处理的key和value值,然后后两个参数是要提交给reduce处理的key和value值。例如map的尖括号里面是<*,*,Text,Intwritable>,那么reduce就必须是<Text,Intwritable,*,*>。这是根据一个带有学生名字和分数的文件,自己写的输出最高分的mapreduce
package org.apache.hadoop.examples;
import java.io.IOException;
import java.io.IOException;
import java.net.URI;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.examples.WordCount.IntSumReducer;
import org.apache.hadoop.fs.FileSystem;
//import org.apache.hadoop.examples.Wordcount.IntSumReducer;
//import org.apache.hadoop.examples.Wordcount.TokenizerMapper;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import java.util.Iterator;
public class statics {
public static class Map extends
Mapper<Object, Text, Text, IntWritable> {
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
int max=0;
int min=65535;
String line = value.toString();
String s[]=line.split(" ");
for (int i=0;i<s.length;i++)
{
if (Integer.parseInt(s[i])>max)
max=Integer.parseInt(s[i]);
if (Integer.parseInt(s[i])>min)
min=Integer.parseInt(s[i]);
}
Text text = new Text("1");
// Text text1 = new Text("2");
context.write(text, new IntWritable(max));
// context.write(text1, new IntWritable(min));
}
}
public static class Reduce extends
Reducer<Text, IntWritable, Text, IntWritable> {
public void reduce(Text key, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {
int max=0;
int min=10000;
System.out.println(key.toString()+"....................");
Iterator<IntWritable> iterator = values.iterator(); //循环遍历成绩
while (iterator.hasNext()) {
int now = iterator.next().get();
if (now>max)
max =now;
}
context.write(new Text("max is "), new IntWritable(max));
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);
conf.set("mapred.job.tracker", "127.0.0.1:9000");
// 指定带运行参数的目录为输入输出目录
String[] otherArgs = new GenericOptionsParser(conf, args)
.getRemainingArgs();
if (otherArgs.length != 2) { // 判断路径参数是否为3个
System.err.println("Usage: Data Deduplication <in> <out><out>");
System.exit(2);
}
Job job = Job.getInstance(conf,"average");
job.setJarByClass(statics.class);
job.setMapperClass(Map.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setReducerClass(Reduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
Path outpath = new Path(otherArgs[1]);
if (fs.exists(outpath))
{
fs.delete(outpath,true);
}
FileOutputFormat.setOutputPath(job, outpath);
if (job.waitForCompletion(true))
{
System.exit(0);
}
}
}