MapReduce框架结构及代码示例

一个完整的 mapreduce 程序在分布式运行时有三类实例进程:

1、MRAppMaster:负责整个程序的过程调度及状态协调

2、MapTask:负责 map 阶段的整个数据处理流程

3、ReduceTask:负责 reduce 阶段的整个数据处理流程

设计构思

    MapReduce 是一个分布式运算程序的编程框架,核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序,并发运行在Hadoop 集群上。

    既然是做计算的框架,那么表现形式就是有个输入(input),MapReduce 操作这个输入(input),通过本身定义好的计算模型,得到一个输出(output)。

    对许多开发者来说,自己完完全全实现一个并行计算程序难度太大,而MapReduce 就是一种简化并行计算的编程模型,降低了开发并行应用的入门门槛。

  Hadoop MapReduce 构思体现在如下的三个方面:

  • 如何对付大数据处理:分而治之

    对相互间不具有计算依赖关系的大数据,实现并行最自然的办法就是采取分而治之的策略。并行计算的第一个重要问题是如何划分计算任务或者计算数据以便对划分的子任务或数据块同时进行计算。不可分拆的计算任务或相互间有依赖关系的数据无法进行并行计算!

  • 构建抽象模型:Map 和 Reduce

    MapReduce 借鉴了函数式语言中的思想,用 Map 和 Reduce 两个函数提供了高层的并行编程抽象模型。

    Map: 对一组数据元素进行某种重复式的处理;

    Reduce: 对 Map 的中间结果进行某种进一步的结果整理。

    MapReduce 中定义了如下的 Map 和 Reduce 两个抽象的编程接口,由用户去编程实现:

      map: (k1; v1) → [(k2; v2)]

      reduce: (k2; [v2]) → [(k3; v3)]

    Map 和 Reduce 为程序员提供了一个清晰的操作接口抽象描述。通过以上两个编程接口,大家可以看出 MapReduce 处理的数据类型是<key,value>键值对。

  • 统一构架,隐藏系统层细节

    如何提供统一的计算框架,如果没有统一封装底层细节,那么程序员则需要考虑诸如数据存储、划分、分发、结果收集、错误恢复等诸多细节;为此,MapReduce 设计并提供了统一的计算框架,为程序员隐藏了绝大多数系统层面的处理细节。

    MapReduce 最大的亮点在于通过抽象模型和计算框架把需要做什么(whatneed to do)与具体怎么做(how to do)分开了,为程序员提供一个抽象和高层的编程接口和框架。程序员仅需要关心其应用层的具体计算问题,仅需编写少量的处理应用本身计算问题的程序代码。如何具体完成这个并行计算任务所相关的诸多系统层细节被隐藏起来,交给计算框架去处理:从分布代码的执行,到大到数千小到单个节点集群的自动调度使用。

代码示例

  • pom依赖
<dependencies>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>2.7.4</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>2.7.4</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>2.7.4</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-core</artifactId>
            <version>2.7.4</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-jar-plugin</artifactId>
                <version>2.4</version>
                <configuration>
                    <archive>
                        <manifest>
                            <addClasspath>true</addClasspath>
                            <classpathPrefix>lib/</classpathPrefix>
                            <mainClass>hadoop.mr.wc.WordCountDriver</mainClass>
                        </manifest>
                    </archive>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.0</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <encoding>UTF-8</encoding>
                </configuration>
            </plugin>
        </plugins>
    </build>
  • Mapper类
/**
 * mr程序执行的时候mapper阶段运行的类,也就是maptask
 */
public class WordCountMapper extends Mapper<LongWritable,Text,Text,IntWritable> {
    //该方法为map阶段具体的业务逻辑的实现
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        //获取传入的一行内容
        String line = value.toString();
        //按照分隔符切割数据返回数组
        String[] words = line.split(" ");
        //遍历数组
        for (String word : words) {
            //每出现一个单词都标记1
            context.write(new Text(word),new IntWritable(1));
        }
    }
}
  • Reducer类
/**
 * mr程序执行的时候reducer阶段运行的类,也就是reducertask
 */
public class WordCountReducer extends Reducer<Text,IntWritable,Text,IntWritable>{
    //该方法为reducer阶段具体的业务逻辑的实现
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        //定义一个变量统计
        int count = 0;
        //遍历所有value所在的迭代器  累加构成该单词最终的总个数
        for (IntWritable value : values) {
            count += value.get();
        }
        //相同key的一组调用reduce完毕,直接输出
        context.write(key,new IntWritable(count));
    }
}
  • 入口函数

    /**
     * mr程序运行时的主类,除了入口函数之外,还要对mr程序做具体描述
     */
    public class WordCountDriver {
        public static void main(String[] args) throws Exception{
            Configuration conf = new Configuration();
            //指定mr程序使用本地模式模拟一套环境执行mr程序,一般用于本地代码测试
    //        conf.set("mapreduce.framework.name","local");
    
            //通过job方法获得mr程序运行的实例
            Job job = Job.getInstance(conf);
    
            //指定本次mr程序的运行主类
            job.setJarByClass(WordCountDriver.class);
            //指定本次mr程序使用的mapper reduce
            job.setMapperClass(WordCountMapper.class);
            job.setReducerClass(WordCountReducer.class);
    
            //指定本次mr程序map输出的数据类型
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(IntWritable.class);
    
            //指定本次mr程序reduce输出的数据类型,也就是说最终的输出类型
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(IntWritable.class);
    
            //指定本次mr程序待处理数据目录   输出结果存放目录
            FileInputFormat.addInputPath(job,new Path("/wordcount/input"));
            FileOutputFormat.setOutputPath(job,new Path("/wordcount/output"));
    //        FileInputFormat.addInputPath(job,new Path("D:\\wordcount\\input"));
    //        FileOutputFormat.setOutputPath(job,new Path("D:\\wordcount\\output"));
    
            //提交本次mr程序
            boolean b = job.waitForCompletion(true);
            System.exit(b ? 0 : 1);//程序执行成功,退出状态码为0,退出程序,否则为1
        }
    }
  • 测试

    入口类中被注释的部分为本地测试方法,也就是在windows指定路径中准备测试数据,直接run执行,

    而另一种方法是将代码打成jar包上传到集群中,在hdfs上指定路径准备数据,使用hadoop命令启动

