Hadoop学习之路(十九)MapReduce框架排序

流量统计项目案例

样本示例

需求

1、 统计每一个用户(手机号)所耗费的总上行流量、总下行流量,总流量

2、 得出上题结果的基础之上再加一个需求:将统计结果按照总流量倒序排序

3、 将流量汇总统计结果按照手机归属地不同省份输出到不同文件中

第一题

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/**
 *    第一题:统计每一个用户(手机号)所耗费的总上行流量、总下行流量,总流量
 */

public class FlowSumMR {

    public static void main(String[] args) throws Exception {

        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "FlowSumMR");
        job.setJarByClass(FlowSumMR.class);

        job.setMapperClass(FlowSumMRMapper.class);
        job.setReducerClass(FlowSumMRReducer.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        FileInputFormat.setInputPaths(job, new Path("E:/bigdata/flow/input/"));
        FileOutputFormat.setOutputPath(job, new Path("E:/bigdata/flow/output_sum"));

        boolean isDone = job.waitForCompletion(true);
        System.exit(isDone ? 0 : 1);
    }

    public static class FlowSumMRMapper extends Mapper<LongWritable, Text, Text, Text>{

        /**
         * value  =  1363157993044     18211575961    94-71-AC-CD-E6-18:CMCC-EASY    120.196.100.99
         * iface.qiyi.com    视频网站    15    12    1527    2106    200
         */
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

            String[] split = value.toString().split("\t");

            String outkey = split[1];

            String outValue = split[8] + "\t" + split[9];

            context.write(new Text(outkey), new Text(outValue));

        }
    }

    public static class FlowSumMRReducer extends Reducer<Text, Text, Text, Text>{

        @Override
        protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {

            int upFlow = 0;
            int downFlow = 0;
            int sumFlow = 0;

            for(Text t : values){
                String[] split = t.toString().split("\t");

                int upTempFlow = Integer.parseInt(split[0]);
                int downTempFlow = Integer.parseInt(split[1]);

                upFlow+=upTempFlow;
                downFlow +=  downTempFlow;
            }

            sumFlow = upFlow + downFlow;

            context.write(key, new Text(upFlow + "\t" + downFlow + "\t" + sumFlow));
        }
    }
}

第二题

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import comg.ghgj.mr.pojo.FlowBean;

/**
 * 需求: 第二个题目,就是对第一个题目的结果数据,进行按照总流量倒叙排序
 *
 *
 */
public class FlowSortMR {

    public static void main(String[] args) throws Exception {

        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "FlowSumMR");
        job.setJarByClass(FlowSortMR.class);

        job.setMapperClass(FlowSortMRMapper.class);
        job.setReducerClass(FlowSortMRReducer.class);

        job.setOutputKeyClass(FlowBean.class);
        job.setOutputValueClass(NullWritable.class);

        FileInputFormat.setInputPaths(job, new Path("E:/bigdata/flow/output_sum"));
        FileOutputFormat.setOutputPath(job, new Path("E:/bigdata/flow/output_sort_777"));

        boolean isDone = job.waitForCompletion(true);
        System.exit(isDone ? 0 : 1);

    }

    public static class FlowSortMRMapper extends Mapper<LongWritable, Text, FlowBean, NullWritable>{

        /**
         * value  = 13602846565    26860680    40332600    67193280
         */
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

            String[] split = value.toString().split("\t");

            FlowBean fb = new FlowBean(split[0], Long.parseLong(split[1]), Long.parseLong(split[2]));

            context.write(fb, NullWritable.get());
        }

    }

    public static class FlowSortMRReducer extends Reducer<FlowBean, NullWritable, FlowBean, NullWritable>{

        @Override
        protected void reduce(FlowBean key, Iterable<NullWritable> values, Context context)
                throws IOException, InterruptedException {

            for(NullWritable nvl : values){
                context.write(key, nvl);
            }

        }

    }
}

