一个简单的MapReduce示例(多个MapReduce任务处理)

一、需求

  有一个列表,只有两列:id、pro,记录了id与pro的对应关系,但是在同一个id下,pro有可能是重复的。

  现在需要写一个程序,统计一下每个id下有多少个不重复的pro。

  为了写一个完整的示例,我使用了多job!

二、文件目录

|- OutCount    //单Job的,本次试验没有使用到,这里写出来供参考
|- OutCount2
|- OutCountMapper
|- OutCountMapper2
|- OutCountReduce
|- OutCountReduce2

三、样本数据(部分)

2,10000088379
9,10000088379
6,10000088379
1,10000088379
8,10000088379
0,10000088379
1,10000088379
4,10000091621
3,10000091621
2,10000091621
0,10000091621
6,10000091621
2,10000091621
0,10000091621
0,10000091621
9,10000091621
2,10000091621

四、Java代码

1、OutCountMapper.java

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/**
 * created by wangjunfu on 2017-05-25.
 * 4个泛型中,前两个是指定mapper输入数据的类型,KEYIN是输入的key的类型,VALUEIN是输入的value的类型
 * map 和 reduce 的数据输入输出都是以 key-value对的形式封装的
 * 默认情况下,Map框架传递给我们的mapper的输入数据中,key是要处理的文本中一行的起始偏移量(选用LongWritable),value是这一行的内容(VALUEIN选用Text)
 * 在wordcount中,经过mapper处理数据后,得到的是<单词,1>这样的结果,所以KEYOUT选用Text,VAULEOUT选用IntWritable
 */
public class OutCountMapper extends Mapper<LongWritable, Text, Text, Text> {
    // MapReduce框架每读一行数据就调用一次map方法
    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        // 数据格式:uid skuid
        String oneline = value.toString().replace(‘,‘, ‘_‘).trim();

        // 去重思路:Map的key具有数据去重的功能,以整个数据作为key发送出去, value为null
        context.write(new Text(oneline), new Text(""));

        /*
        // 这里需要说明一下,我们现在的样本是标准的,一行一个样本。
        // 有的情况下一行多个,那就需要进行分割。
        // 对这一行的文本按特定分隔符切分
        String[] words = oneline.split("\t");
        for (String word : words) {
            // 遍历这个单词数组,输出为key-value形式 key:单词 value : 1
            context.write(new Text(word), new IntWritable(1));
        }
        */
    }
}

2、OutCountReduce.java

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

/**
 * created by wangjunfu on 2017-05-25.
 * 经过mapper处理后的数据会被reducer拉取过来,所以reducer的KEYIN、VALUEIN和mapper的KEYOUT、VALUEOUT一致
 * 经过reducer处理后的数据格式为<单词,频数>,所以KEYOUT为Text,VALUEOUT为IntWritable
 */
public class OutCountReduce extends Reducer<Text, Text, Text, Text> {
    // 当mapper框架将相同的key的数据处理完成后,reducer框架会将mapper框架输出的数据<key,value>变成<key,values{}>。
    // 例如,在wordcount中会将mapper框架输出的所有<hello,1>变为<hello,{1,1,1...}>,即这里的<k2,v2s>,然后将<k2,v2s>作为reduce函数的输入
    // 这个将在下面reduce2 中得到体现
    public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
        context.write(key, new Text(""));
    }
}

3、OutCountMapper2.java

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/**
 * created by wangjunfu on 2017-05-27.
 * 将原始数据作为map输出的key设置为int类型。map会自动的根据key进行排序
 */
public class OutCountMapper2 extends Mapper<LongWritable, Text, Text, IntWritable> {
    private final static IntWritable one = new IntWritable(1);

    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        // 数据格式:uid_skuid
        String oneline = value.toString();

        // 将这条数据中的uid 发出去, value为计算one
        context.write(new Text(oneline.split("_")[0]), one);
    }
}

4、OutCountReduce2.java

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;
import java.util.Iterator;