hadoop jar wordcount.jar

原文地址:https://www.cnblogs.com/jifengblog/p/9277253.html

时间: 2024-08-29 10:58:09

MapReduce框架结构及代码示例的相关文章

MapReduce序列化及分区的java代码示例

概述 序列化(Serialization)是指把结构化对象转化为字节流. 反序列化(Deserialization)是序列化的逆过程.把字节流转为结构化对象. 当要在进程间传递对象或持久化对象的时候,就需要序列化对象成字节流,反之当要将接收到或从磁盘读取的字节流转换为对象,就要进行反序列化. Java 的序列化(Serializable)是一个重量级序列化框架,一个对象被序列化后,会附带很多额外的信息(各种校验信息,header,继承体系-),不便于在网络中高效传输:所以,hadoop 自己开发

Hadoop RCFile存储格式详解(源码分析、代码示例)

RCFile RCFile全称Record Columnar File,列式记录文件,是一种类似于SequenceFile的键值对(Key/Value Pairs)数据文件. 关键词:Record.Columnar.Key.Value. RCFile的优势在哪里?适用于什么场景?为了让大家有一个感性的认识,我们来看一个例子. 假设我们有这样一张9行3列的Hive数据表table,以普通的TextFile进行存储, 现在我们需要统计这张数据表的第二列(col2)值为“row5_col2”的出现次数

计算DXFReader中多边形的面积代码示例

在DXFReader中, 一般的多边形的面积计算绝对值 其中K表是顶点的数目,它们的坐标,用于在求和和, 所以用下面的代码就可以计算出一个封闭的多段线的区域: view source print? 01 Dim Vertex As Object 02 Dim Entity As Object 03 Dim k As Long 04 Dim i As Long 05 Dim Area As Single 06 07 With DXFReader1 08 09  For Each Entity In

代码示例:一些简单技巧优化JavaScript编译器工作详解,让你写出高性能运行的更快JavaScript代码

告诉你一些简单的技巧来优化JavaScript编译器工作,从而让你的JavaScript代码运行的更快.尤其是在你游戏中发现帧率下降或是当垃圾回收器有大量的工作要完成的时候. 单一同态: 当你定义了一个两个参数的函数,编译器会接受你的定义,如果函数参数的类型.个数或者返回值的类型改变编译器的工作会变得艰难.通常情况下,单一同态的数据结构和个数相同的参数会让你的程序会更好的工作. function example(a, b) { // 期望a,b都为数值类型 console.log(++a * +

jquery操作单选钮代码示例

jquery操作单选钮代码示例:radio单选按钮是最重要的表单元素之一,下面介绍一下常用的几个jquery对radio单选按钮操作.一.取消选中: $(".theclass").each(function(){ if($(this).attr('checked')) { $(this).attr('checked',false); } }); 以上代码可以将class属性值为theclass的被选中单选按钮取消选中.二.获取被选中的单选按钮的值: var val=$('.thecla

Python实现各种排序算法的代码示例总结

Python实现各种排序算法的代码示例总结 作者:Donald Knuth 字体:[增加 减小] 类型:转载 时间:2015-12-11我要评论 这篇文章主要介绍了Python实现各种排序算法的代码示例总结,其实Python是非常好的算法入门学习时的配套高级语言,需要的朋友可以参考下 在Python实践中,我们往往遇到排序问题,比如在对搜索结果打分的排序(没有排序就没有Google等搜索引擎的存在),当然,这样的例子数不胜数.<数据结构>也会花大量篇幅讲解排序.之前一段时间,由于需要,我复习了

领域驱动开发推荐代码示例 — Microsoft NLayerApp

简介: Microsoft NLayerApp是由微软西班牙团队出品的基于.NET 4.0的“面向领域N层分布式架构”代码示例,在codeplex上的地址是:http://microsoftnlayerapp.codeplex.com/. 架构图: 点击查看大图 代码下载:http://microsoftnlayerapp.codeplex.com/releases/view/56660 所用到的软件: - Microsoft Visual Studio 2010  - Microsoft Ex

Aspectj快速上手代码示例之Before,After,Around

本文不打算解释AOP的相关专业名词和概念,仅通过几个代码示例来展示Aspectj(对AOP实现的)的基本使用,并且使用的Aspectj是目前最新版本. 1.搭建环境 本文使用Maven来构建工程,通过aspectj-maven-plugin插件来编译*.aj文件至.class. Maven的具体配置: <plugin> <groupId>org.codehaus.mojo</groupId> <artifactId>aspectj-maven-plugin&

jxl创建Excel文件java代码示例

记得要下载 并 导入 jxl.jar 包,免积分下载地址:http://download.csdn.net/detail/u010011052/7561041 package Test; import java.io.*; import jxl.*; import jxl.format.Colour; import jxl.write.*; public class JXLTest { private static WritableWorkbook book; private static Wr