使用mapreduce计算环比的实例

  最近做了一个小的mapreduce程序,主要目的是计算环比值最高的前5名,本来打算使用spark计算,可是本人目前spark还只是简单看了下,因此就先改用mapreduce计算了,今天和大家分享下这个例子,也算是对自己写的程序的总结了。

  首先解释下环比,例如我们要算本周的环比,那么计算方式就是本周的数据和上周数字的差值除以上周数值就是环比了,如果是月的环比就是本月和上月数据的差值除以上月数字就是本月环比了。不过本mapreduce实例不会直接算出比值,只是简单求出不同时间段数值的差值,最终环比结果由业务系统进行运算了。

  下面看看本人构造的测试数据了,测试数据分成两个文件,文件一的内容如下所示:

guanggu,1;90
hongshan,1;80
xinzhou,1;70
wuchang,1;95
hankou,1;85
hanyang,1;75

  第二个文件的测试数据如下:

guanggu,2;66
hongshan,2;68
xinzhou,2;88
wuchang,2;59
hankou,2;56
hanyang,2;38

  这里每行第一列的字段就是key了,key和value使用逗号分割,1;90是value值,value值包含两个内容,1为时间段标记,90就是数值,大家可以看到同一个key会有两个不同的时间段(使用1和2来标记)。

  Mapreduce的运算逻辑如下:首先第一步我们要求出环比数值,第二步就是排序了,做这个算法我曾考虑许久就是想把求环比值和排序两个过程合并,但是最后发现很难做到,只好将整个运算过程拆分成两个不同mapreduce,第一个mapreduce计算环比,第二个进行排序,二者是迭代关系。这里解释下分成两个mapreduce原因吧,主要原因就是最原始数据很难把两个不同时间段的数据按照key合并在一起变成一行数据,因此mapreduce计算时候必须有一个过程就是执行相同key合并操作,因此不得不分成两个步骤完成计算。

  接下来就是具体代码了,首先是第一个mapreduce,用来计算环比值的mapreduce了,它的map实现代码如下:

import java.io.IOException;

import org.apache.hadoop.io.Text;
// 使用输入为object,text,输出为Text,Text的数据结构,Object其实是行号,在本计算里意义不大,Text就是每行的内容
public class MrByAreaHBMap extends org.apache.hadoop.mapreduce.Mapper<Object, Text, Text, Text>{

    private static String firstSeparator = ",";//每行的key和value值使用逗号分割

    @Override
    protected void map(Object key, Text value, Context context)
            throws IOException, InterruptedException {
        /* 本map的逻辑非常简单,就是从行里拆分key和value,对于有些初学者可能疑惑,我们到底如何让相同的key合并在一起了?这个就要看reduce计算了*/
        Text areaKey = new Text();// reduce输入是Text类型
        Text areaVal = new Text();// reduce输入是Text类型
        String line = value.toString();
        if (line != null && !line.equals("")){
            String[] arr = line.split(firstSeparator);

            areaKey.set(arr[0]);
            areaVal.set(arr[1]);

            context.write(areaKey, areaVal);
        }

    }

}

  下面是reduce代码了,具体如下:

