2018-08-02 期 MapReduce实现多表查询自连接

1、员工对象EmployeeBean

package cn.sjq.bigdata.mr.self.join;

import java.io.DataInput;

import java.io.DataOutput;

import java.io.IOException;

import org.apache.hadoop.io.Writable;

/**

* 员工对象EmployeeBean

* 由于该对象需要做为Mapper的输出,因此需要实现Writable接口

* @author songjq

*/

public class EmployeeBean implements Writable {

// 定义成员属性

private int deptNo = 0;

private int empno = 0;

private String ename = "";

private String job = "";

private int mgr = 0;

private String hiredate = "";

private float salary = 0f;

private float comm = 0f;

// 定义老板对象和员工对象标志位flag 0:员工对象 1:老板对象

private int flag = 0;

public int getDeptNo() {

return deptNo;

}

public void setDeptNo(int deptNo) {

this.deptNo = deptNo;

}

public int getEmpno() {

return empno;

}

public void setEmpno(int empno) {

this.empno = empno;

}

public String getEname() {

return ename;

}

public void setEname(String ename) {

this.ename = ename;

}

public String getJob() {

return job;

}

public void setJob(String job) {

this.job = job;

}

public int getMgr() {

return mgr;

}

public void setMgr(int mgr) {

this.mgr = mgr;

}

public String getHiredate() {

return hiredate;

}

public void setHiredate(String hiredate) {

this.hiredate = hiredate;

}

public float getSalary() {

return salary;

}

public void setSalary(float salary) {

this.salary = salary;

}

public float getComm() {

return comm;

}

public void setComm(float comm) {

this.comm = comm;

}

public int getFlag() {

return flag;

}

public void setFlag(int flag) {

this.flag = flag;

}

public EmployeeBean(int deptNo, int empno, String ename, String job, int mgr, String hiredate, float salary,

float comm, int flag) {

this.deptNo = deptNo;

this.empno = empno;

this.ename = ename;

this.job = job;

this.mgr = mgr;

this.hiredate = hiredate;

this.salary = salary;

this.comm = comm;

this.flag = flag;

}

public EmployeeBean() {

}

/*

* 反序列化 (non-Javadoc)

*

* @see org.apache.hadoop.io.Writable#readFields(java.io.DataInput)

*/

@Override

public void readFields(DataInput in) throws IOException {

this.deptNo = in.readInt();

this.empno = in.readInt();

this.ename = in.readUTF();

this.job = in.readUTF();

this.mgr = in.readInt();

this.hiredate = in.readUTF();

this.salary = in.readFloat();

this.comm = in.readFloat();

this.flag = in.readInt();

}

/*

* 序列化 (non-Javadoc)

*

* @see org.apache.hadoop.io.Writable#write(java.io.DataOutput)

*/

@Override

public void write(DataOutput out) throws IOException {

out.writeInt(this.deptNo);

out.writeInt(this.empno);

out.writeUTF(this.ename);

out.writeUTF(this.job);

out.writeInt(this.mgr);

out.writeUTF(this.hiredate);

out.writeFloat(this.salary);

out.writeFloat(this.comm);

out.writeInt(this.flag);

}

}

2、EmployeeBossBean最终输出对象

package cn.sjq.bigdata.mr.self.join;

import java.io.DataInput;

import java.io.DataOutput;

import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;

/**

* 员工老板综合JavaBean

* 最终将EmployeeBossBean通过Reduce输出到HDFS

* 输出格式为:

*  通过编写MapReduce程序实现emp表自连接,输出格式为:

BOSSNO BOSSNAM EMPNO ENAME JOB MGR HIREDATE SAL COMM DEPTNO

7698 BLAKE 7499 ALLEN SALESMAN 7698 1981/2/20 1600 300 30

7698 BLAKE 7521 WARD SALESMAN 7698 1981/2/22 1250 500 30

7698 BLAKE 7654 MARTIN SALESMAN 7698 1981/9/28 1250 1400 30

由于EmployeeBossBean最终要输出到HDFS,因此需要序列化EmployeeBossBean,实现WritableComparable接口

* @author songjq

*

*/

