Hadoop编程实例之MapReduce

MapReduce原理图:

MapReduce具体执行过程图:

首先是客户端要编写好mapreduce程序,配置好mapreduce的作业也就是job,接下来就是提交job了,提交job是提交到JobTracker上的,这个时候JobTracker就会构建这个job,具体就是分配一个新的job任务的ID值,接下来它会做检查操作,这个检查就是确定输出目录是否存在,如果存在那么job就不能正常运行下去,JobTracker会抛出错误给客户端,接下来还要检查输入目录是否存在,如果不存在同样抛出错误,如果存在JobTracker会根据输入计算输入分片(Input Split),如果分片计算不出来也会抛出错误,至于输入分片我后面会做讲解的,这些都做好了JobTracker就会配置Job需要的资源了。分配好资源后,JobTracker就会初始化作业,初始化主要做的是将Job放入一个内部的队列,让配置好的作业调度器能调度到这个作业,作业调度器会初始化这个job,初始化就是创建一个正在运行的job对象(封装任务和记录信息),以便JobTracker跟踪job的状态和进程。初始化完毕后,作业调度器会获取输入分片信息(input split),每个分片创建一个map任务。接下来就是任务分配了,这个时候tasktracker会运行一个简单的循环机制定期发送心跳给jobtracker,心跳间隔是5秒,程序员可以配置这个时间,心跳就是jobtracker和tasktracker沟通的桥梁,通过心跳,jobtracker可以监控tasktracker是否存活,也可以获取tasktracker处理的状态和问题,同时tasktracker也可以通过心跳里的返回值获取jobtracker给它的操作指令。任务分配好后就是执行任务了。在任务执行时候jobtracker可以通过心跳机制监控tasktracker的状态和进度,同时也能计算出整个job的状态和进度,而tasktracker也可以本地监控自己的状态和进度。当jobtracker获得了最后一个完成指定任务的tasktracker操作成功的通知时候,jobtracker会把整个job状态置为成功,然后当客户端查询job运行状态时候(注意:这个是异步操作),客户端会查到job完成的通知的。如果job中途失败,mapreduce也会有相应机制处理,一般而言如果不是程序员程序本身有bug,mapreduce错误处理机制都能保证提交的job能正常完成。

下面,我们来实现一个mapreduce的demo,wordcount

public class WordCountMapReduce extends Configured implements Tool {

    // Mapper Class
    public static class MyMapper extends
            Mapper<LongWritable, Text, Text, IntWritable> {
        /*
         * key是偏移量,value是一行一行的值 首先分割单词,组成key/value对进行输出
         */
        private Text mapOutputKey = new Text();
        private final static IntWritable mapOutputValue = new IntWritable(1);

        @Override
        protected void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {
            // todo
            String line = value.toString().trim();

            //segment
            StringTokenizer strToken = new StringTokenizer(line);

            while(strToken.hasMoreTokens()){
                String word = strToken.nextToken();
                mapOutputKey.set(word);
                context.write(mapOutputKey, mapOutputValue);
            }

        }
    }

    // Reducer
    public static class MyReducer extends
            Reducer<Text, IntWritable, Text, IntWritable> {

        private IntWritable reduceOutputValue = new IntWritable();

        @Override
        protected void reduce(Text key, Iterable<IntWritable> values,
                Context context) throws IOException, InterruptedException {
            // todo

            int sum = 0;
            //reduce
            for(IntWritable value : values){
                sum+=value.get();
            }
            reduceOutputValue.set(sum);
            context.write(key, reduceOutputValue);
        }

    }

