MapReduce 初试

一、境遇

接触Hadoop已经有半年了,从Hadoop集群搭建到Hive、HBase、Sqoop相关组件的安装,甚至Spark on Hive、Phoenix、Kylin这些边缘的项目都有涉及。如果说部署,我自认为可以没有任何问题,但是如果说我对于这个系统已经掌握了,我却不敢这么讲,因为至少MapReduce我还没有熟悉,其工作机制也只是一知半解。关于MapReduce的运算,我差不多理解了,但是实际实现现在却只能靠找到的代码,真的是惭愧的很。

于是再也忍不住,一定要有点自己的东西,最起码,写的时候不用去找别人的博客,嗯,找自己的就行。

二、实验

1、实验过程

最开始实验的是最简单的去重MapReduce,在本地文件实验时没有任何问题,但把文件放到HDFS上就怎么也找不到了,究其原因,HDFS上的需要用Hadoop执行jar文件才可以

1)javac输出类到指定目录 dir

javac *.java -d dir

2)jar打包class文件

1,打包指定class文件到target.jar

jar cvf target.jar x1.class x2.class ... xn.class

2,打包指定路径dir下的所有class文件到target.jar

jar cvf target.jar -C dir .

3,打包class文件成可执行jar,程序入口Main函数

jar cvfe tarrget.jar Main -C dir .

Hadoop只需要普通jar即可,不用打包成可执行jar

3)执行jar,主类MapDuplicate

Hadoop jar target.jar MapDuplicate (params)

2、代码分析

1)import类

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

Configuration类:用来设定Hadoop的参数,如:IP、端口等

Path:用来设定输入输出路径

IntWritable:MapReduce用到的int类型

Text:MapReduce用到的string类型

Job:生成MapReduce任务的主类,任务参数也在此类中设定

Mapper:被继承的Map类

Reducer:被继承的Reduce类

FileInputFormat:输入文件格式

FileOutputFormat:输出文件格式(可改为其它IO类,如数据库)

GenericOptionsParser:解析命令行参数的类

2)代码结构

public class MapDuplicate {
    public static class Map extends Mapper<...> { ... }
    public static class Reduce extends Reducer<...> { ... }
    public static void main(String[] args) throws Ex { ... }
}

2)Map类

        public static class Map extends Mapper<Object,Text,Text,Text> {
                private static Text line = new Text();

                public void map(Object key,Text value,Context context)
                throws IOException,InterruptedException {
                        line = value;
                        context.write(line,new Text(""));
                }
        }

Map类的主要作用是将数据进行统一处理,按照规则给出键值对,为Combine和Reduce等Reduce操作提供标准化数据。从代码上来讲,均继承Mapper类,并实现map函数

Mapper类继承的四个参数,前两个分别是输入数据键和值的类型,一般写Object,Text即可;后两个是输出数据键和值的类型,这两个类型必须和Reduce的输入数据键值类型一致。

所有的Java值类型在送到MapReduce任务前都要转化成对应的值类型:如:String->Text,int->IntWritable,long->LongWritable

Context是Java类与MapReduce任务交互的类,它把Map的键值对传给Combiner或者Reducer,也把Reducer的结果写到HDFS上

3)Reduce类

        public static class Reduce extends Reducer<Text,Text,Text,Text> {
                public void reduce(Text key,Iterable<Text> values,Context context)
                throws IOException,InterruptedException {
                        context.write(key,new Text(""));
                }
        }

Reduce有两种操作,Combine和Reduce,都继承Reducer类。前者用于对数据进行预处理,将处理好的数据交给Reduce,可以看成是本地的Reduce,当不需要任何处理时,Combine可以直接用Reduce代替;后者用于对数据进行正式处理,将相同键值的数据合并,每一个Reduce函数过程只处理同一个键(key)的数据。

Reducer类继承的四个参数,前两个是输入数据键和值的类型,必须与Mapper类的输出类型一致(Combine也必须一致,而且Combine输出需要跟Reduce的输入一致,所以Combine输入输出类型必须是相同的);后两个是输出数据键和值的类型,即我们最终得到的结果

4)Main函数

        public static void main(String[] args) throws Exception {
                Configuration conf = new Configuration();
                conf.set("mapred.job.tracker","XHadoop1:9010");
                String[] ioArgs = new String[] {"duplicate_in","duplicate_out"};
                String[] otherArgs = new GenericOptionsParser(conf,ioArgs).getRemainingArgs();
                if (otherArgs.length != 2) {
                        System.err.println("Usage: MapDuplicate <in> <out>");
                        System.exit(2);
                }
                Job job = new Job(conf,"MapDuplicate");
                job.setJarByClass(MapDuplicate.class);
                job.setMapperClass(Map.class);
                job.setCombinerClass(Reduce.class);
                job.setReducerClass(Reduce.class);
                job.setOutputKeyClass(Text.class);
                job.setOutputValueClass(Text.class);

                FileInputFormat.addInputPath(job,new Path(otherArgs[0]));
                FileOutputFormat.setOutputPath(job,new Path(otherArgs[1]));
                System.exit(job.waitForCompletion(true) ? 0 : 1);
        }

首先,必须有Configuration类,通过这个类指定工作的机器

然后,接收参数的语句,这个不解释了

然后,需要有Job类,指定MapReduce处理用到的类,需要指定的有:Mapper类、Combiner类、Reducer类、输出数据键和值类型的类

然后,指定输入数据的路径

然后,等待任务结束并退出

三、总结

这个实验可以说是最简单的MapReduce,但是麻雀虽小五脏俱全。