public class EmployeeBossBean implements WritableComparable<EmployeeBossBean> {

// 定义成员属性

private int bossno = 0;

private String bossname = "-";

private int deptNo = 0;

private int empno = 0;

private String ename = "-";

private String job = "";

private String hiredate = "-";

private float salary = 0f;

private float comm = 0f;

//定义对象是否输出文件的表头

//如:BOSSNO BOSSNAM EMPNO ENAME JOB MGR HIREDATE SAL COMM DEPTNO

private boolean isTableHeader = false;

public int getBossno() {

return bossno;

}

public void setBossno(int bossno) {

this.bossno = bossno;

}

public String getBossname() {

return bossname;

}

public void setBossname(String bossname) {

this.bossname = bossname;

}

public int getDeptNo() {

return deptNo;

}

public void setDeptNo(int deptNo) {

this.deptNo = deptNo;

}

public int getEmpno() {

return empno;

}

public void setEmpno(int empno) {

this.empno = empno;

}

public String getEname() {

return ename;

}

public void setEname(String ename) {

this.ename = ename;

}

public String getJob() {

return job;

}

public void setJob(String job) {

this.job = job;

}

public String getHiredate() {

return hiredate;

}

public void setHiredate(String hiredate) {

this.hiredate = hiredate;

}

public float getSalary() {

return salary;

}

public void setSalary(float salary) {

this.salary = salary;

}

public float getComm() {

return comm;

}

public void setComm(float comm) {

this.comm = comm;

}

public boolean isTableHeader() {

return isTableHeader;

}

public void setTableHeader(boolean isTableHeader) {

this.isTableHeader = isTableHeader;

}

public EmployeeBossBean(int bossno, String bossname, int deptNo, int empno, String ename, String job,

String hiredate, float salary, float comm, boolean isTableHeader) {

this.bossno = bossno;

this.bossname = bossname;

this.deptNo = deptNo;

this.empno = empno;

this.ename = ename;

this.job = job;

this.hiredate = hiredate;

this.salary = salary;

this.comm = comm;

this.isTableHeader = isTableHeader;

}

public EmployeeBossBean() {}

/*

* 重写toString

* (non-Javadoc)

* @see java.lang.Object#toString()

*/

@Override

public String toString() {

// BOSSNO BOSSNAM EMPNO ENAME JOB MGR HIREDATE SAL COMM DEPTNO

// 7698 BLAKE 7499 ALLEN SALESMAN 7698 1981/2/20 1600 300 30

if (this.isTableHeader == true) {

return "BOSSNO\t" + formatStr("BOSSNAME", 12) + "DEPTNO\t" + formatStr("DEPTNO", 8) + formatStr("ENAME", 12)

+ formatStr("JOB", 12) + formatStr("HIREDATE", 12) + formatStr("SAL", 6) + "\t"

+ formatStr("COMM", 6) + "\t";

} else {

return this.bossno == 0 ? "\t"

: this.bossno + "\t" + formatStr(this.bossname, 12)

+ formatStr(this.deptNo == 0 ? "-" : String.valueOf(this.deptNo), 8)

+ (this.empno == 0 ? "-" : this.empno) + "\t" + formatStr(this.ename, 12)

+ formatStr(this.job, 12) + formatStr(this.hiredate, 12)

+ formatStr(this.salary == 0f ? "-" : String.valueOf(this.salary), 6) + "\t"

+ formatStr(this.comm == 0f ? "-" : String.valueOf(this.comm), 6) + "\t";

}

}

/**

* 字符串填充空格

* @param str

* @param length

* @return

*/

public static String formatStr(String str, int length)

{

if (str == null)

{

str="";

}

int strLen = str.getBytes().length;

if (strLen == length)

{

return str;

} else if (strLen < length)

{

int temp = length - strLen;

String tem = "";

for (int i = 0; i < temp; i++)

{

tem = tem + " ";

}

return str + tem;

} else

{

return str.substring(0, length);

}

}

/*

* 反序列化

* (non-Javadoc)

* @see org.apache.hadoop.io.Writable#readFields(java.io.DataInput)

*/

@Override

public void readFields(DataInput in) throws IOException {

this.bossno = in.readInt();

this.bossname = in.readUTF();

this.deptNo = in.readInt();

this.empno = in.readInt();

this.ename = in.readUTF();

this.job = in.readUTF();

this.hiredate = in.readUTF();

this.salary = in.readFloat();

this.comm = in.readFloat();

this.isTableHeader = in.readBoolean();

}

/* 序列化

* (non-Javadoc)

* @see org.apache.hadoop.io.Writable#write(java.io.DataOutput)

*/

@Override

public void write(DataOutput out) throws IOException {

out.writeInt(this.bossno);

out.writeUTF(this.bossname);

out.writeInt(this.deptNo);

out.writeInt(this.empno);

out.writeUTF(this.ename);

out.writeUTF(this.job);

out.writeUTF(this.hiredate);

out.writeFloat(this.salary);

out.writeFloat(this.comm);

out.writeBoolean(this.isTableHeader);

}

/*

* 采用默认排序即可

* (non-Javadoc)

* @see java.lang.Comparable#compareTo(java.lang.Object)

*/

@Override

public int compareTo(EmployeeBossBean o) {

return 0;

}

}