/**
 * created by wangjunfu on 2017-05-27.
 * 按统计数排序:将values作为次序key,将map排序好的key作为value输出
 */
public class OutCountReduce2 extends Reducer<Text, IntWritable, Text, IntWritable> {
    public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        int sum = 0;

        // 迭代器,访问容器中的元素,为容器而生
        Iterator<IntWritable> itr = values.iterator();
        while (itr.hasNext()) {
            sum += itr.next().get();
        }

        /*
        // 这种遍历也可以
        // 遍历v2的list,进行累加求和
        for (IntWritable v2 : itr) {
            sum = v2.get();
        }
        */

        // 按统计数排序:将values作为次序key,将map排序好的key作为value输出
        //context.write(new IntWritable(sum), key);     //需要再起一个 map-reduce
        context.write(key, new IntWritable(sum));
    }
}

5、OutCount2.java

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob;
import org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/**
 * 需求:给定一个列表uid skuid,求出uid下不重复的skuid数据;然后再按统计大小排序。
 * 涉及到多job 处理。
 * created by wangjunfu on 2017-05-27.
 */
public class OutCount2 {
    public static void main(String[] args) throws Exception {
        JobConf conf = new JobConf(OutCount.class);

        //第一个job的配置
        Job job1 = new Job(conf, "Join1");
        job1.setJarByClass(OutCount.class);

        job1.setMapperClass(OutCountMapper.class);
        job1.setReducerClass(OutCountReduce.class);

        job1.setMapOutputKeyClass(Text.class);          //map阶段的输出的key
        job1.setMapOutputValueClass(Text.class); //map阶段的输出的value

        job1.setOutputKeyClass(Text.class);             //reduce阶段的输出的key
        job1.setOutputValueClass(Text.class);    //reduce阶段的输出的value

        //job-1 加入控制容器
        ControlledJob ctrljob1 = new ControlledJob(conf);
        ctrljob1.setJob(job1);

        //job-1 的输入输出文件路径
        FileInputFormat.addInputPath(job1, new Path(args[0]));
        FileOutputFormat.setOutputPath(job1, new Path(args[1]));

        //第二个job的配置
        Job job2 = new Job(conf, "Join2");
        job2.setJarByClass(OutCount.class);             // 设置job所在的类在哪个jar包

        job2.setMapperClass(OutCountMapper2.class);     // 指定job所用的mappe类
        job2.setReducerClass(OutCountReduce2.class);    // 指定job所用的reducer类

        // 指定mapper输出类型和reducer输出类型
        // 由于在wordcount中mapper和reducer的输出类型一致,
        // 所以使用setOutputKeyClass和setOutputValueClass方法可以同时设定mapper和reducer的输出类型
        // 如果mapper和reducer的输出类型不一致时,可以使用setMapOutputKeyClass和setMapOutputValueClass单独设置mapper的输出类型
        job2.setMapOutputKeyClass(Text.class);          //map阶段的输出的key
        job2.setMapOutputValueClass(IntWritable.class); //map阶段的输出的value

        job2.setOutputKeyClass(Text.class);             //reduce阶段的输出的key
        job2.setOutputValueClass(IntWritable.class);    //reduce阶段的输出的value

        //job-2 加入控制容器
        ControlledJob ctrljob2 = new ControlledJob(conf);
        ctrljob2.setJob(job2);

        //设置多个作业直接的依赖关系
        //job-2 的启动,依赖于job-1作业的完成
        ctrljob2.addDependingJob(ctrljob1);

        //输入路径是上一个作业的输出路径,因此这里填args[1],要和上面对应好
        FileInputFormat.addInputPath(job2, new Path(args[1]));

        //输出路径从新传入一个参数,这里需要注意,因为我们最后的输出文件一定要是没有出现过得
        //因此我们在这里new Path(args[2])因为args[2]在上面没有用过,只要和上面不同就可以了
        FileOutputFormat.setOutputPath(job2, new Path(args[2]));

        //主的控制容器,控制上面的总的两个子作业
        JobControl jobCtrl = new JobControl("myOutCount");

        //添加到总的JobControl里,进行控制
        jobCtrl.addJob(ctrljob1);
        jobCtrl.addJob(ctrljob2);

        //在线程启动,记住一定要有这个
        Thread t = new Thread(jobCtrl);
        t.start();

        while (true) {
            if (jobCtrl.allFinished()) {
                //如果作业成功完成,就打印成功作业的信息
                System.out.println(jobCtrl.getSuccessfulJobList());
                jobCtrl.stop();
                break;
            }
        }
    }
}

