使用MapReduce实现两个文件的Join操作

数据结构

customer表

       
1
hanmeimei  
ShangHai   
110
2
leilei 
BeiJing    
112
3
lucy   
GuangZhou  
119

oder表

     
1 1 50
2 1 200
3 3 15
4 3 350
5 3 58
6 1 42
7 1 352
8 2 1135
9 2 400
10 2 2000
11 2 300

MAPJOIN

场景:我们模拟一个有一份小表一个大表的场景,customer是那份小表,order是那份大表
做法:直接将较小的数据加载到内存中,按照连接的关键字建立索引,

大份数据作为MapTask的输入键值对 map()方法的每次输入都去内存当中直接去匹配连接。

然后把连接结果按 key 输出,这种方法要使用 hadoop中的 DistributedCache 把小份数据分布到各个计算节点,

每个 maptask 执行任务的节点都需要加载该数据到内存,并且按连接关键字建立索引。

环境配置:因为我们是在本地操作的,所以需要配置本地的hadoop

1。下载hadoop

2.解压到一个目录,记住,一会要用

配置电脑环境变量

如果你是一个初学者那么你就创建一个Java工程,步骤自己搜吧,网上一大堆然后创建一个mapjoin的包,在包里创建一个

JoinDemo的类然后如下第24行代码,在JoinDemo后加extened往后就根据我的代码敲就好了。

 
package mapjoin;##以上内容标志了段代码是在mapjoin这个包里

import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import java.io.*;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;

