大数据笔记(八)——Mapreduce的高级特性(A)

一.序列化

类似于Java的序列化:将对象——>文件

如果一个类实现了Serializable接口,这个类的对象就可以输出为文件

同理,如果一个类实现了的Hadoop的序列化机制(接口:Writable),这个类的对象就可以作为输入和输出的值

例子:使用序列化  求每个部门的工资总额

数据:在map阶段输出k2部门号 v2是Employee对象

reduce阶段:k4部门号 v3.getSal()得到薪水求和——>v4

Employee.java:封装的员工属性

package saltotal;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.Writable;

//定义员工的属性: 7654,MARTIN,SALESMAN,7698,1981/9/28,1250,1400,30
public class Employee implements Writable{

    private int empno;//员工号
    private String ename;//员工姓名
    private String job;//?职位
    private int mgr;//经理的员工号
    private String hiredate;//入职日期
    private int sal;//月薪
    private int comm;//奖金
    private int deptno;// 部门号

    @Override
    public String toString() {
        return "["+this.empno+"\t"+this.ename+"\t"+this.sal+"\t"+this.deptno+"]";
    }

    @Override
    public void write(DataOutput output) throws IOException {
        // 代表序列化过程:输出
        output.writeInt(this.empno);
        output.writeUTF(this.ename);
        output.writeUTF(this.job);
        output.writeInt(this.mgr);
        output.writeUTF(this.hiredate);
        output.writeInt(this.sal);
        output.writeInt(this.comm);
        output.writeInt(this.deptno);
    }

    @Override
    public void readFields(DataInput input) throws IOException {
        // 代表反序列化:输入
        //注意:序列化和反序列化的顺序要一致
        this.empno = input.readInt();
        this.ename = input.readUTF();
        this.job = input.readUTF();
        this.mgr = input.readInt();
        this.hiredate = input.readUTF();
        this.sal = input.readInt();
        this.comm = input.readInt();
        this.deptno = input.readInt();
    }

    public int getEmpno() {
        return empno;
    }
    public void setEmpno(int empno) {
        this.empno = empno;
    }
    public String getEname() {
        return ename;
    }
    public void setEname(String ename) {
        this.ename = ename;
    }
    public String getJob() {
        return job;
    }
    public void setJob(String job) {
        this.job = job;
    }
    public int getMgr() {
        return mgr;
    }
    public void setMgr(int mgr) {
        this.mgr = mgr;
    }
    public String getHiredate() {
        return hiredate;
    }
    public void setHiredate(String hiredate) {
        this.hiredate = hiredate;
    }
    public int getSal() {
        return sal;
    }
    public void setSal(int sal) {
        this.sal = sal;
    }
    public int getComm() {
        return comm;
    }
    public void setComm(int comm) {
        this.comm = comm;
    }
    public int getDeptno() {
        return deptno;
    }
    public void setDeptno(int deptno) {
        this.deptno = deptno;
    }

}

EmployeeMapper.java

package saltotal;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import saltotal.Employee;
//k2 部门号 v2 员工对象
public class SalaryTotalMapper extends Mapper<LongWritable, Text, IntWritable, Employee>{

    @Override
    protected void map(LongWritable k1, Text v1, Context context)
            throws IOException, InterruptedException {
        // 数据:MARTIN,SALEsMAN,7698,1981/9/28,1250,1400,30
        String data = v1.toString();

        //分词
        String[] words = data.split(",");

        //创建员工的对象
        Employee e = new Employee();

        //设置员工号
        e.setEmpno(Integer.parseInt(words[0]));
        //姓名
        e.setEname(words[1]);

        //职位
        e.setJob(words[2]);

        //经理号:有些没有
        try{
            e.setMgr(Integer.parseInt(words[3]));
        }catch(Exception ex){
            //空值设0
            e.setMgr(0);
        }

        //入职日期
        e.setHiredate(words[4]);

        //月薪
        e.setSal(Integer.parseInt(words[5]));

        //奖金:有的没有
        try{
            e.setComm(Integer.parseInt(words[6]));
        }catch(Exception ex){
            e.setComm(0);
        }

        //部门
        e.setDeptno(Integer.parseInt(words[7]));

        //输出 部门号 员工对象
        context.write(new IntWritable(e.getDeptno()), e);
    }

}

SalaryTotalReducer.java

package saltotal;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Reducer;
import saltotal.Employee;
//                                              k3 部门号  v3员工对象    k4部门号 v4 工资总额
public class SalaryTotalReducer extends Reducer<IntWritable, Employee, IntWritable, IntWritable>{

    @Override
    protected void reduce(IntWritable k3, Iterable<Employee> v3,Context context)
            throws IOException, InterruptedException {
        //对v3求和
        int total = 0;
        for (Employee e : v3) {
            total = total + e.getSal();
        }

        //输出
        context.write(k3, new IntWritable(total));
    }

}

SalaryTotalMain.java

