2018-07-30期 MapReduce分区(Partitioner)编程案例

1、EmpSalaryBean 对象

package cn.sjq.mr.part;

import java.io.DataInput;

import java.io.DataOutput;

import java.io.IOException;

import org.apache.hadoop.io.Writable;

/**

* 定义一个员工薪水的JavaBean,并实现MapReduce的Writable序列化接口

* @author songjq

*

*/

public class EmpSalaryBean implements Writable {

/*

定义成员属性

c_oid

c_employee_name

c_second_company_name

c_third_company_name

c_fourth_company_name

c_company_name

c_dept_name

c_sub_total

c_com_fund_payamt

*/

private int seq;

private String c_oid;

private String c_employee_name;

private String c_second_company_name;

private String c_third_company_name;

private String c_fourth_company_name;

private String c_company_name;

private String c_dept_name;

private float c_sub_total;

private float c_com_fund_payamt;

public int getSeq() {

return seq;

}

public void setSeq(int seq) {

this.seq = seq;

}

public String getC_oid() {

return c_oid;

}

public void setC_oid(String c_oid) {

this.c_oid = c_oid;

}

public String getC_employee_name() {

return c_employee_name;

}

public void setC_employee_name(String c_employee_name) {

this.c_employee_name = c_employee_name;

}

public String getC_second_company_name() {

return c_second_company_name;

}

public void setC_second_company_name(String c_second_company_name) {

this.c_second_company_name = c_second_company_name;

}

public String getC_third_company_name() {

return c_third_company_name;

}

public void setC_third_company_name(String c_third_company_name) {

this.c_third_company_name = c_third_company_name;

}

public String getC_fourth_company_name() {

return c_fourth_company_name;

}

public void setC_fourth_company_name(String c_fourth_company_name) {

this.c_fourth_company_name = c_fourth_company_name;

}

public String getC_company_name() {

return c_company_name;

}

public void setC_company_name(String c_company_name) {

this.c_company_name = c_company_name;

}

public String getC_dept_name() {

return c_dept_name;

}

public void setC_dept_name(String c_dept_name) {

this.c_dept_name = c_dept_name;

}

public float getC_sub_total() {

return c_sub_total;

}

public void setC_sub_total(float c_sub_total) {

this.c_sub_total = c_sub_total;

}

public float getC_com_fund_payamt() {

return c_com_fund_payamt;

}

public void setC_com_fund_payamt(float c_com_fund_payamt) {

this.c_com_fund_payamt = c_com_fund_payamt;

}

//反序列化方法

@Override

public void readFields(DataInput in) throws IOException {

this.seq = in.readInt();

this.c_oid = in.readUTF();

this.c_employee_name = in.readUTF();

this.c_second_company_name = in.readUTF();

this.c_third_company_name = in.readUTF();

this.c_fourth_company_name = in.readUTF();

this.c_company_name = in.readUTF();

this.c_dept_name = in.readUTF();

this.c_sub_total = in.readFloat();

this.c_com_fund_payamt = in.readFloat();

}

//序列化方法

@Override

public void write(DataOutput out) throws IOException {

out.writeInt(this.seq);

out.writeUTF(this.c_oid);

out.writeUTF(this.c_employee_name);

out.writeUTF(this.c_second_company_name);

out.writeUTF(this.c_third_company_name);

out.writeUTF(this.c_fourth_company_name);

out.writeUTF(this.c_company_name);

out.writeUTF(this.c_dept_name);

out.writeFloat(this.c_sub_total);

out.writeFloat(this.c_com_fund_payamt);

}

@Override

public String toString() {

return this.seq+"\t"+this.c_oid+"\t"+

this.c_employee_name+"\t"+this.c_second_company_name+"\t"+

this.c_third_company_name+"\t"+this.c_fourth_company_name+"\t"+

this.c_company_name+"\t"+this.c_dept_name+"\t"+

this.c_sub_total+"\t"+this.c_com_fund_payamt+"\t";

}

}

2、Mapper、Reducer、Job、Partitioner实现

package cn.sjq.mr.part;

import java.io.IOException;

import java.util.Iterator;

import org.apache.commons.lang.StringUtils;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.FloatWritable;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.NullWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Partitioner;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.junit.Test;

