MapReduce使用Partitioner分区案例

Mapper:

import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Mapper.Context;

public class EmployeeMapper  extends Mapper<LongWritable, Text, LongWritable, Employee> {

    @Override
    protected void map(LongWritable key, Text value,Context context)
            throws IOException, InterruptedException {
        //7499,ALLEN,SALESMAN,7698,1981/2/20,1600,300,30
        String str = value.toString();
        //分词
        String[] words = str.split(",");

        Employee e = new Employee();
        e.setEmpno(Integer.parseInt(words[0]));
        e.setEname(words[1]);
        e.setJob(words[2]);
        try {
            e.setMgr(Integer.parseInt(words[3]));
        } catch (Exception e2) {
            e.setMgr(0);
        }
        e.setHiredate(words[4]);
        e.setSal(Integer.parseInt(words[5]));
        try {
            e.setComm(Integer.parseInt(words[6]));
        } catch (Exception e2) {
            e.setComm(0);
        }
        e.setDeptno(Integer.parseInt(words[7]));

        //将这个员工输出
        context.write(new LongWritable(e.getDeptno()),e);
    }
}

40

1

import java.io.IOException;

2

import org.apache.hadoop.io.LongWritable;

3

import org.apache.hadoop.io.NullWritable;

4

import org.apache.hadoop.io.Text;

5

import org.apache.hadoop.mapreduce.Mapper;

6

import org.apache.hadoop.mapreduce.Mapper.Context;

7


8

public class EmployeeMapper  extends Mapper<LongWritable, Text, LongWritable, Employee> {

9


10

    @Override

11

    protected void map(LongWritable key, Text value,Context context)

12

            throws IOException, InterruptedException {

13

        //7499,ALLEN,SALESMAN,7698,1981/2/20,1600,300,30

14

        String str = value.toString();

15

        //分词

16

        String[] words = str.split(",");

17


18

        Employee e = new Employee();

19

        e.setEmpno(Integer.parseInt(words[0]));

20

        e.setEname(words[1]);

21

        e.setJob(words[2]);

22

        try {

23

            e.setMgr(Integer.parseInt(words[3]));

24

        } catch (Exception e2) {

25

            e.setMgr(0);

26

        }

27

        e.setHiredate(words[4]);

28

        e.setSal(Integer.parseInt(words[5]));

29

        try {

30

            e.setComm(Integer.parseInt(words[6]));

31

        } catch (Exception e2) {

32

            e.setComm(0);

33

        }

34

        e.setDeptno(Integer.parseInt(words[7]));

35


36

        //将这个员工输出

37

        context.write(new LongWritable(e.getDeptno()),e);

38

    }

39

}

Reducer:

import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.Reducer;

public class EmployeeReducer extends Reducer<LongWritable, Employee, LongWritable, Employee> {

    @Override
    protected void reduce(LongWritable deptno, Iterable<Employee> values,Context context)
            throws IOException, InterruptedException {
        for(Employee e:values){
            context.write(deptno, e);
        }
    }
}

16

1

import java.io.IOException;

2

import org.apache.hadoop.io.LongWritable;

3

import org.apache.hadoop.mapreduce.Reducer;

4


5

public class EmployeeReducer extends Reducer<LongWritable, Employee, LongWritable, Employee> {

6


7

    @Override

8

    protected void reduce(LongWritable deptno, Iterable<Employee> values,Context context)

9

            throws IOException, InterruptedException {

10

        for(Employee e:values){

11

            context.write(deptno, e);

12

        }

13

    }

14

}

Employee:

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;

public class Employee implements Writable{
    private int empno;
    private String ename;
    private String job;
    private int mgr;
    private String hiredate;
    private int sal;
    private int comm;
    private int deptno;

    public Employee(){

    }

    @Override
    public String toString() {
        return "Employee [empno=" + empno + ", ename=" + ename + ", job=" + job
                + ", mgr=" + mgr + ", hiredate=" + hiredate + ", sal=" + sal
                + ", comm=" + comm + ", deptno=" + deptno + "]";
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        this.empno = in.readInt();
        this.ename = in.readUTF();
        this.job = in.readUTF();
        this.mgr = in.readInt();
        this.hiredate = in.readUTF();
        this.sal = in.readInt();
        this.comm = in.readInt();
        this.deptno = in.readInt();
    }