##以上内容是需要导入的jar包,否则一些代码会报红。
  1 public class JoinDemo extends Configured implements Tool {
  2     // customer文件在本地上的位置。
  3     // TODO: 改用参数传入
  4     private static final String CUSTOMER_CACHE_URL = "input/customer.txt"; --本地输入文件路径
  5 //            "hdfs://hadoop1:9000/user/hadoop/mapreduce/cache/customer.txt";
  6     private static class CustomerBean {                 --
  7         private int custId;
  8         private String name;
  9         private String address;
 10         private String phone;
 11
 12         public CustomerBean() {}
 13
 14         public CustomerBean(int custId, String name, String address,String phone)             {
 16             super();
 17             this.custId = custId;
 18             this.name = name;
 19             this.address = address;
 20             this.phone = phone;
 21         }
 22
 23
 24
 25         public int getCustId() {
 26             return custId;
 27         }
 28
 29         public String getName() {
 30             return name;
 31         }
 32
 33         public String getAddress() {
 34             return address;
 35         }
 36
 37         public String getPhone() {
 38             return phone;
 39         }
 40     }
 41
 42     private static class CustOrderMapOutKey implements
 43             WritableComparable<CustOrderMapOutKey> {
 44         private int custId;
 45         private int orderId;
 46
 47         public void set(int custId, int orderId) {
 48             this.custId = custId;
 49             this.orderId = orderId;
 50         }
 51
 52         public int getCustId() {
 53             return custId;
 54         }
 55
 56         public int getOrderId() {
 57             return orderId;
 58         }
 59
 60
 61         public void write( DataOutput out) throws IOException {
 62             out.writeInt(custId);
 63             out.writeInt(orderId);
 64         }
 65
 66
 67         public void readFields( DataInput in) throws IOException {
 68             custId = in.readInt();
 69             orderId = in.readInt();
 70         }
 71
 72         public int compareTo(CustOrderMapOutKey o) {
 73             int res = custId -o.custId;
 74             return res == 0 ? orderId-o.orderId : res;
 75         }
 76
 77         public boolean equals(Object obj) {
 78             if (obj instanceof CustOrderMapOutKey) {
 79                 CustOrderMapOutKey o = (CustOrderMapOutKey)obj;
 80                 return custId == o.custId && orderId == o.orderId;
 81             } else {
 82                 return false;
 83             }
 84         }
 85
 86         @Override
 87         public String toString() {
 88             return custId + "\t" + orderId;
 89         }
 90     }
 91
 92     private static class JoinMapper extends
 93             Mapper<LongWritable, Text, CustOrderMapOutKey, Text> {
 94         private final CustOrderMapOutKey outputKey = new CustOrderMapOutKey();
 95         private final Text outputValue = new Text();
 96
 97
 98         /**
 99          * 在内存中customer数据
100          */
101         private static final Map<Integer, CustomerBean> CUSTOMER_MAP = new HashMap<Integer, CustomerBean>();
102
103         @Override
104         protected void setup(Context context)
105                 throws IOException, InterruptedException {
106             //读取缓存的文件
107             FileSystem fs = FileSystem
108                     .get( URI.create(CUSTOMER_CACHE_URL), context.getConfiguration());
109             FSDataInputStream fdis = fs.open(new Path(CUSTOMER_CACHE_URL));
110
111             BufferedReader reader = new BufferedReader(new InputStreamReader(fdis));
112             String line = null;
113             String[] cols = null;
114
115             // 格式:客户编号  姓名  地址  电话
116             while ((line = reader.readLine()) != null) {
117                 cols = line.split("\t");
118                 if (cols.length < 4) {              // 数据格式不匹配,忽略
119                     continue;
120                 }
121
122                 CustomerBean bean = new CustomerBean(Integer.parseInt(cols[0]), cols[1], cols[2], cols[3]);
123                 //缓存客户信息
124                 CUSTOMER_MAP.put(bean.getCustId(), bean);
125             }
126         }
127
128         @Override
129         protected void map(LongWritable key, Text value, Mapper.Context context)
130                 throws IOException, InterruptedException {
131
132             // 格式: 订单编号 客户编号    订单金额
133             String[] cols = value.toString().split("\t");
134             if (cols.length < 3) {
135                 return;
136             }
137
138             int custId = Integer.parseInt(cols[1]);     // 取出客户编号
139             CustomerBean customerBean = CUSTOMER_MAP.get(custId);
140
141             if (customerBean == null) {         // 没有对应的customer信息可以连接
142                 return;
143             }
144
145             StringBuffer sb = new StringBuffer();
146             sb.append(cols[2])
147                     .append("\t")
148                     .append(customerBean.getName())
149                     .append("\t")
150                     .append(customerBean.getAddress())
151                     .append("\t")
152                     .append(customerBean.getPhone());
153
154             outputValue.set(sb.toString());
155             outputKey.set(custId, Integer.parseInt(cols[0]));
156
157             //WC 操作
158 //            context.write(value, 1);
159
160             context.write(outputKey, outputValue);
161         }
162
163     }
164
165     /**
166      * reduce
167      * @author Ivan
168      *
169      */
170     private static class JoinReducer extends
171             Reducer<CustOrderMapOutKey, Text, CustOrderMapOutKey, Text> {
172         @Override
173         protected void reduce(CustOrderMapOutKey key, Iterable<Text> values, Context context)
174                 throws IOException, InterruptedException {
175             // 什么事都不用做,直接输出
176             for (Text value : values) {
177                 context.write(key, value);
178             }
179         }
180     }
181     /**
182      * @param args
183      * @throws Exception
184      */
185     public static void main(String[] args) throws Exception {
186 //        if (args.length < 2) {
187 //            new IllegalArgumentException("Usage: <inpath> <outpath>");
188 //            return;
189 //        }
190         //调用 JoinDemo 对象的run方法
191         ToolRunner.run(new Configuration(), new JoinDemo(), args);
192     }
193
194
195     public int run(String[] args) throws Exception {
196         System.setProperty("hadoop.home.dir", "D:\\software\\hadoop-2.6.0\\hadoop-2.6.0");
197         //默认配置
198         Configuration conf = getConf();
199         FileSystem fileSystem=FileSystem.get(conf);
200         Path outputPath=new Path("output/");
201         if(fileSystem.exists(outputPath)){
202             fileSystem.delete(outputPath,true);
203         }
204
205         Job job = Job.getInstance(conf);
206         job.setJarByClass(JoinDemo.class);
207
208         // 添加customer cache文件
209         job.addCacheFile(URI.create(CUSTOMER_CACHE_URL));
210         Path inputPath=new Path("input/order.txt");
211 //        Path inputPath= new Path(args[0])
212         FileInputFormat.addInputPath(job, inputPath);
213
214 //        Path outputPath=new Path(args[1]);
215         FileOutputFormat.setOutputPath(job, outputPath);
216
217         // map settings
218         job.setMapperClass(JoinMapper.class);
219         job.setMapOutputKeyClass(CustOrderMapOutKey.class);
220         job.setMapOutputValueClass(Text.class);
221
222         // reduce settings
223 //        job.setReducerClass(JoinReducer.class);
224 //        job.setOutputKeyClass(CustOrderMapOutKey.class);
225 //        job.setOutputKeyClass(Text.class);
226         boolean res = job.waitForCompletion(true);
227         return res ? 0 : 1;
228     }
229 }

