mapreduce运行流程总结

先上图,下图描绘了一个mapreduce程序的的一般运行过程和需要经过的几个阶段

  大体上我们可以将mapreduce程序划分为inputformat ,map ,shuffle,reduce,outputformat五个阶段,下面我们会详细介绍各个阶段的具体的运行细节

  以最简单的wordcount程序为例,本例使用基于hadoop2.6的环境,一般的api都使用mapreudce下的,注意不要使用mapred下的api可能会引起未知错误

 

   惯例hello word程序

driver类,负责构建mapredue任务,设置job的名称,指定任务的输入文件并设置相应的读取类,map处理类,reduce处理类,输出文件路径,mapreduce程序从driver类开始,程序运行时会根据Configuration读取到配置和yarn通信,申请运行任务的资源,申请资源之后就开始将jar包发送到yarn的各个节点执行map和reudce任务

 1 package mapreduce.wordcount;
 2
 3 import org.apache.hadoop.conf.Configuration;
 4 import org.apache.hadoop.fs.Path;
 5 import org.apache.hadoop.io.IntWritable;
 6 import org.apache.hadoop.io.Text;
 7 import org.apache.hadoop.mapreduce.Job;
 8 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 9 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
10 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
11 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
12
13 import java.io.IOException;
14
15 /**
16  * Created by teaegg on 2016/11/21.
17  */
18 public class WordcountDriver {
19     public static void main(String[] args) throws InterruptedException, IOException, ClassNotFoundException {
20         runjob(args);
21     }
22
23
24     public static void runjob(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
25         Configuration conf = new Configuration();//创建任务的配置对象,会自动加载hadoop的的配置文件
26         //Job job = new Job(conf);是废弃的写法,虽然依然可以使用
27         Job job  = Job.getInstance(conf);
28         job.setJobName("wordcount");//设置任务的名称
29
30         //hadoop中对java一般的常用的数据类型做了封装形成了mapreduce专用的数据类型
31         job.setOutputKeyClass(Text.class);//Text.class对应了String类型
32         job.setOutputValueClass(IntWritable.class);//IntWritable.class对应了int类型
33
34         job.setMapperClass(WordcountMap.class);//设置任务的map类
35         job.setReducerClass(WordcountReduce.class);//设置任务的reduce类
36
37         job.setMapOutputKeyClass(Text.class);//设置map端输出数据类型
38         job.setMapOutputValueClass(IntWritable.class);//设置mapd端reduce的数据类型
39         //注意以上都要和map的write方法写出类型一致
40
41
42         //1. 这里不设置TextInputFormat.class也没有关系,hadoop默认调用TextInputFormat类处理,
43         //2. 注意TextInputFormat类不能用来读取Sequncefile类型文件
44         job.setInputFormatClass(TextInputFormat.class);//设置输入文件读取的类,可以hdfs上处理一般的文本文件
45         job.setOutputFormatClass(TextOutputFormat.class);//设置输出的文件类型
46
47         FileInputFormat.addInputPath(job, new Path(args[0]));//设置hdfs上输入文件路径,这里可以传入多个文件路径
48         // 可以再添加一个输入文件路径
49         //如 FileInputFormat.addInputPath(job, new Path(" "));
50         FileOutputFormat.setOutputPath(job, new Path(args[1]));//设置hdfs上的输出文件路径,
51
52         job.waitForCompletion(true);
53     }
54
55 }

map类,其实再map类执行之前还有一个过程inputformat,这里没有详细介绍,inputformat过程负责将输入路径传入的文件做分片,每一个分片会生成一个map任务,并且会读取输入的文件分片,并返回一行记录recordreader对象交给map方法执行,就这样不断的生成recordreader对象交给map去执行,recordreader对应的就是map方法中解析的kv键值对

 1 package mapreduce.wordcount;
 2
 3 import org.apache.hadoop.io.IntWritable;
 4 import org.apache.hadoop.io.LongWritable;
 5 import org.apache.hadoop.io.Text;
 6 import org.apache.hadoop.mapreduce.Mapper;
 7
 8 import java.io.IOException;
 9
10 /**
11  * Created by teaegg on 2016/11/21.
12  */
13 public class WordcountMap extends Mapper<LongWritable, Text, Text, IntWritable> {
14
15     private Text outkey;
16
17     private IntWritable outvalue = new IntWritable(1);
18
19     public void map(LongWritable key, Text value, Context context)
20             throws IOException, InterruptedException {
21
22         //LongWritable key是这一行记录相对文件的偏移量,一般情况用不上,可以忽视
23         //inputformat过程默认采用Textinputformat类来读取hdfs上一般的文本文件,并将每一行记录转化成Text对象
24         //这是inputformat过程的一个主要职责,读取数据并交给map来处理,
25
26         String line = value.toString();//toString方法可以将Text类型的数据转换成String
27         String[] item = line.split(" ");//采用空格分隔每行数据
28         for (String str: item) {
29             outkey = new Text(str);    //将每一个word再转化成Text类型
30
31             //1. 这一步是关键一步,无论是inputformat还是map或者reduce,其处理的数据都是键值对类型
32             //2. 这里调用context.write方法,将map处理好的结果写出到磁盘上,然后数据会根据key做排序,shuffle并最终
33             //到达reduce端交给reduce方法继续处理
34             //3. map端可以多次调用write方法,每次调用都是一个写出的键值对结果,
35             context.write(outkey, outvalue);
36         }
37     }
38
39     }