    @Override
    public void write(DataOutput output) throws IOException {
        ////7499,ALLEN,SALESMAN,7698,1981/2/20,1600,300,30
        output.writeInt(empno);
        output.writeUTF(ename);
        output.writeUTF(job);
        output.writeInt(mgr);
        output.writeUTF(hiredate);
        output.writeInt(sal);
        output.writeInt(comm);
        output.writeInt(deptno);
    }

    public int getEmpno() {
        return empno;
    }

    public void setEmpno(int empno) {
        this.empno = empno;
    }

    public String getEname() {
        return ename;
    }

    public void setEname(String ename) {
        this.ename = ename;
    }

    public String getJob() {
        return job;
    }

    public void setJob(String job) {
        this.job = job;
    }

    public int getMgr() {
        return mgr;
    }

    public void setMgr(int mgr) {
        this.mgr = mgr;
    }

    public String getHiredate() {
        return hiredate;
    }

    public void setHiredate(String hiredate) {
        this.hiredate = hiredate;
    }

    public int getSal() {
        return sal;
    }

    public void setSal(int sal) {
        this.sal = sal;
    }

    public int getComm() {
        return comm;
    }

    public void setComm(int comm) {
        this.comm = comm;
    }

    public int getDeptno() {
        return deptno;
    }

    public void setDeptno(int deptno) {
        this.deptno = deptno;
    }
}
x

1

import java.io.DataInput;

2

import java.io.DataOutput;

3

import java.io.IOException;

4

import org.apache.hadoop.io.Writable;

5

import org.apache.hadoop.io.WritableComparable;

6


7

public class Employee implements Writable{

8

    private int empno;

9

    private String ename;

10

    private String job;

11

    private int mgr;

12

    private String hiredate;

13

    private int sal;

14

    private int comm;

15

    private int deptno;

16


17

    public Employee(){

18


19

    }

20


21

    @Override

22

    public String toString() {

23

        return "Employee [empno=" + empno + ", ename=" + ename + ", job=" + job

24

                + ", mgr=" + mgr + ", hiredate=" + hiredate + ", sal=" + sal

25

                + ", comm=" + comm + ", deptno=" + deptno + "]";

26

    }

27


28

    @Override

29

    public void readFields(DataInput in) throws IOException {

30

        this.empno = in.readInt();

31

        this.ename = in.readUTF();

32

        this.job = in.readUTF();

33

        this.mgr = in.readInt();

34

        this.hiredate = in.readUTF();

35

        this.sal = in.readInt();

36

        this.comm = in.readInt();

37

        this.deptno = in.readInt();

38

    }

39


40

    @Override

41

    public void write(DataOutput output) throws IOException {

42

        ////7499,ALLEN,SALESMAN,7698,1981/2/20,1600,300,30

43

        output.writeInt(empno);

44

        output.writeUTF(ename);

45

        output.writeUTF(job);

46

        output.writeInt(mgr);

47

        output.writeUTF(hiredate);

48

        output.writeInt(sal);

49

        output.writeInt(comm);

50

        output.writeInt(deptno);

51

    }

52


53

    public int getEmpno() {

54

        return empno;

55

    }

56


57

    public void setEmpno(int empno) {

58

        this.empno = empno;

59

    }

60


61

    public String getEname() {

62

        return ename;

63

    }

64


65

    public void setEname(String ename) {

66

        this.ename = ename;

67

    }

68


69

    public String getJob() {

70

        return job;

71

    }

72


73

    public void setJob(String job) {

74

        this.job = job;

75

    }

76


77

    public int getMgr() {

78

        return mgr;

79

    }

80


81

    public void setMgr(int mgr) {

82

        this.mgr = mgr;

83

    }

84


85

    public String getHiredate() {

86

        return hiredate;

87

    }

88


89

    public void setHiredate(String hiredate) {

90

        this.hiredate = hiredate;

91

    }

92


93

    public int getSal() {

94

        return sal;

95

    }

96


97

    public void setSal(int sal) {

98

        this.sal = sal;

99

    }

100


101

    public int getComm() {

102

        return comm;

103

    }

104


105

    public void setComm(int comm) {

106

        this.comm = comm;

107

    }

108


109

