Hadoop基础---MapReduce对数据进行排序

承接上文:Hadoop基础---流量求和MapReduce程序及自定义数据类型

一:实验数据

对上一篇文章中的数据进行排序处理:

13480253104    180    200    380
13502468823    102    7335    7437
13560439658    5892    400    6292
13600217502    186852    200    187052
13602846565    12    1938    1950
13660577991    9    6960    6969
13719199419    0    200    200
13726230503    2481    24681    27162
13760778710    120    200    320
13823070001    180    200    380
13826544101    0    200    200
13922314466    3008    3720    6728
13925057413    63    11058    11121
13926251106    0    200    200
13926435656    1512    200    1712
15013685858    27    3659    3686
15920133257    20    3156    3176
15989002119    3    1938    1941
18211575961    12    1527    1539
18320173382    18    9531    9549
84138413    4116    1432    5548

二:MapReduce程序编写

(一)自定义数据结构FlowBean编写

package cn.hadoop.mr.wc;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;

public class FlowBean implements WritableComparable<FlowBean> {
    private String phoneNB;
    private long up_flow;
    private long down_flow;
    private long sum_flow;

    public FlowBean() {}    //无参构造函数,用于反序列化时使用

    public FlowBean(String phoneNB, long up_flow, long down_flow) {
        this.phoneNB = phoneNB;
        this.up_flow = up_flow;
        this.down_flow = down_flow;
        this.sum_flow = up_flow + down_flow;
    }

    public String getPhoneNB() {
        return phoneNB;
    }

    public void setPhoneNB(String phoneNB) {
        this.phoneNB = phoneNB;
    }

    public long getUp_flow() {
        return up_flow;
    }

    public void setUp_flow(long up_flow) {
        this.up_flow = up_flow;
    }

    public long getDown_flow() {
        return down_flow;
    }

    public void setDown_flow(long down_flow) {
        this.down_flow = down_flow;
    }

    public long getSum_flow() {
        return up_flow + down_flow;
    }

    //用于序列化
    @Override
    public void write(DataOutput out) throws IOException {
        // TODO Auto-generated method stub
        out.writeUTF(phoneNB);
        out.writeLong(up_flow);
        out.writeLong(down_flow);
        out.writeLong(up_flow+down_flow);
    }

    //用于反序列化
    @Override
    public void readFields(DataInput in) throws IOException {
        // TODO Auto-generated method stub
        phoneNB = in.readUTF();
        up_flow = in.readLong();
        down_flow = in.readLong();
        sum_flow = in.readLong();
    }

    @Override
    public int compareTo(FlowBean o) {  //用于排序操作
        return sum_flow > o.sum_flow ? -1 : 1;    //返回值为-1,则排在前面
    }

    @Override
    public String toString() {
        return "" + up_flow + "\t" + down_flow + "\t"+ sum_flow;
    }

}

(二)Map程序编写

package cn.hadoop.rs;

import java.io.IOException;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import cn.hadoop.mr.wc.FlowBean;

public class ResSortMapper extends Mapper<LongWritable, Text, FlowBean, NullWritable>{
    @Override
    protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, FlowBean, NullWritable>.Context context)
            throws IOException, InterruptedException {
        //获取一行数据
        String line = value.toString();
        //进行文本分割
        String[] fields = StringUtils.split(line, ‘\t‘);
        //数据获取
        String phoneNB = fields[0];
        long up_flow = Long.parseLong(fields[1]);
        long down_flow = Long.parseLong(fields[2]);

        context.write(new FlowBean(phoneNB, up_flow, down_flow), NullWritable.get());
    }
}

(三)Reduce程序编写

package cn.hadoop.rs;

import java.io.IOException;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import cn.hadoop.mr.wc.FlowBean;

//会在reduce接收数据时,对key进行排序
public class ResSortReducer extends Reducer<FlowBean, NullWritable, Text, FlowBean>{
    @Override
    protected void reduce(FlowBean key, Iterable<NullWritable> values,
            Reducer<FlowBean, NullWritable, Text, FlowBean>.Context context) throws IOException, InterruptedException {
        String phoneNB = key.getPhoneNB();
        context.write(new Text(phoneNB), key);
    }
}

注意:排序比较会在Reduce接收到key时进行排序,所以我们需要对输入的key进行处理

