MapReduce中的Map join操作

可以使用setup进行去读,吧数据读取放到一个容器中,在map段去读的时候,可以根据ID就找出数据,然后再转化回来

map端的join

适用场景,小表可以全部读取放到内存中,两个在内存中装不下的大表,不适合Map端的join操作

在一个TaskTracker中可以运行多个map任务。每个map任务是一个java进程,如果每个map从HDFS中读取相同的小表内容,就有些浪费了。
使用DistributedCache,小表内容可以加载在TaskTracker的linux磁盘上。每个map运行时只需要从linux磁盘加载数据就行了,不必每次从HDFS加载。

问:如何使用DistributedCache哪?
答:1.把文件上传到HDFS中
2.在job.waitForCompletion(...)代码之前写DistributedCache.addCacheFile(hdfs路径, conf);
3.在MyMapper类的setup(...)方法中使用DistributedCache.getLocalCacheFiles()获得文件的路径,读取文件内容

一:背景

MapReduce提供了表连接操作其中包括Map端joinReduce端join还有半连接,现在我们要讨论的是Map端join,Map端join是指数据到达map处理函数之前进行合并的,效率要远远高于Reduce端join,因为Reduce端join是把所有的数据都经过Shuffle,非常消耗资源。

二:技术实现

基本思路:

(1):需要join的两个文件,一个存储在HDFS中,一个使用DistributedCache.addCacheFile()将需要join的另外一个文件加入到所有Map缓存中。

(2):在Map函数里读取该文件,进行join

(3):将结果输出到reduce

(4):DistributedCache.addCacheFile()需要在作业提交前设置。

什么是DistributedCache?

DistributedCache是为了方便用户进行应用程序开发而设计的文件分发工具。它能够将只读的外部文件进行自动分发到各个节点上进行本地缓存,以便task运行时加载。

DistributedCache的使用步骤

(1):在HDFS中上传文件(文本文件、压缩文件、jar包等)

(2):调用相关API添加文件信息

(3):task运行前直接调用文件读写API获取文件。

常见API:

DistributedCache.addCacheFile();

DistributedCache.addCacheArchive();

下面我们通过一个示例来深入体会Map端join。

表一:tb_a数据如下

name	sex	age	depNo
zhang	male	20	1
li	female	25	2
wang	female	30	3
zhou	male	35	2

表二:tb_b数据如下

depNo	depName
1	sales
2	Dev
3	Mgt

#需求就是连接上面两张表

注意:在Map端join操作中,我们往往将较小的表添加到内存中,因为内存的资源是很宝贵的,这也说明了另外一个问题,那就是如果表的数据量都非常大则不适合使用Map端join。

代码如下:

public class MyMapJoin {
	// 定义输入路径
	private static String INPUT_PATH1 = "";
	//加载到内存的表的路径
	private static String INPUT_PATH2 = "";
	// 定义输出路径
	private static String OUT_PATH = "";

	public static void main(String[] args) {

		try {
			// 创建配置信息
			Configuration conf = new Configuration();
			// 获取命令行的参数
			String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
			// 当参数违法时,中断程序
			if (otherArgs.length != 3) {
				System.err.println("Usage:MyMapJoin<in1> <in2> <out>");
				System.exit(1);
			}

			// 给路径赋值
			INPUT_PATH1 = otherArgs[0];
			INPUT_PATH2 = otherArgs[1];
			OUT_PATH = otherArgs[2];
			// 创建文件系统
			FileSystem fileSystem = FileSystem.get(new URI(OUT_PATH), conf);
			// 如果输出目录存在,我们就删除
			if (fileSystem.exists(new Path(OUT_PATH))) {
				fileSystem.delete(new Path(OUT_PATH), true);
			}
			// 添加到内存中的文件(随便添加多少个文件)
			DistributedCache.addCacheFile(new Path(INPUT_PATH2).toUri(), conf);

			// 创建任务
			Job job = new Job(conf, MyMapJoin.class.getName());
			// 打成jar包运行,这句话是关键
			job.setJarByClass(MyMapJoin.class);
			//1.1 设置输入目录和设置输入数据格式化的类
			FileInputFormat.setInputPaths(job, INPUT_PATH1);
			job.setInputFormatClass(TextInputFormat.class);

			//1.2 设置自定义Mapper类和设置map函数输出数据的key和value的类型
			job.setMapperClass(MapJoinMapper.class);
			job.setMapOutputKeyClass(NullWritable.class);
			job.setMapOutputValueClass(Emp_Dep.class);

			//1.3 设置分区和reduce数量
			job.setPartitionerClass(HashPartitioner.class);
			job.setNumReduceTasks(0);

			FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));
			// 提交作业 退出
			System.exit(job.waitForCompletion(true) ? 0 : 1);

		} catch (Exception e) {
			e.printStackTrace();
		}
	}

	public static class MapJoinMapper extends Mapper<LongWritable, Text, NullWritable, Emp_Dep> {

		private Map<Integer, String> joinData = new HashMap<Integer, String>();

		@Override
		protected void setup(Mapper<LongWritable, Text, NullWritable, Emp_Dep>.Context context) throws IOException, InterruptedException {
			// 预处理把要关联的文件加载到缓存中
			Path[] paths = DistributedCache.getLocalCacheFiles(context.getConfiguration());
			// 我们这里只缓存了一个文件,所以取第一个即可,创建BufferReader去读取
			BufferedReader reader = new BufferedReader(new FileReader(paths[0].toString()));

			String str = null;
			try {
				// 一行一行读取
				while ((str = reader.readLine()) != null) {
					// 对缓存中的表进行分割
					String[] splits = str.split("\t");
					// 把字符数组中有用的数据存在一个Map中
					joinData.put(Integer.parseInt(splits[0]), splits[1]);
				}
			} catch (Exception e) {
				e.printStackTrace();
			} finally{
				reader.close();
			}

		}

		@Override
		protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, NullWritable, Emp_Dep>.Context context) throws IOException,
				InterruptedException {
			// 获取从HDFS中加载的表
			String[] values = value.toString().split("\t");
			// 创建Emp_Dep对象
			Emp_Dep emp_Dep = new Emp_Dep();
			// 设置属性
			emp_Dep.setName(values[0]);
			emp_Dep.setSex(values[1]);
			emp_Dep.setAge(Integer.parseInt(values[2]));
			// 获取关联字段depNo,这个字段是关键
			int depNo = Integer.parseInt(values[3]);
			// 根据depNo从内存中的关联表中获取要关联的属性depName
			String depName = joinData.get(depNo);
			// 设置depNo
			emp_Dep.setDepNo(depNo);
			// 设置depName
			emp_Dep.setDepName(depName);

			// 写出去
			context.write(NullWritable.get(), emp_Dep);
		}
	}
}

