MapReduce之浅析Map接口和Reduce接口


import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Partitioner;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class WordCount {

public static class TokenizerMapper
extends Mapper<Object, Text, Text, IntWritable>{

private final static IntWritable one = new IntWritable(1);
private Text word = new Text();

public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
String line = value.toString();
StringTokenizer itr = new StringTokenizer(line);
while (itr.hasMoreTokens()) {
word.set(itr.nextToken().toLowerCase());
context.write(word, one);
}
}
}

public static class IntSumReducer
extends Reducer<Text,IntWritable,Text,IntWritable> {
private IntWritable result = new IntWritable();

public void reduce(Text key, Iterable<IntWritable> values,
Context context
) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, new IntWritable(sum));
}
}

public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: wordcount <in> <out>");
System.exit(2);
}
Job job = new Job(conf, "word count");
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}

http://www.cnblogs.com/xuqiang/archive/2011/06/05/2071935.html

关键语句:

Job job = new Job(conf, "word
count");//构造一个job作业

job.setMapperClass(TokenizerMapper.class);//设置job作业的map类

job.setReducerClass(IntSumReducer.class);//设置job作业的reduce类

FileInputFormat.addInputPath(job, new
Path(otherArgs[0]));//设置输入路径

FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));//设置输出路径

System.exit(job.waitForCompletion(true) ? 0 : 1);//等待Job完成

图:数据流程图

InputDataFormat类将行记录变成<行号,行内容>对;

Mapper类将记录行<行号,行内容>变成<键值,键对应内容>;

MapReduceFramwok框架将相同键值组合成<键值,对应内容列表>;

Reduce类中就是把<键值,对应内容列表>对变成<键值,键对应内容>;

我们所关注的是Mapper类和Reduce类

前言:数据在整体框架上能够流动是因为key和value是可以序列化和反序列化的;

value值类型通过接口Writable来定义实现;key和value值类型可以通过WritableComparalbe<T>接口实现;这些通过类实现,那么这个类就是该key和value的数据类型。

系统已经预定义实现了如下类:

同理:对于Mapper类和Reduce类

一个map类必须实现Mapper接口,一个reduce类必须实现Reduce接口;

如何实现:

重点是实现Mapper接口下的函数map;Reduce接口的reduce函数。具体原型及其代码见wordcount代码。

其中Mapper接口继承于MapReduceBase类;Reduce接口继承于MapReduceBase类。

MapReduce之浅析Map接口和Reduce接口

时间: 2024-10-12 09:13:58

MapReduce之浅析Map接口和Reduce接口的相关文章

MapReduce作业的map task和reduce task调度参数

MapReduce作业可以细分为map task和reduce task,而MRAppMaster又将map task和reduce task分为四种状态: 1.pending:刚启动但尚未向resourcemanager发送资源请求: 2.scheduled:已经向resourceManager发送资源请求,但尚未分配到资源: 3.assigned:已经分配到了资源且正在运行: 4.completed:已经运行完成. map task的生命周期为:scheduled -> assigned -

List接口、Set接口和Map接口

1.List和Set接口自Collection接口,而Map不是继承的Collection接口 Collection表示一组对象,这些对象也称为collection的元素;一些 collection允许有重复的元素,而另一些则不允许;一些collection是有序的,而另一些则是无序的;JDK中不提供此接口的任何直接实 现,它提供更具体的子接口(如 Set 和 List)实现;Map没有继承Collection接口,Map提供key到value的映射;一个Map中不能包含相同key,每个key只

List接口和Set接口和Map接口的of方法

只适用于List接口和Set接口和Map接口,不能改变,不允许有重复元素: 原文地址:https://www.cnblogs.com/wmqiang/p/10663856.html

Hadoop那些事儿(三)---MapReduce编程浅析

1.map和reduce 1.1 mapReduce处理逻辑 在本系列文章的第一篇中,曾对MapReduce原理做过简单的描述,在这里再重述一遍. 首先我们有两个文件word1.txt和word2.txt 其中word1.txt的内容如下: aaaa bbbb cccc dddd aaaa word2.txt的内容如下: aaaa cccc dddd eeee aaaa 这里的两个文件很小,我们先假设这两个文件很大,分别为64M和96M的大小,然后我们需要统计文件中每个字符串的数量,那么MapR

记Hadoop2.5.0线上mapreduce任务执行map任务划分的一次问题解决

前言 近日在线上发现有些mapreduce作业的执行时间很长,我们需要解决这个问题.输入文件的大小是5G,采用了lzo压缩,整个集群的默认block大小是128M.本文将详细描述这次线上问题的排查过程. 现象 线上有一个脚本,为了便于展示,我将这个脚本重新copy了一份并重命名为zzz.这个脚本实际是使用Hadoop streaming运行一个mapreduce任务,在线上执行它的部分输出内容如下: 可以看到map任务划分为1个.这个执行过程十分漫长,我将中间的一些信息省略,map与reduce

MapReduce实例浅析

在文章<MapReduce原理与设计思想>中,详细剖析了MapReduce的原理,这篇文章则通过实例重点剖析MapReduce 本文地址:http://www.cnblogs.com/archimedes/p/mapreduce-example-analysis.html,转载请注明源地址. 1.MapReduce概述 Hadoop Map/Reduce是一个使用简易的软件框架,基于它写出来的应用程序能够运行在由上千个商用机器组成的大型集群上,并以一种可靠容错的方式并行处理上T级别的数据集.

Java基础之集合框架(Collection接口和List接口)

首先我们说说集合有什么作用. 一.集合的作用 1.在类的内部,对数据进行组织: 2.简单而快速的搜索大数量的条目: 3.有的集合接口,提供一系列排列有序的元素,并且可以在序列中间快速的插入或者删除有关元素: 例如:做广播操的时候,可以将学生插入到某排某列,反之也可以叫某排某列中的学生出列. 4.有的集合接口,提供了映射关系,可以通过关键字(key)去快速查找到对应的唯一对象,而这个关键字可以是任意类型. 例如:在吃饭的时候,众多铝饭盒中如何区别是自己的呢?在饭盒上刻独有的标志或贴纸条,这个标志和

集合类:collection接口及其子接口(基础知识普及)

collection接口是集合接口树的根,定义了集合操作的通用API.作用是为了方便程序员处理一组常规元素. 注:Object[] toArray(Object[] a):返回一个内含集合所有元素的数组.实现了集合类和数组类之间的相互转换,一般如果指定数组长度不够的话,系统会自动生成一个新的数组用来存放这些元素,如果长度过大的话,多余的地方会被赋值为NULL.所以,如果不是操作者的话,不应该对这个函数返回的数组进行  .lenght运算,因为那样会得到不准确的数据. 另,这个函数返回的数组的运行

Java—集合框架 Collections.sort()、Comparable接口和Comparator接口

Collentions工具类--java.util.Collections Collentions是Java集合框架中,用来操作集合对象的工具类,也是Java集合框架的成员,与List.Map和Set是并列的. Collections.sort() 排序方法,实现对List对象中的元素进行排序. package com.test.collection; import java.util.ArrayList; import java.util.Collections; import java.ut