    public int run(String[] args) throws Exception {
        // set Conf env
        Configuration conf = new Configuration();
        // conf.set("mapreduce.map.output.compress", true);

        // get job by conf
        Job job = Job.getInstance(super.getConf(),
                WordCountMapReduce.class.getSimpleName());

        job.setJarByClass(WordCountMapReduce.class);

        // set job
        // step 1 : map phase
        job.setMapperClass(MyMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);

        // step 2 :reduce phase
        job.setCombinerClass(MyReducer.class);
        job.setReducerClass(MyReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        // submit
        // job.submit();
        boolean isSucceed = job.waitForCompletion(true);

        return isSucceed ? 1 : 0;

    }

    // Driver
    public static void main(String[] args) throws Exception {

        args = new String[] { "hdfs://192.168.1.109:8020/home/test/test.txt",
                "hdfs://192.168.1.109:8020/home/test/ouput2" };

        int status = ToolRunner.run(new WordCountMapReduce(), args);

        System.out.println(status);

    }

}

运行结果如下:

运行成功!

时间: 2024-10-08 14:02:58

Hadoop编程实例之MapReduce的相关文章

MapReduce编程实例5

前提准备: 1.hadoop安装运行正常.Hadoop安装配置请参考:Ubuntu下 Hadoop 1.2.1 配置安装 2.集成开发环境正常.集成开发环境配置请参考 :Ubuntu 搭建Hadoop源码阅读环境 MapReduce编程实例: MapReduce编程实例(一),详细介绍在集成环境中运行第一个MapReduce程序 WordCount及代码分析 MapReduce编程实例(二),计算学生平均成绩 MapReduce编程实例(三),数据去重 MapReduce编程实例(四),排序 M

Hadoop初学指南(6)--MapReduce的简单实例及分析

本文在上一节的基础上通过一个简单的MR示例对MapReduce的运行流程进行分析. 假设有两行数据,分别是hello you,hello me,我们要统计其中出现的单词以及每个单词出现的次数. 所得的结果为 hello   2 you     1 me      1 (1)大致运行流畅 1.解析成2个<k,v>,分别是<0, hello you><10, hello me>.调用2次map函数. 2.执行map任务 3.map输出后的数据是:<hello,1>

hadoop编程小技巧(4)---全局key排序类TotalOrderPartitioner

Hadoop代码测试版本:Hadoop2.4 原理:在进行MR程序之前对输入数据进行随机提取样本,把样本排序,然后在MR的中间过程Partition的时候使用这个样本排序的值进行分组数据,这样就可以达到全局排序的目的了. 难点:如果使用Hadoop提供的方法来实现全局排序,那么要求Mapper的输入.输出的key不变才可以,因为在源码InputSampler中提供的随机抽取的数据是输入数据最原始的key,如下代码(line:225): for (int i = 0; i < splitsToSa

hadoop 学习笔记:mapreduce框架详解

hadoop 学习笔记:mapreduce框架详解 开始聊mapreduce,mapreduce是hadoop的计算框架,我 学hadoop是从hive开始入手,再到hdfs,当我学习hdfs时候,就感觉到hdfs和mapreduce关系的紧密.这个可能是我做技术研究的 思路有关,我开始学习某一套技术总是想着这套技术到底能干什么,只有当我真正理解了这套技术解决了什么问题时候,我后续的学习就能逐步的加快,而学习 hdfs时候我就发现,要理解hadoop框架的意义,hdfs和mapreduce是密不

Hadoop学习笔记:MapReduce框架详解

原文出处: 夏天的森林 开始聊mapreduce,mapreduce是hadoop的计算框架,我学hadoop是从hive开始入手,再到hdfs,当我学习hdfs时候,就感觉到hdfs和mapreduce关系的紧密.这个可能是我做技术研究的思路有关,我开始学习某一套技术总是想着这套技术到底能干什么,只有当我真正理解了这套技术解决了什么问题时候,我后续的学习就能逐步的加快,而学习hdfs时候我就发现,要理解hadoop框架的意义,hdfs和mapreduce是密不可分,所以当我写分布式文件系统时候

hadoop编程技巧(6)---处理大量的小型数据文件CombineFileInputFormat申请书

代码测试环境:Hadoop2.4 应用场景:当需要处理非常多的小数据文件,这种技术的目的,可以被应用到实现高效的数据处理. 原理:申请书CombineFileInputFormat,能够进行切片合并的时候把多个小的数据文件.因为每个切片将有一个Mapper,当一个Mapper处理的数据比較小的时候,其效率较低.而一般使用Hadoop处理数据时.即默认方式,会把一个输入数据文件当做一个分片.这样当输入文件较小时就会出现效率低下的情况. 实例: 參考前篇blog:hadoop编程小技巧(5)---自

hadoop编程小技巧(1)---map端聚合

测试hadoop版本:2.4 Map端聚合的应用场景:当我们只关心所有数据中的部分数据时,并且数据可以放入内存中. 使用的好处:可以大大减小网络数据的传输量,提高效率: 一般编程思路:在Mapper的map函数中读入所有数据,然后添加到一个List(队列)中,然后在cleanup函数中对list进行处理,输出我们关系的少量数据. 实例: 在map函数中使用空格分隔每行数据,然后把每个单词添加到一个堆栈中,在cleanup函数中输出堆栈中单词次数比较多的单词以及次数: package fz.inm

hadoop编程小技巧(3)---自定义分区类Partitioner

Hadoop代码测试环境:Hadoop2.4 原理:在Hadoop的MapReduce过程中,Mapper读取处理完成数据后,会把数据发送到Partitioner,由Partitioner来决定每条记录应该送往哪个reducer节点,默认使用的是HashPartitioner,其核心代码如下: /** Use {@link Object#hashCode()} to partition. */ public int getPartition(K2 key, V2 value, int numRe

Hadoop实战实例

Hadoop实战实例 Hadoop 是Google MapReduce的一个Java实现.MapReduce是一种简化的分布式编程模式,让程序自动分布到一个由普通机器组成的超大集群上并发执行.就如同java程序员可以不考虑内存泄露一样, MapReduce的run-time系统会解决输入数据的分布细节,跨越机器集群的程序执行调度,处理机器的失效,并且管理机器之间的通讯请求.这样的模式允许程序员可以不需要有什么并发处理或者分布式系统的经验,就可以处理超大的分布式系统得资源. 一.概论 作为Hado