mapreduce join操作

上次和朋友讨论到mapreduce,join应该发生在map端,理由太想当然到sql里面的执行过程了 wheremap端 join在map之前(笛卡尔积),但实际上网上看了,mapreduce的笛卡尔积发生在reduce端,下面哥们有个实现过程可以参考(http://blog.csdn.net/xyilu/article/details/8996204)。有空再看看 实际上实现过程是不是和他写的代码一样。

前阵子把MapReduce实现join操作的算法设想清楚了,但一直没有在代码层面落地。今天终于费了些功夫把整个流程走了一遭,期间经历了诸多麻烦并最终得以将其一一搞定,再次深切体会到,什么叫从计算模型到算法实现还有很多路要走。

数据准备

首先是准备好数据。这个倒已经是一个熟练的过程,所要做的是把示例数据准备好,记住路径和字段分隔符。

准备好下面两张表:

(1)m_ys_lab_jointest_a(以下简称表A)

建表语句为:

[sql] view plain copy

print?

  1. create table if not exists m_ys_lab_jointest_a (
  2. id bigint,
  3. name string
  4. )
  5. row format delimited
  6. fields terminated by ‘9‘
  7. lines terminated by ‘10‘
  8. stored as textfile;

数据:

id     name
1     北京
2     天津
3     河北
4     山西
5     内蒙古
6     辽宁
7     吉林
8     黑龙江

(2)m_ys_lab_jointest_b(以下简称表B)

建表语句为:

[sql] view plain copy

print?

  1. create table if not exists m_ys_lab_jointest_b (
  2. id bigint,
  3. statyear bigint,
  4. num bigint
  5. )
  6. row format delimited
  7. fields terminated by ‘9‘
  8. lines terminated by ‘10‘
  9. stored as textfile;

数据:

id     statyear     num
1     2010     1962
1     2011     2019
2     2010     1299
2     2011     1355
4     2010     3574
4     2011     3593
9     2010     2303
9     2011     2347

我们的目的是,以id为key做join操作,得到以下表:

m_ys_lab_jointest_ab

id     name    statyear     num
1       北京    2011    2019
1       北京    2010    1962
2       天津    2011    1355
2       天津    2010    1299
4       山西    2011    3593
4       山西    2010    3574

计算模型

整个计算过程是:

(1)在map阶段,把所有记录标记成<key, value>的形式,其中key是id,value则根据来源不同取不同的形式:来源于表A的记录,value的值为"a#"+name;来源于表B的记录,value的值为"b#"+score。

(2)在reduce阶段,先把每个key下的value列表拆分为分别来自表A和表B的两部分,分别放入两个向量中。然后遍历两个向量做笛卡尔积,形成一条条最终结果。

如下图所示:

代码

代码如下:

[java] view plain copy

print?

  1. import java.io.IOException;
  2. import java.util.HashMap;
  3. import java.util.Iterator;
  4. import java.util.Vector;
  5. import org.apache.hadoop.io.LongWritable;
  6. import org.apache.hadoop.io.Text;
  7. import org.apache.hadoop.io.Writable;
  8. import org.apache.hadoop.mapred.FileSplit;
  9. import org.apache.hadoop.mapred.JobConf;
  10. import org.apache.hadoop.mapred.MapReduceBase;
  11. import org.apache.hadoop.mapred.Mapper;
  12. import org.apache.hadoop.mapred.OutputCollector;
  13. import org.apache.hadoop.mapred.RecordWriter;
  14. import org.apache.hadoop.mapred.Reducer;
  15. import org.apache.hadoop.mapred.Reporter;
  16. /**
  17. * MapReduce实现Join操作
  18. */
  19. public class MapRedJoin {
  20. public static final String DELIMITER = "\u0009"; // 字段分隔符
  21. // map过程
  22. public static class MapClass extends MapReduceBase implements
  23. Mapper<LongWritable, Text, Text, Text> {
  24. public void configure(JobConf job) {
  25. super.configure(job);
  26. }
  27. public void map(LongWritable key, Text value, OutputCollector<Text, Text> output,
  28. Reporter reporter) throws IOException, ClassCastException {
  29. // 获取输入文件的全路径和名称
  30. String filePath = ((FileSplit)reporter.getInputSplit()).getPath().toString();
  31. // 获取记录字符串
  32. String line = value.toString();
  33. // 抛弃空记录
  34. if (line == null || line.equals("")) return;
  35. // 处理来自表A的记录
  36. if (filePath.contains("m_ys_lab_jointest_a")) {
  37. String[] values = line.split(DELIMITER); // 按分隔符分割出字段
  38. if (values.length < 2) return;
  39. String id = values[0]; // id
  40. String name = values[1]; // name
  41. output.collect(new Text(id), new Text("a#"+name));
  42. }
  43. // 处理来自表B的记录
  44. else if (filePath.contains("m_ys_lab_jointest_b")) {
  45. String[] values = line.split(DELIMITER); // 按分隔符分割出字段
  46. if (values.length < 3) return;
  47. String id = values[0]; // id
  48. String statyear = values[1]; // statyear
  49. String num = values[2]; //num
  50. output.collect(new Text(id), new Text("b#"+statyear+DELIMITER+num));
  51. }
  52. }
  53. }
  54. // reduce过程
  55. public static class Reduce extends MapReduceBase
  56. implements Reducer<Text, Text, Text, Text> {
  57. public void reduce(Text key, Iterator<Text> values,
  58. OutputCollector<Text, Text> output, Reporter reporter)
  59. throws IOException {
  60. Vector<String> vecA = new Vector<String>(); // 存放来自表A的值
  61. Vector<String> vecB = new Vector<String>(); // 存放来自表B的值
  62. while (values.hasNext()) {
  63. String value = values.next().toString();
  64. if (value.startsWith("a#")) {
  65. vecA.add(value.substring(2));
  66. } else if (value.startsWith("b#")) {
  67. vecB.add(value.substring(2));
  68. }
  69. }
  70. int sizeA = vecA.size();
  71. int sizeB = vecB.size();
  72. // 遍历两个向量
  73. int i, j;
  74. for (i = 0; i < sizeA; i ++) {
  75. for (j = 0; j < sizeB; j ++) {
  76. output.collect(key, new Text(vecA.get(i) + DELIMITER +vecB.get(j)));
  77. }
  78. }
  79. }
  80. }
  81. protected void configJob(JobConf conf) {
  82. conf.setMapOutputKeyClass(Text.class);
  83. conf.setMapOutputValueClass(Text.class);
  84. conf.setOutputKeyClass(Text.class);
  85. conf.setOutputValueClass(Text.class);
  86. conf.setOutputFormat(ReportOutFormat.class);
  87. }
  88. }

技术细节

下面说一下其中的若干技术细节:

(1)由于输入数据涉及两张表,我们需要判断当前处理的记录是来自表A还是来自表B。Reporter类getInputSplit()方法可以获取输入数据的路径,具体代码如下:

String filePath = ((FileSplit)reporter.getInputSplit()).getPath().toString();

(2)map的输出的结果,同id的所有记录(不管来自表A还是表B)都在同一个key下保存在同一个列表中,在reduce阶段需要将其拆开,保存为相当于笛卡尔积的m x n条记录。由于事先不知道m、n是多少,这里使用了两个向量(可增长数组)来分别保存来自表A和表B的记录,再用一个两层嵌套循环组织出我们需要的最终结果。

(3)在MapReduce中可以使用System.out.println()方法输出,以方便调试。不过System.out.println()的内容不会在终端显示,而是输出到了stdout和stderr这两个文件中,这两个文件位于logs/userlogs/attempt_xxx目录下。可以通过web端的历史job查看中的“Analyse This Job”来查看stdout和stderr的内容。

时间: 2024-12-05 02:57:29

mapreduce join操作的相关文章

MapReduce实现Reduce端Join操作实例

使用案例: 联接两张表 Table EMP:(新建文件EMP,第一行属性名不要) Name Sex Age DepNo zhang male 20 1 li female 25 2 wang female 30 3 zhou male 35 2 Table Dep:(新建文件DEP,第一行属性名不要) DepNo DepName 1 Sales 2 Dev 3 Mgt Inner join: select Name,Sex,Age,DepName from EMP inner join DEP

MapReduce中的Reduce join操作

-------file1[ID NAME]-------- 1 zhangsan2 lisi3 wangwu -------file2[ID VALUE]--------1 452 563 89 -------结果[NAME VALUE]------------zhagnsan 45lisi 56wangwu 89 一般数据库的join操作 a join b  on a.id = b.id 后面的条件在reduce中指的是相同的key,在sql中很容易区分出后面条件的字段到底来自那张表 而在Ma

MapReduce中的Map join操作

可以使用setup进行去读,吧数据读取放到一个容器中,在map段去读的时候,可以根据ID就找出数据,然后再转化回来 map端的join 适用场景,小表可以全部读取放到内存中,两个在内存中装不下的大表,不适合Map端的join操作 在一个TaskTracker中可以运行多个map任务.每个map任务是一个java进程,如果每个map从HDFS中读取相同的小表内容,就有些浪费了.使用DistributedCache,小表内容可以加载在TaskTracker的linux磁盘上.每个map运行时只需要从

MapReduce 实现数据join操作

前段时间有一个业务需求,要在外网商品(TOPB2C)信息中加入 联营自营 识别的字段.但存在的一个问题是,商品信息 和 自营联营标示数据是 两份数据:商品信息较大,是存放在hbase中.他们之前唯一的关联是url.所以考虑用url做key将两者做join,将 联营自营标识 信息加入的商品信息中,最终生成我需要的数据: 一,首先展示一下两份数据的demo example 1. 自营联营标识数据(下面开始就叫做unionseller.txt) http://cn.abc.www/product436

SQL join中级篇--hive中 mapreduce join方法分析

1. 概述. 本文主要介绍了mapreduce框架上如何实现两表JOIN. 2. 常见的join方法介绍 假设要进行join的数据分别来自File1和File2. 2.1 reduce side join reduce side join是一种最简单的join方式,其主要思想如下: 在map阶段,map函数同时读取两个文件File1和File2,为了区分两种来源的key/value数据对,对每条数据打一个标签 (tag),比如:tag=0表示来自文件File1,tag=2表示来自文件File2.

Linq之Join操作

1 摘要 文章通过一个简单的实例对Linq中的Join操作进行演示,并在文章的最后对Join操作相关知识点进行简单的总结. 2 实例演示 1) 新建数据库MyTestDB,在数据库中新建数据表tb_Class和tb_Student,两表的定义如下图所示.                                        图1  tb_Class的定义                                                                    

使用 Linq 对多个对象进行join操作 C#

class A { public int id { get; set; } public string name { get; set; } } class B { public int id { get; set; } public int age { get; set; } } class C { public int id { get; set; } public string address { get; set; } } private void button8_Click(objec

MySql 中Join操作的用法

SQL标准中的Join的类型: 首先,设置表employees和department的数据为: 1.inner join - on操作类型 内连接inner join是基于连接谓词将两张表(如A和B)的列组合在一起的,产生新的结果表. 例子: SELECT * FROM employees a inner join department b ON a.department_id = b.department_id 查询结果为: 注意:inner join 可以简写为join,该查询得出的结果为两

那位帮忙提供一个java mongodb多个collection进行mapreduce的操作。

原文:那位帮忙提供一个java mongodb多个collection进行mapreduce的操作. 代码下载地址:http://www.zuidaima.com/share/1550463227890688.htm 我想统计下每个月某个视频的播放量,需要跨日表去mapreduce. 那位帮忙提供一个java mongodb多个collection进行mapreduce的操作.,布布扣,bubuko.com