大数据笔记(九)——Mapreduce的高级特性(B)

二.排序

对象排序

员工数据 Employee.java  ----> 作为key2输出

需求:按照部门和薪水升序排列

Employee.java

package mr.object;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;

//?????: 7654,MARTIN,SALESMAN,7698,1981/9/28,1250,1400,30
public class Employee implements WritableComparable<Employee>{

    private int empno;
    private String ename;
    private String job;
    private int mgr;
    private String hiredate;
    private int sal;
    private int comm;
    private int deptno;//    @Override
//    public int compareTo(Employee o) {
//        // 一个列的排序规则:按照员工的薪水排序
//        if(this.sal >= o.getSal()){
//            return 1;
//        }else{
//            return -1;
//        }
//    }

    @Override
    public int compareTo(Employee o) {
        // 两个列排序规则:部门
        if(this.deptno > o.getDeptno()){
            return 1;
        }else if(this.deptno < o.getDeptno()){
            return -1;
        }

        //薪水
        if(this.sal >= o.getSal()){
            return 1;
        }else{
            return -1;
        }

    }

    @Override
    public String toString() {
        return "["+this.empno+"\t"+this.ename+"\t"+this.sal+"\t"+this.deptno+"]";
    }

    @Override
    public void write(DataOutput output) throws IOException {
        output.writeInt(this.empno);
        output.writeUTF(this.ename);
        output.writeUTF(this.job);
        output.writeInt(this.mgr);
        output.writeUTF(this.hiredate);
        output.writeInt(this.sal);
        output.writeInt(this.comm);
        output.writeInt(this.deptno);
    }

    @Override
    public void readFields(DataInput input) throws IOException {
        this.empno = input.readInt();
        this.ename = input.readUTF();
        this.job = input.readUTF();
        this.mgr = input.readInt();
        this.hiredate = input.readUTF();
        this.sal = input.readInt();
        this.comm = input.readInt();
        this.deptno = input.readInt();
    }

    public int getEmpno() {
        return empno;
    }
    public void setEmpno(int empno) {
        this.empno = empno;
    }
    public String getEname() {
        return ename;
    }
    public void setEname(String ename) {
        this.ename = ename;
    }
    public String getJob() {
        return job;
    }
    public void setJob(String job) {
        this.job = job;
    }
    public int getMgr() {
        return mgr;
    }
    public void setMgr(int mgr) {
        this.mgr = mgr;
    }
    public String getHiredate() {
        return hiredate;
    }
    public void setHiredate(String hiredate) {
        this.hiredate = hiredate;
    }
    public int getSal() {
        return sal;
    }
    public void setSal(int sal) {
        this.sal = sal;
    }
    public int getComm() {
        return comm;
    }
    public void setComm(int comm) {
        this.comm = comm;
    }
    public int getDeptno() {
        return deptno;
    }
    public void setDeptno(int deptno) {
        this.deptno = deptno;
    }
}

EmployeeSortMapper.java

package mr.object;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

//                                                                   Key2
public class EmployeeSortMapper extends Mapper<LongWritable, Text, Employee, NullWritable> {

    @Override
    protected void map(LongWritable key1, Text value1,Context context)
            throws IOException, InterruptedException {
        // ?????7654,MARTIN,SALESMAN,7698,1981/9/28,1250,1400,30
        String data = value1.toString();

        //分词
        String[] words = data.split(",");

        //创建员工对象
        Employee e = new Employee();

        //员工号
        e.setEmpno(Integer.parseInt(words[0]));
        //员工姓名
        e.setEname(words[1]);

        //job
        e.setJob(words[2]);

        //经理号:注意 有些员工没有经理
        try{
            e.setMgr(Integer.parseInt(words[3]));
        }catch(Exception ex){
            //null
            e.setMgr(0);
        }

        //入职日期
        e.setHiredate(words[4]);

        //薪水
        e.setSal(Integer.parseInt(words[5]));

        //奖金
        try{
            e.setComm(Integer.parseInt(words[6]));
        }catch(Exception ex){
            //无奖金
            e.setComm(0);
        }

        //部门
        e.setDeptno(Integer.parseInt(words[7]));

        //输出key2
        context.write(e, NullWritable.get());
    }
}

EmployeeSortMain.java

