map reduce程序示例

package test2;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

/**
 样例数据中包含了年份和温度,提出年份里温度最大的
 (0, 0067011990999991950051507+0000+),
 (33, 0043011990999991950051512+0022+),
 (66, 0043011990999991950051518-0011+),
 (99, 0043012650999991949032412+0111+),
 (132, 0043012650999991949032418+0078+),
 (165, 0067011990999991937051507+0001+),
 (198, 0043011990999991937051512-0002+),
 (231, 0043011990999991945051518+0001+),
 (264, 0043012650999991945032412+0002+),
 (297, 0043012650999991945032418+0078+),
 * */
public class mytest {

static String INPUT_PATH="input/t1_num.txt";   //待统计的文件路径
static String OUTPUT_PATH="output/t1_num";    //统计结果存放的路径

static class MyMapper extends Mapper <Object,Object,Text,IntWritable> {     //定义继承mapper类
    protected void map(Object key, Object value, Context context) throws IOException, InterruptedException{    //定义map方法

    String[] arr=value.toString().split("\\),");      //文件中的单词是以“),”分割的,并将每一行定义为一个数组
    for(int i=0;i<arr.length;i++){      //遍历循环每一行,统计单词出现的数量
        String line = arr[i].toString();
        String year = line.substring(line.length()-16, line.length()-12);
        String airTemperature = line.substring(line.length()-6, line.length()-1);
        context.write(new Text(year),new IntWritable(Integer.valueOf(airTemperature)));
    }
        /**
         map过程中,通过对字符串的解析,得到年-温度的key-value对作为输出
         (1950, 0)
         (1950, 22)
         (1950, -11)
         (1949, 111)
         (1949, 78)
         (1937, 1)
         (1937, -2)
         (1945, 1)
         (1945, 2)
         (1945, 78)
         */
 }
}

static class MyReduce extends Reducer<Text,IntWritable,Text,IntWritable>{     //定义继承reducer类
    protected void reduce(Text key,Iterable<IntWritable> values,Context context) throws IOException,InterruptedException{      //定义reduce方法
         int max = 0;
         for(IntWritable c:values){     //统计同一个单词的数量
             if(c.get()>max){
                 max = c.get();//获取value值
             }
         }
        IntWritable outValue=new IntWritable(max);//挨个输出
        context.write(key,outValue);
     }
    /**
     在reduce过程,将map过程中的输出,按照相同的key(年份)将value放到同一个列表中作为reduce的输入
     (1950, [0, 22, –11])
     (1949, [111, 78])
     (1937, [1, -2])
     (1945, [1, 2, 78])

     在reduce过程中,在列表中选择出最大的温度,将年-max温度的key-value作为输出:
     (1950, 22)
     (1949, 111)
     (1937, 1)
     (1945, 78)
     */

}

 public static void main(String[] args) throws Exception{    //main函数
     System.setProperty("hadoop.home.dir", "D:\\hadoop-2.7.6");//这一行一定要
     Path outputpath=new Path(OUTPUT_PATH);    //输出路径
     Configuration conf=new Configuration();
     Job job=Job.getInstance(conf);     //定义一个job,启动任务
     FileInputFormat.setInputPaths(job, INPUT_PATH);
     FileOutputFormat.setOutputPath(job,outputpath);

     job.setMapperClass(MyMapper.class);
     job.setReducerClass(MyReduce.class);
     job.setOutputKeyClass(Text.class);
     job.setOutputValueClass(IntWritable.class);
     job.waitForCompletion(true);
    }

}

原文地址:https://www.cnblogs.com/xiaoliu66007/p/9912198.html

时间: 2024-10-07 17:13:32

map reduce程序示例的相关文章

如何给Map/Reduce程序传递参数?

前言 以前我们启动一个Map/Reduce,经常是利用hadoop jar ./xxx.jar yyy.KK input output的方式在SHELL脚本或者命令行直接提交作业.但是最近涉及到的一个项目,需要根据配置动态的启动MR作业,也就是涉及到向MAP,REDUCE处理类传递参数的问题. 传递参数的方式 最常见的方式: Configuration conf = new Configuration(); conf.set("key","value"); 然后在M

详述执行map reduce 程序的步骤(本地执行MR、服务器上执行MR)