import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class MrByAreaHBReduce extends Reducer<Text, Text, Text, Text>{

    private static String firstSeparator = ";";
    private static String preFlag = "1";
    private static String nextFlag = "2";

    /*reduce的输入也是key,value的形式,不过这个输入是会将map里相同的key的值进行合并,合并形式就是一个数组形式,不过reduce方法里是通过迭代器进行数值处理*/
    @Override
    protected void reduce(Text key, Iterable<Text> values, Context context)
            throws IOException, InterruptedException {
        int num1 = 0,num2 = 0,hbNum = 0;
        for(Text value : values){
            String inVal = value.toString();
            String[] arr = inVal.split(firstSeparator);
            // 下面的逻辑是通过不同时间段标记获取不同时间段数值
            if (arr[0].equals(preFlag)){
                num1 = Integer.valueOf(arr[1]);
            }
            if (arr[0].equals(nextFlag)){
                num2 = Integer.valueOf(arr[1]);
            }
        }
        hbNum = num1 - num2;// 这里计算环比
        Text valueText = new Text();
        valueText.set(hbNum + "");
        Text retKey = new Text();
        /* 对reduce的key进行了修改,将原来key和各个时间段的数值合并在一起,这样读取计算结果时候就可以读取到原始计算数据了,这是key,value计算模式可以简陋的无奈之举*/
        retKey.set(key.toString() + firstSeparator + num1 + firstSeparator + num2);
        context.write(valueText,retKey);
    }

}

  求环比的mapredue代码介绍结束了,下面是排序的算法,排序算法更加简单,在计算环比的mapreduce输出里我将环比值和原始key进行了互换,然后输出到结果文件里,这个结果文件就是第二个mapreduce的输入了,下面我们就要对这个新key进行排序,mapredcue计算模型里从map到reduce有默认的排序机制,如果map输出的key是字符类型那么排序规则就是按照字典进行排序,如果key是数字,那么就会按照数字由小到大进行排序,下面就是排序mapreduce的具体算法,map代码如下:

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class MrByAreaSortMap extends
        Mapper<LongWritable, Text, IntWritable, Text> {
    /* 我们需要的排序是按照key的数值排序,不过这个排序是map的输出才做的,因此代码里输出的key使用了IntWritable类型
       其实排序的map逻辑非常简单就是保证map的输出key是数字类型即可
    */
    @Override
    protected void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {
        String line = value.toString();
        /*reduce的输出结果文件格式是按照空格分隔的,不过也搞不清有几个空格,或者是tab分割了,这里使用正则表达式s+就不怕多少空格和tab了*/
        String[] arr = line.split("\\s+");
        IntWritable outputKey = new IntWritable(Integer.valueOf(arr[0]));
        Text outputValue = new Text();
        outputValue.set(arr[1]);
        context.write(outputKey, outputValue);
    }
}

  reduce代码如下:

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

/* reduce代码很让人吃惊吧,就是把map结果原样输出即可 */
public class MrByAreaSortReduce extends
        Reducer<IntWritable, Text, IntWritable, Text> {

    @Override
    protected void reduce(IntWritable key, Iterable<Text> values,
            Context context) throws IOException, InterruptedException {
        for (Text text : values){
            context.write(key, text);
        }
    }

}

  代码里的注释对代码逻辑进行了详细的解释,这里就不累述了。

  下面就是调用两个mapreduce的main函数了,也就是我们该如何执行mapreduce的方式,这个main函数还是非常有特点的,特点一就是两个mapreduce有迭代关系,具体就是第一个mapredcue执行完毕后第二个mapredcue才能执行,或者说第一个mapredcue的输出就是第二个mapredcue的输入,特点二就是排序计算里我们使用了map到reduce过程及shuffle过程里的默认排序机制,那么该机制运用可不是像mapreduce代码那么简单了,其实背后需要我们更加深入理解mapreduce的原理,这里我们直接看代码了,代码如下:

mport java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob;
import org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class MrByAreaJob {
    public static void main(String[] args) throws IOException {
        // 一个mapreduce就是一个job 一个job需要一个单独的Configuration,我开始让两个job公用Configuration,最后mr报错
        Configuration conf01 = new Configuration();
        ControlledJob conJobHB = new ControlledJob(conf01);

        // 下面代码很多文章里都会提到这里就不多说了
        Job jobHB = new Job(conf01,"hb");
        jobHB.setJarByClass(MrByAreaJob.class);
        jobHB.setMapperClass(MrByAreaHBMap.class);
        jobHB.setReducerClass(MrByAreaHBReduce.class);
        jobHB.setMapOutputKeyClass(Text.class);
        jobHB.setMapOutputValueClass(Text.class);
        jobHB.setOutputKeyClass(Text.class);
        jobHB.setOutputValueClass(Text.class);

        conJobHB.setJob(jobHB);

        FileInputFormat.addInputPath(jobHB, new Path(args[0]));
        FileOutputFormat.setOutputPath(jobHB, new Path(args[1]));

        Configuration conf02 = new Configuration();
        Job jobSort = new Job(conf02,"sort");
        jobSort.setJarByClass(MrByAreaJob.class);
        jobSort.setMapperClass(MrByAreaSortMap.class);
        jobSort.setReducerClass(MrByAreaSortReduce.class);
        // Partitioner是shuffle的一个步骤,一个Partitioner对应一个reduce
        // 假如这个mapredue有多个reduce,我们如何保证排序的全局一致性,因此这里需要进行处理
        jobSort.setPartitionerClass(PartitionByArea.class);
        // map对数值排序默认是由小到大,但是需求是由大到小,因此需要我们改变这种排序
        jobSort.setSortComparatorClass(IntKeyComparator.class);
        jobSort.setMapOutputKeyClass(IntWritable.class);
        jobSort.setMapOutputValueClass(Text.class);

        jobSort.setOutputKeyClass(IntWritable.class);
        jobSort.setOutputValueClass(Text.class);

        ControlledJob conJobSort = new ControlledJob(conf02);
        conJobSort.setJob(jobSort);

        // 这里添加job的依赖关系
        conJobSort.addDependingJob(conJobHB);

        // 可以看到第一个mapreduce的输出就是第二个的输入
        FileInputFormat.addInputPath(jobSort, new Path(args[1]));
        FileOutputFormat.setOutputPath(jobSort, new Path(args[2]));

        // 主控制job
        JobControl mainJobControl = new JobControl("mainHBSort");

        mainJobControl.addJob(conJobHB);
        mainJobControl.addJob(conJobSort);

        Thread t = new Thread(mainJobControl);
        t.start();

        while(true){
            if (mainJobControl.allFinished()){
                System.out.println(mainJobControl.getSuccessfulJobList());
                mainJobControl.stop();
                break;
            }
        }
    }
}

  这里有两个类还没有介绍,一个是IntKeyComparator,这是为了保证排序的mapreduce结果是按数字由大到小排序,代码如下:

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

public class IntKeyComparator extends WritableComparator {

    protected IntKeyComparator() {
        super(IntWritable.class,true);
    }

    @Override
    public int compare(WritableComparable a, WritableComparable b) {
        return -super.compare(a, b);
    }

}

  另一个类就是PartitionByArea,这个是保证排序不会因为reduce设置的个数而不能保证排序的全局一致性,代码具体如下:

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

public class PartitionByArea<IntWritable, Text> extends Partitioner<IntWritable, Text> {

    @Override
    public int getPartition(IntWritable key, Text value, int numReduceTasks) {
        int maxValue = 50;
        int keySection = 0;

        // numReduceTasks就是默认的reduce任务个数
        if (numReduceTasks > 1 && key.hashCode() < maxValue){
            int sectionValue = maxValue / (numReduceTasks - 1);
            int count = 0;
            while((key.hashCode() - sectionValue * count) > sectionValue){
                count++;
            }
            keySection = numReduceTasks - 1 - count;
        }

        return keySection;
    }

}

  这里特别要讲解的是PartitionByArea,这个原理我花了好一段时间才理解过来,partition是map输出为reduce对应做的分区,一般一个partition对应一个reduce,如果我们将reduce任务槽设置为一个那么就不用更改Partition类,但是实际生产情况下reduce往往会配置多个,这个时候保证数据的整体排序就十分重要了,那么我们如何保证其数据的整体有序了,这个时候我们要找到输入数据的最大值,然后让最大值除以partition的数量的商值作为分割数据的边界,这样等分就可以保证数据的整体排序的有效性了。

  现在所有的代码都介绍完毕了,下面就是我们该如何让这个代码运行了,我在写本代码时候使用的是ide是eclipse,不过我没有使用mapreduce插件,而是直接放在服务器上运行,下面我来描述下运行该mr的方式,具体如下:

  首先我在装有hadoop服务的服务器上使用root用户创建一个属于我自己的文件夹,这里文件夹的名字叫做xiajun,我通过ftp将源文件传递到xiajun目录下的javafile文件夹,执行如下命令:

mkdir /xiajun/javafile
javac –classpath /home/hadoop/hadoop/hadoop-core-0.20.2-cdh3u4.jar –d /xiajun/javaclass /xiajun/ javafile/*.java

  以上命令是编译源文件,将javafile文件夹的java代码编译到javaclass目录下。

  

Jar –cvf /xiajun/mymr.jar –C /xiajun/javaclass/ .

  这里将javaclass目录下class文件打成jar包,放在xiajun目录下。

  接下来我们使用hadoop用户登录:

su – hadoop

  之所以使用root用户编译,打jar包原因是我的hadoop用户没有权限上传文件不得已而为之了。

  我们首先将测试数据上传到HDFS上,接下来执行如下命令:

cd /hadoop/bin

  切换目录到bin目录下,然后执行:

hadoop jar mymr.jar cn.com.TestMain  输入目录  输出目录

  这里输入可以是具体文件也可以是目录,输出目录在HDFS上要不存在,如果存在hadoop会无法确认任务是否已经执行完毕,就会强制终止任务。

  两个mapreduce迭代执行日志非常让人失望,因此如果我们发现任务无法正常执行,我现在都是一个个mapredcue执行查看错误日志。

  最后我们看看应用服务应该如何调用这个mapreduce程序,这里我使用远程调用shell 的方式,代码如下:

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;

import ch.ethz.ssh2.Connection;
import ch.ethz.ssh2.Session;

public class TestMain {

    /**
     * @param args
     * @throws IOException
     * @throws ClassNotFoundException
     * @throws InterruptedException
     */
    public static void main(String[] args) {
        String hostname = "192.168.1.200";
        String username = "hadoop";
        String pwd = "hadoop";

        Connection conn = new Connection(hostname);
        Session sess = null;
        long begin = System.currentTimeMillis();
        try {
            conn.connect();
            boolean isAuthenticated = conn.authenticateWithPassword(username, pwd);
            sess = conn.openSession();
            sess.execCommand("cd hadoop/bin && hadoop jar /xiajun/mymr.jar com.test.mr.MrByAreaJob /xiajun/areaHBinput /xiajun/areaHBoutput58 /xiajun/areaHBoutput68");

            InputStream stdout = sess.getStdout();
            BufferedReader br = new BufferedReader(new InputStreamReader(stdout));
            StringBuilder sb = new StringBuilder();

            while(true){
                String line = br.readLine();
                if (line == null) break;
                sb.append(line);
            }

            System.out.println(sb.toString());

            long end = System.currentTimeMillis();
            System.out.println("耗时:" + (begin - end)/1000 + "秒");
        } catch (IOException e) {
            e.printStackTrace();
        }finally{
            sess.close();
            conn.close();
        }
    }

}

  好了,本文就此结束了。

时间: 2024-09-30 06:43:49

使用mapreduce计算环比的实例的相关文章

MapReduce计算模型二

之前写过关于Hadoop方面的MapReduce框架的文章MapReduce框架Hadoop应用(一) 介绍了MapReduce的模型和Hadoop下的MapReduce框架,此文章将进一步介绍mapreduce计算模型能用于解决什么问题及有什么巧妙优化. MapReduce到底解决什么问题? MapReduce准确的说,它不是一个产品,而是一种解决问题的思路,能够用分治策略来解决问题.例如:网页抓取.日志处理.索引倒排.查询请求汇总等等问题.通过分治法,将一个大规模的问题,分解成多个小规模的问

MapReduce计算框架

MapReduce计算框架 一.MapReduce实现原理 图展示了MapReduce实现中的全部流程,处理步骤如下: 1.用户程序中的MapReduce函数库首先把输入文件分成M块(每块大小默认64M),在集群上执行处理程序,见序号1 2.主控程序master分配Map任务和Reduce任务给工作执行机器worker.见序号2 3.一个分配了Map任务的worker读取并处理输入数据块.从数据片段中解析出key/value键值对,然后把其传递给Map函数,由Map函数生成并输出中间key/va