package mr.object;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class EmployeeSortMain {

    public static void main(String[] args) throws Exception {
        // job = map + reduce
        Job job = Job.getInstance(new Configuration());
        //?任务入口
        job.setJarByClass(EmployeeSortMain.class);

        job.setMapperClass(EmployeeSortMapper.class);
        job.setMapOutputKeyClass(Employee.class);
        job.setMapOutputValueClass(NullWritable.class);

        job.setReducerClass(EmployeeSortReducer.class);
        job.setOutputKeyClass(LongWritable.class);
        job.setOutputValueClass(Employee.class); 

        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        //执行任务
        job.waitForCompletion(true);
    }

}

结果:

三.分区分区:Partition:

根据Map的输出(k2  v2)进行分区

默认情况下,MapReduce只有一个分区(只有一个输出文件)

作用:提高查询的效率

建立分区:根据条件的不同

需求:按照员工的部门号进行分区,相同部门号的员工输出到一个分区中

EmpPartionMapper.java

package demo.partion;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

//k2部门号 v2 员工对象
public class EmpPartionMapper extends Mapper<LongWritable, Text, LongWritable, Employee> {

    @Override
    protected void map(LongWritable key1, Text value1, Context context)
            throws IOException, InterruptedException {

        // ?????7654,MARTIN,SALESMAN,7698,1981/9/28,1250,1400,30
                String data = value1.toString();

                String[] words = data.split(",");

                Employee e = new Employee();

                e.setEmpno(Integer.parseInt(words[0]));

                e.setEname(words[1]);

                e.setJob(words[2]);

                try{
                    e.setMgr(Integer.parseInt(words[3]));
                }catch(Exception ex){
                    //null
                    e.setMgr(0);
                }

                e.setHiredate(words[4]);

                e.setSal(Integer.parseInt(words[5]));

                try{
                    e.setComm(Integer.parseInt(words[6]));
                }catch(Exception ex){
                    e.setComm(0);
                }

                e.setDeptno(Integer.parseInt(words[7]));

                //输出 k2是部门号 v2是员工对象
                context.write(new LongWritable(e.getDeptno()), e);
    }
}

EmpPartionReducer.java

package demo.partion;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.Reducer;

//把相同部门的员工输出到HDFS                                                                                                                          K4: 部门号 v4: 员工对象
public class EmpPartionReducer extends Reducer<LongWritable, Employee, LongWritable, Employee>{

    @Override
    protected void reduce(LongWritable k3, Iterable<Employee> v3, Context context)
            throws IOException, InterruptedException {
        for (Employee e : v3) {
            context.write(k3, e);
        }
    }

}

MyEmployeePartitioner.java

package demo.partion;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.Partitioner;
//分区规则:根据Map的输出建立分区                                                                                              k2            v2
public class MyEmployeePartitioner extends Partitioner<LongWritable, Employee>{

    /*
     * numParts 分区个数
     */
    @Override
    public int getPartition(LongWritable k2, Employee v2, int numParts) {
        //分区规则
        int deptno = v2.getDeptno();
        if (deptno == 10) {
            //放入一号分区
            return 1%numParts;
        }else if (deptno == 20) {
            //放入二号分区
            return 2%numParts;
        }else {
            //放入0号分区
            return 3%numParts;
        }
    }
}

EmpPartitionMain.java

package demo.partion;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class EmpPartitionMain {

    public static void main(String[] args) throws Exception {

        Job job = Job.getInstance(new Configuration());

        job.setJarByClass(EmpPartitionMain.class);

        job.setMapperClass(EmpPartionMapper.class);
        job.setMapOutputKeyClass(LongWritable.class);
        job.setMapOutputValueClass(Employee.class);

        //指定分区规则
        job.setPartitionerClass(MyEmployeePartitioner.class);
        //指定分区的个数
        job.setNumReduceTasks(3);

        job.setReducerClass(EmpPartionReducer.class);
        job.setOutputKeyClass(LongWritable.class);
        job.setOutputValueClass(Employee.class);

        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        job.waitForCompletion(true);

    }

}

结果:建立了三个分区

一号分区:

二号分区:

0号分区:

四.合并:Combiner

1、MapReduce的任务中,可以没有Combiner
2、Combiner是一种特殊的Reducer,是在Mapper端先做一次Reducer,用来减少Map的输出,从而提高的效率。
3、注意事项:
(1)有些情况,不能使用Combiner -----> 求平均值
(2)引入Combiner,不引人Combiner,一定不能改变原理的逻辑。(MapReduce编程案例:实现倒排索引)

WordCountMapper.java

