hadoop map端join

   map端的联结比reduce端的联结实现起来复杂,而且限制也多,一般我们将小表置于内存中, 对于大表的一个纪录我们在内存中查找即可。

   改例子摘自hadoop基础教程, 我们实现sales和accounts的联结, 其中sales记录的顾客的销售信息,accounts纪录的是用户的账户信息,我们的目的是统计每个用户消费的次数和消费总额。

  数据如下:

  sales.txt

  

002 12.29   2004-07-02
004 13.42   2005-12-20
003 499.99  2010-12-20
001 78.95   2012-04-02
002 21.99   2006-11-30
002 93.45   2008-09-10
001 9.99    2012-05-17

  accounts.txt

002 Abigail SmithPremium    2004-07-13
003 April StevensStandard   2010-12-20
004 Nasser HafezPremium 2001-04-23

代码如下:  

import java.io.*;
import java.util.*;

import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class MapJoin {
	public static class MapJoinMapper extends Mapper<Object, Text, Text, Text> {
		public Map<String, String> joinData = new HashMap();
		//执行连接操作
		public void map(Object key, Text value, Context context) throws IOException, InterruptedException{
			String[] values = value.toString().split("\t");
			context.write(new Text(joinData.get(values[0])), value);
		}
		//加载小表
		public void setup(Context context) throws IOException, InterruptedException{
			Path[] path = DistributedCache.getLocalCacheFiles(context.getConfiguration());
			BufferedReader reader = new BufferedReader(new FileReader(path[0].toString()));
			String str = null;
			while((str = reader.readLine()) != null) {
				String[] s = str.split("\t");
				joinData.put(s[0], s[1]);
			}
		}
	}

	public static class MapJoinReducer extends Reducer<Text, Text, Text, Text> {
		public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException{
			int ci = 0;
			double total = 0.0;
			for(Text val : values) {
				ci ++;
				String[] v = val.toString().split("\t");
				total += Float.parseFloat(v[1]);
			}
			String str = String.format("%d\t%f", ci, total);
			context.write(key, new Text(str));
		}
	}	

	public static void main(String[] args) throws Exception{
		Configuration conf = new Configuration();
		DistributedCache.addCacheFile(new Path(args[1]).toUri(), conf);

		Job job = new Job(conf, "MapJoin");
		//设置相关类
		job.setJarByClass(MapJoin.class);
		job.setMapperClass(MapJoinMapper.class);
		job.setReducerClass(MapJoinReducer.class);

		//设置map输出格式
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(Text.class);

		//设置输入输出文件
		FileInputFormat.addInputPath(job, new Path(args[0]));
		FileOutputFormat.setOutputPath(job, new Path(args[2]));

		//等待作业执行完毕
		System.exit(job.waitForCompletion(true)?0:1);
	}
}

  

时间: 2024-08-24 08:46:31

hadoop map端join的相关文章

hadoop的压缩解压缩,reduce端join,map端join

hadoop的压缩解压缩 hadoop对于常见的几种压缩算法对于我们的mapreduce都是内置支持,不需要我们关心.经过map之后,数据会产生输出经过shuffle,这个时候的shuffle过程特别需要消耗网络资源,它传输的数据量越少,对作业的运行时间越有意义,在这种情况下,我们可以对输出进行一个压缩.输出压缩之后,reducer就要接收,然后再解压,reducer处理完之后也需要做输出,也可以做压缩.对于我们程序而言,输入的压缩是我们原来的,不是程序决定的,因为输入源就是这样子,reduce

map端join

package my.hadoop.hdfs.mapreduceJoin; import java.io.BufferedReader; import java.io.FileInputStream; import java.io.IOException; import java.io.InputStreamReader; import java.net.URI; import java.net.URISyntaxException; import java.util.HashMap; impo

Hadoop.2.x_高级应用_二次排序及MapReduce端join