3、Mapper、Reducer、job实现

package cn.sjq.bigdata.mr.self.join;

import java.io.IOException;

import java.util.ArrayList;

import org.apache.commons.lang.StringUtils;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.NullWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.junit.Test;

/**

* 通过MapReduce实现emp表自连接

* 原始数据如下:

7369,SMITH,CLERK,7902,1980/12/17,800,,20

7499,ALLEN,SALESMAN,7698,1981/2/20,1600,300,30

7521,WARD,SALESMAN,7698,1981/2/22,1250,500,30

7566,JONES,MANAGER,7839,1981/4/2,2975,,20

7654,MARTIN,SALESMAN,7698,1981/9/28,1250,1400,30

7698,BLAKE,MANAGER,7839,1981/5/1,2850,,30

7782,CLARK,MANAGER,7839,1981/6/9,2450,,10

7788,SCOTT,ANALYST,7566,1987/4/19,3000,,20

7839,KING,PRESIDENT,,1981/11/17,5000,,10

7844,TURNER,SALESMAN,7698,1981/9/8,1500,0,30

7876,ADAMS,CLERK,7788,1987/5/23,1100,,20

7900,JAMES,CLERK,7698,1981/12/3,950,,30

7902,FORD,ANALYST,7566,1981/12/3,3000,,20

7934,MILLER,CLERK,7782,1982/1/23,1300,,10

最终输出结果如下:

通过编写MapReduce程序实现emp表自连接,输出格式为:

BOSSNO BOSSNAM EMPNO ENAME JOB MGR HIREDATE SAL COMM DEPTNO

7698 BLAKE 7499 ALLEN SALESMAN 7698 1981/2/20 1600 300 30

7698 BLAKE 7521 WARD SALESMAN 7698 1981/2/22 1250 500 30

7698 BLAKE 7654 MARTIN SALESMAN 7698 1981/9/28 1250 1400 30

实现逻辑:

(1)emp表中mgr为员工老板的编号

(2)老板mgr也是一位员工

(3)关系型数据库中实现逻辑为对emp表进行重命名作为多张表使用,假设这里emp e表示员工,emp b表示boss老板,因此核心where条件为e.empno = b.mgr这样就可以实现emp表的自连接

(4)MapReduce也是利用这样连接条件来实现emp表的自连接

假设 EMPNO ENAME MGR

7521 WARD 7698

7654 MARTIN 7698

作为emp普通员工表

BOSSNO(MGR) (BOSSNAME)ENAME

7698 BLAKE

作为boss老板表,这样连接条件就为emp.empno = boss.bossno

这样,如果我们将mapper中输入的数据作为emp普通员工表,那输出的

<k2 v2>

BOSSNO(MGR) EmployeeBean对象

如果我们将mapper中输入的数据作为老板表,那输出的

<k2 v2>

EMPNO EmployeeBean对象

最后在结合MapReduce原理,相同的Key会采用同一Reduce处理,就可以实现我们emp表的自连接。

* @author songjq

*

*/

