map端join

package my.hadoop.hdfs.mapreduceJoin;

import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.HashMap;
import java.util.Map;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

 /**
  * 当商品表比较小只有几十个(比如小米手机),但是订单表比较大(一年卖几千万)此时
  * 如果将每个产品用一个reduce处理时那就可能出现小米书包只有几万,数据,但是小米手机就有100万的数据,
  * 出现负载不均衡,数据倾斜的情况。
  * @author lq
  *
  */
public class MapsideJoin {

    public static class FindFriendMapper extends
            Mapper<LongWritable, Text, AllInfoBean, NullWritable> {

        FileSplit fileSplit = null;
        String filename = null;

        Map<String,String> pdinfo = new HashMap<String,String>();

        @Override
        protected void setup(
                Mapper<LongWritable, Text, AllInfoBean, NullWritable>.Context context)
                throws IOException, InterruptedException {
            //文件和程序已经在同一个路径(splist。xml。wc,)
        BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream("product")));
        String line  = null;
        while ((line  = br.readLine())!=null){
            String[] split = line.split(",");
            pdinfo.put(split[0], split[1]);
        }
        // 关闭流
        br.close();
        }
        AllInfoBean bean = new AllInfoBean();
        @Override
        protected void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {
            // 获取文件名字的方法
            // 判断用的是哪个文件
                String[] cols = value.toString().split(",");
                bean.setOderid(Integer.parseInt(cols[0]));
                bean.setDate(cols[1]);
                bean.setPid(cols[2]);
                bean.setAmount(Integer.parseInt(cols[3]));
                bean.setPname(pdinfo.get(cols[2])==null? "" : pdinfo.get(cols[2]));
                bean.setPrice("");
                bean.setCategory_id("");

            context.write(bean, NullWritable.get());
        }
    }

 //不要reduce
    /*public static class FindFriendReducer extends
            Reducer<Text, AllInfoBean, AllInfoBean, NullWritable> {

        @Override
        protected void reduce(Text Keyin, Iterable<AllInfoBean> values,
                Context context) throws IOException, InterruptedException {

            for(AllInfoBean bean : values){
                context.write(bean, NullWritable.get());
            }

        }
    }*/

    public static void main(String[] args) throws IOException,
            ClassNotFoundException, InterruptedException, URISyntaxException {

        Configuration configuration = new Configuration();
        Job job = Job.getInstance(configuration);
        job.setJarByClass(MapsideJoin.class);

        job.setMapperClass(FindFriendMapper.class);
        //不指定reduce
        //job.setReducerClass(FindFriendReducer.class);
        //指定最终输出的数据kv类型

        //job.setMapOutputKeyClass(Text.class);
        //job.setMapOutputValueClass(AllInfoBean.class);
        job.setNumReduceTasks(0);//设置不运行reduce
        job.setOutputKeyClass(AllInfoBean.class);
        job.setOutputValueClass(NullWritable.class);
        //第三方jar包使用这个路径指定,本地和hdfs都可以
        //job.addArchiveToClassPath(archive);
        //job
        job.addCacheFile(new URI("hdfs://mini2:9000/Rjoin/dat2/product"));//缓存其他节点

        FileInputFormat.setInputPaths(job, new Path(args[0]));

        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        boolean res = job.waitForCompletion(true);
        System.exit(res ? 0 :1);
    }

}
时间: 2024-11-13 08:17:43

map端join的相关文章

hadoop的压缩解压缩,reduce端join,map端join

hadoop的压缩解压缩 hadoop对于常见的几种压缩算法对于我们的mapreduce都是内置支持,不需要我们关心.经过map之后,数据会产生输出经过shuffle,这个时候的shuffle过程特别需要消耗网络资源,它传输的数据量越少,对作业的运行时间越有意义,在这种情况下,我们可以对输出进行一个压缩.输出压缩之后,reducer就要接收,然后再解压,reducer处理完之后也需要做输出,也可以做压缩.对于我们程序而言,输入的压缩是我们原来的,不是程序决定的,因为输入源就是这样子,reduce

hadoop map端join