package saltotal;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class SalaryTotalMain {
    public static void main(String[] args) throws Exception {
        //创建一个job = map + reduce
        Job job = Job.getInstance(new Configuration());
        //?指定任务的入口
        job.setJarByClass(SalaryTotalMain.class);

        //?指定任务的Mapper和输出的数据类型k2 v2
        job.setMapperClass(SalaryTotalMapper.class);
        job.setMapOutputKeyClass(IntWritable.class);
        job.setMapOutputValueClass(Employee.class);

        //?指定任务的Reducer和输出的数据类型k4 v4
        job.setReducerClass(SalaryTotalReducer.class);
        job.setOutputKeyClass(IntWritable.class);
        job.setOutputValueClass(IntWritable.class);

        //?指定输入输出的路径
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        //执行任务
        job.waitForCompletion(true);
    }
}

输出jar文件,传到Linux上temp文件夹下,然后执行任务:

hadoop jar temp/s3.jar /scott/emp.csv /output/day0301/s3

二.排序

1.数字的排序

  默认:按照key2进行升序排序

现在HDFS上有一个文件,里面的数据如下:

开发MapReduce程序进行排序:

NumberMapper.java

package mr.number;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class NumberMapper extends Mapper<LongWritable, Text, LongWritable, NullWritable>{

    @Override
    protected void map(LongWritable key1, Text value1, Context context)
            throws IOException, InterruptedException {
        //数字:10
        String data = value1.toString().trim();

        //输出:把数字作为k2
        context.write(new LongWritable(Long.parseLong(data)), NullWritable.get());
    }
}

NumberMain.java

package mr.number;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class NumberMain {

    public static void main(String[] args) throws Exception {
        // 创建一个job = map + reduce
        Job job = Job.getInstance(new Configuration());
        //?指定任务入口
        job.setJarByClass(NumberMain.class);

        //?指定mapper和输出的数据类型:k2 v2
        job.setMapperClass(NumberMapper.class);
        job.setMapOutputKeyClass(LongWritable.class);
        job.setMapOutputValueClass(NullWritable.class);

        //job.setSortComparatorClass(MyNumberComparator.class);

        //?指定输入和输出的路径
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        //执行任务
        job.waitForCompletion(true);
    }

}

执行任务后看到结果:

如果要改变默认的排序规则,需要创建一个自己的比较器

定义一个降序比较器类 MyNumberComparator.java

package mr.number;

import org.apache.hadoop.io.LongWritable;

//自己定义的比较器
public class MyNumberComparator extends LongWritable.Comparator{

    @Override
    public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
        // 使用降序排序
        return -super.compare(b1, s1, l1, b2, s2, l2);
    }
}

将NumberMain.java的这句话放开:

job.setSortComparatorClass(MyNumberComparator.class);

然后重新打包执行任务之后可看到如下结果:



原文地址:https://www.cnblogs.com/lingluo2017/p/8490825.html

时间: 2024-08-30 14:21:33

大数据笔记(八)——Mapreduce的高级特性(A)的相关文章

自学it18大数据笔记-第一阶段Java-day16-day17-day18-day19--day20-day21-day22——会持续更新

转行大数据领域,没报班,自学试试,能坚持下来就以后好好做这行,不能就--!准备从现有这套it18的视屏残本开始--自学是痛苦的,发博客,算是监督自己,督促自己坚持学下去. (教学视屏是it18做活动送的,虽不全,但徐培成老师讲的真心不错,特此感谢it18掌--帮你们打打广告) 笔记为自学时记录,如有错误,欢迎指正,不胜感激! 笔记分享:自学it18大数据笔记-第一阶段Java-day16-day17-day18-day19--day20-day21-day22--会持续更新-- 第一阶段Java

大数据运算模型 MapReduce 原理

大数据运算模型 MapReduce 原理 2016-01-24 杜亦舒 MapReduce 是一个大数据集合的并行运算模型,由google提出,现在流行的hadoop中也使用了MapReduce作为计算模型 MapReduce 通俗解释 图书馆要清点图书数量,有10个书架,管理员为了加快统计速度,找来了10个同学,每个同学负责统计一个书架的图书数量 张同学 统计 书架1王同学 统计 书架2刘同学 统计 书架3...... 过了一会儿,10个同学陆续到管理员这汇报自己的统计数字,管理员把各个数字加

自学it18大数据笔记-第三阶段Spark-day14;Spark-day15(开始试水找工作了)——会持续更新……

