MapReduce实战(二)

需求:

基于上一道题,我想将结果按照总流量的大小由大到小输出。

思考:

默认mapreduce是对key字符串按照字母进行排序的,而我们想任意排序,只需要把key设成一个类,再对该类写一个compareTo(大于要比较对象返回1,等于返回0,小于返回-1)方法就可以了。

注:这里如果是实现java.lang.Comparable接口,最终报错,还是直接实现WritableComparable吧。

FlowBean.java更改如下:

package cn.darrenchan.hadoop.mr.flow;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;

public class FlowBean implements WritableComparable<FlowBean> {
    private String phoneNum;// 手机号
    private long upFlow;// 上行流量
    private long downFlow;// 下行流量
    private long sumFlow;// 总流量

    public FlowBean() {
        super();
    }

    public FlowBean(String phoneNum, long upFlow, long downFlow) {
        super();
        this.phoneNum = phoneNum;
        this.upFlow = upFlow;
        this.downFlow = downFlow;
        this.sumFlow = upFlow + downFlow;
    }

    public String getPhoneNum() {
        return phoneNum;
    }

    public void setPhoneNum(String phoneNum) {
        this.phoneNum = phoneNum;
    }

    public long getUpFlow() {
        return upFlow;
    }

    public void setUpFlow(long upFlow) {
        this.upFlow = upFlow;
    }

    public long getDownFlow() {
        return downFlow;
    }

    public void setDownFlow(long downFlow) {
        this.downFlow = downFlow;
    }

    public long getSumFlow() {
        return sumFlow;
    }

    public void setSumFlow(long sumFlow) {
        this.sumFlow = sumFlow;
    }

    @Override
    public String toString() {
        return upFlow + "\t" + downFlow + "\t" + sumFlow;
    }

    // 从数据流中反序列出对象的数据
    // 从数据流中读出对象字段时,必须跟序列化时的顺序保持一致
    @Override
    public void readFields(DataInput in) throws IOException {
        phoneNum = in.readUTF();
        upFlow = in.readLong();
        downFlow = in.readLong();
        sumFlow = in.readLong();
    }

    // 将对象数据序列化到流中
    @Override
    public void write(DataOutput out) throws IOException {
        out.writeUTF(phoneNum);
        out.writeLong(upFlow);
        out.writeLong(downFlow);
        out.writeLong(sumFlow);
    }

    @Override
    public int compareTo(FlowBean flowBean) {
        return sumFlow > flowBean.getSumFlow() ? -1 : 1;
    }

}

建立文件SortMR.java:

package cn.darrenchan.hadoop.mr.flowsort;

import java.io.IOException;

import org.apache.commons.io.output.NullWriter;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import cn.darrenchan.hadoop.mr.flow.FlowBean;

//执行命令:hadoop jar flowsort.jar cn.darrenchan.hadoop.mr.flowsort.SortMR /flow/output /flow/outputsort
public class SortMR {
    public static class SortMapper extends
            Mapper<LongWritable, Text, FlowBean, NullWritable> {
        // 拿到一行数据,切分出各字段,封装为一个flowbean,作为key输出
        @Override
        protected void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {
            String line = value.toString();
            String[] words = StringUtils.split(line, "\t");

            String phoneNum = words[0];
            long upFlow = Long.parseLong(words[1]);
            long downFlow = Long.parseLong(words[2]);

            context.write(new FlowBean(phoneNum, upFlow, downFlow),
                    NullWritable.get());
        }
    }

    public static class SortReducer extends
            Reducer<FlowBean, NullWritable, Text, FlowBean> {
        @Override
        protected void reduce(FlowBean key, Iterable<NullWritable> values,
                Context context) throws IOException, InterruptedException {
            String phoneNum = key.getPhoneNum();
            context.write(new Text(phoneNum), key);
        }
    }

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);

        job.setJarByClass(SortMR.class);

        job.setMapperClass(SortMapper.class);
        job.setReducerClass(SortReducer.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FlowBean.class);

        job.setMapOutputKeyClass(FlowBean.class);
        job.setMapOutputValueClass(NullWritable.class);

        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

我们现在处理的结果是上一次实验的输出结果,打成jar包flowsort.jar,执行命令:

hadoop jar flowsort.jar cn.darrenchan.hadoop.mr.flowsort.SortMR /flow/output /flow/outputsort

得到的处理信息如下:

