hadoop join问题1

Join讲解1: 获取员工所在部门信息,输出格式要求:员工编号,员工姓名,部门名称,部门编号

1、原始数据

员工数据

empno ename
job        mgr
hiredate sal
comm deptno
loc

7499 allen
salesman 7698
1981-02-20 1600
300 30

7782 clark
managers 7639
1981-06-09 2450
10

7654 martin
salesman 7698
1981-03-20 1250
1400 30
boston

7900 james
clerk 7698
1981-01-09 950
30

7788 scott
analyst 7566
1981-09-01 3000
100 20

部门数据

deptno dname
loc

30 sales
chicago

20 research
dallas

10 accounting
newyork

2、实现的功能类似于

select e.empno,e.ename,d.dname,d.deptno from emp e join dept d on e.deptno=d.deptno;

key: deptno

第一种思路:

Text:empno_ename_0/deptno_dname_1;

第二种思路:

Consume bean: empno/ename/deptno/dname/flag

3、处理join的思路:

将Join key 当作map的输出key, 也就是reduce的输入key ,  这样只要join的key相同,shuffle过后,就会进入到同一个reduce 的key - value list 中去。

需要为join的2张表设计一个通用的一个bean.  并且bean中加一个flag的标志属性,这样可以根据flag来区分是哪张表的数据。

reduce 阶段根据flag来判断是员工数据还是部门数据就很容易了 。而join的真正处理是在reduce阶段。

4、实现中间bean

存储数据的bean  (由于数据要在网络上传输必须序列化,hadoop处理的时候需要分组和排序,所以要实现WritableComparable接口):

package com.wy.hadoop.join.one;

import java.io.DataInput;

import java.io.DataOutput;

import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;

public class Emplyee implements WritableComparable {

private String empNo ="";

private String empName ="";

private String deptNo="";

private String deptName="";

private int flag =0;

public Emplyee(){};

public Emplyee(String empNo,String empName,String deptNo,String deptName,int flag){

this.empNo = empNo;

this.empName = empName;

this.deptNo = deptNo;

this.deptName = deptName;

this.flag = flag;

}

public Emplyee(Emplyee e){

this.empNo = e.empNo;

this.empName = e.empName;

this.deptNo = e.deptNo;

this.deptName = e.deptName;

this.flag = e.flag;

}

public String getEmpNo() {

return empNo;

}

public void setEmpNo(String empNo) {

this.empNo = empNo;

}

public String getEmpName() {

return empName;

}

public void setEmpName(String empName) {

this.empName = empName;

}

public String getDeptNo() {

return deptNo;

}

public void setDeptNo(String deptNo) {

this.deptNo = deptNo;

}

public String getDeptName() {

return deptName;

}

public void setDeptName(String deptName) {

this.deptName = deptName;

}

public int getFlag() {

return flag;

}

public void setFlag(int flag) {

this.flag = flag;

}

@Override

public void readFields(DataInput input) throws IOException {

// TODO Auto-generated method stub

this.empNo = input.readUTF();

this.empName = input.readUTF();

this.deptNo = input.readUTF();

this.deptName = input.readUTF();

this.flag = input.readInt();

}

@Override

public void write(DataOutput output) throws IOException {

// TODO Auto-generated method stub

output.writeUTF(this.empNo);

output.writeUTF(this.empName);

output.writeUTF(this.deptNo);

output.writeUTF(this.deptName);

output.writeInt(this.flag);

}

@Override

public int compareTo(Object o) {

// TODO Auto-generated method stub

return 0;

}

@Override

public String toString() {

String string = this.empNo+","+this.empName+","+this.deptName;

return string;

}

}

package com.hadoop.eight;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Mapper;

public class JoinOneMapper extends Mapper<LongWritable, Text, IntWritable, Emplyee> {

@Override

protected void map(LongWritable key, Text value,Context context)

throws IOException, InterruptedException {

String line = value.toString();

String[] arr = line.split("\t");

System.out.println("-------"+arr.length);

if(arr.length<=3){//部门数据

Emplyee emplyee = new Emplyee();

emplyee.setDeptno(arr[0]);

emplyee.setDname(arr[1]);

emplyee.setFlag(0);

context.write(new IntWritable(Integer.parseInt(arr[0])), emplyee);

}else{//员工信息

Emplyee emplyee = new Emplyee();

emplyee.setEmpno(arr[0]);

emplyee.setEname(arr[1]);

emplyee.setDeptno(arr[7]);

emplyee.setFlag(1);

context.write(new IntWritable(Integer.parseInt(arr[7])), emplyee);

}

}

public static void main(String[] args){

String line = "30
sales chicago";

String[] arr = line.split("\t");

System.out.println("-------"+arr.length);

if(arr.length==3){//部门数据

System.out.println("----1");

}else{//员工信息

System.out.println("----2");

}

}

}

package com.hadoop.eight;

import java.io.IOException;

import java.util.ArrayList;

import java.util.List;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.NullWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Reducer;

public class JoinOneReducer extends Reducer<IntWritable, Emplyee, NullWritable, Text> {

@Override

protected void reduce(IntWritable key, Iterable<Emplyee> values,Context context)

throws IOException, InterruptedException {

Emplyee dept = null;

List<Emplyee> list = new ArrayList<Emplyee>();

for(Emplyee e:values){

if(e.getFlag()==0){//部门数据

dept = new Emplyee(e);

list.add(new Emplyee(e));

}else{

list.add(new Emplyee(e));

}

}

for(Emplyee e:list){

e.setDname(dept.getDname());

context.write(NullWritable.get(), new Text(e.toString()));

}

}

}