(四)主函数进行调用

package cn.hadoop.rs;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import cn.hadoop.mr.wc.FlowBean;

public class ResSortRunner {

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);

        job.setJarByClass(ResSortRunner.class);

        job.setMapperClass(ResSortMapper.class);
        job.setReducerClass(ResSortReducer.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FlowBean.class);

        job.setMapOutputKeyClass(FlowBean.class);
        job.setMapOutputValueClass(NullWritable.class);

        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        System.exit(job.waitForCompletion(true)?0:1);
    }
}

(五)结果测试

hadoop jar rs.jar cn.hadoop.rs.ResSortRunner /fs/output1 /fs/output6

三:实现将两个job在main中一次执行

(一)修改main方法,实现连续调用两个job

package cn.hadoop.rs;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import cn.hadoop.fs.FlowSumMapper;
import cn.hadoop.fs.FlowSumReducer;
import cn.hadoop.fs.FlowSumRunner;
import cn.hadoop.mr.wc.FlowBean;

public class ResSortRunner {

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Configuration conf1 = new Configuration();
        Job job1 = Job.getInstance(conf1);

        job1.setJarByClass(FlowSumRunner.class);

        job1.setMapperClass(FlowSumMapper.class);
        job1.setReducerClass(FlowSumReducer.class);

        job1.setOutputKeyClass(Text.class);
        job1.setOutputValueClass(FlowBean.class);

        job1.setMapOutputKeyClass(Text.class);
        job1.setMapOutputValueClass(FlowBean.class);

        FileInputFormat.setInputPaths(job1, new Path(args[0]));
        FileOutputFormat.setOutputPath(job1, new Path(args[1]));

        if(!job1.waitForCompletion(true)) {
            System.exit(1);
        }

        Configuration conf2 = new Configuration();
        Job job2 = Job.getInstance(conf2);

        job2.setJarByClass(ResSortRunner.class);

        job2.setMapperClass(ResSortMapper.class);
        job2.setReducerClass(ResSortReducer.class);

        job2.setOutputKeyClass(Text.class);
        job2.setOutputValueClass(FlowBean.class);

        job2.setMapOutputKeyClass(FlowBean.class);
        job2.setMapOutputValueClass(NullWritable.class);

        FileInputFormat.setInputPaths(job2, new Path(args[1]));
        FileOutputFormat.setOutputPath(job2, new Path(args[2]));

        System.exit(job2.waitForCompletion(true)?0:1);
    }
}

(二)实验测试,结果查看

 hadoop jar rs.jar  cn.hadoop.rs.ResSortRunner /fs/input /fs/outdata1 /fs/outdata2

(三)补充:使用时,不推荐这种方法。中间结果单独输出,使用shell将各个程序串联,灵活性更大,更容易调试

原文地址:https://www.cnblogs.com/ssyfj/p/12350628.html

时间: 2024-10-31 00:09:39

Hadoop基础---MapReduce对数据进行排序的相关文章

hadoop 原生MapReduce 实现数据连接

业务逻辑 其实很简单,输入两个文件,一个作为基础数据(学生信息文件),一个是分数信息文件. 学生信息文件:存放学生数据:包括学号,学生名称 分数信息数据:存放学生的分数信息:包括学号,学科,分数. 我们将通过M/R实现根据学号,进行数据关联,最终结果为:学生名称,学科,分数. 模拟数据 学生数据 [[email protected] student_data]$ cat students.txt1       Randy2       Tom3       kitty4       Lucy5 

Hadoop 使用 MapReduce 排序 思路、全局排序

本文主要讲对key的排序,主要利用hadoop的机制进行排序. 1.Partition partition作用是将map的结果分发到多个Reduce上.当然多个reduce才能体现分布式的优势. 2.思路 由于每个partition内部是有序的,所以只要保证各partition间有序,即可保证全部有序. 3.问题 有了思路,如何定义partition的边界,这是个问题. 解决办法:hadoop提供了一个采样器帮我们预估整个边界,以使数据的分配尽量平均 引用:http://stblog.baidu

Hadoop第7周练习—MapReduce进行数据查询和实现推简单荐系统(转)