package demo.combiner;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class WordCountMapper extends Mapper<LongWritable, Text, Text, LongWritable>{

    @Override
    protected void map(LongWritable k1, Text v1, Context context)
            throws IOException, InterruptedException {

        //取出数据: I love beijing
        String data = v1.toString();

        //分词
        String[] words = data.split(" ");

        //输出K2:单词  V2:记一次数
        for (String w : words) {
            context.write(new Text(w), new LongWritable(1));
        }

    }

}

WordCountReducer.java

package demo.combiner;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class WordCountReducer extends Reducer<Text, LongWritable, Text, LongWritable>{

    @Override
    protected void reduce(Text k3, Iterable<LongWritable> v3,
            Context context) throws IOException, InterruptedException {
        long total = 0;
        for (LongWritable l : v3) {
            total = total + l.get();
        }

        //输出K4 V4
        context.write(k3, new LongWritable(total));
    }

}

WordCountMain.java:增加Combiner

package demo.combiner;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCountMain {

    public static void main(String[] args) throws Exception {
        Job job = Job.getInstance(new Configuration());

        job.setJarByClass(WordCountMain.class);

        //Mapper
        job.setMapperClass(WordCountMapper.class);
        job.setMapOutputKeyClass(Text.class);//指定k2
        job.setMapOutputValueClass(LongWritable.class);//指定v2

        //Combiner
        job.setCombinerClass(WordCountReducer.class);

        //?reducer
        job.setReducerClass(WordCountReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(LongWritable.class);
        //?mapper/reducer路径
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        //?执行任务
        job.waitForCompletion(true);
    }
}

原文地址:https://www.cnblogs.com/lingluo2017/p/8503501.html

时间: 2024-11-02 15:08:45

大数据笔记(九)——Mapreduce的高级特性(B)的相关文章

自学it18大数据笔记-第一阶段Java-day16-day17-day18-day19--day20-day21-day22——会持续更新

转行大数据领域,没报班,自学试试,能坚持下来就以后好好做这行,不能就--!准备从现有这套it18的视屏残本开始--自学是痛苦的,发博客,算是监督自己,督促自己坚持学下去. (教学视屏是it18做活动送的,虽不全,但徐培成老师讲的真心不错,特此感谢it18掌--帮你们打打广告) 笔记为自学时记录,如有错误,欢迎指正,不胜感激! 笔记分享:自学it18大数据笔记-第一阶段Java-day16-day17-day18-day19--day20-day21-day22--会持续更新-- 第一阶段Java

大数据运算模型 MapReduce 原理

大数据运算模型 MapReduce 原理 2016-01-24 杜亦舒 MapReduce 是一个大数据集合的并行运算模型,由google提出,现在流行的hadoop中也使用了MapReduce作为计算模型 MapReduce 通俗解释 图书馆要清点图书数量,有10个书架,管理员为了加快统计速度,找来了10个同学,每个同学负责统计一个书架的图书数量 张同学 统计 书架1王同学 统计 书架2刘同学 统计 书架3...... 过了一会儿,10个同学陆续到管理员这汇报自己的统计数字,管理员把各个数字加

自学it18大数据笔记-第三阶段Spark-day14;Spark-day15(开始试水找工作了)——会持续更新……

写在最前:转行大数据领域,没报班,自学试试,能坚持下来以后就好好做这行,不能就--!准备从现有这套it18掌的视屏残本开始--自学是痛苦的,发博客和大家分享下学习成果--也是监督自己,督促自己坚持学下去. (教学视屏是it18掌做活动送的,视屏不是很全,课堂笔记和源码等课堂相关资料也未放出,但徐培成老师课讲的真心很好,感兴趣的不妨听听,特此感谢it18掌--帮他们打打广告) 笔记为自学时记录,如有错误,欢迎指正,不胜感激!现已广州转移至上海,开始试水找工作了,欢迎小伙伴们加qq或微博沟通交流(Q

自学it18大数据笔记-第二阶段HBase-day3——会持续更新……

写在最前:转行大数据领域,没报班,自学试试,能坚持下来以后就好好做这行,不能就--!准备从现有这套it18的视屏残本开始--自学是痛苦的,发博客和大家分享下学习成果--也是监督自己,督促自己坚持学下去. (教学视屏是it18做活动送的,视屏不是很全,课堂笔记和源码等课堂相关资料也未放出,但徐培成老师课讲的真心很好,感兴趣的不妨听听,特此感谢it18掌--帮他们打打广告) 笔记为自学时记录,如有错误,欢迎指正,不胜感激!欢迎小伙伴们沟通交流. 笔记分享:自学it18大数据笔记-第二阶段HBase-

自学it18大数据笔记-第二阶段Linux-day1——会持续更新……

转行大数据领域,没报班,自学试试,能坚持下来就以后好好做这行,不能就--!准备从现有这套it18的视屏残本开始--自学是痛苦的,发博客,算是监督自己,督促自己坚持学下去. (教学视屏是it18做活动送的,虽不全,但徐培成老师讲的真心不错,特此感谢it18掌--帮他们打打广告) 笔记为自学时记录,如有错误,欢迎指正,不胜感激! 笔记分享:自学it18大数据笔记-第二阶段Linux-day1--会持续更新-- 第一阶段Java-day16-day17-day18-day19--day20-day21

自学it18大数据笔记-第一阶段Java-day09-day10-day11-day12-day13-day14-day15

转行大数据领域,没报班,自学试试,能坚持下来就以后好好做这行,不能就--!准备从现有这套it18的视屏残本开始--自学是痛苦的,发博客,算是监督自己,督促自己坚持学下去. (教学视屏是it18做活动送的,虽不全,但徐培成老师讲的真心不错,特此感谢it18掌--帮你们打打广告) 笔记为自学时记录,如有错误,欢迎指正,不胜感激! 笔记分享:自学it18大数据笔记-第一阶段Java-day09-day10-day11-day12--day13-day14-day15--会持续更新-- 第一阶段Java

自学it18大数据笔记-第三阶段Spark-day12——会持续更新……

写在最前:转行大数据领域,没报班,自学试试,能坚持下来以后就好好做这行,不能就--!准备从现有这套it18掌的视屏残本开始--自学是痛苦的,发博客和大家分享下学习成果--也是监督自己,督促自己坚持学下去. (教学视屏是it18掌做活动送的,视屏不是很全,课堂笔记和源码等课堂相关资料也未放出,但徐培成老师课讲的真心很好,感兴趣的不妨听听,特此感谢it18掌--帮他们打打广告) 笔记为自学时记录,如有错误,欢迎指正,不胜感激!现已广州转移至上海,欢迎小伙伴们加qq或微博沟通交流(QQ,微博和博客同名

自学it18大数据笔记-第三阶段Spark-day03——会持续更新……

写在最前:转行大数据领域,没报班,自学试试,能坚持下来以后就好好做这行,不能就--!准备从现有这套it18掌的视屏残本开始--自学是痛苦的,发博客和大家分享下学习成果--也是监督自己,督促自己坚持学下去. (教学视屏是it18掌做活动送的,视屏不是很全,课堂笔记和源码等课堂相关资料也未放出,但徐培成老师课讲的真心很好,感兴趣的不妨听听,特此感谢it18掌--帮他们打打广告) 笔记为自学时记录,如有错误,欢迎指正,不胜感激!欢迎小伙伴们沟通交流. 笔记分享:自学it18大数据笔记-第三阶段Spar

自学it18大数据笔记-第二阶段ProtoBuf-day1——会持续更新……

转行大数据领域,没报班,自学试试,能坚持下来就以后好好做这行,不能就--!准备从现有这套it18的视屏残本开始--自学是痛苦的,发博客,算是监督自己,督促自己坚持学下去. (教学视屏是it18做活动送的,视屏虽有不全也无老师课堂笔记和源码等相关资料,但徐培成老师讲的真心不错,特此感谢it18掌--帮他们打打广告) 笔记为自学时记录,如有错误,欢迎指正,不胜感激! 笔记分享:自学it18大数据笔记-第二阶段avro-day1--会持续更新-- 第二阶段hadoop-day13:hadoop-day

蔡先生论道大数据十九:王羲之与大数据

2013年德勤公司发布了一份<2013年艺术品在线交易报告>,显示艺术品投资交易方式由线下慢慢朝着线上交易的方式转变,过去几年至少有80%的藏家尝试过在线购买艺术品,这一趋势还在不断上升.对于艺术品电商来说是个好势头. 但艺术品电商如何找到潜在市场.定位目标客户,提高用户体验和转化率从而增加收益,首先,我们看一下艺术品电商和传统电商的区别,购买艺术品的人群属于高净值的精英阶层,相比淘宝用户数量来说比例非常小,购买特点具有明显的喜好性,比如有人喜欢收藏字画,而有人喜欢瓷器,其次艺术品非普通商品具