map端的联结比reduce端的联结实现起来复杂,而且限制也多,一般我们将小表置于内存中, 对于大表的一个纪录我们在内存中查找即可. 改例子摘自hadoop基础教程, 我们实现sales和accounts的联结, 其中sales记录的顾客的销售信息,accounts纪录的是用户的账户信息,我们的目的是统计每个用户消费的次数和消费总额. 数据如下: sales.txt 002 12.29   2004-07-02 004 13.42   2005-12-20 003 499.99  2010-12

MapReduce实现Reduce端Join操作实例

使用案例: 联接两张表 Table EMP:(新建文件EMP,第一行属性名不要) Name Sex Age DepNo zhang male 20 1 li female 25 2 wang female 30 3 zhou male 35 2 Table Dep:(新建文件DEP,第一行属性名不要) DepNo DepName 1 Sales 2 Dev 3 Mgt Inner join: select Name,Sex,Age,DepName from EMP inner join DEP

MapReduce表连接操作之Reduce端join

一:背景 Reduce端连接比Map端连接更为普遍,因为输入的数据不需要特定的结构,但是效率比较低,因为所有数据都必须经过Shuffle过程. 二:技术实现 基本思路 (1):Map端读取所有的文件,并在输出的内容里加上标示,代表数据是从哪个文件里来的. (2):在reduce处理函数中,按照标识对数据进行处理. (3):然后根据Key去join来求出结果直接输出. 数据准备 准备好下面两张表: (1):tb_a(以下简称表A) [java] view plain copy id  name 1

Hadoop.2.x_高级应用_二次排序及MapReduce端join

一.对于二次排序案例部分理解 1. 分析需求(首先对第一个字段排序,然后在对第二个字段排序) 杂乱的原始数据 排序完成的数据 a,1 a,1 b,1 a,2 a,2 [排序] a,100 b,6 ===> b,-3 c,2 b,-2 b,-2 b,1 a,100 b,6 b,-3 c,-7 c,-7 c,2 2. 分析[MapRedice过程] 1> 分析数据传入通过input()传入map() 2> map()对数据进行层层过滤,以达到我们想要的数据源, 3> 过滤方法中可添加自

Hadoop2.4.1 MapReduce通过Map端shuffle(Combiner)完成数据去重

package com.bank.service; import java.io.IOException; import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.NullWrita

hadoop编程小技巧(1)---map端聚合

测试hadoop版本:2.4 Map端聚合的应用场景:当我们只关心所有数据中的部分数据时,并且数据可以放入内存中. 使用的好处:可以大大减小网络数据的传输量,提高效率: 一般编程思路:在Mapper的map函数中读入所有数据,然后添加到一个List(队列)中,然后在cleanup函数中对list进行处理,输出我们关系的少量数据. 实例: 在map函数中使用空格分隔每行数据,然后把每个单词添加到一个堆栈中,在cleanup函数中输出堆栈中单词次数比较多的单词以及次数: package fz.inm

Hive Map 端OOM 异常

怪异现象:数据量不大,且不是Reduce端OOM,是Map端OOM Map Task运行的时候数据流中包含了非法字符例如:EOF.NOP等东西,导致BufferedReader读取和StreamDecoder解码出错, 进一步导致了OOM,需要剔除这些记录,可以通过length来限制. PS:当然,这只是Map 端OOM出现的其中一种原因,仅供参考.

Hadoop on Mac with IntelliJ IDEA - 10 陆喜恒. Hadoop实战(第2版)6.4.1(Shuffle和排序)Map端 内容整理

下午对着源码看陆喜恒. Hadoop实战(第2版)6.4.1  (Shuffle和排序)Map端,发现与Hadoop 1.2.1的源码有些出入.下面作个简单的记录,方便起见,引用自书本的语句都用斜体表示. 依书本,从MapTask.java开始.这个类有多个内部类: 从书的描述可知,collect()并不在MapTask类,而在MapOutputBuffer类,其函数功能是 1.定义输出内存缓冲区为环形结构2.定义输出内存缓冲区内容到磁盘的操作 在collect函数中将缓冲区的内容写出时会调用s