/**

* 自定义分区

* 按照员工薪水范围进行分区

* 按照[0-2000] [2000-4000] [4000-6000] [6000-8000] >8000这几个范围进行分区

* 所有的Mapper、Reducer、Partitioner、Job均采用匿名内部类实现

* @author songjq

*

*/

public class EmployeePart {

/**

* 分区主要在<k2,v2>上进行,因此这里k2:员工薪水 v2:员工对象

* @author songjq

*

*/

static class EmployeePartMapper extends Mapper<LongWritable, Text, FloatWritable, EmpSalaryBean> {

private FloatWritable tkey = new FloatWritable();

private EmpSalaryBean tvalue = new EmpSalaryBean();

@Override

protected void map(LongWritable k1, Text v1, Context context) throws IOException, InterruptedException {

//获取一行

String line = v1.toString();

//分词

String[] fds = StringUtils.split(line, ",");

//将分词数据封装到EmpSalaryBean对象

tvalue.setSeq(new Integer(fds[0]).intValue());

tvalue.setC_oid(fds[1]);

tvalue.setC_employee_name(fds[2]);

tvalue.setC_second_company_name(fds[3]);

tvalue.setC_third_company_name(fds[4]);

tvalue.setC_fourth_company_name(fds[5]);

tvalue.setC_company_name(fds[6]);

tvalue.setC_dept_name(fds[7]);

tvalue.setC_sub_total(new Float(fds[8]).floatValue());

tvalue.setC_com_fund_payamt(new Float(fds[9]).floatValue());

tkey.set(tvalue.getC_sub_total());

//序列化输出到Reducer

context.write(tkey,tvalue);

}

}

/**

* 将分区后的数据写入HDFS

* @author songjq

*

*/

static class EmployeePartReducer extends Reducer<FloatWritable, EmpSalaryBean, NullWritable, EmpSalaryBean> {

@Override

protected void reduce(FloatWritable k3, Iterable<EmpSalaryBean> v3, Context ctx)

throws IOException, InterruptedException {

Iterator<EmpSalaryBean> iterator = v3.iterator();

while(iterator.hasNext()) {

EmpSalaryBean v4 = iterator.next();

ctx.write(NullWritable.get(), v4);

}

}

}

/**

* 自定义EmployeePartJob分区

* Partitioner<FloatWritable, EmpSalaryBean>对应Mapper<k2,v2>

* @author songjq

*

*/

static class EmployeeMyPartioner extends Partitioner<FloatWritable, EmpSalaryBean>{

/*

* 这里分5个区

* (non-Javadoc)

* @see org.apache.hadoop.mapreduce.Partitioner#getPartition(java.lang.Object, java.lang.Object, int)

*/

@Override

public int getPartition(FloatWritable k2, EmpSalaryBean v2, int reduceNum) {

if(k2.get()<2000) {

//[0-2000)

return 0;

}else if(k2.get()<4000) {

//[2000-4000)

return 1;

}else if(k2.get()<6000) {

//[4000-6000)

return 2;

}else if(k2.get()<8000) {

//[6000-8000)

return 3;

}else {

//>8000

return 4;

}

}

}

/**

* 提交MapReduce任务

* @throws Exception

*/

@Test

public void EmployeePartJobSubmiter() throws Exception{

Job job = Job.getInstance(new Configuration());

job.setJarByClass(EmployeePart.class);

job.setMapperClass(EmployeePartMapper.class);

job.setReducerClass(EmployeePartReducer.class);

job.setMapOutputKeyClass(FloatWritable.class);

job.setMapOutputValueClass(EmpSalaryBean.class);

job.setOutputKeyClass(NullWritable.class);

job.setOutputValueClass(EmpSalaryBean.class);

//指定自定义分区

job.setPartitionerClass(EmployeeMyPartioner.class);

//设置运行的ReduceTask数量,建议等于分区数量,必须>=partNums

job.setNumReduceTasks(5);

FileInputFormat.setInputPaths(job, new Path("D:\\test\\tmp\\part\\empsalary.csv"));

FileOutputFormat.setOutputPath(job, new Path("D:\\test\\tmp\\part\\output1"));

job.waitForCompletion(true);

}

}

原文地址:http://blog.51cto.com/2951890/2152436

时间: 2024-11-07 12:18:12

2018-07-30期 MapReduce分区(Partitioner)编程案例的相关文章

