第2节 mapreduce深入学习:15、reduce端的join算法的实现

reduce端的join算法:

例子:

商品表数据 product: 
pid
p0001,小米5,1000,2000
p0002,锤子T1,1000,3000

订单表数据 order: 
           pid
1001,20150710,p0001,2
1002,20150710,p0002,3
1002,20150710,p0003,3

mapReduce可以实现sql语句的功能:select 。。。。。。from product p left join order o on p.pid = o.pid

思路:将关联的条件作为map输出的key。

缺点:这种方式中,join的操作是在reduce阶段完成,reduce端的处理压力太大,map节点的运算负载则很低,资源利用率不高,且在reduce阶段极易产生数据倾斜。

替代解决方案: map端join实现方式。

代码:

ReduceJoinMain:
package cn.itcast.demo4.reduceJoin;

import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner;

public class ReduceJoinMain extends Configured implements Tool{    @Override    public int run(String[] args) throws Exception {

        Job job = Job.getInstance(this.getConf(), ReduceJoinMain.class.getSimpleName());//        job.setJarByClass(ReduceJoinMain.class);

        job.setInputFormatClass(TextInputFormat.class);        TextInputFormat.addInputPath(job,new Path("file:///D:\\Study\\BigData\\heima\\stage2\\4、大数据离线第四天\\map端join\\input"));

        job.setMapperClass(ReduceJoinMapper.class);        job.setMapOutputKeyClass(Text.class);        job.setMapOutputValueClass(Text.class);

        job.setReducerClass(ReduceJoinReducer.class);        job.setOutputKeyClass(Text.class);        job.setOutputValueClass(Text.class);

        job.setOutputFormatClass(TextOutputFormat.class);        TextOutputFormat.setOutputPath(job,new Path("file:///D:\\Study\\BigData\\heima\\stage2\\4、大数据离线第四天\\map端join\\reduce_join_output"));

        boolean b = job.waitForCompletion(true);        return b?0:1;    }

    public static void main(String[] args) throws Exception {        int run = ToolRunner.run(new Configuration(), new ReduceJoinMain(), args);        System.exit(run);    }

}
ReduceJoinMapper:
package cn.itcast.demo4.reduceJoin;

import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class ReduceJoinMapper extends Mapper<LongWritable,Text,Text,Text> {    @Override    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {        String line = value.toString();        String[] split = line.split(",");        if(line.startsWith("p")){            context.write(new Text(split[0]),value);        }else{            context.write(new Text(split[2]),value);        }    }}
ReduceJoinReducer:
package cn.itcast.demo4.reduceJoin;

import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class ReduceJoinReducer extends Reducer<Text,Text,Text,Text> {    @Override    protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {        String firP = "";        String secP = "";

        for(Text text:values){            String value = text.toString();            if(value!=null && !"".equals(value)) {                if(value.startsWith("p"))                    secP += value;                else                    firP += value+"\t";            }        }

        context.write(key,new Text(firP+secP));

    }}

原文地址:https://www.cnblogs.com/mediocreWorld/p/11028591.html

时间: 2024-07-31 06:20:46

第2节 mapreduce深入学习:15、reduce端的join算法的实现的相关文章

Python学习(三) 八大排序算法的实现(下)

本文Python实现了插入排序.基数排序.希尔排序.冒泡排序.高速排序.直接选择排序.堆排序.归并排序的后面四种. 上篇:Python学习(三) 八大排序算法的实现(上) 1.高速排序 描写叙述 通过一趟排序将要排序的数据切割成独立的两部分,当中一部分的全部数据都比另外一部分的全部数据都要小,然后再按此方法对这两部分数据分别进行高速排序,整个排序过程能够递归进行,以此达到整个数据变成有序序列. 1.先从数列中取出一个数作为基准数. 2.分区过程,将比这个数大的数全放到它的右边,小于或等于它的数全

Haoop MapReduce 的Partition和reduce端的二次排序

先贴一张原理图(摘自hadoop权威指南第三版) 实际中看了半天还是不太理解其中的Partition,和reduce端的二次排序,最终根据实验来结果来验证自己的理解 1eg 数据如下 2014010114 标识20140101日的温度为14度,需求为统计每年温度的最最高值 2014010114 2014010216 2014010317 2014010410... Partition 实际是根据map 任务的key,以及reduce任务的数量来决定最终来由那个reduce来处理,默认指定redu

