先上图,下图描绘了一个mapreduce程序的的一般运行过程和需要经过的几个阶段
大体上我们可以将mapreduce程序划分为inputformat ,map ,shuffle,reduce,outputformat五个阶段,下面我们会详细介绍各个阶段的具体的运行细节
以最简单的wordcount程序为例,本例使用基于hadoop2.6的环境,一般的api都使用mapreudce下的,注意不要使用mapred下的api可能会引起未知错误
惯例hello word程序
driver类,负责构建mapredue任务,设置job的名称,指定任务的输入文件并设置相应的读取类,map处理类,reduce处理类,输出文件路径,mapreduce程序从driver类开始,程序运行时会根据Configuration读取到配置和yarn通信,申请运行任务的资源,申请资源之后就开始将jar包发送到yarn的各个节点执行map和reudce任务
1 package mapreduce.wordcount; 2 3 import org.apache.hadoop.conf.Configuration; 4 import org.apache.hadoop.fs.Path; 5 import org.apache.hadoop.io.IntWritable; 6 import org.apache.hadoop.io.Text; 7 import org.apache.hadoop.mapreduce.Job; 8 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 9 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; 10 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 11 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 12 13 import java.io.IOException; 14 15 /** 16 * Created by teaegg on 2016/11/21. 17 */ 18 public class WordcountDriver { 19 public static void main(String[] args) throws InterruptedException, IOException, ClassNotFoundException { 20 runjob(args); 21 } 22 23 24 public static void runjob(String[] args) throws IOException, ClassNotFoundException, InterruptedException { 25 Configuration conf = new Configuration();//创建任务的配置对象,会自动加载hadoop的的配置文件 26 //Job job = new Job(conf);是废弃的写法,虽然依然可以使用 27 Job job = Job.getInstance(conf); 28 job.setJobName("wordcount");//设置任务的名称 29 30 //hadoop中对java一般的常用的数据类型做了封装形成了mapreduce专用的数据类型 31 job.setOutputKeyClass(Text.class);//Text.class对应了String类型 32 job.setOutputValueClass(IntWritable.class);//IntWritable.class对应了int类型 33 34 job.setMapperClass(WordcountMap.class);//设置任务的map类 35 job.setReducerClass(WordcountReduce.class);//设置任务的reduce类 36 37 job.setMapOutputKeyClass(Text.class);//设置map端输出数据类型 38 job.setMapOutputValueClass(IntWritable.class);//设置mapd端reduce的数据类型 39 //注意以上都要和map的write方法写出类型一致 40 41 42 //1. 这里不设置TextInputFormat.class也没有关系,hadoop默认调用TextInputFormat类处理, 43 //2. 注意TextInputFormat类不能用来读取Sequncefile类型文件 44 job.setInputFormatClass(TextInputFormat.class);//设置输入文件读取的类,可以hdfs上处理一般的文本文件 45 job.setOutputFormatClass(TextOutputFormat.class);//设置输出的文件类型 46 47 FileInputFormat.addInputPath(job, new Path(args[0]));//设置hdfs上输入文件路径,这里可以传入多个文件路径 48 // 可以再添加一个输入文件路径 49 //如 FileInputFormat.addInputPath(job, new Path(" ")); 50 FileOutputFormat.setOutputPath(job, new Path(args[1]));//设置hdfs上的输出文件路径, 51 52 job.waitForCompletion(true); 53 } 54 55 }
map类,其实再map类执行之前还有一个过程inputformat,这里没有详细介绍,inputformat过程负责将输入路径传入的文件做分片,每一个分片会生成一个map任务,并且会读取输入的文件分片,并返回一行记录recordreader对象交给map方法执行,就这样不断的生成recordreader对象交给map去执行,recordreader对应的就是map方法中解析的kv键值对
1 package mapreduce.wordcount; 2 3 import org.apache.hadoop.io.IntWritable; 4 import org.apache.hadoop.io.LongWritable; 5 import org.apache.hadoop.io.Text; 6 import org.apache.hadoop.mapreduce.Mapper; 7 8 import java.io.IOException; 9 10 /** 11 * Created by teaegg on 2016/11/21. 12 */ 13 public class WordcountMap extends Mapper<LongWritable, Text, Text, IntWritable> { 14 15 private Text outkey; 16 17 private IntWritable outvalue = new IntWritable(1); 18 19 public void map(LongWritable key, Text value, Context context) 20 throws IOException, InterruptedException { 21 22 //LongWritable key是这一行记录相对文件的偏移量,一般情况用不上,可以忽视 23 //inputformat过程默认采用Textinputformat类来读取hdfs上一般的文本文件,并将每一行记录转化成Text对象 24 //这是inputformat过程的一个主要职责,读取数据并交给map来处理, 25 26 String line = value.toString();//toString方法可以将Text类型的数据转换成String 27 String[] item = line.split(" ");//采用空格分隔每行数据 28 for (String str: item) { 29 outkey = new Text(str); //将每一个word再转化成Text类型 30 31 //1. 这一步是关键一步,无论是inputformat还是map或者reduce,其处理的数据都是键值对类型 32 //2. 这里调用context.write方法,将map处理好的结果写出到磁盘上,然后数据会根据key做排序,shuffle并最终 33 //到达reduce端交给reduce方法继续处理 34 //3. map端可以多次调用write方法,每次调用都是一个写出的键值对结果, 35 context.write(outkey, outvalue); 36 } 37 } 38 39 }
reduce类,reduce方法每次调用的时候会处理一个map输出的key和这个key下所有的输出的value值,
1 package mapreduce.wordcount; 2 3 import org.apache.hadoop.io.IntWritable; 4 import org.apache.hadoop.io.Text; 5 import org.apache.hadoop.mapreduce.Reducer; 6 7 import java.io.IOException; 8 9 /** 10 * Created by teaegg on 2016/11/21. 11 * <p> 12 * 1. map端传入的是“一行”数据,而在reduce中,输入的key 即Text key是map端输出的context.write(outkey, outvalue)中设置的key 13 * 2. 而reduce方法中输入参数Iterable<IntWritable> values 是map端输出键值对相同的key下所有的value的集合 14 */ 15 public class WordcountReduce extends Reducer<Text, IntWritable, Text, IntWritable> { 16 17 public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { 18 int sum = 0; 19 for (IntWritable val : values) { 20 sum += val.get(); 21 } 22 context.write(key, new IntWritable(sum)); 23 } 24 }
时间: 2024-11-10 13:53:36