1  运行环境说明 1.1 硬软件环境 1.2 机器网络环境 2  书面作业1:计算员工相关 2.1 书面作业1内容 2.2  实现过程 2.2.1   准备测试数据 2.2.2   问题1:求各个部门的总工资 2.2.3   问题2:求各个部门的人数和平均工资 2.2.4   问题3:求每个部门最早进入公司的员工姓名 2.2.5   问题4:求各个城市的员工的总工资 2.2.6   问题5:列出工资比上司高的员工姓名及其工资 2.2.7   问题6:列出工资比公司平均工资要高的员工姓名及其工资

Hadoop基础之初识大数据与Hadoop

前言 从今天起,我将一步一步的分享大数据相关的知识,其实很多程序员感觉大数据很难学,其实并不是你想象的这样,只要自己想学,还有什么难得呢? 学习Hadoop有一个8020原则,80%都是在不断的配置配置搭建集群,只有20%写程序! 一.引言(大数据时代) 1.1.从数据中得到信息 我们看一张图片: 我们知道这个图片上的人叫张小妹,年龄20岁,职业模特.但是如果只有数据没有图片的话,就没有意义的数据了.所以数据一定是在特定的环境下才有意义的. 我们再来看一张图片: 从这张图片分析出: 从纵向分析,

hadoop基础之初识Hadoop MapReduce架构

Hadoop的mapreduce是一个快速.高效.简单用于编写的并运行处理大数据程序并应用在大数据集群上的编程框架.它将复杂的.运行于大规模集群上的并行计算过程高度的抽象到两个函数:map.reduce.适用于MP来处理的数据集(或者任务),需要满足一个基本的要求:待处理的数据集可以分解成许多小的数据集额,而且每一个小数据集都可以完全并行的进行处理. 图1.2-1MP框架数据流 MP框架包括一个主节点(ResourceManager).多个子节点(运行NodeManager)和MRAppMast

Hadoop之MapReduce基础

一.MapReduce概念 Mapreduce是一个分布式运算程序的编程框架,是用户开发“基于hadoop的数据分析应用”的核心框架: Mapreduce核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序,并发运行在一个hadoop集群上. 1.1 为什么要MapReduce 1)海量数据在单机上处理因为硬件资源限制,无法胜任 2)而一旦将单机版程序扩展到集群来分布式运行,将极大增加程序的复杂度和开发难度 3)引入mapreduce框架后,开发人员可以将绝大部分工作集

hadoop学习;block数据块;mapreduce实现例子;UnsupportedClassVersionError异常;关联项目源码

Football on Table 题意:一些杆上有人,人有一个宽度,然后现在有一个球射过去,要求出球不会碰到任何人的概率 思路:计算出每根杆的概率,之后累乘,计算杆的概率的时候,可以先把每块人的区间长度再移动过程中会覆盖多少长度累加出来,然后(1?总和/可移动距离)就是不会碰到的概率 代码: #include <stdio.h> #include <string.h> #include <math.h> const double eps = 1e-8; int t,

从Hadoop骨架MapReduce在海量数据处理模式(包括淘宝技术架构)

从hadoop框架与MapReduce模式中谈海量数据处理 前言 几周前,当我最初听到,以致后来初次接触Hadoop与MapReduce这两个东西,我便稍显兴奋,认为它们非常是神奇.而神奇的东西常能勾起我的兴趣.在看过介绍它们的文章或论文之后,认为Hadoop是一项富有趣味和挑战性的技术,且它还牵扯到了一个我更加感兴趣的话题:海量数据处理. 由此,近期凡是空暇时,便在看"Hadoop"."MapReduce""海量数据处理"这方面的论文.但在看论

【Hadoop基础教程】9、Hadoop之倒排索引

开发环境 硬件环境:Centos 6.5 服务器4台(一台为Master节点,三台为Slave节点) 软件环境:Java 1.7.0_45.hadoop-1.2.1 1.倒排索引 倒排索引是文档检索系统中最常用的数据结构,被广泛用于全文搜索引擎.它主要是用来存储某个单词(或词组)在一个文档或一组文档的存储位置的映射,即提供了一种根据内容来查找文档的方式.由于不是根据文档来确定文档所包含的内容,而是进行了相反的操作(根据关键字来查找文档),因而称为倒排索引(Inverted Index).通常情况