6、OutCount.java

单Job的,本次试验没有使用到,这里写出来供参考

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.GenericOptionsParser;

/**
 * 需求:给定一个列表uid skuid,求出uid下不重复的skuid数据;然后再按统计大小排序。
 * 涉及到多job 处理。
 * created by wangjunfu on 2017-05-25.
 */
public class OutCount {
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();       //指定作业执行规范
        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
        if (otherArgs.length != 2) {
            System.err.println("Usage:wordcount <in> <out>");
            System.exit(2);
        }

        Job job = new Job(conf, "word count");  //指定job名称,及运行对象
        job.setJarByClass(OutCount.class);
        job.setMapperClass(OutCountMapper.class);       //指定map函数
        job.setCombinerClass(OutCountReduce.class);     //是否需要conbiner整合
        job.setReducerClass(OutCountReduce.class);      //指定reduce函数
        job.setOutputKeyClass(Text.class);              //输出key格式
        job.setOutputValueClass(IntWritable.class);     //输出value格式
        org.apache.hadoop.mapreduce.lib.input.FileInputFormat.addInputPath(job, new Path(otherArgs[0]));       //处理文件路径
        org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));    //结果输出路径
        // 将job提交给集群运行
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

五、结果

11    0
11    1
7    2
10    3
10    4
9    5
10    6
7    7
13    8
9    9
时间: 2024-10-06 16:40:21

一个简单的MapReduce示例(多个MapReduce任务处理)的相关文章

[MySQL5.6] 一个简单的optimizer_trace示例

[MySQL5.6] 一个简单的optimizer_trace示例 前面已经介绍了如何使用和配置MySQL5.6中optimizer_trace(点击博客),本篇我们以一个相对简单的例子来跟踪optimizer_trace的产生过程. 本文的目的不是深究查询优化器的实现,只是跟踪optimizer trace在优化器的那一部分输出,因此很多部分只是一带而过,对于需要深究的部分,暂时标注为红色,后续再扩展阅读;之前一直没看过这部分代码,理解起来还是比较困难的… 我们以一个简单的表为例过一下opti

一个简单的CRUD示例:使用PHP+MySQL

一个简单的CRUD示例:使用PHP+MySQL 前情 总是听说CRUD,但一直不清楚是做什么的,就去查了一下,大概的意思是一组常见的数据库操作:增(create).查(read).改(update)删(delete),大概是,也有其他的翻译,这里大概了解一下就好.截止到现在,网上好像没有什么很小的示例来阐述CRUD这个概念的,然后就去查了一番资料,写了一个真的很小白的.很简单.未使用任何框架的案例. 前端准备 由于笔者对前端知识并不熟悉,这里只贴容器(传输/返回数据的容器)代码,在服务器根目录下

一个简单网络爬虫示例(转载)

在学生时期,可能听到网络爬虫这个词会觉得很高大上,但是它的简单实现可能学生都不难懂. 网络爬虫应用,就是把整个互联网真的就当做一张网,像蜘蛛网那样,应用就像一个虫子,在网上面按照一定的规则爬动. 现在互联网应用最广的就是http(s)协议了,本文例子就是基于使用http(s)协议的,只作为示例,不涉及复杂的算法(实际上是最重要的). 设计思路: 程序入口从一个或多个url开始,通过http(s)获取url的内容,对获取到内容处理,获取内容中需要爬取的信息,获取到内容中的url链接,再重复以上步骤

