MapReduce的Reduce side Join

1. 简单介绍

reduce side  join是全部join中用时最长的一种join,可是这样的方法可以适用内连接、left外连接、right外连接、full外连接和反连接等全部的join方式。reduce side
 join不仅能够对小数据进行join,也能够对大数据进行join,可是大数据会占用大量的集群内部网络IO,由于全部数据终于要写入到reduce端进行join。

假设要做join的数据量很大的话。就不得不用reduce join了。

2. 适用场景

-join的两部分数据量很大;

-想要通过一种模式灵活的适用多种join。

3.Reduce side  join的架构

3.1 map 阶段

map 阶段首先从数据中提取出join的foreign key作为map输出的key,然后将输入的记录所有作为输出value。输出的value须要依据输入的数据集打上数据集的标签,比方在value的开头加上‘A’‘B’的标签。

3.2 reduce阶段

reduce端对具有相同foreign key的数据进行处理,对具有标签‘A‘和‘B‘的数据进行迭代处理,下面分别用伪代码对不同的join的处理进行说明。

-内连接:假设带有标签‘A’和‘B’的数据都存在,遍历并连接这些数据,然后输出

if (!listA.isEmpty() && !listB.isEmpty()) {
     for (Text A : listA) {
          for (Text B : listB) {
          context.write(A, B);
          }
     }
}

-左外连接:右边的数据假设存在就与左边连接,否则将右边的字段都赋null。仅仅输出左边

// For each entry in A,
for (Text A : listA) {
// If list B is not empty, join A and B
     if (!listB.isEmpty()) {
          for (Text B : listB) {
               context.write(A, B);
          }
     } else {
// Else, output A by itself
          context.write(A, EMPTY_TEXT);
     }
}

-右外连接:与左外连接类似。左边为空就将左边赋值null,仅仅输出右边

// For each entry in B,
for (Text B : listB) {
// If list A is not empty, join A and B
     if (!listA.isEmpty()) {
          for (Text A : listA) {
               context.write(A, B);
          }
     } else {
// Else, output B by itself
          context.write(EMPTY_TEXT, B);
     }
}

-全外连接:这个要相对复杂点,首先输出A和B都不为空的。然后输出某一边为空的

// If list A is not empty
if (!listA.isEmpty()) {
// For each entry in A
     for (Text A : listA) {
// If list B is not empty, join A with B
          if (!listB.isEmpty()) {
               for (Text B : listB) {
                    context.write(A, B);
               }
          } else {
          // Else, output A by itself
               context.write(A, EMPTY_TEXT);
          }
     }
} else {
// If list A is empty, just output B
     for (Text B : listB) {
          context.write(EMPTY_TEXT, B);
     }
}

-反连接:输出A和B没有共同foreign key的值

// If list A is empty and B is empty or vice versa
if (listA.isEmpty() ^ listB.isEmpty()) {
// Iterate both A and B with null values
// The previous XOR check will make sure exactly one of
// these lists is empty and therefore the list will be skipped
     for (Text A : listA) {
          context.write(A, EMPTY_TEXT);
     }
     for (Text B : listB) {
          context.write(EMPTY_TEXT, B);
     }
}

4.实例

以下举一个简单的样例,要求可以用reduce side join方式实现以上全部的join。

4.1数据

User 表

---------------------------
username     cityid
--------------------------
 Li lei,       1
Xiao hong,     2
Lily,          3
Lucy,          3
Daive,         4
Jake,          5
Xiao Ming,     6

City表

---------------------------
cityid     cityname
--------------------------
1,     Shanghai
2,     Beijing
3,     Jinan
4,     Guangzhou
7,     Wuhan
8,     Shenzhen

4.2 代码介绍

写两个mapper,一个mapper处理user数据,一个mapper处理city数据。在主函数中调用时用MultipleInputs类加入数据路径,并分别指派两个处理的Mapper。

往configuration中加入參数“join.type”,传给reducer,决定在reduce端採用什么样的join。

具体代码例如以下:

package com.study.hadoop.mapreduce;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class ReduceJoin {

	//user map
	public static class UserJoinMapper extends Mapper<Object, Text, Text, Text>{
		private Text outKey = new Text();
		private Text outValue = new Text();
		@Override
		protected void map(Object key, Text value, Context context)
				throws IOException, InterruptedException {
			// TODO Auto-generated method stub
			String line = value.toString();
			String[] items = line.split(",");

			outKey.set(items[1]);
			outValue.set("A"+items[0]);
			context.write(outKey, outValue);
		}
	}
	//city map
	public static class CityJoinMapper extends Mapper<Object, Text, Text, Text>{
		// TODO Auto-generated constructor stub
		private Text outKey = new Text();
		private Text outValue = new Text();
		@Override
		protected void map(Object key, Text value, Context context)
				throws IOException, InterruptedException {
			// TODO Auto-generated method stub
			String line = value.toString();
			String[] items = line.split(",");

			outKey.set(items[0]);
			outValue.set("B"+items[1]);
			context.write(outKey, outValue);
		}

	}
	public static class JoinReducer extends Reducer<Text, Text, Text, Text>{
		// TODO Auto-generated constructor stub
		//Join type:{inner,leftOuter,rightOuter,fullOuter,anti}
		private String joinType = null;
		private static final Text EMPTY_VALUE = new Text("");
		private List<Text> listA = new ArrayList<Text>();
		private List<Text> listB = new ArrayList<Text>();
		@Override
		protected void setup(Context context)
				throws IOException, InterruptedException {
			// TODO Auto-generated method stub
			//获取join的类型
			joinType = context.getConfiguration().get("join.type");
		}

		@Override
		protected void reduce(Text key, Iterable<Text> values,Context context)
				throws IOException, InterruptedException {
			// TODO Auto-generated method stub
			listA.clear();
			listB.clear();

			Iterator<Text> iterator = values.iterator();
			while(iterator.hasNext()){
				String value = iterator.next().toString();
				if(value.charAt(0)==‘A‘)
					listA.add(new Text(value.substring(1)));
				if(value.charAt(0)==‘B‘)
					listB.add(new Text(value.substring(1)));
			}
			joinAndWrite(context);
		}

		private void joinAndWrite(Context context)
				throws IOException, InterruptedException{
			//inner join
			if(joinType.equalsIgnoreCase("inner")){
				if(!listA.isEmpty() && !listB.isEmpty()) {
					for (Text A : listA)
						for(Text B : listB){
							context.write(A, B);
						}
				}
			}
			//left outer join
			if(joinType.equalsIgnoreCase("leftouter")){
				if(!listA.isEmpty()){
					for (Text A : listA){
						if(!listB.isEmpty()){
							for(Text B: listB){
								context.write(A, B);
							}
						}
						else{
							context.write(A, EMPTY_VALUE);
						}
					}
				}
			}
			//right outer join
			else if(joinType.equalsIgnoreCase("rightouter")){
				if(!listB.isEmpty()){
					for(Text B: listB){
						if(!listA.isEmpty()){
							for(Text A: listA)
								context.write(A, B);
						}else {
							context.write(EMPTY_VALUE, B);
						}
					}
				}
			}
			//full outer join
			else if(joinType.equalsIgnoreCase("fullouter")){
				if(!listA.isEmpty()){
					for (Text A : listA){
						if(!listB.isEmpty()){
							for(Text B : listB){
								context.write(A, B);
							}
						}else {
							context.write(A, EMPTY_VALUE);
						}
					}
				}else{
					for(Text B : listB)
						context.write(EMPTY_VALUE, B);
				}
			}
			//anti join
			else if(joinType.equalsIgnoreCase("anti")){
				if(listA.isEmpty() ^ listB.isEmpty()){
					for(Text A : listA)
						context.write(A, EMPTY_VALUE);
					for(Text B : listB)
						context.write(EMPTY_VALUE, B);
				}
			}
		}
	}

	public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
		// TODO Auto-generated method stub
		Configuration conf = new Configuration();
		String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();

	    if (otherArgs.length < 4)
	    {
	      System.err.println("params:<UserInDir> <CityInDir> <OutDir> <join Type>");
	      System.exit(1);
	    }
	    Job job = new Job(conf,"Reduce side join Job");
	    job.setJarByClass(ReduceJoin.class);
	    job.setReducerClass(JoinReducer.class);
	    job.setOutputKeyClass(Text.class);
	    job.setOutputValueClass(Text.class);
	    MultipleInputs.addInputPath(job, new Path(otherArgs[0]), TextInputFormat.class, UserJoinMapper.class);
	    MultipleInputs.addInputPath(job, new Path(otherArgs[1]), TextInputFormat.class, CityJoinMapper.class);
	    FileOutputFormat.setOutputPath(job, new Path(otherArgs[2]));
	    job.getConfiguration().set("join.type", otherArgs[3]);

	    System.exit(job.waitForCompletion(true) ? 0 : 1);
	}

}

4.3 结果

运行语句:

inner join:

left outer join:

right outer join:

full outer join:

anti join:

时间: 2024-08-26 05:57:17

MapReduce的Reduce side Join的相关文章

MapReduce实现Reduce端Join操作实例