FlowBean.java

  1 import java.io.DataInput;
  2 import java.io.DataOutput;
  3 import java.io.IOException;
  4
  5 import org.apache.hadoop.io.WritableComparable;
  6
  7 /**
  8  * 第一,定义好属性
  9  * 第二,定义好属性的getter 和 setter方法
 10  * 第三,定义好构造方法(有参,无参)
 11  * 第四:定义好toString();
 12  *
 13  *
 14  * 详细解释:
 15  *
 16  * 如果一个自定义对象要作为key 必须要实现 WritableComparable 接口, 而不能实现 Writable, Comparable
 17  *
 18  * 如果一个自定义对象要作为value,那么只需要实现Writable接口即可
 19  */
 20 public class FlowBean implements WritableComparable<FlowBean>{
 21 //public class FlowBean implements Comparable<FlowBean>{
 22
 23     private String phone;
 24     private long upFlow;
 25     private long downFlow;
 26     private long sumFlow;
 27     public String getPhone() {
 28         return phone;
 29     }
 30     public void setPhone(String phone) {
 31         this.phone = phone;
 32     }
 33     public long getUpFlow() {
 34         return upFlow;
 35     }
 36     public void setUpFlow(long upFlow) {
 37         this.upFlow = upFlow;
 38     }
 39     public long getDownFlow() {
 40         return downFlow;
 41     }
 42     public void setDownFlow(long downFlow) {
 43         this.downFlow = downFlow;
 44     }
 45     public long getSumFlow() {
 46         return sumFlow;
 47     }
 48     public void setSumFlow(long sumFlow) {
 49         this.sumFlow = sumFlow;
 50     }
 51     public FlowBean(String phone, long upFlow, long downFlow, long sumFlow) {
 52         super();
 53         this.phone = phone;
 54         this.upFlow = upFlow;
 55         this.downFlow = downFlow;
 56         this.sumFlow = sumFlow;
 57     }
 58     public FlowBean(String phone, long upFlow, long downFlow) {
 59         super();
 60         this.phone = phone;
 61         this.upFlow = upFlow;
 62         this.downFlow = downFlow;
 63         this.sumFlow = upFlow + downFlow;
 64     }
 65     public FlowBean() {
 66         super();
 67         // TODO Auto-generated constructor stub
 68     }
 69     @Override
 70     public String toString() {
 71         return  phone + "\t" + upFlow + "\t" + downFlow + "\t" + sumFlow;
 72     }
 73
 74
 75
 76
 77     /**
 78      * 把当前这个对象 --- 谁掉用这个write方法,谁就是当前对象
 79      *
 80      * FlowBean bean = new FlowBean();
 81      *
 82      * bean.write(out)    把bean这个对象的四个属性序列化出去
 83      *
 84      *  this = bean
 85      */
 86     @Override
 87     public void write(DataOutput out) throws IOException {
 88         // TODO Auto-generated method stub
 89
 90         out.writeUTF(phone);
 91         out.writeLong(upFlow);
 92         out.writeLong(downFlow);
 93         out.writeLong(sumFlow);
 94
 95     }
 96
 97
 98     //   序列化方法中的写出的字段顺序, 一定一定一定要和 反序列化中的 接收顺序一致。 类型也一定要一致
 99
100
101     /**
102      * bean.readField();
103      *
104      *             upFlow =
105      */
106     @Override
107     public void readFields(DataInput in) throws IOException {
108         // TODO Auto-generated method stub
109
110         phone = in.readUTF();
111         upFlow = in.readLong();
112         downFlow = in.readLong();
113         sumFlow = in.readLong();
114
115     }
116
117
118
119     /**
120      * Hadoop的序列化机制为什么不用   java自带的实现 Serializable这种方式?
121      *
122      * 本身Hadoop就是用来解决大数据问题的。
123      *
124      * 那么实现Serializable接口这种方式,在进行序列化的时候。除了会序列化属性值之外,还会携带很多跟当前这个对象的类相关的各种信息
125      *
126      * Hadoop采取了一种全新的序列化机制;只需要序列化 每个对象的属性值即可。
127      */
128
129
130
131     /*@Override
132       public void readFields(DataInput in) throws IOException {
133         value = in.readLong();
134       }
135
136       @Override
137       public void write(DataOutput out) throws IOException {
138         out.writeLong(value);
139       }*/
140
141
142     /**
143      * 用来指定排序规则
144      */
145     @Override
146     public int compareTo(FlowBean fb) {
147
148         long diff = this.getSumFlow() - fb.getSumFlow();
149
150         if(diff == 0){
151             return 0;
152         }else{
153             return diff > 0 ? -1 : 1;
154         }
155
156     }
157 }