hadoop之魂--mapreduce计算框架,让收集的数据产生价值 (第4篇)

  通过前面的学习,大家已经了解了HDFS文件系统.有了数据,下一步就要分析计算这些数据,产生价值.接下来我们介绍Mapreduce计算框架,学习数据是怎样被利用的. Mapreduce计算框架 如果将Hadoop比做一头大象,那么MapReduce就是那头大象的电脑.MapReduce是Hadoop核心编程模型.在Hadoop中,数据处理核心就是MapReduce程序设计模型. 本章内容: 1) MapReduce编程模型 2) MapReduce执行流程 3) MapReduce数据本地化

从 WordCount 到 MapReduce 计算模型

概述 虽然现在都在说大内存时代,不过内存的发展怎么也跟不上数据的步伐吧.所以,我们就要想办法减小数据量.这里说的减小可不是真的减小数据量,而是让数据分散开来.分开存储.分开计算.这就是 MapReduce 分布式的核心. 版权说明 著作权归作者所有. 商业转载请联系作者获得授权,非商业转载请注明出处. 本文作者:Coding-Naga 发表日期: 2016年5月10日 本文链接:http://blog.csdn.net/lemon_tree12138/article/details/513677

Hadoop MapReduce计算框架

1.MapReduce理论 1.1.MapReduce是什么? MapReduce用于处理海量数据的分布式计算框架,是Hadoop生态中的核心之一(MapReduce用于计算海量数据,HDFS用于存储海量数据):MapReduce是谷歌公司在研究如何处理海量数据所提出的一种面向大规模数据处理的并行计算模型和方法. 1.2.MapReduce概述 MapReduce是一个计算框架,用于对大数据进行处理,它的主要思想就是"分而治之":整个MapReduce计算过程可以分为Map(映射)阶段

MapReduce计算框架高级特性程序运行并发度

2019/2/19 星期二 MapReduce计算框架高级特性程序运行并发度 所谓的并发度,就是在MapReduce执行程序的过程中有多少个map task进程和reduce task进程,来一起完成程序的处理. MapReduce就是把业务处理逻辑变成分布式来处理. reduce task 数量的决定机制 //全局的聚合操作 由业务场景决定1.业务逻辑需要2.数据量大小设置方法:job.setNumReduceTasks(5) //reduce task的数量不能够任意的指定,比如:我们在一大

如何通过Java程序提交yarn的mapreduce计算任务

由于项目需求,需要通过Java程序提交Yarn的MapReduce的计算任务.与一般的通过Jar包提交MapReduce任务不同,通过程序提交MapReduce任务需要有点小变动,详见以下代码. 以下为MapReduce主程序,有几点需要提一下: 1.在程序中,我将文件读入格式设定为WholeFileInputFormat,即不对文件进行切分. 2.为了控制reduce的处理过程,map的输出键的格式为组合键格式.与常规的<key,value>不同,这里变为了<TextPair,Valu

Spark入门实战系列--9.Spark图计算GraphX介绍及实例

[注]该系列文章以及使用到安装包/测试数据 可以在<倾情大奉送--Spark入门实战系列>获取 1.GraphX介绍 1.1 GraphX应用背景 Spark GraphX是一个分布式图处理框架,它是基于Spark平台提供对图计算和图挖掘简洁易用的而丰富的接口,极大的方便了对分布式图处理的需求. 众所周知·,社交网络中人与人之间有很多关系链,例如Twitter.Facebook.微博和微信等,这些都是大数据产生的地方都需要图计算,现在的图处理基本都是分布式的图处理,而并非单机处理.Spark

MapReduce 计算模型

前言 本文讲解Hadoop中的编程及计算模型MapReduce,并将给出在MapReduce模型下编程的基本套路. 模型架构 在Hadoop中,用于执行计算任务(MapReduce任务)的机器有两个角色:一个是JobTracker,一个是TaskTracker,前者用于管理和调度工作,后者用于执行工作. 一般来说,一个Hadoop集群由一个JobTracker和N个TaskTracker构成. 执行流程 每次计算任务都可以分为两个阶段,Map阶段和Reduce阶段. 其中,Map阶段接收一组键值