使用案例: 联接两张表 Table EMP:(新建文件EMP,第一行属性名不要) Name Sex Age DepNo zhang male 20 1 li female 25 2 wang female 30 3 zhou male 35 2 Table Dep:(新建文件DEP,第一行属性名不要) DepNo DepName 1 Sales 2 Dev 3 Mgt Inner join: select Name,Sex,Age,DepName from EMP inner join DEP

hadoop的压缩解压缩,reduce端join,map端join

hadoop的压缩解压缩 hadoop对于常见的几种压缩算法对于我们的mapreduce都是内置支持,不需要我们关心.经过map之后,数据会产生输出经过shuffle,这个时候的shuffle过程特别需要消耗网络资源,它传输的数据量越少,对作业的运行时间越有意义,在这种情况下,我们可以对输出进行一个压缩.输出压缩之后,reducer就要接收,然后再解压,reducer处理完之后也需要做输出,也可以做压缩.对于我们程序而言,输入的压缩是我们原来的,不是程序决定的,因为输入源就是这样子,reduce

MapReduce中的Map join操作

可以使用setup进行去读,吧数据读取放到一个容器中,在map段去读的时候,可以根据ID就找出数据,然后再转化回来 map端的join 适用场景,小表可以全部读取放到内存中,两个在内存中装不下的大表,不适合Map端的join操作 在一个TaskTracker中可以运行多个map任务.每个map任务是一个java进程,如果每个map从HDFS中读取相同的小表内容,就有些浪费了.使用DistributedCache,小表内容可以加载在TaskTracker的linux磁盘上.每个map运行时只需要从

MapReduce编程之Semi Join多种应用场景与使用

Map Join 实现方式一 ● 使用场景:一个大表(整张表内存放不下,但表中的key内存放得下),一个超大表 ● 实现方式:分布式缓存 ● 用法: SemiJoin就是所谓的半连接,其实仔细一看就是reduce join的一个变种,就是在map端过滤掉一些数据,在网络中只传输参与连接的数据不参与连接的数据不必在网络中进行传输,从而减少了shuffle的网络传输量,使整体效率得到提高,其他思想和reduce join是一模一样的.说得更加接地气一点就是将小表中参与join的key单独抽出来通过D

MapReduce实现两表join

一.方法介绍 假设要进行join的数据分别来自File1和File2. 参考:https://blog.csdn.net/yimingsilence/article/details/70242604 1.1 reduce side join reduce side join是一种最简单的join方式,其主要思想如下:在map阶段,map函数同时读取两个文件File1和File2,为了区分两种来源的key/value数据对,对每条数据打一个标签(tag),比如:tag=0表示来自文件File1,t

MapReduce中的join算法-reduce端join

在海量数据的环境下,不可避免的会碰到join需求, 例如在数据分析时需要连接从不同的数据源中获取到数据. 假设有两个数据集:气象站数据库和天气记录数据库,并考虑如何合二为一. 一个典型的查询是:输出气象站的历史信息,同时各行记录也包含气象站的元数据信息. 气象站和天气记录的示例数据分别如下所示: Station ID            Station Name 011990-99999    SIHCCAJAVRI 012650-99999    TRNSET-HANSMOEN Statio

MapReduce表连接操作之Reduce端join

一:背景 Reduce端连接比Map端连接更为普遍,因为输入的数据不需要特定的结构,但是效率比较低,因为所有数据都必须经过Shuffle过程. 二:技术实现 基本思路 (1):Map端读取所有的文件,并在输出的内容里加上标示,代表数据是从哪个文件里来的. (2):在reduce处理函数中,按照标识对数据进行处理. (3):然后根据Key去join来求出结果直接输出. 数据准备 准备好下面两张表: (1):tb_a(以下简称表A) [java] view plain copy id  name 1

MapReduce的reduce函数里的key用的是同一个引用

最近写MapReduce程序,出现了这么一个问题,程序代码如下: 1 package demo; 2 3 import java.io.IOException; 4 import java.util.HashMap; 5 import java.util.Map; 6 import java.util.Map.Entry; 7 8 import org.apache.hadoop.fs.FSDataOutputStream; 9 import org.apache.hadoop.fs.FileS

mapreduce中reduce中的迭代器只能调用一次!

亲测,只能调用一次,如果想想在一次reduce重复使用迭代器中的数据,得先取出来放在list中然后在从list中取出来!!多次读取reduce函数中迭代器的数据 public static class FindFriendReducer extends Reducer<Text, AllInfoBean, AllInfoBean, NullWritable> { protected void reduce(Text Keyin, Iterable<AllInfoBean> valu