    public int getDeptno() {

110

        return deptno;

111

    }

112


113

    public void setDeptno(int deptno) {

114

        this.deptno = deptno;

115

    }

116

}

Partitioner:

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.Partitioner;

public class EmployeePartition extends Partitioner<LongWritable, Employee> {

    @Override
    public int getPartition(LongWritable key2, Employee e, int numPartition) {
        // 分区的规则
        if(e.getDeptno() == 10){
            return 1%numPartition;
        }else if(e.getDeptno() == 20){
            return 2%numPartition;
        }else{
            return 3%numPartition;
        }
    }
}

17

1

import org.apache.hadoop.io.LongWritable;

2

import org.apache.hadoop.mapreduce.Partitioner;

3


4

public class EmployeePartition extends Partitioner<LongWritable, Employee> {

5


6

    @Override

7

    public int getPartition(LongWritable key2, Employee e, int numPartition) {

8

        // 分区的规则

9

        if(e.getDeptno() == 10){

10

            return 1%numPartition;

11

        }else if(e.getDeptno() == 20){

12

            return 2%numPartition;

13

        }else{

14

            return 3%numPartition;

15

        }

16

    }

17

}

Driver:

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class PartitionMain {

    public static void main(String[] args) throws Exception {
        // 求员工工资的总额
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);

        //指明程序的入口
        job.setJarByClass(PartitionMain.class);

        //指明任务中的mapper
        job.setMapperClass(EmployeeMapper.class);
        job.setMapOutputKeyClass(LongWritable.class);
        job.setMapOutputValueClass(Employee.class);

        //设置分区的规则
        job.setPartitionerClass(EmployeePartition.class);
        job.setNumReduceTasks(3);

        job.setReducerClass(EmployeeReducer.class);
        job.setOutputKeyClass(LongWritable.class);
        job.setOutputValueClass(Employee.class);

        //指明任务的输入路径和输出路径	---> HDFS的路径
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        //启动任务
        job.waitForCompletion(true);
    }
}

38

1

import org.apache.hadoop.conf.Configuration;

2

import org.apache.hadoop.fs.Path;

3

import org.apache.hadoop.io.LongWritable;

4

import org.apache.hadoop.io.NullWritable;

5

import org.apache.hadoop.mapreduce.Job;

6

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

7

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

8

public class PartitionMain {

9


10

    public static void main(String[] args) throws Exception {

11

        // 求员工工资的总额

12

        Configuration conf = new Configuration();

13

        Job job = Job.getInstance(conf);

14


15

        //指明程序的入口

16

        job.setJarByClass(PartitionMain.class);

17


18

        //指明任务中的mapper

19

        job.setMapperClass(EmployeeMapper.class);

20

        job.setMapOutputKeyClass(LongWritable.class);

21

        job.setMapOutputValueClass(Employee.class);

22


23

        //设置分区的规则

24

        job.setPartitionerClass(EmployeePartition.class);

25

        job.setNumReduceTasks(3);

26


27

        job.setReducerClass(EmployeeReducer.class);

28

        job.setOutputKeyClass(LongWritable.class);

29

        job.setOutputValueClass(Employee.class);

30


31

        //指明任务的输入路径和输出路径---> HDFS的路径

32

        FileInputFormat.addInputPath(job, new Path(args[0]));

33

        FileOutputFormat.setOutputPath(job, new Path(args[1]));

34


35

        //启动任务

36

        job.waitForCompletion(true);

37

    }

38

}

原文地址:https://www.cnblogs.com/TiePiHeTao/p/126c97716f7c505105c1347f0f0e0989.html

时间: 2024-08-03 08:18:49

MapReduce使用Partitioner分区案例的相关文章

MapReduce教程(二)MapReduce框架Partitioner分区&lt;转&gt;

1 Partitioner分区 1.1 Partitioner分区描述 在进行MapReduce计算时,有时候需要把最终的输出数据分到不同的文件中,按照手机号码段划分的话,需要把同一手机号码段的数据放到一个文件中:按照省份划分的话,需要把同一省份的数据放到一个文件中:按照性别划分的话,需要把同一性别的数据放到一个文件中.我们知道最终的输出数据是来自于Reducer任务.那么,如果要得到多个文件,意味着有同样数量的Reducer任务在运行.Reducer任务的数据来自于Mapper任务,也就说Ma