本地的文件位置

最后执行结果:

 

原文地址:https://www.cnblogs.com/xuziyu/p/10803264.html

时间: 2024-11-09 00:38:23

使用MapReduce实现两个文件的Join操作的相关文章

awk 对两个文件进行合并操作

1.awk命令概念 $0 表示一个文本中的一行记录 $1...N 表示一行中的第 1...N 字段 FNR     The input record number in the current input file.  #已读入当前文件的记录数 NR      The total number of input records seen so far.      #已读入的总记录数 next    Stop processing the current input record. The nex

基于python的selenium两种文件上传操作

方法一.input标签上传     如果是input标签,可以直接输入路径,那么可以直接调用send_keys输入路径,这里不做过多赘述,前文有相关操作方法. 方法二.非input标签上传 这种上传方式需要借助第三方工具,主要有以下三种情况: 1.AutoIt  去调用它生成的au3或者exe格式的文件 2.SendKeys第三方库(目前只支持到2.7版本) 网址:https://pypi.python.org/pypi/SendKeys 3.Python的pywin32库,通过识别对话框句柄来

MapReduce实现两表join

一.方法介绍 假设要进行join的数据分别来自File1和File2. 参考:https://blog.csdn.net/yimingsilence/article/details/70242604 1.1 reduce side join reduce side join是一种最简单的join方式,其主要思想如下:在map阶段,map函数同时读取两个文件File1和File2,为了区分两种来源的key/value数据对,对每条数据打一个标签(tag),比如:tag=0表示来自文件File1,t

MapReduce实现Reduce端Join操作实例

使用案例: 联接两张表 Table EMP:(新建文件EMP,第一行属性名不要) Name Sex Age DepNo zhang male 20 1 li female 25 2 wang female 30 3 zhou male 35 2 Table Dep:(新建文件DEP,第一行属性名不要) DepNo DepName 1 Sales 2 Dev 3 Mgt Inner join: select Name,Sex,Age,DepName from EMP inner join DEP

MapReduce实现两表的Join--原理及python和java代码实现

用Hive一句话搞定的,但是有时必须要用mapreduce 方法介绍 1. 概述 在传统数据库(如:MYSQL)中,JOIN操作是非常常见且非常耗时的.而在HADOOP中进行JOIN操作,同样常见且耗时,由于Hadoop的独特设计思想,当进行JOIN操作时,有一些特殊的技巧. 本文首先介绍了Hadoop上通常的JOIN实现方法,然后给出了几种针对不同输入数据集的优化方法. 2. 常见的join方法介绍 假设要进行join的数据分别来自File1和File2. 2.1 reduce side jo

mapreduce join操作

上次和朋友讨论到mapreduce,join应该发生在map端,理由太想当然到sql里面的执行过程了 wheremap端 join在map之前(笛卡尔积),但实际上网上看了,mapreduce的笛卡尔积发生在reduce端,下面哥们有个实现过程可以参考(http://blog.csdn.net/xyilu/article/details/8996204).有空再看看 实际上实现过程是不是和他写的代码一样. 前阵子把MapReduce实现join操作的算法设想清楚了,但一直没有在代码层面落地.今天

Hadoop MapReduce编程 API入门系列之join(二十五)(未完)

不多说,直接上代码.  代码版本1 package zhouls.bigdata.myMapReduce.Join; import java.util.Set; import java.io.*;import org.apache.hadoop.io.Text;import org.apache.hadoop.io.WritableComparable; public class TextPair implements WritableComparable<TextPair>{ private

MapReduce中的Map join操作

可以使用setup进行去读,吧数据读取放到一个容器中,在map段去读的时候,可以根据ID就找出数据,然后再转化回来 map端的join 适用场景,小表可以全部读取放到内存中,两个在内存中装不下的大表,不适合Map端的join操作 在一个TaskTracker中可以运行多个map任务.每个map任务是一个java进程,如果每个map从HDFS中读取相同的小表内容,就有些浪费了.使用DistributedCache,小表内容可以加载在TaskTracker的linux磁盘上.每个map运行时只需要从

MapReduce对输入多文件的处理

MultipleInputs类指定不同的输入文件路径以及输入文化格式 现有两份数据 phone 123,good number 124,common number 125,bad number user zhangsan,123 lisi,124 wangwu,125 现在需要把user和phone按照phone number连接起来.得到下面的结果 zhangsan,123,good number lisi,123,common number wangwu,125,bad number 分析思