reduce类,reduce方法每次调用的时候会处理一个map输出的key和这个key下所有的输出的value值,

 1 package mapreduce.wordcount;
 2
 3 import org.apache.hadoop.io.IntWritable;
 4 import org.apache.hadoop.io.Text;
 5 import org.apache.hadoop.mapreduce.Reducer;
 6
 7 import java.io.IOException;
 8
 9 /**
10  * Created by teaegg on 2016/11/21.
11  * <p>
12  * 1. map端传入的是“一行”数据,而在reduce中,输入的key  即Text key是map端输出的context.write(outkey, outvalue)中设置的key
13  * 2. 而reduce方法中输入参数Iterable<IntWritable> values  是map端输出键值对相同的key下所有的value的集合
14  */
15 public class WordcountReduce extends Reducer<Text, IntWritable, Text, IntWritable> {
16
17     public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
18         int sum = 0;
19         for (IntWritable val : values) {
20             sum += val.get();
21         }
22         context.write(key, new IntWritable(sum));
23     }
24 }
时间: 2024-11-10 13:53:36

mapreduce运行流程总结的相关文章

MapReduce运行流程分析

研究MapReduce已经有一段时间了.起初是从分析WordCount程序开始,后来开始阅读Hadoop源码,自认为已经看清MapReduce的运行流程.现在把自己的理解贴出来,与大家分享,欢迎纠错. 还是以最经典的WordCount程序作为基础,来分析map阶段.reduce阶段和最复杂的shuffle阶段. 文本1:hello world                                      文本2:map reduce hello hadoop            

Hadoop Mapreduce运行流程

Mapreduce的运算过程为两个阶段: 第一个阶段的map task相互独立,完全并行: 第二个阶段的reduce task也是相互独立,但依赖于上一阶段所有map task并发实例的输出: 这些task任务分布在多台机器运行,它的运行管理是有一个master负责,这个master由yarn负责启动,那么yarn如何知道启动多少个map task进程去计算呢? 下面概述一下Mapreduce的执行流程: 1.客户端首先会访问hdfs的namenode获取待处理数据的信息(文件数及文件大小),形

【转】mapreduce运行机制

转自http://langyu.iteye.com/blog/992916 写的相当好! 谈 mapreduce运行机制,可以从很多不同的角度来描述,比如说从mapreduce运行流程来讲解,也可以从计算模型的逻辑流程来进行讲解,也许有些 深入理解了mapreduce运行机制还会从更好的角度来描述,但是将mapreduce运行机制有些东西是避免不了的,就是一个个参入的实例对象,一个 就是计算模型的逻辑定义阶段,我这里讲解不从什么流程出发,就从这些一个个牵涉的对象,不管是物理实体还是逻辑实体. 首

hadoop笔记之MapReduce的运行流程

MapReduce的运行流程 MapReduce的运行流程 基本概念: Job&Task:要完成一个作业(Job),就要分成很多个Task,Task又分为MapTask和ReduceTask JobTracker TaskTracker Hadoop MapReduce体系结构 JobTracker的角色 作业调度 分配任务.监控任务执行进度 监控TaskTracker的状态 TaskTracker的角色 执行任务 汇报任务状态 MapReduce作业执行过程 MapReduce的容错机制 重复

016_笼统概述MapReduce执行流程结合wordcount程序

一.map任务处理 1 .读取输入文件内容,解析成key.value对.对输入文件的每一行,解析成key.value对.每一个键值对调用一次map函数. 2 .写自己的逻辑,对输入的key.value处理,转换成新的key.value输出.3. 对输出的key.value进行分区.4 .对不同分区的数据,按照key进行排序.分组.相同key的value放到一个集合中.5 .(可选)分组后的数据进行归约. 二.reduce任务处理 1.对多个map任务的输出,按照不同的分区,通过网络copy到不同

【原创】MapReduce运行原理和过程

一.Map的原理和运行流程 Map的输入数据源是多种多样的,我们使用hdfs作为数据源.文件在hdfs上是以block(块,Hdfs上的存储单元)为单位进行存储的. 1.分片 我们将这一个个block划分成数据分片,即Split(分片,逻辑划分,不包含具体数据,只包含这些数据的位置信息),那么上图中的第一个Split则对应两个个文件块,第二个Split对应一个块.需要注意的是一个Split只会包含一个File的block,不会跨文件.  2. 数据读取和处理 当我们把数据块分好的时候,MapRe

大数据技术之_05_Hadoop学习_02_MapReduce_MapReduce框架原理+InputFormat数据输入+MapReduce工作流程(面试重点)+Shuffle机制(面试重点)

第3章 MapReduce框架原理3.1 InputFormat数据输入3.1.1 切片与MapTask并行度决定机制3.1.2 Job提交流程源码和切片源码详解3.1.3 FileInputFormat切片机制3.1.4 CombineTextInputFormat切片机制3.1.5 CombineTextInputFormat案例实操3.1.6 FileInputFormat实现类3.1.7 KeyValueTextInputFormat使用案例3.1.8 NLineInputFormat使

spark记录(5)Spark运行流程及在不同集群中的运行过程

摘自:https://www.cnblogs.com/qingyunzong/p/8945933.html 一.Spark中的基本概念 (1)Application:表示你的应用程序 (2)Driver:表示main()函数,创建SparkContext.由SparkContext负责与ClusterManager通信,进行资源的申请,任务的分配和监控等.程序执行完毕后关闭SparkContext (3)Executor:某个Application运行在Worker节点上的一个进程,该进程负责运

Spark学习之路 (七)Spark 运行流程

讨论QQ:1586558083 目录 一.Spark中的基本概念 二.Spark的运行流程 2.1 Spark的基本运行流程 三.Spark在不同集群中的运行架构 3.1 Spark on Standalone运行过程 3.2 Spark on YARN运行过程 正文 回到顶部 一.Spark中的基本概念 (1)Application:表示你的应用程序 (2)Driver:表示main()函数,创建SparkContext.由SparkContext负责与ClusterManager通信,进行资