第2节 mapreduce深入学习:7、MapReduce的规约过程combiner

每一个 map 都可能会产生大量的本地输出,Combiner 的作用就是对 map 端的输出先做一次合并,以减少在 map 和 reduce 节点之间的数据传输量,以提高网络IO 性能,是 MapReduce 的一种优化手段之一. ?   combiner 是 MR 程序中 Mapper 和 Reducer 之外的一种组件 ?   combiner 组件的父类就是 Reducer ?   combiner 和 reducer 的区别在于运行的位置: Combiner 是在每一个 maptask

第2节 mapreduce深入学习:8、手机流量汇总求和

例子:MapReduce综合练习之上网流量统计. 数据格式参见资料夹 需求一:统计求和 统计每个手机号的上行流量总和,下行流量总和,上行总流量之和,下行总流量之和 分析:以手机号码作为key值,上行流量,下行流量,上行总流量,下行总流量四个字段作为value值,然后以这个key,和value作为map阶段的输出,reduce阶段的输入. data_flow.dat内容类似下面的: 1363157985066 13726230503 00-FD-07-A4-72-B8:CMCC 120.196.1

第2节 mapreduce深入学习:12、reducetask运行机制(多看几遍)

ReduceTask的运行的整个过程 背下来1.启动线程到mapTask那里去拷贝数据,拉取属于每一个reducetask自己内部的数据2.数据的合并,拉取过来的数据进行合并,合并的过程,有可能在内存当中,有可能在磁盘当中,有可能在内存和磁盘当中,合并的时候同时要进行分组操作3.调用reduce逻辑4.数据输出 原文地址:https://www.cnblogs.com/mediocreWorld/p/11028068.html

MapReduce实现Reduce端Join操作实例

使用案例: 联接两张表 Table EMP:(新建文件EMP,第一行属性名不要) Name Sex Age DepNo zhang male 20 1 li female 25 2 wang female 30 3 zhou male 35 2 Table Dep:(新建文件DEP,第一行属性名不要) DepNo DepName 1 Sales 2 Dev 3 Mgt Inner join: select Name,Sex,Age,DepName from EMP inner join DEP

hadoop的压缩解压缩,reduce端join,map端join

hadoop的压缩解压缩 hadoop对于常见的几种压缩算法对于我们的mapreduce都是内置支持,不需要我们关心.经过map之后,数据会产生输出经过shuffle,这个时候的shuffle过程特别需要消耗网络资源,它传输的数据量越少,对作业的运行时间越有意义,在这种情况下,我们可以对输出进行一个压缩.输出压缩之后,reducer就要接收,然后再解压,reducer处理完之后也需要做输出,也可以做压缩.对于我们程序而言,输入的压缩是我们原来的,不是程序决定的,因为输入源就是这样子,reduce

Python学习15:Open读取文件

在之前我已经学习过raw_input和argv了,在这一节的Python学习中,我学习怎样使用脚本打开普通的文本文件,读取它并且关闭文件.关闭文件很重要,关闭是为了释放资源,防止内存被耗尽,导致机器死锁.另外,关闭文件还有一个作用,当写文件时,关闭后将缓冲区中的内容写入文件本身. 下面是一个简单的读取文本文件的脚本.我们可以用两种方式来实现这个功能:第一种是一个带参数的脚本.第二种是不使用参数,直接使用变量来读取文件的脚本. 第一种: 1. # 导入argv模块 2. from sys impo

第3节 mapreduce高级:4、倒排索引的建立

倒排索引建立 需求分析 需求:有大量的文本(文档.网页),需要建立搜索索引 最终实现的结果就是哪个单词在哪个文章当中出现了多少次 思路分析: 首选将文档的内容全部读取出来,加上文档的名字作为key,文档的value为1,组织成这样的一种形式的数据 map端数据输出 hello-a.txt 1tom-a.txt 1hello-a.txt 1jerry-a.txt 1 到reduce阶段hello-a.txt <1,1> reduce端数据输出 hello-a.txt 2 tom-a.txt 1