MapReduce教程(二)MapReduce框架Partitioner分区&lt;转&gt;

1 Partitioner分区 1.1 Partitioner分区描述 在进行MapReduce计算时,有时候需要把最终的输出数据分到不同的文件中,按照手机号码段划分的话,需要把同一手机号码段的数据放到一个文件中:按照省份划分的话,需要把同一省份的数据放到一个文件中:按照性别划分的话,需要把同一性别的数据放到一个文件中.我们知道最终的输出数据是来自于Reducer任务.那么,如果要得到多个文件,意味着有同样数量的Reducer任务在运行.Reducer任务的数据来自于Mapper任务,也就说Ma

Hadoop2.6.0学习笔记(七)MapReduce分区

鲁春利的工作笔记,谁说程序员不能有文艺范? MapReduce中map task任务的数量是由spli分片决定,那么reduce task的数量由什么来确定的呢?就是这里要讨论的MapReduce分区.默认情况下,MapReduce中使用的是HashPartitioner. /** Partition keys by their {@link Object#hashCode()}. */ public class HashPartitioner<K, V> extends Partitione

饥饿疗法是目前唯一确信能够延缓衰老的办法:4星|《三联生活周刊》2018年3期

三联生活周刊·人类到底能活多久:抗衰老科学指南(2018年3期) 本期主题是抗衰老,科学记者袁越走访了全球抗衰老研究的顶级机构,把这个领域最前沿的进展深入浅出地展现出来,非常有价值.这一类报道也是国内比较稀缺的. 总体评价4星. 以下是书中一些内容的摘抄,#号后面是kindle电子版中的页码,[]中是我根据上下文补充的信息: 1:2016年世界人均一次性能源消费量为1.87吨油当量,中国为2.25吨,相比十几年前还不足1吨有了飞跃,但只相当于经合组织(OECD)4.5吨的一半.OECD目前有34

2018-08-09期 MapReduce实现对单个用户支付金额最大的前N个商品排名

package cn.sjq.mr.sort; import java.io.FileOutputStream; import java.io.IOException; import java.util.Comparator; import java.util.Random; import java.util.TreeSet; import java.util.UUID; import org.apache.hadoop.conf.Configuration; import org.apache

项目经理应该把30%的时间用在编程上(转)

本文的作者Eliot Horowitz是MongoDB的创始人和技术总监. 在一个科技公司里,软件技术经理用在编程上的时间应该不低于总工作时间的30%.无论是管理一个团队,还是一个分部,还是整个公司,当技术经理用在编程上的时间低于30%时,他执行职责的能力就会发生严重退化. 我的这个断言可能跟那些我看到的想成为团队首领的软件程序员们期望的情况完全相反.每次晋升,程序员们都期待花在编码上的时间会大幅度减少,当从"leader"爬到"经理"职位时,就应该彻底脱离编码活动

沈阳当年对学校承认了他和高岩的性关系:3星|《三联生活周刊》2018年16期

三联生活周刊·教授的权力:高校内的不平等关系(2018年16期) 本期主题是高校教师性侵学生的调查与思考. 总体评价3星,有参考价值. 以下是书中一些内容的摘抄,#号后面是kindle电子版中的页码: 1:自第二次世界大战以来,以色列制造的暗杀事件比任何西方国家都多.该国领导人甚至认为通过杀戮指定的目标保护其国家安全,危害无辜平民的生命是合情合理的.#52 2:旅游业难以聚集大量的财富,给其从业者带来的回报也有限,这就是为什么海南成了高消费的代名词,可当地人收入却普遍不高的原因.这也可以说是资源

2018 07 14 题解

2018 07 14 T1 Description给出一个长度为 n 的序列 A,求这个序列的最长上升子序列的长度. HintO(NlogN)模板题,不赘述了 Code #include <set> #include <cmath> #include <queue> #include <cstdio> #include <string> #include <cstring> #include <iostream> #inc

MapReduce的核心编程思想

1.MapReduce的核心编程思想 2.yarn集群工作机制 3.maptask并行度与决定机制 4.maptask工作机制 原文地址:https://www.cnblogs.com/areyouready/p/9853445.html

AI Summit(2018.07.19)

AI Summit 时间:2018.07.19地点:北京丽都皇冠假日酒店 原文地址:https://www.cnblogs.com/xuefeng1982/p/10331578.html