public class EmpSelfJoin {

/**

* Mapper端类

* k1:输入偏移量 LongWritable

* v1:读入的一行数据 Text

* k2:输出key IntWritable

* v2:输出value EmployeeBean

* @author songjq

*/

static class EmpSelfJoinMapper extends Mapper<LongWritable, Text, IntWritable, EmployeeBean> {

@Override

protected void map(LongWritable k1, Text v1, Context context) throws IOException, InterruptedException {

//读入数据

String line = v1.toString();

//分词

String[] fields = line.split(",");

//定义员工对象

EmployeeBean emp = new EmployeeBean();

// 7369,SMITH,CLERK,7902,1980/12/17,800,,20

emp.setDeptNo(Integer.parseInt(fields[7]));

emp.setEmpno(Integer.parseInt(fields[0]));

emp.setEname(fields[1]);

emp.setJob(fields[2]);

try {

emp.setMgr(Integer.parseInt(fields[3]));

}catch (Exception e) {

emp.setMgr(0);

}

emp.setHiredate(fields[4]);

try {

// 防止fileds[5]为空抛空指针异常

emp.setSalary(Float.parseFloat(fields[5]));

} catch (Exception e) {

emp.setSalary(0);

}

try {

// 防止fileds[6]为空抛空指针异常

emp.setComm(Float.parseFloat(fields[6]));

} catch (Exception e) {

emp.setComm(0);

}

//0 表示员工

emp.setFlag(0);

//将输入的数据作为员工表写出去

context.write(new IntWritable(emp.getMgr()), emp);

//将输入的数据作为老板表写出去

//1 表示老板

emp.setFlag(1);

context.write(new IntWritable(emp.getEmpno()), emp);

}

}

/**

* Reducer端

* k3:输入key IntWritable

* v3:输ru value EmployeeBean集合

* k4:输出key NullWritable

* v4:输出value EmployeeBossBean

* 利用Reduce相同key会被同意Reduce处理原理实现emp表自连接

* @author songjq

*

*/

static class EmpSelfJoinReducer extends Reducer<IntWritable, EmployeeBean, NullWritable, EmployeeBossBean> {

@Override

protected void reduce(IntWritable k3, Iterable<EmployeeBean> v3, Context ctx)

throws IOException, InterruptedException {

//定义最终输出的对象list

ArrayList<EmployeeBossBean> eblist = new ArrayList<EmployeeBossBean>();

//定义老板属性

String bossname = "";

// 老板是否有员工计数器,1表示老板没有员工,大于1表示有员工

for (EmployeeBean emp : v3) {

if (emp.getFlag() == 1) {

// 老板对象

bossname = emp.getEname();

} else if (emp.getFlag() == 0) {

// 员工对象

EmployeeBossBean ebtmp = new EmployeeBossBean();

ebtmp.setDeptNo(emp.getDeptNo());

ebtmp.setEmpno(emp.getEmpno());

ebtmp.setEname(emp.getEname());

ebtmp.setJob(emp.getJob());

ebtmp.setHiredate(emp.getHiredate());

ebtmp.setSalary(emp.getSalary());

ebtmp.setComm(emp.getComm());

ebtmp.setBossno(emp.getMgr());

eblist.add(ebtmp);

}

}

// 将老板名称追加到eblist

for (EmployeeBossBean eb : eblist) {

eb.setBossname(bossname);

ctx.write(NullWritable.get(), eb);

}

}

/*

* 初始化输出文件表头

* (non-Javadoc)

* @see org.apache.hadoop.mapreduce.Reducer#setup(org.apache.hadoop.mapreduce.Reducer.Context)

*/

@Override

protected void setup(Context context) throws IOException, InterruptedException {

//BOSSNO BOSSNAM EMPNO ENAME JOB MGR HIREDATE SAL COMM DEPTNO

EmployeeBossBean eheader = new EmployeeBossBean();

eheader.setTableHeader(true);

context.write(NullWritable.get(), eheader);

}

}

/**

* 提交Job

* @throws Exception

*/

@Test

public void EmpSelfJoinJob() throws Exception {

Job job = Job.getInstance(new Configuration());

job.setJarByClass(EmpSelfJoin.class);

job.setMapperClass(EmpSelfJoinMapper.class);

job.setMapOutputKeyClass(IntWritable.class);

job.setMapOutputValueClass(EmployeeBean.class);

job.setReducerClass(EmpSelfJoinReducer.class);

job.setOutputKeyClass(NullWritable.class);

job.setOutputValueClass(EmployeeBossBean.class);

FileInputFormat.setInputPaths(job, new Path("D:\\test\\tmp\\join\\self\\emp.csv"));

FileOutputFormat.setOutputPath(job, new Path("D:\\test\\tmp\\join\\self\\output4"));

job.waitForCompletion(true);

}

}