MR程序的执行环境有两种:本地测试环境.服务器环境. 1.本地环境执行MR程序的步骤: (1)在windows下配置hadoop的环境变量 (2)拷贝debug工具(winutils)到HADOOP_HOME/bin (3)从源码中拷贝org.apache.hadoop.io.nativeio.NativeIO.java到我们的mr的src目录下,修改NativeIO.java.(大家可去http://download.csdn.net/detail/u013226462/9516657下载.)

Hadoop 使用Combiner提高Map/Reduce程序效率

众所周知,Hadoop框架使用Mapper将数据处理成一个<key,value>键值对,再网络节点间对其进行整理(shuffle),然后使用Reducer处理数据并进行最终输出. 在上述过程中,我们看到至少两个性能瓶颈: 如果我们有10亿个数据,Mapper会生成10亿个键值对在网络间进行传输,但如果我们只是对数据求最大值,那么很明显的Mapper只需要输出它所知道的最大值即可.这样做不仅可以减轻网络压力,同样也可以大幅度提高程序效率. 使用专利中的国家一项来阐述数据倾斜这个定义.这样的数据远

用python写map reduce程序

利用hadoop streaming框架,帮助我们在map和reduce之间传递数据,通过stdin和stdout. http://wenku.baidu.com/link?url=R1vj6NWV5nv_aVI8P0G5SNzxOyaDsffGeOJrRT6iA9iYHry3w60hJ9CVAtS1iRkh7IOYEuGozIqAZubfXybzf1URxTEY7a2gY9L3LTZQ0Wq

Hadoop实战:使用Combiner提高Map/Reduce程序效率

好不easy算法搞定了.小数据測试也得到了非常好的结果,但是扔到进群上.挂上大数据就挂了.无休止的reduce不会结束了. .. .. .... .. ... .. ==================================================================== 这才想起还有个combiner! .!!!.!! !.!!.!! !! ! !! ! 我们知道.MapReduce是分为Mapper任务和Reducer任务.Mapper任务的输出,通过网络传输到

Map/Reduce 工作机制分析 --- 作业的执行流程

前言 从运行我们的 Map/Reduce 程序,到结果的提交,Hadoop 平台其实做了很多事情. 那么 Hadoop 平台到底做了什么事情,让 Map/Reduce 程序可以如此 "轻易" 地实现分布式运行? Map/Reduce 任务执行总流程 经过之前的学习,我们已经知道一个 Map/Reduce 作业的总流程为: 代码编写  -->  作业配置  -->  作业提交  -->  Map任务的分配和执行  -->  处理中间结果(Shuffle)  --&

Hadoop学习:Map/Reduce初探与小Demo实现

一.    概念知识介绍 Hadoop MapReduce是一个用于处理海量数据的分布式计算框架.这个框架攻克了诸如数据分布式存储.作业调度.容错.机器间通信等复杂问题,能够使没有并行 处理或者分布式计算经验的project师,也能非常轻松地写出结构简单的.应用于成百上千台机器处理大规模数据的并行分布式程序. Hadoop MapReduce基于"分而治之"的思想,将计算任务抽象成map和reduce两个计算过程,能够简单理解为"分散运算-归并结果"的过程.一个 M

Map/Reduce个人实战--生成数据测试集

背景: 在大数据领域, 由于各方面的原因. 有时需要自己来生成测试数据集, 由于测试数据集较大, 因此采用Map/Reduce的方式去生成. 在这小编(mumuxinfei)结合自身的一些实战经历, 具体阐述下生成测试数据集的Map/Reduce程序该如何写? 场景构造: 假设某移动电信行业的某具体业务, 其记录了通话信息(包括拨打方/接听方/通话时间点/基站 等要素). 产商是不可能提供真实的用户数据用于测试的, 但提供了基本的数据规格. 具体针对该业务场景, 我们简单规划如下: num1 v

一步一步跟我学习hadoop(5)----hadoop Map/Reduce教程(2)

Map/Reduce用户界面 本节为用户採用框架要面对的各个环节提供了具体的描写叙述,旨在与帮助用户对实现.配置和调优进行具体的设置.然而,开发时候还是要相应着API进行相关操作. 首先我们须要了解Mapper和Reducer接口,应用通常须要提供map和reduce方法以实现他们. 接着我们须要对JobConf, JobClient,Partitioner,OutputCollector,Reporter,InputFormat,OutputFormat,OutputCommitter等进行讨