自己总结一下mapreduce

mapreduce分为map和reduce两个工作,map负责处理初始数据,处理后产生的新数据再汇聚给reduce处理。

map和reduce类的重写都是一个规则。都是类名<*,*,*,*>(尖括号里面,Java叫做泛型)四个参数,map的前两个参数是从文件处传输过来待处理的key和value值,然后后两个参数是要提交给reduce处理的key和value值。例如map的尖括号里面是<*,*,Text,Intwritable>,那么reduce就必须是<Text,Intwritable,*,*>。这是根据一个带有学生名字和分数的文件,自己写的输出最高分的mapreduce

package org.apache.hadoop.examples;
import java.io.IOException;
import java.io.IOException;
import java.net.URI;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.examples.WordCount.IntSumReducer;
import org.apache.hadoop.fs.FileSystem;
//import org.apache.hadoop.examples.Wordcount.IntSumReducer;
//import org.apache.hadoop.examples.Wordcount.TokenizerMapper;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

import java.util.Iterator;

public class statics {

public static class Map extends

Mapper<Object, Text, Text, IntWritable> {
public void map(Object key, Text value, Context context)

throws IOException, InterruptedException {

int max=0;
int min=65535;
String line = value.toString();

String s[]=line.split(" ");
for (int i=0;i<s.length;i++)
{
if (Integer.parseInt(s[i])>max)
max=Integer.parseInt(s[i]);
if (Integer.parseInt(s[i])>min)
min=Integer.parseInt(s[i]);
}

Text text = new Text("1");
// Text text1 = new Text("2");
context.write(text, new IntWritable(max));
// context.write(text1, new IntWritable(min));

}

}

public static class Reduce extends

Reducer<Text, IntWritable, Text, IntWritable> {

public void reduce(Text key, Iterable<IntWritable> values,

Context context) throws IOException, InterruptedException {
int max=0;
int min=10000;

System.out.println(key.toString()+"....................");

Iterator<IntWritable> iterator = values.iterator(); //循环遍历成绩

while (iterator.hasNext()) {
int now = iterator.next().get();
if (now>max)
max =now;

}

context.write(new Text("max is "), new IntWritable(max));

}

}

public static void main(String[] args) throws Exception {

Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);
conf.set("mapred.job.tracker", "127.0.0.1:9000");
// 指定带运行参数的目录为输入输出目录
String[] otherArgs = new GenericOptionsParser(conf, args)
.getRemainingArgs();
if (otherArgs.length != 2) { // 判断路径参数是否为3个

System.err.println("Usage: Data Deduplication <in> <out><out>");

System.exit(2);

}

Job job = Job.getInstance(conf,"average");
job.setJarByClass(statics.class);

job.setMapperClass(Map.class);
job.setMapOutputKeyClass(Text.class);

job.setMapOutputValueClass(IntWritable.class);

job.setReducerClass(Reduce.class);

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
Path outpath = new Path(otherArgs[1]);
if (fs.exists(outpath))
{
fs.delete(outpath,true);
}
FileOutputFormat.setOutputPath(job, outpath);

if (job.waitForCompletion(true))
{

System.exit(0);
}

}

}

时间: 2024-10-10 00:28:56

自己总结一下mapreduce的相关文章

MapReduce实现手机上网流量分析

一.问题背景 现在的移动刚一通话就可以在网站上看自己的通话记录,以前是本月只能看上一个月.不过流量仍然是只能看上一月的. 目的就是找到用户在一段时间内的上网流量. 本文并没有对时间分组. 二.数据集分析 可以看出实际数据集并不是每个字段都有值,但是还好,完整地以tab隔开了,数据格式还是不错的,我们需要的上行下行数据都有,没有缺失值.其实这个需要在程序中处理,如果不在的话 该怎么办. 1363157985066 13726230503 00-FD-07-A4-72-B8:CMCC 120.196

mapreduce和spark的原理及区别

Mapreduce和spark是数据处理层两大核心,了解和学习大数据必须要重点掌握的环节,根据自己的经验和大家做一下知识的分享. 首先了解一下Mapreduce,它最本质的两个过程就是Map和Reduce,Map的应用在于我们需要数据一对一的元素的映射转换,比如说进行截取,进行过滤,或者任何的转换操作,这些一对一的元素转换就称作是Map:Reduce主要就是元素的聚合,就是多个元素对一个元素的聚合,比如求Sum等,这就是Reduce. Mapreduce是Hadoop1.0的核心,Spark出现

基于 Eclipse 的 MapReduce 开发环境搭建