17/02/26 05:22:36 INFO client.RMProxy: Connecting to ResourceManager at weekend110/192.168.230.134:8032
17/02/26 05:22:36 WARN mapreduce.JobSubmitter: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
17/02/26 05:22:36 INFO input.FileInputFormat: Total input paths to process : 1
17/02/26 05:22:36 INFO mapreduce.JobSubmitter: number of splits:1
17/02/26 05:22:37 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1488112052214_0003
17/02/26 05:22:37 INFO impl.YarnClientImpl: Submitted application application_1488112052214_0003
17/02/26 05:22:37 INFO mapreduce.Job: The url to track the job: http://weekend110:8088/proxy/application_1488112052214_0003/
17/02/26 05:22:37 INFO mapreduce.Job: Running job: job_1488112052214_0003
17/02/26 05:24:16 INFO mapreduce.Job: Job job_1488112052214_0003 running in uber mode : false
17/02/26 05:24:16 INFO mapreduce.Job: map 0% reduce 0%
17/02/26 05:24:22 INFO mapreduce.Job: map 100% reduce 0%
17/02/26 05:24:28 INFO mapreduce.Job: map 100% reduce 100%
17/02/26 05:24:28 INFO mapreduce.Job: Job job_1488112052214_0003 completed successfully
17/02/26 05:24:28 INFO mapreduce.Job: Counters: 49
File System Counters
FILE: Number of bytes read=933
FILE: Number of bytes written=187799
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=735
HDFS: Number of bytes written=623
HDFS: Number of read operations=6
HDFS: Number of large read operations=0
HDFS: Number of write operations=2
Job Counters 
Launched map tasks=1
Launched reduce tasks=1
Data-local map tasks=1
Total time spent by all maps in occupied slots (ms)=3077
Total time spent by all reduces in occupied slots (ms)=2350
Total time spent by all map tasks (ms)=3077
Total time spent by all reduce tasks (ms)=2350
Total vcore-seconds taken by all map tasks=3077
Total vcore-seconds taken by all reduce tasks=2350
Total megabyte-seconds taken by all map tasks=3150848
Total megabyte-seconds taken by all reduce tasks=2406400
Map-Reduce Framework
Map input records=22
Map output records=22
Map output bytes=883
Map output materialized bytes=933
Input split bytes=112
Combine input records=0
Combine output records=0
Reduce input groups=22
Reduce shuffle bytes=933
Reduce input records=22
Reduce output records=22
Spilled Records=44
Shuffled Maps =1
Failed Shuffles=0
Merged Map outputs=1
GC time elapsed (ms)=142
CPU time spent (ms)=1280
Physical memory (bytes) snapshot=218406912
Virtual memory (bytes) snapshot=726446080
Total committed heap usage (bytes)=137433088
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
File Input Format Counters 
Bytes Read=623
File Output Format Counters 
Bytes Written=623

最终结果如下,可以看到是排序好的。

1363157985069 186852 200 187052
1363157985066 2481 24681 27162
1363157990043 63 11058 11121
1363157986072 18 9531 9549
1363157982040 102 7335 7437
1363157984041 9 6960 6969
1363157995093 3008 3720 6728
1363157995074 4116 1432 5548
1363157992093 4938 200 5138
1363157973098 27 3659 3686
1363157995033 20 3156 3176
1363157984040 12 1938 1950
1363157986029 3 1938 1941
1363157991076 1512 200 1712
1363157993044 12 1527 1539
1363157993055 954 200 1154
1363157985079 180 200 380
1363157986041 180 200 380
1363157988072 120 200 320
1363154400022 0 200 200
1363157983019 0 200 200
1363157995052 0 200 200

时间: 2025-01-02 16:09:52

MapReduce实战(二)的相关文章

Python爬虫实战二之爬取百度贴吧帖子

大家好,上次我们实验了爬取了糗事百科的段子,那么这次我们来尝试一下爬取百度贴吧的帖子.与上一篇不同的是,这次我们需要用到文件的相关操作. 前言 亲爱的们,教程比较旧了,百度贴吧页面可能改版,可能代码不好使,八成是正则表达式那儿匹配不到了,请更改一下正则,当然最主要的还是帮助大家理解思路. 2016/12/2 本篇目标 1.对百度贴吧的任意帖子进行抓取 2.指定是否只抓取楼主发帖内容 3.将抓取到的内容分析并保存到文件 1.URL格式的确定 首先,我们先观察一下百度贴吧的任意一个帖子. 比如:ht

FastDFS安装使用实战二(配置篇)