写在最前:转行大数据领域,没报班,自学试试,能坚持下来以后就好好做这行,不能就--!准备从现有这套it18掌的视屏残本开始--自学是痛苦的,发博客和大家分享下学习成果--也是监督自己,督促自己坚持学下去. (教学视屏是it18掌做活动送的,视屏不是很全,课堂笔记和源码等课堂相关资料也未放出,但徐培成老师课讲的真心很好,感兴趣的不妨听听,特此感谢it18掌--帮他们打打广告) 笔记为自学时记录,如有错误,欢迎指正,不胜感激!现已广州转移至上海,开始试水找工作了,欢迎小伙伴们加qq或微博沟通交流(Q

自学it18大数据笔记-第二阶段HBase-day3——会持续更新……

写在最前:转行大数据领域,没报班,自学试试,能坚持下来以后就好好做这行,不能就--!准备从现有这套it18的视屏残本开始--自学是痛苦的,发博客和大家分享下学习成果--也是监督自己,督促自己坚持学下去. (教学视屏是it18做活动送的,视屏不是很全,课堂笔记和源码等课堂相关资料也未放出,但徐培成老师课讲的真心很好,感兴趣的不妨听听,特此感谢it18掌--帮他们打打广告) 笔记为自学时记录,如有错误,欢迎指正,不胜感激!欢迎小伙伴们沟通交流. 笔记分享:自学it18大数据笔记-第二阶段HBase-

自学it18大数据笔记-第二阶段Linux-day1——会持续更新……

转行大数据领域,没报班,自学试试,能坚持下来就以后好好做这行,不能就--!准备从现有这套it18的视屏残本开始--自学是痛苦的,发博客,算是监督自己,督促自己坚持学下去. (教学视屏是it18做活动送的,虽不全,但徐培成老师讲的真心不错,特此感谢it18掌--帮他们打打广告) 笔记为自学时记录,如有错误,欢迎指正,不胜感激! 笔记分享:自学it18大数据笔记-第二阶段Linux-day1--会持续更新-- 第一阶段Java-day16-day17-day18-day19--day20-day21

自学it18大数据笔记-第一阶段Java-day09-day10-day11-day12-day13-day14-day15

转行大数据领域,没报班,自学试试,能坚持下来就以后好好做这行,不能就--!准备从现有这套it18的视屏残本开始--自学是痛苦的,发博客,算是监督自己,督促自己坚持学下去. (教学视屏是it18做活动送的,虽不全,但徐培成老师讲的真心不错,特此感谢it18掌--帮你们打打广告) 笔记为自学时记录,如有错误,欢迎指正,不胜感激! 笔记分享:自学it18大数据笔记-第一阶段Java-day09-day10-day11-day12--day13-day14-day15--会持续更新-- 第一阶段Java

自学it18大数据笔记-第三阶段Spark-day12——会持续更新……

写在最前:转行大数据领域,没报班,自学试试,能坚持下来以后就好好做这行,不能就--!准备从现有这套it18掌的视屏残本开始--自学是痛苦的,发博客和大家分享下学习成果--也是监督自己,督促自己坚持学下去. (教学视屏是it18掌做活动送的,视屏不是很全,课堂笔记和源码等课堂相关资料也未放出,但徐培成老师课讲的真心很好,感兴趣的不妨听听,特此感谢it18掌--帮他们打打广告) 笔记为自学时记录,如有错误,欢迎指正,不胜感激!现已广州转移至上海,欢迎小伙伴们加qq或微博沟通交流(QQ,微博和博客同名

自学it18大数据笔记-第三阶段Spark-day03——会持续更新……

写在最前:转行大数据领域,没报班,自学试试,能坚持下来以后就好好做这行,不能就--!准备从现有这套it18掌的视屏残本开始--自学是痛苦的,发博客和大家分享下学习成果--也是监督自己,督促自己坚持学下去. (教学视屏是it18掌做活动送的,视屏不是很全,课堂笔记和源码等课堂相关资料也未放出,但徐培成老师课讲的真心很好,感兴趣的不妨听听,特此感谢it18掌--帮他们打打广告) 笔记为自学时记录,如有错误,欢迎指正,不胜感激!欢迎小伙伴们沟通交流. 笔记分享:自学it18大数据笔记-第三阶段Spar

自学it18大数据笔记-第二阶段ProtoBuf-day1——会持续更新……

转行大数据领域,没报班,自学试试,能坚持下来就以后好好做这行,不能就--!准备从现有这套it18的视屏残本开始--自学是痛苦的,发博客,算是监督自己,督促自己坚持学下去. (教学视屏是it18做活动送的,视屏虽有不全也无老师课堂笔记和源码等相关资料,但徐培成老师讲的真心不错,特此感谢it18掌--帮他们打打广告) 笔记为自学时记录,如有错误,欢迎指正,不胜感激! 笔记分享:自学it18大数据笔记-第二阶段avro-day1--会持续更新-- 第二阶段hadoop-day13:hadoop-day

自学it18大数据笔记-第三阶段Spark-day05——会持续更新……

写在最前:转行大数据领域,没报班,自学试试,能坚持下来以后就好好做这行,不能就--!准备从现有这套it18掌的视屏残本开始--自学是痛苦的,发博客和大家分享下学习成果--也是监督自己,督促自己坚持学下去. (教学视屏是it18掌做活动送的,视屏不是很全,课堂笔记和源码等课堂相关资料也未放出,但徐培成老师课讲的真心很好,感兴趣的不妨听听,特此感谢it18掌--帮他们打打广告) 笔记为自学时记录,如有错误,欢迎指正,不胜感激!欢迎小伙伴们沟通交流. 笔记分享:自学it18大数据笔记-第三阶段Spar