MapReduce框架Partitioner分区方法

前言:对于二次排序相信大家也是似懂非懂,我也是一样,对其中的很多方法都不理解诶,所有只有暂时放在一边,当你接触到其他的函数,你知道的越多时你对二次排序的理解也就更深入了,同时建议大家对wordcount的流程好好分析一下,要真正的知道每一步都是干什么的. 1.Partitioner分区类的作用是什么? 2.getPartition()三个参数分别是什么? 3.numReduceTasks指的是设置的Reducer任务数量,默认值是是多少? 扩展: 如果不同类型的数据被分配到了同一个分区,输出的数

MapReduce分区方法Partitioner方法

前言:对于二次排序相信大家也是似懂非懂,我也是一样,对其中的很多方法都不理解诶,所有只有暂时放在一边,当你接触到其他的函数,你知道的越多时你对二次排序的理解也就更深入了,同时建议大家对wordcount的流程好好分析一下,要真正的知道每一步都是干什么的. 1.Partitioner分区类的作用是什么? 2.getPartition()三个参数分别是什么? 3.numReduceTasks指的是设置的Reducer任务数量,默认值是是多少? 扩展: 如果不同类型的数据被分配到了同一个分区,输出的数

mapreduce原理【分区,分组】

分析这个原理,的原因是: 1.更好的理解MAPREDUCE的过程. 2.在二次排序时会用到这个原理,二次排序要重写分区方法,重写分组方法:如果原理没搞明白,就无法写二次排序的代码. Key 默认分区 默认分组 自定义分区 自定义分组 Abc123 1.使用系统默认分区方式,是按KEY进行分区. 2.KEY相同,分划分到一个分区且只能划分到一个分区. 划分方式按KEY的HASHCODE进行计算. 3.假设设定为3个分区,则划分方式可能是 a) 分区1:Abc789,Cde123,Cde456 b)

MapReduce的Partitioner案例

项目简介 这里给出一个经典的词频统计的案例:统计如下样本数据中每个单词出现的次数. SparkHBase HiveFlinkStormHadoopHBaseSpark Flink HBaseStorm HBaseHadoopHiveFlink HBaseFlinkHiveStorm HiveFlinkHadoop HBaseHive HadoopSparkHBaseStorm HBaseHadoopHiveFlink HBaseFlinkHiveStorm HiveFlinkHadoop HBa

MapReduce几个小案例

MapReduce案例 1.单词计数--wordcount 首先准备好文件 Hello World Hello Java Hello World Hello hadoop wo shi wo 开始编写程序 public class MapReduceTest { //第一个参数是我们的行偏移量 //第二个参数是我们的数据集 //第三个是我们要输出时候的key类型 //第四个是我们要输出时候的value类型 public static class Map extends Mapper<LongWr

Partitioner分区过程分析

转自:http://blog.csdn.net/androidlushangderen/article/details/41172865 Partition的中文意思就是分区,分片的意思,这个阶段也是整个MapReduce过程的第三个阶段,就在Map任务的后面,他的作用就是使key分到通过一定的分区算法,分到固定的区域中,给不同的Reduce做处理,达到负载均衡的目的.他的执行过程其实就是发生在上篇文章提到的collect的过程阶段,当输入的key调用了用户的map函数时,中间结果就会被分区了.

关于MapReduce中自定义分区类(四)

MapTask类 在MapTask类中找到run函数 if(useNewApi){       runNewMapper(job, splitMetaInfo, umbilical, reporter);     } 再找到runNewMapper @SuppressWarnings("unchecked")   private<INKEY,INVALUE,OUTKEY,OUTVALUE>   void runNewMapper(final JobConf job,    

MapReduce之Partitioner组件

简述 Partitioner组件可以让Map对Key进行分区,从而可以根据不同的key来分发到不同的reduce中去处理: 你可以自定义key的一个分发规则,如数据文件包含不同的大学,而输出的要求是每个大学输出一个文件: Partitioner组件提供了一个默认的HashPartitioner. package org.apache.hadoop.mapreduce.lib.partition; public class HashPartitioner<K, V> extends Partit