第三题

package comg.ghgj.mr.flow;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.partition.ProvincePartitioner;

public class FlowPartitionerMR {

    public static void main(String[] args) throws Exception {

        Configuration conf = new Configuration();
        FileSystem fs = FileSystem.get(conf);
        Job job = Job.getInstance(conf, "FlowSumMR");
        job.setJarByClass(FlowPartitionerMR.class);

        job.setMapperClass(FlowPartitionerMRMapper.class);
        job.setReducerClass(FlowPartitionerMRReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        /**
         * 非常重要的两句代码
         */
        job.setPartitionerClass(ProvincePartitioner.class);
        job.setNumReduceTasks(10);

        FileInputFormat.setInputPaths(job, new Path("E:\\bigdata\\flow\\input"));
        Path outputPath = new Path("E:\\bigdata\\flow\\output_ptn2");
        if(fs.exists(outputPath)){
            fs.delete(outputPath, true);
        }
        FileOutputFormat.setOutputPath(job, outputPath);

        boolean isDone = job.waitForCompletion(true);
        System.exit(isDone ? 0 : 1);
    }

    public static class FlowPartitionerMRMapper extends Mapper<LongWritable, Text, Text, Text>{

        /**
         * value  =  13502468823    101663100    1529437140    1631100240
         */
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

            String[] split = value.toString().split("\t");

            String outkey = split[1];
            String outValue = split[8] + "\t" + split[9];

            context.write(new Text(outkey), new Text(outValue));

        }
    }

    public static class FlowPartitionerMRReducer extends Reducer<Text, Text, Text, Text>{

        @Override
        protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {

            int upFlow = 0;
            int downFlow = 0;
            int sumFlow = 0;

            for(Text t : values){
                String[] split = t.toString().split("\t");

                int upTempFlow = Integer.parseInt(split[0]);
                int downTempFlow = Integer.parseInt(split[1]);

                upFlow+=upTempFlow;
                downFlow +=  downTempFlow;
            }

            sumFlow = upFlow + downFlow;

            context.write(key, new Text(upFlow + "\t" + downFlow + "\t" + sumFlow));
        }
    }
}

原文地址:https://www.cnblogs.com/qingyunzong/p/8584775.html

时间: 2024-08-29 12:56:54

Hadoop学习之路(十九)MapReduce框架排序的相关文章

Hadoop学习之路(九)HDFS深入理解

HDFS的优点和缺点 HDFS的优点 1.可构建在廉价机器上 通过多副本提高可靠性,提供了容错和恢复机制 服务器节点的宕机是常态   必须理性对象 2.高容错性 数据自动保存多个副本,副本丢失后,自动恢复 HDFS的核心设计思想:  分散均匀存储 + 备份冗余存储 3.适合批处理 移动计算而非数据,数据位置暴露给计算框架 海量数据的计算 任务 最终是一定要被切分成很多的小任务进行 4.适合大数据处理 GB.TB.甚至 PB 级数据,百万规模以上的文件数量,10K+节点规模 5.流式文件访问 一次

Linux嵌入式驱动学习之路(十九)触摸屏驱动

触摸屏使用流程: 1. 按下产生中断. 2.在中断处理程序中启动AD转换XY坐标. 3.AD转换结束并产生AD中断. 4. 在AD的中断处理函数中上报信息,启动定时器. 5. 定时器时间到后进入中断,处理长按滑动.跳转到第二步 6. 松开. sd

阿里封神谈hadoop学习之路

阿里封神谈hadoop学习之路 封神 2016-04-14 16:03:51 浏览3283 评论3 发表于: 阿里云E-MapReduce >> 开源大数据周刊 hadoop 学生 spark 摘要: 在大数据时代,要想个性化实现业务的需求,还是得操纵各类的大数据软件,如:hadoop.hive.spark等.笔者(阿里封神)混迹Hadoop圈子多年,经历了云梯1.ODPS等项目,目前base在E-Mapreduce.在这,笔者尽可能梳理下hadoop的学习之路. 引言 当前,越来越多的同学进