4、执行输出结果

BOSSNO BOSSNAME    DEPTNO DEPTNO  ENAME       JOB         HIREDATE    SAL   COMM

7566 JONES       20      7902 FORD        ANALYST     1981/12/3   3000.0 -

7566 JONES       20      7788 SCOTT       ANALYST     1987/4/19   3000.0 -

7698 BLAKE       30      7499 ALLEN       SALESMAN    1981/2/20   1600.0 300.0

7698 BLAKE       30      7900 JAMES       CLERK       1981/12/3   950.0 -

7698 BLAKE       30      7844 TURNER      SALESMAN    1981/9/8    1500.0 -

7698 BLAKE       30      7654 MARTIN      SALESMAN    1981/9/28   1250.0 1400.0

7698 BLAKE       30      7521 WARD        SALESMAN    1981/2/22   1250.0 500.0

7782 CLARK       10      7934 MILLER      CLERK       1982/1/23   1300.0 -

7788 SCOTT       20      7876 ADAMS       CLERK       1987/5/23   1100.0 -

7839 KING        10      7782 CLARK       MANAGER     1981/6/9    2450.0 -

7839 KING        30      7698 BLAKE       MANAGER     1981/5/1    2850.0 -

7839 KING        20      7566 JONES       MANAGER     1981/4/2    2975.0 -

7902 FORD        20      7369 SMITH       CLERK       1980/12/17  800.0 -

原文地址:http://blog.51cto.com/2951890/2153420

时间: 2024-10-06 02:30:36

2018-08-02 期 MapReduce实现多表查询自连接的相关文章

2018.08.02

好久没有写了.低落的心情,读博的事情没想好不说,开题也着实愁人. 想想一年之前,大四的自己想到一年之后自己又要做毕业设计了就是烦烦的,,如今呀,,确实还是烦烦的! 真心不适合搞科研的,读博做什么呢?我是在逃避找工作的事实么? 人生没有多少个5年的.真心希望自己无愧于父母呀.哎. 这个月计划: 开题(如果可以尽快完成就可以回家呆几天了) 去南京比赛 回来比赛 考虑清楚是否读博以及和谁读. 突然想起YKH老师,抛硬币决定未来... 加油! 原文地址:https://www.cnblogs.com/g

oracle多表查询-自连接

自连接 *自连接的实质是:将同一张表看成是多张表. *举例: *查询所有员工的姓名及其直属上级的姓名 喜欢就关注我哦 原文地址:https://www.cnblogs.com/zhiyanwenlei/p/9638132.html