FastDFS安装使用实战二(配置篇) Keywords:FastDFS.分布式文件系统.Ubuntu Author:soartju 转载请注明出处:http://soartju.iteye.com/blog/803524 FastDFS的配置文件在%FastDFS%/conf目录下,其中包括 Client.conf    客户端上传配置文件 Storage.conf    文件存储服务器配置文件 Tracker.conf    负责均衡调度服务器配置文件 http.conf        ht

Python机器学习实战&lt;二&gt;:机器学习概述

1.机器学习的真实含义是利用数据来彰显数据背后的真实含义. 2.机器学习的一般用例:人脸识别.手写数字识别.垃圾邮件过滤.产品推荐等等. 3.机器学习的主要任务是分类,即将实例数据划分到合适的分类中.另一项任务是回归,主要用于预测数值型数据.分类和回归属于监督学习,之所以称为监督学习,是因为这类算法必须知道预测什么,即目标的分类信息.另一种机器学习方式是无监督学习,此时数据没有类别信息,也没有给定的目标.在无监督学习中,将数据集合分成由类似对象组成的多个类成为聚类,将寻找数据统计值的过程称为密度

MapReduce教程(二)MapReduce框架Partitioner分区&lt;转&gt;

1 Partitioner分区 1.1 Partitioner分区描述 在进行MapReduce计算时,有时候需要把最终的输出数据分到不同的文件中,按照手机号码段划分的话,需要把同一手机号码段的数据放到一个文件中:按照省份划分的话,需要把同一省份的数据放到一个文件中:按照性别划分的话,需要把同一性别的数据放到一个文件中.我们知道最终的输出数据是来自于Reducer任务.那么,如果要得到多个文件,意味着有同样数量的Reducer任务在运行.Reducer任务的数据来自于Mapper任务,也就说Ma

转 Python爬虫实战二之爬取百度贴吧帖子

静觅 » Python爬虫实战二之爬取百度贴吧帖子 大家好,上次我们实验了爬取了糗事百科的段子,那么这次我们来尝试一下爬取百度贴吧的帖子.与上一篇不同的是,这次我们需要用到文件的相关操作. 本篇目标 1.对百度贴吧的任意帖子进行抓取 2.指定是否只抓取楼主发帖内容 3.将抓取到的内容分析并保存到文件

二 Flask Project 实战 二

5 templates {{ and }} is an expression{% and %} denotes a control flow statement like if and for blocks defined here that will be overridden in the other templates{% extends 'base.html' %} tells Jinja that this template should replace the blocks from

Docker最全教程之Python爬网实战(二十一)

原文:Docker最全教程之Python爬网实战(二十一) Python目前是流行度增长最快的主流编程语言,也是第二大最受开发者喜爱的语言(参考Stack Overflow 2019开发者调查报告发布).笔者建议.NET.Java开发人员可以将Python发展为第二语言,一方面Python在某些领域确实非常犀利(爬虫.算法.人工智能等等),另一方面,相信我,Python上手完全没有门槛,你甚至无需购买任何书籍! 由于近期在筹备4.21的长沙开发者大会,耽误了不少时间.不过这次邀请到了腾讯资深技术

数据-第16课-栈的应用实战二

第16课-栈的应用实战二 1. 问题的提出 计算机的本质工作就是数学运算,那计算机可以读入字符串”9 + (3 - 1) *5 +8/2”并且计算值吗? 2. 后缀表达式 波兰科学家在20世纪50年代提出了一种将运算符放在数字后面的后缀表达式. 对应的,我们平时用的数学表达式叫做中缀表达式. 实例 5 + 3 => 5 3 + 1 + 2 * 3 => 1 2 3 * + 9 + ( 3–1 ) * 5 => 9 3 1–5 * + 中缀表达式符合人类的阅读和思维习惯:后缀表达式符合计算

数据--第20课-递归的应用实战二

第20课-递归的应用实战二 1. 递归与回溯 (1)递归在程序设计中也常用于需要回溯算法的场合. (2)回溯算法的基本思想. ① 从问题的某一种状态出发,搜索可以到达的所有状态. ② 当某个状态到达后,可向前回退,并继续搜索其它可达状态 ,并继续搜索其它可达状态. ③ 当所有状态都到达后,回溯算法结束. (3)程序设计中可利用函数的活动对象保存回溯算法的状态数据,因此可以利用递归完成回溯算法 2. 八皇后问题 在一个8×8国际象棋盘上,有8个皇后,每个皇后占一格:要求皇后间不会出现相互“攻击”的