Android学习笔记二十九之SwipeRefreshLayout、RecyclerView和CardView

Android学习笔记二十九之SwipeRefreshLayout.RecyclerView和CardView 前面我们介绍了AlertDialog和几个常用的Dialog,ProgressDialog进度条提示框.DatePickerDialog日期选择对话框和TimePickerDialog时间选择对话框.这一节我们介绍几个新的API控件SwipeRefreshLayout.RecyclerView和CardView,这几个API控件都是google在Android5.0推出的.下面我们来学

我的编程之路(十九) 开发中一些细节与启发

1.js的命名空间           如果写后台代码,分层是潜意识中的基本常识,但是一到了前台,却没了这种意识,归根结底还是js用的不多,也一直没有在意js的地位,直到现在富客户端的趋势与要求,使得很多代码都要在前台用js或其框架完成,所以对于js代码的管理就要像后台java代码一样有其规范了,而命名空间就是package,也是为了管理不同层次的代码. 2.闭包          闭包就是能够读取其他函数内部变量的函数.它的最大用处有两个,一个是可以读取函数内部的变量,另一个就是让这些变量的值

《Javascript权威指南》学习笔记之十九--HTML5 DOM新标准---处理文档元信息和管理交互能力

一.了解DOM 1.DOM是Document Object Model的缩写,即文档对象类型,是文档在内存中的表示形式,是一个应用程序接口,定义了文档的逻辑结构以及一套访问和处理文档的方法. 2.HTML DOM与Core DOM的区别:前者提供了大量的方法和属性,与现有的程序模型一致,更便于脚本的编写者控制. 二.document对象 使用window.document属性返回一个document对象,代表当前window内加载的文档.window可以省略.winName.document返回

【Unity 3D】学习笔记二十九:游戏实例——简单小地图制作

任何的学习,光看不练是学不好的.所以这次就总结回顾下怎么制作MMROPG类游戏中的小地图.在MMROPG类游戏里,主角在游戏世界里走动时,一般在屏幕右上角都会有一个区域来显示当前游戏场景的小地图.主角在游戏世界里走动,小地图里代表着主角的小标记也会随之移动.那怎么实现咧? 首先需要确定两个贴图,第一个是右上角的小地图背景贴图,应该是从Y轴俯视向下截取主角所在的位置大地图.第二个就是主角的位置大贴图.在本例中,因为没有学习unity地图制作,所以地图用一个面对象代替,主角用立方体代替,使用GUI来

Android学习路线(十九)支持不同设备——支持不同(Android)平台版本

当最新的Android版本为你的应用提供着很棒的APIs时,你却要在更多的设备更新之前继续支持老的系统版本.这篇课程如何在继续支持低版本的系统的情况下使用新版本的高级API. Platform Versions 仪表板展示了最新的活跃设备上运行的Android系统版本的分布,基于设备访问Google Play商店的次数.通常情况下,支持90%的活跃设备同时使用最新版本作为target是一个好习惯. 贴士: 为了在不同的Android版本上提供最好的特性和功能,你应该在你的应用中使用Android

unity3d学习笔记(十九)--ngui制作3d人物头顶的头像和血条

原地址:http://blog.csdn.net/lzhq1982/article/details/18793479 本系列文章由Aimar_Johnny编写,欢迎转载,转载请标明出处,谢谢. http://blog.csdn.net/lzhq1982/article/details/18793479 先上张图,自己做的一个demo. 这里的人物头像和血条是在3d世界生成的,所以有真正的纵深感和遮挡关系,废话不多说,看我是怎么实现的. 第一步,先在UI Root里制作头像和血条. 这个制作步骤基

java痛苦学习之路[十二]JSON+ajax+Servlet JSON数据转换和传递

1.首先客户端需要引入 jquery-1.11.1.js 2.其次javaweb工程里面需要引入jar包  [commons-beanutils-1.8.0.jar.commons-collections-3.1.jar.commons-lang-2.4.jar.commons-logging-1.1.3.jar.ezmorph-1.0.6.jar.json-lib-2.3-jdk15.jar] 3.客户端js端代码 4.servlet 服务器,映射的路径CardColl 以上就是整个过程,如果