MapReduce数据连接

对于不同文件里的数据,有时候有相应关系,须要进行连接(join),获得一个新的文件以便进行分析。比方有两个输入文件a.txt,b.txt,当中的数据格式分别例如以下

1 a
2 b
3 c
4 d
1 good
2 bad
3 ok
4 hello

须要将其连接成一个新的例如以下的文件:

a good
b bad
c ok
d hello

处理步骤能够分成两步:

1.map阶段,将两个输入文件里的数据进行打散,例如以下:

1 a
1 good
2 b
2 bad
3 c
3 ok
4 d
4 hello

2.reduce阶段,进行数据的连接操作,此处数据较简单,仅仅要推断map结果的value的长度是不是1就决定是新的键还是值。

package cn.zhf.hadoop;

import java.io.IOException;
import java.util.Iterator;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class SingleJoin extends Configured implements Tool{
	public static void main(String[] args) throws Exception {
		Tool tool = new SingleJoin();
		ToolRunner.run(tool, args);
		print(tool);
	}

	@Override
	public int run(String[] arg0) throws Exception {
		Configuration conf = getConf();
		Job job = new Job();
		job.setJarByClass(getClass());
		FileSystem fs = FileSystem.get(conf);
		fs.delete(new Path("out"),true);
		FileInputFormat.addInputPath(job, new Path("a.txt"));
		FileInputFormat.addInputPath(job, new Path("b.txt"));
		FileOutputFormat.setOutputPath(job,new Path("out"));

		job.setMapperClass(JoinMapper.class);
		job.setReducerClass(JoinReducer.class);

		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(Text.class);
		job.waitForCompletion(true);
		return 0;
	}
	public static class JoinMapper extends Mapper<LongWritable,Text,Text,Text>{
		public void map(LongWritable key,Text value,Context context) throws IOException, InterruptedException{
			String[] str = value.toString().split(" ");
			context.write(new Text(str[0]), new Text(str[1]));
		}
	}

	public static class JoinReducer extends Reducer<Text,Text,Text,Text>{
		public void reduce(Text key,Iterable<Text> values,Context context) throws IOException, InterruptedException{
			Iterator<Text> iterator = values.iterator();
			Text keyy = new Text();
			Text valuee = new Text();
			while(iterator.hasNext()){
				Text temp = iterator.next();
				if(temp.toString().length() == 1){
					keyy.set(temp);
					valuee.set(iterator.next());
				}else{
					valuee.set(temp);
					keyy.set(iterator.next());
				}
			}
			context.write(keyy, valuee);
		}
	}
	public static void print(Tool tool) throws IOException{
		FileSystem fs = FileSystem.get(tool.getConf());
		Path path = new Path("out/part-r-00000");
		FSDataInputStream fsin = fs.open(path);
		int length = 0;
		byte[] buff = new byte[128];
		while((length = fsin.read(buff,0,128)) != -1)
			System.out.println(new String(buff,0,length));
	}
}

reference:《MapReduce2.0源代码分析及编程实践》

时间: 2024-10-14 12:38:02

MapReduce数据连接的相关文章

FineBI学习系列之FineBI与Spark数据连接(图文详解)

不多说,直接上干货! 这是来自FineBI官网提供的帮助文档 http://help.finebi.com/http://help.finebi.com/doc-view-581.html 目录: 1.描述 2.操作 3.注意事项 1.描述 Spark是一种通用的大数据快速处理引擎.Spark使用Spark RDD. Spark SQL. Spark Streaming. MLlib. GraphX成功解决了大数据领域中离线批处理.交互式查询.实时流计算.机器学习与图计算等最重要的任务和问题.S

Netbeans 中创建数据连接池和数据源步骤

1.启动glassfish服务器, 在浏览器的地址栏中输入 http://localhost:4848 2.首先建立JDBC Connection Pools: 3.new 一个Connectio Pools 4.对General Settings属性填写: 5.填写Datasource Classname:com.mysql.jdbc.jdbc2.optional.MysqlConnectionPoolDataSource Ping属性选中,可以用来检验数据连接池是否创建成功! . 6.对Ad