从原理来讲,MapReduce有以下步骤:

HDFS(Block)->Split->Mapper->Partion->Spill->Sort->Combiner->Merge->Reducer->HDFS

1、HDFS输入数据被分成Split,被Mapper类读取,

2、Mapper读取数据后,将任务进行Partion(分配)

3、如果Map操作内存溢出,需要Spill(溢写)到磁盘上

4、Mapper进行Sort(排序)操作

5、排序之后进行Combine(合并key)操作,可以理解为本地模式Reduce

6、Combine的同时会进行溢出文件的Merge(合并)

7、所有任务完成后将数据交给Reducer进行处理,处理完成写入HDFS

8、从Map任务开始到Reduce任务开始的数据传输操作叫做Shuffle

从编程来讲,MapReduce有以下步骤:

1、编写Mapper类

2、编写Combiner类(可选)

3、编写Reducer类

4、调用过程:参数配置Configuration

指定任务类

指定输入输出格式

指定数据位置

开始任务

以上仅仅是浅层认识,仅供学习参考及备查。

时间: 2024-10-28 10:48:55

MapReduce 初试的相关文章

Hadoop-2、Mapred初试

import java.io.IOException; import java.util.Iterator; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable;

PageRank算法简介及Map-Reduce实现

PageRank对网页排名的算法,曾是Google发家致富的法宝.以前虽然有实验过,但理解还是不透彻,这几天又看了一下,这里总结一下PageRank算法的基本原理. 一.什么是pagerank PageRank的Page可是认为是网页,表示网页排名,也可以认为是Larry Page(google 产品经理),因为他是这个算法的发明者之一,还是google CEO(^_^).PageRank算法计算每一个网页的PageRank值,然后根据这个值的大小对网页的重要性进行排序.它的思想是模拟一个悠闲的

MapReduce实现手机上网流量分析

一.问题背景 现在的移动刚一通话就可以在网站上看自己的通话记录,以前是本月只能看上一个月.不过流量仍然是只能看上一月的. 目的就是找到用户在一段时间内的上网流量. 本文并没有对时间分组. 二.数据集分析 可以看出实际数据集并不是每个字段都有值,但是还好,完整地以tab隔开了,数据格式还是不错的,我们需要的上行下行数据都有,没有缺失值.其实这个需要在程序中处理,如果不在的话 该怎么办. 1363157985066 13726230503 00-FD-07-A4-72-B8:CMCC 120.196

mapreduce和spark的原理及区别

Mapreduce和spark是数据处理层两大核心,了解和学习大数据必须要重点掌握的环节,根据自己的经验和大家做一下知识的分享. 首先了解一下Mapreduce,它最本质的两个过程就是Map和Reduce,Map的应用在于我们需要数据一对一的元素的映射转换,比如说进行截取,进行过滤,或者任何的转换操作,这些一对一的元素转换就称作是Map:Reduce主要就是元素的聚合,就是多个元素对一个元素的聚合,比如求Sum等,这就是Reduce. Mapreduce是Hadoop1.0的核心,Spark出现

基于 Eclipse 的 MapReduce 开发环境搭建

文 / vincentzh 原文连接:http://www.cnblogs.com/vincentzh/p/6055850.html 上周末本来要写这篇的,结果没想到上周末自己环境都没有搭起来,运行起来有问题的呢,拖到周一才将问题解决掉.刚好这周也将之前看的内容复习了下,边复习边码代码理解,印象倒是很深刻,对看过的东西理解也更深入了. 目录 1.概述 2.环境准备 3.插件配置 4.配置文件系统连接 5.测试连接 6.代码编写与执行 7.问题梳理 7.1 console 无日志输出问题 7.2

mongodb aggregate and mapReduce

Aggregate MongoDB中聚合(aggregate)主要用于处理数据(诸如统计平均值,求和等),并返回计算后的数据结果.有点类似sql语句中的 count(*) 语法如下: db.collection.aggregate() db.collection.aggregate(pipeline,options) db.runCommand({ aggregate: "<collection>", pipeline: [ <stage>, <...&g

Ubuntu Nginx uwsgi django 初试

/************************************************************************************** * Ubuntu Nginx uwsgi django 初试 * 说明: * 最近打算通过Python搭建一个数据收集的网站,先做一个搭建测试. * * 2016-8-5 深圳 南山平山村 曾剑锋 ***************************************************************

MapReduce源码分析之Task中关于对应TaskAttempt存储Map方案的一些思考

我们知道,MapReduce有三层调度模型,即Job-->Task-->TaskAttempt,并且: 1.通常一个Job存在多个Task,这些Task总共有Map Task和Redcue Task两种大的类型(为简化描述,Map-Only作业.JobSetup Task等复杂的情况这里不做考虑): 2.每个Task可以尝试运行1-n此,而且通常很多情况下都是1次,只有当开启了推测执行原理且存在拖后腿Task,或者Task之前执行失败时,Task才执行多次. 而TaskImpl中存在一个成员变

初步掌握MapReduce的架构及原理

目录 1.MapReduce定义 2.MapReduce来源 3.MapReduce特点 4.MapReduce实例 5.MapReduce编程模型 6.MapReduce 内部逻辑 7.MapReduce架构 8.MapReduce框架的容错性 9.MapReduce资源组织方式 1.MapReduce 定义 Hadoop 中的 MapReduce是一个使用简单的软件框架,基于它写出来的应用程序能够运行在由上千个商用机器组成的大型集群上,并以一种可靠容错式并行处理TB级别的数据集 2.MapR