程序运行的结果:

时间: 2024-08-06 08:55:30

MapReduce中的Map join操作的相关文章

MapReduce中的Reduce join操作

-------file1[ID NAME]-------- 1 zhangsan2 lisi3 wangwu -------file2[ID VALUE]--------1 452 563 89 -------结果[NAME VALUE]------------zhagnsan 45lisi 56wangwu 89 一般数据库的join操作 a join b  on a.id = b.id 后面的条件在reduce中指的是相同的key,在sql中很容易区分出后面条件的字段到底来自那张表 而在Ma

MapReduce实现Reduce端Join操作实例

使用案例: 联接两张表 Table EMP:(新建文件EMP,第一行属性名不要) Name Sex Age DepNo zhang male 20 1 li female 25 2 wang female 30 3 zhou male 35 2 Table Dep:(新建文件DEP,第一行属性名不要) DepNo DepName 1 Sales 2 Dev 3 Mgt Inner join: select Name,Sex,Age,DepName from EMP inner join DEP

MapReduce中的map个数

在map阶段读取数据前,FileInputFormat会将输入文件分割成split.split的个数决定了map的个数.影响map个数(split个数)的主要因素有: 1) 文件的大小.当块(dfs.block.size)为128m时,如果输入文件为128m,会被划分为1个split:当块为256m,会被划分为2个split. 2) 文件的个数.FileInputFormat按照文件分割split,并且只会分割大文件,即那些大小超过HDFS块的大小的文件.如果HDFS中dfs.block.siz

Hadoop框架下MapReduce中的map个数如何控制

控制map个数的核心源码 1 long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job)); 2 3 //getFormatMinSplitSize 默认返回1,getMinSplitSize 为用户设置的最小分片数, 如果用户设置的大于1,则为用户设置的最小分片数 4 long maxSize = getMaxSplitSize(job); 5 6 //getMaxSplitSize为用户设置的最大分片数,默认最大

MapReduce实现两表join

一.方法介绍 假设要进行join的数据分别来自File1和File2. 参考:https://blog.csdn.net/yimingsilence/article/details/70242604 1.1 reduce side join reduce side join是一种最简单的join方式,其主要思想如下:在map阶段,map函数同时读取两个文件File1和File2,为了区分两种来源的key/value数据对,对每条数据打一个标签(tag),比如:tag=0表示来自文件File1,t

Hive Map Join

Hive中的Map Join即map side join工作原理是在Map端把小表加载到内存中,然后读取大表,和内存中的小表完成连接操作.MapJoin使用了分布式缓存技术. Map Join的优点: 1.不消耗集群的reduce资源. 2.减少了reduce操作,加快了程序执行. 3.降低网络负载. Map Join的缺点: 1.占用内存(所以加载到内存中的表不能过大,因为每个计算节点都会加载一次). 2.生成较多的小文件. 我们有如下两种方式来执行Map Join: 1.配置一下参数,Hiv

MapReduce中combine、partition、shuffle的作用是什么

http://www.aboutyun.com/thread-8927-1-1.html Mapreduce在hadoop中是一个比較难以的概念.以下须要用心看,然后自己就能总结出来了. 概括: combine和partition都是函数.中间的步骤应该仅仅有shuffle! 1.combine combine分为map端和reduce端,作用是把同一个key的键值对合并在一起,能够自己定义的. combine函数把一个map函数产生的<key,value>对(多个key,value)合并成一

mapreduce join操作

上次和朋友讨论到mapreduce,join应该发生在map端,理由太想当然到sql里面的执行过程了 wheremap端 join在map之前(笛卡尔积),但实际上网上看了,mapreduce的笛卡尔积发生在reduce端,下面哥们有个实现过程可以参考(http://blog.csdn.net/xyilu/article/details/8996204).有空再看看 实际上实现过程是不是和他写的代码一样. 前阵子把MapReduce实现join操作的算法设想清楚了,但一直没有在代码层面落地.今天

MapReduce中的Join算法

在关系型数据库中Join是非常常见的操作,各种优化手段已经到了极致.在海量数据的环境下,不可避免的也会碰到这种类型的需求,例如在数据分析时需要从不同的数据源中获取数据.不同于传统的单机模式,在分布式存储下采用MapReduce编程模型,也有相应的处理措施和优化方法. 我们先简要地描述待解决的问题.假设有两个数据集:气象站数据库和天气记录数据库 气象站的示例数据,如下 Station ID Station Name 011990-99999 SIHCCAJAVRI 012650-99999 TRN