D3数据连接:进入

引言:数据连接是D3中的面包和黄油.D3不提供制图的基础函数,相反,它靠的是数据连接.数据连接可以让页面元素进入网页,一旦进入,可以修改.更新及退出.本文将主要介绍"进入"部分.本文选自<图说D3:数据可视化利器从入门到进阶>. 什么是数据连接 顾名思义,数据连接肯定是将数据和某些东西连接起来.这些东西是网页上的一个或一组--< rect>.< circle>.< div>等所有值得怀疑的常见元素.具体一点,就是这些常见元素的一个D3选择

spring下,druid,c3p0,proxool,dbcp四个数据连接池的使用和配置

由于那天Oracle的数据连接是只能使用dbcp的数据库连接池才连接上了,所以决定试一下当下所有得数据库连接池连接orcale和mysql,先上代码 配置文件的代码 1 #=================dbcp连接池======================# 2 #Oracle数据库连接 3 #jdbc_driverClassName=oracle.jdbc.driver.OracleDriver 4 #jdbc_url=jdbc:oracle:thin:@localhost:1521:

记录一个简单的dbcp数据连接池

这个示例用到了ThreadLocal与dbcp,我觉得有点意思,就整理了下.使用dbcp,肯定要导入commons-dbcp.jar包.下面直接贴DBUtil代码: public class DBUtil { private static DataSource ds; //定义一个数据连接池 //threadLocal是线程的局部变量,它的实例通常是类中的 private static 字段 private static ThreadLocal<Connection> connLocal=ne

c# 高效利用数据连接

有两种方式可以确保数据连接在用完后立即释放 1.第一种方式--利用try..catch...finally语句块 在finally中关闭任何已经打开的连接 try { conn.open(); } catch(SqlException ex) { // } finally { conn.Close(); } 在finally块中,可以释放已经使用的任何资源, 第二种方式使用Using 语句 string source =''......." using (SqlConnection conn=n

htc M8 无法自动恢复数据连接(4g)的问题解决

情况如下:htc m8 tdd-lte的双待手机,4g.2g同时在线. 本月出现,在短时间没有信号的情况后,无法恢复数据连接,哪怕是edge,更不论4g了. 尝试各种方法无解.最后咨询10086解决此问题. 这个应该不是手机的问题,貌似是移动端的2g edge数据流量被关闭/错误设置导致的.   我感觉像是2g-3g-4g之间切换需要依次回落或者升级.因为双待,4g信号开机是有的,可以上网,但是电梯等4g消失的情况发生后,需要借助于2g/3g的数据连接再恢复到4g,没有2g/3g的数据连接就恢复

数据连接池JNDI

数据库连接有很多中方式,JDBC数据库的连接方式,前边我们已经介绍过了,而开发中我们经常使用的是DataBaseConnectionPool(数据库连接池,DBCP).数据库连接池到底是什么?它比jdbc数据库连接有什么优势呢?它又怎么使用呢? 一,先看一下JDBC连接,每次用户访问数据库时,需要JDBC为我们创建数据库连接,我们使用,然后关闭.而当我们加断点测试,可以发现这个过程中创建数据库连接这个过程是比较费时间的,而不是我们在数据库中操作数据费时间.所以JDBC这种方式的效率大大折扣.而且

帆软报表学习之数据连接

帆软报表FineReport中数据连接的JDBC连接池属性问题 连接池原理 在帆软报表FineReport中,连接池主要由三部分组成:连接池的建立.连接池中连接使用的治理.连接池的关闭.下面就着重讨论这三部分及连接池的配置问题. 1. 连接池原理 连接池技术的核心思想,是连接复用,通过建立一个数据库连接池以及一套连接使用.分配.治理策略,使得该连接池中的连接可以得到高效.安全的复用,避免了数据库连接频繁建立.关闭的开销. 另外,由于对JDBC中的原始连接进行了封装,从而方便了数据库应用对于连接的