一.对于二次排序案例部分理解 1. 分析需求(首先对第一个字段排序,然后在对第二个字段排序) 杂乱的原始数据 排序完成的数据 a,1 a,1 b,1 a,2 a,2 [排序] a,100 b,6 ===> b,-3 c,2 b,-2 b,-2 b,1 a,100 b,6 b,-3 c,-7 c,-7 c,2 2. 分析[MapRedice过程] 1> 分析数据传入通过input()传入map() 2> map()对数据进行层层过滤,以达到我们想要的数据源, 3> 过滤方法中可添加自

hadoop编程小技巧(1)---map端聚合

测试hadoop版本:2.4 Map端聚合的应用场景:当我们只关心所有数据中的部分数据时,并且数据可以放入内存中. 使用的好处:可以大大减小网络数据的传输量,提高效率: 一般编程思路:在Mapper的map函数中读入所有数据,然后添加到一个List(队列)中,然后在cleanup函数中对list进行处理,输出我们关系的少量数据. 实例: 在map函数中使用空格分隔每行数据,然后把每个单词添加到一个堆栈中,在cleanup函数中输出堆栈中单词次数比较多的单词以及次数: package fz.inm

Hadoop on Mac with IntelliJ IDEA - 10 陆喜恒. Hadoop实战(第2版)6.4.1(Shuffle和排序)Map端 内容整理

下午对着源码看陆喜恒. Hadoop实战(第2版)6.4.1  (Shuffle和排序)Map端,发现与Hadoop 1.2.1的源码有些出入.下面作个简单的记录,方便起见,引用自书本的语句都用斜体表示. 依书本,从MapTask.java开始.这个类有多个内部类: 从书的描述可知,collect()并不在MapTask类,而在MapOutputBuffer类,其函数功能是 1.定义输出内存缓冲区为环形结构2.定义输出内存缓冲区内容到磁盘的操作 在collect函数中将缓冲区的内容写出时会调用s

hadoop核心逻辑shuffle代码分析-map端

首先要推荐一下:http://www.alidata.org/archives/1470 阿里的大牛在上面的文章中比较详细的介绍了shuffle过程中mapper和reduce的每个过程,强烈推荐先读一下. 不过,上文没有写明一些实现的细节,比如:spill的过程,mapper生成文件的 partition是怎么做的等等,相信有很多人跟我一样在看了上面的文章后还是有很多疑问,我也是带着疑问花了很久的看了cdh4.1.0版本 shuffle的逻辑,整理成本文,为以后回顾所用. 首先用一张图展示下m

MapReduce实现Reduce端Join操作实例

使用案例: 联接两张表 Table EMP:(新建文件EMP,第一行属性名不要) Name Sex Age DepNo zhang male 20 1 li female 25 2 wang female 30 3 zhou male 35 2 Table Dep:(新建文件DEP,第一行属性名不要) DepNo DepName 1 Sales 2 Dev 3 Mgt Inner join: select Name,Sex,Age,DepName from EMP inner join DEP

hadoop reduce端联结

此例子摘自hadoop基础教程. 其中sales.txt内容如下 客户编号 客户消费额度 消费时间001 35.99 2012-03-15 002 12.29 2004-07-02 004 13.42 2005-12-20 003 499.99 2010-12-20 001 78.95 2012-04-02 002 21.99 2006-11-30 002 93.45 2008-09-10 001 9.99 2012-05-17 accounts.txt内容如下: 客户编号 姓名 注册时间001

Hadoop的Reduce Join+BloomFilter实现表链接

适用于场景 连接的列数据量很大,在分布式缓存中无法存储时,Bloom Filter 可解决这个问题,用很小的内存可有MAP端过滤掉不需要JOIN的数据,这样传到REDUCE的数据量减少,减少了网络传及磁盘IO. 缺点 Bloom Filter 会有一定的错误率,但是错误率很低,用空间换取了时间.并且,最终的JOIN在REDUCE端还要进行比对,所以对最终结果无影响. 下面我们先来简单了解下什么是布隆过滤器? Bloom Filter的中文翻译叫做布隆过滤器,是1970年由布隆提出的.它实际上是一