文 / vincentzh 原文连接:http://www.cnblogs.com/vincentzh/p/6055850.html 上周末本来要写这篇的,结果没想到上周末自己环境都没有搭起来,运行起来有问题的呢,拖到周一才将问题解决掉.刚好这周也将之前看的内容复习了下,边复习边码代码理解,印象倒是很深刻,对看过的东西理解也更深入了. 目录 1.概述 2.环境准备 3.插件配置 4.配置文件系统连接 5.测试连接 6.代码编写与执行 7.问题梳理 7.1 console 无日志输出问题 7.2

mongodb aggregate and mapReduce

Aggregate MongoDB中聚合(aggregate)主要用于处理数据(诸如统计平均值,求和等),并返回计算后的数据结果.有点类似sql语句中的 count(*) 语法如下: db.collection.aggregate() db.collection.aggregate(pipeline,options) db.runCommand({ aggregate: "<collection>", pipeline: [ <stage>, <...&g

MapReduce源码分析之Task中关于对应TaskAttempt存储Map方案的一些思考

我们知道,MapReduce有三层调度模型,即Job-->Task-->TaskAttempt,并且: 1.通常一个Job存在多个Task,这些Task总共有Map Task和Redcue Task两种大的类型(为简化描述,Map-Only作业.JobSetup Task等复杂的情况这里不做考虑): 2.每个Task可以尝试运行1-n此,而且通常很多情况下都是1次,只有当开启了推测执行原理且存在拖后腿Task,或者Task之前执行失败时,Task才执行多次. 而TaskImpl中存在一个成员变

初步掌握MapReduce的架构及原理

目录 1.MapReduce定义 2.MapReduce来源 3.MapReduce特点 4.MapReduce实例 5.MapReduce编程模型 6.MapReduce 内部逻辑 7.MapReduce架构 8.MapReduce框架的容错性 9.MapReduce资源组织方式 1.MapReduce 定义 Hadoop 中的 MapReduce是一个使用简单的软件框架,基于它写出来的应用程序能够运行在由上千个商用机器组成的大型集群上,并以一种可靠容错式并行处理TB级别的数据集 2.MapR

MapReduce/Hbase进阶提升(原理剖析、实战演练)

什么是MapReduce? MapReduce是一种编程模型,用于大规模数据集(大于1TB)的并行运算.概念"Map(映射)"和"Reduce(归约)",和他们的主要思想,都是从函数式编程语言里借来的,还有从矢量编程语言里借来的特性.他极大地方便了编程人员在不会分布式并行编程的情况下,将自己的程序运行在分布式系统上. 当前的软件实现是指定一个Map(映射)函数,用来把一组键值对映射成一组新的键值对,指定并发的Reduce(归约)函数,用来保证所有映射的键值对中的每一

关于mongodb的mapReduce

由于nodejs本身的限制,在程序中使用js进行大批量计算效率不高.而V8引擎自身对内存大小的限制(64位系统下1.4G),同样限制了数据规模. 因此,相对于从mongodb中抽出数据进行计算,在mongodb中利用聚合函数或者其他方法完成计算,避开nodejs自身限制的方案在可靠性和扩展性上都相对较为令人满意. mongodb支持类似SQL中的聚合函数,虽然语法不通,不过基本原理类似. mongodb自带的接口中,aggregate被用来实现聚合查询: rec = db.LIBRARY.agg

mapreduce工作原理

转自:http://www.cnblogs.com/z1987/p/5055565.html MapReduce模型主要包含Mapper类和Reducer类两个抽象类.Mapper类主要负责对数据的分析处理,最终转化为key-value数据对:Reducer类主要获取key-value数据对,然后处理统计,得到结果.MapReduce实现了存储的均衡,但没有实现计算的均衡. 一. MapReduce框架组成 MapReduce主要包括JobClient.JobTracker.TaskTracke

Hadoop hdfs&mapreduce核心概念

1.HDFS(分布式文件系统体系) 1.1.NameNode:(名称节点) Hdfs的守护程序 记录文件是如何分割成数据块的,以及这些数据块被存储到了哪些节点上 对内存和I/O进行集中管理 是个单点,发生故障将使集群崩溃 1.2.SecondaryNamenode(辅助名称节点):发生故障进行人工的设置才能实现集群崩溃的问题 监控HDFS状态的辅助后台程序 每个集群都有一个 与NameNode进行通讯,定期保存HDFS元数据快照 与NameNode故障可以作为备用NameNode使用 1.3.D