IDDD 实现领域驱动设计-一个简单的 CQRS 示例

上一篇:<IDDD 实现领域驱动设计-CQRS(命令查询职责分离)和 EDA(事件驱动架构)> 学习架构知识,需要有一些功底和经验,要不然你会和我一样吃力,CQRS.EDA.ES.Saga 等等,这些是实践 DDD 所必不可少的架构,所以,如果你不懂这些,是很难看懂上篇所提到的 CQRS Journey 和 ENode 项目,那怎么办呢?我们可以从简单的 Demo 一点一滴开始. 代码地址:https://github.com/yuezhongxin/CQRS.Sample 说明:一张很丑陋的

tensorflow框架学习(三)—— 一个简单的神经网络示例

一.神经网络结构 定义一个简单的回归神经网络结构: 数据集为(xi,yi),数据的特征数为1,所以x的维度为1. 输入层1个神经元. 隐藏层数为1,4个神经元. 输出层1个神经元. 隐藏层的激活函数为f(x)=x,输出层的激活函数为ReLU 结构图如下: 二.代码示例 相关函数说明: tf.random_normal :用于生成正太分布随机数矩阵的tensor,tensorFlow有很多随机数函数,可以查找官方文档获得. tf.zeros :用于生成0矩阵的tensor,tf.ones可以用来获

一个简单的ServletContextListener示例

ServletContext可以初始化String类型的参数.但是,如果你希望应用初始化参数是一个数据库DataSource呢?上下文参数只能是String.毕竟,你不能把一个Dog对象塞到XML部署描述文件中(事实上,可以用XML表示一个串行化对象,但是在当前的Servlet规范中还没有相关的支持……没准将来会提供).如果你真的想让Web应用的所有部分都能访问一个共享的数据连接,该怎么办?当然可以把这个DataSource查找名放在一个上下文初始化参数里,这也是当前上下文参数最常见的一种用法.

从一个简单的 JPA 示例开始

本文主要讲述 Spring Data JPA,但是为了不至于给 JPA 和 Spring 的初学者造成较大的学习曲线,我们首先从 JPA 开始,简单介绍一个 JPA 示例:接着重构该示例,并引入 Spring 框架,这两部分不会涉及过多的篇幅,如果希望能够深入学习 Spring 和 JPA,可以根据本文最后提供的参考资料进一步学习. 自 JPA 伴随 Java EE 5 发布以来,受到了各大厂商及开源社区的追捧,各种商用的和开源的 JPA 框架如雨后春笋般出现,为开发者提供了丰富的选择.它一改之

一个简单的多线程示例

项目中要用 到多线程,需求: 论坛发帖,如果是我们的老师用户发帖,会向所有用户发一条信息,提醒用户去看贴.代码这么实现,老师保存文章后,新写一个线程,获取所有的用户,循环插入信息 具体代码: 通过构造把 动态参数传进来,插入到数据库中,就是这么简单. 如何调用:

宏&amp;一个简单的宏病毒示例

基于VisualBasicForApplications 其一:录制宏 在word,视图,宏,录制宏选项. 操作比较简单,不再赘述. (注意根据需求选择normal还是当前文档) 例如:录制宏:快捷键设为空格,将某些字段设为隐藏/空白.可以隐藏信息.(虽然很简陋) 其二:编辑宏 视图 宏 查看宏 创建 (注意根据需求选择normal还是当前文档) CDO组件发送邮件. 用了ActiveDocument.FullName获取当前文档的path. 不知道为什么Dim mail As New CDO.

Java并发编程 - 一个简单的死锁示例和死锁的检查

Java线程死锁是一个经典的多线程问题.因为不同的线程都在等待根本不可能被释放的锁,从而导致所有的任务都无法继续完成. 1.死锁程序示例 创建类 DeadLockThread: public class DeadLockThread implements Runnable { private Object lock1 = new Object(); private Object lock2 = new Object(); private String s; public void setS(St