MapReduce实现两表的Join--原理及python和java代码实现

用Hive一句话搞定的,但是有时必须要用mapreduce 方法介绍 1. 概述 在传统数据库(如:MYSQL)中,JOIN操作是非常常见且非常耗时的.而在HADOOP中进行JOIN操作,同样常见且耗时,由于Hadoop的独特设计思想,当进行JOIN操作时,有一些特殊的技巧. 本文首先介绍了Hadoop上通常的JOIN实现方法,然后给出了几种针对不同输入数据集的优化方法. 2. 常见的join方法介绍 假设要进行join的数据分别来自File1和File2. 2.1 reduce side jo

DLUBAL RSTAB 8.08.02 Win64 1DVD

CATIA Composer R2018 build 7.5.0.1279 Win64 1DVD Sysnopy Coretools vK-2015.06 SP5 Linux32_64 2CD Cadence Design Systems OpenPOWER Compliance Kits for Sigrity 2017 1DVD Geometric.Stackup.2.1.0.15228.Win32_64 2CD Siemens.FEMAP.v11.4.0.Win64 1DVD Ventuz

大数据技术之MapReduce中多表合并案例

大数据技术之MapReduce中多表合并案例 1)需求: 订单数据表t_order: id pid amount 1001 01 1 1002 02 2 1003 03 3 订单数据order.txt 1001 01 1 1002 02 2 1003 03 3 1004 01 4 1005 02 5 1006 03 6 商品信息表t_product pid pname 01 小米 02 华为 03 格力 商品数据pd.txt 01 小米 02 华为 03 格力 将商品信息表中数据根据商品pid合

【转换博客通知】(本博客2015.08.02停用)

2015.08.02 本博客停用,搬运至 http://tonyfang.is-programmer.com/ 感谢各位对我的支持!

饥饿疗法是目前唯一确信能够延缓衰老的办法:4星|《三联生活周刊》2018年3期

三联生活周刊·人类到底能活多久:抗衰老科学指南(2018年3期) 本期主题是抗衰老,科学记者袁越走访了全球抗衰老研究的顶级机构,把这个领域最前沿的进展深入浅出地展现出来,非常有价值.这一类报道也是国内比较稀缺的. 总体评价4星. 以下是书中一些内容的摘抄,#号后面是kindle电子版中的页码,[]中是我根据上下文补充的信息: 1:2016年世界人均一次性能源消费量为1.87吨油当量,中国为2.25吨,相比十几年前还不足1吨有了飞跃,但只相当于经合组织(OECD)4.5吨的一半.OECD目前有34

沈阳当年对学校承认了他和高岩的性关系:3星|《三联生活周刊》2018年16期

三联生活周刊·教授的权力:高校内的不平等关系(2018年16期) 本期主题是高校教师性侵学生的调查与思考. 总体评价3星,有参考价值. 以下是书中一些内容的摘抄,#号后面是kindle电子版中的页码: 1:自第二次世界大战以来,以色列制造的暗杀事件比任何西方国家都多.该国领导人甚至认为通过杀戮指定的目标保护其国家安全,危害无辜平民的生命是合情合理的.#52 2:旅游业难以聚集大量的财富,给其从业者带来的回报也有限,这就是为什么海南成了高消费的代名词,可当地人收入却普遍不高的原因.这也可以说是资源

新手C#string类常用函数的学习2018.08.04

ToLower()用于将字符串变为小写,注意字符串的不可变特性,需要重新赋值给另一个字符串变量. s = s.ToLower();//字符串具有不可变性,转换后需要重新赋值,不可仅有s.ToLower(); 这可以使用户的输入不区分大小写,例如验证码. ToUpper()用于将字符串全部变为大写,与上面类似. Trim()可以用于去掉两边的空格. string s1 = " a b c "; s1 = s1.Trim();//用于去除字符串两边的空格 Console.WriteLine