package com.hadoop.eight;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.FileSystem;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.NullWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class JobMain {

/**

* @param args

*/

public static void main(String[] args)throws Exception {

Configuration configuration = new Configuration();

Job job = new Job(configuration,"join_one_job");

job.setJarByClass(JobMain.class);

job.setMapperClass(JoinOneMapper.class);

job.setMapOutputKeyClass(IntWritable.class);

job.setMapOutputValueClass(Emplyee.class);

job.setReducerClass(JoinOneReducer.class);

job.setOutputKeyClass(NullWritable.class);

job.setOutputValueClass(Text.class);

FileInputFormat.addInputPath(job, new Path(args[0]));

Path path = new Path(args[1]);

FileSystem fs = FileSystem.get(configuration);

if(fs.exists(path)){

fs.delete(path, true);

}

FileOutputFormat.setOutputPath(job, path);

System.exit(job.waitForCompletion(true)?0:1);

}

}

时间: 2024-11-15 20:55:44

hadoop join问题1的相关文章

Hadoop Join

Hadoop 中的join分为三种 Reduce端join,适合于两个大表 Map端join,适合一个大表和一个小表,小表放到 Distribute Cache里面 semi join 当join只用到其中一个表中的一小部分时 Reduce端join 读入两个大表,对value按文件进行标记 在Reduce端收集属于不同文件的value到不同的list,对同一key的不同list中的value做笛卡尔积 Logger 用来记录错误 Counter 用来记数想要的一些数据 configuratio

hadoop mapreduce排序原理

 hadoop  mapreduce排序原理 Hadoop 案例3----数据排序  简单问题  (入门级别) "数据排序"是许多实际任务执行时要完成的第一项工作, 比如学生成绩评比.数据建立索引等.这个实例和数据去重类似,都是先对原始数据进行初步处理,为进一步的数据操作打好基础.下面进入这个示例. 1.需求描述 对输入文件中数据进行排序.输入文件中的每行内容均为一个数字,即一个数据. 要求在输出中每行有两个间隔的数字,其中,第一个代表原始数据在原始数据集中的位次,第二个代表原始数据.

(转)MapReduce 中的两表 join 几种方案简介

1. 概述 在传统数据库(如:MYSQL)中,JOIN操作是非常常见且非常耗时的.而在HADOOP中进行JOIN操作,同样常见且耗时,由于Hadoop的独特设计思想,当进行JOIN操作时,有一些特殊的技巧. 本文首先介绍了Hadoop上通常的JOIN实现方法,然后给出了几种针对不同输入数据集的优化方法. 2. 常见的join方法介绍 假设要进行join的数据分别来自File1和File2. 2.1 reduce side join reduce side join是一种最简单的join方式,其主

MapReduce 中的两表 join 实例

1. 概述 在传统数据库(如:MYSQL)中,JOIN操作是非常常见且非常耗时的.而在HADOOP中进行JOIN操作,同样常见且耗时,由于Hadoop的独特设计思想,当进行JOIN操作时,有一些特殊的技巧. 2. 常见的join方法介绍 假设要进行join的数据分别来自File1和File2. reduce side join是一种最简单的join方式,其主要思想如下: 在map阶段,map函数同时读取两个文件File1和File2,为了区分两种来源的key/value数据对,对每条数据打一个标

100 open source Big Data architecture papers for data professionals

zhuan :https://www.linkedin.com/pulse/100-open-source-big-data-architecture-papers-anil-madan Big Data technology has been extremely disruptive with open source playing a dominant role in shaping its evolution. While on one hand it has been disruptiv

mapreduce 平均值

mapreduce 平均值 1.需求分析 对输入文件中数据进行就算学生平均成绩.输入文件中的每行内容均为一个学生的姓名和他相应的成绩,如果有多门学科,则每门学科为一个文件. 要求在输出中每行有两个间隔的数据,其中,第一个代表学生的姓名,第二个代表其平均成绩. 2.原始数据 1)math: 张三    88 李四    99 王五    66 赵六    77 2)china: 张三    78 李四    89 王五    96 赵六    67 3)english: 张三    80 李四  

读完这100篇论文 就能成大数据高手(附论文下载)

100 open source Big Data architecture papers for data professionals. 读完这100篇论文 就能成大数据高手 作者 白宁超 2016年4月16日13:38:49 摘要:本文基于PayPal高级工程总监Anil Madan写的大数据文章,其中涵盖100篇大数据的论文,涵盖大数据技术栈(数据存储层.键值存储.面向列的存储.流式.交互式.实时系统.工具.库等),全部读懂你将会是大数据的顶级高手.作者通过引用Anil Madan原文和CS

Hadoop中两表JOIN的处理方法(转)

1. 概述 在传统数据库(如:MYSQL)中,JOIN操作是非常常见且非常耗时的.而在HADOOP中进行JOIN操作,同样常见且耗时,由于Hadoop的独特设计思想,当进行JOIN操作时,有一些特殊的技巧. 本文首先介绍了Hadoop上通常的JOIN实现方法,然后给出了几种针对不同输入数据集的优化方法. 2. 常见的join方法介绍 假设要进行join的数据分别来自File1和File2. 2.1 reduce side join reduce side join是一种最简单的join方式,其主

Hadoop多目录输入,join,进入reduce,数据流分析

前言 在做需求时,经常遇到多个目录,也就是多个维度进行join,这里分析一下,数据是怎么流动的. 1.多目录输入 使用MultipleInputs.addInputPath()  对多目录制定格式和map 2.数据流分析 map按行读入数据,需要对不同的输入目录,打上不同的标记(这个方法又叫reduce端连接),map在输出后会进行partition和sort,按照key进行排序,然后输出到reduce进行处理. 例子 三个输入文件: a.txt: 500 501 b.txt: 500 501