Hadoop多目录输入,join,进入reduce,数据流分析

前言

在做需求时,经常遇到多个目录,也就是多个维度进行join,这里分析一下,数据是怎么流动的。

1、多目录输入

使用MultipleInputs.addInputPath()  对多目录制定格式和map

2、数据流分析

map按行读入数据,需要对不同的输入目录,打上不同的标记(这个方法又叫reduce端连接),map在输出后会进行partition和sort,按照key进行排序,然后输出到reduce进行处理。

例子

三个输入文件:

a.txt:

500
501

b.txt:

500	501
600 505

c.txt:

501	500
700 800

代码

import java.io.IOException;
import java.util.Iterator;

import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import util.TextPair;

import com.sina.hadoop.MultipleInputs;

public class Main extends Configured implements Tool
{

    public static void main(String[] args) throws Exception
    {
        int exitcode = ToolRunner.run(new Main(), args);
        System.exit(exitcode);
    }

    /**
     * 分区
     */
    static class TextPairKeyPartitioner extends Partitioner<TextPair, Text>
    {
        public int getPartition(TextPair key, Text value, int numPartitions)
        {
            return (key.getFirst().hashCode() & Integer.MAX_VALUE) % numPartitions;
        }

    }

    public int run(String[] arg0) throws Exception
    {
        int exitcode = 0;
        if (exitcode == 0)
        {
            Job job1 = new Job();
            job1.setJobName("testMultipleInputs");
            job1.setJarByClass(Main.class);

            MultipleInputs.addInputPath(job1, new Path("xx/testMultipleInputs/input/a/"),
                    TextInputFormat.class, AMapper.class);
            MultipleInputs.addInputPath(job1, new Path("xx/testMultipleInputs/input/b/"),
                    TextInputFormat.class, BMapper.class);
            MultipleInputs.addInputPath(job1, new Path("xx/testMultipleInputs/input/c/"),
                    TextInputFormat.class, CMapper.class);

            job1.setReducerClass(TestReducer.class);
            FileOutputFormat.setOutputPath(job1, new Path("xx/testMultipleInputs/output/"));
            job1.setOutputKeyClass(Text.class);
            job1.setOutputValueClass(Text.class);
            job1.setPartitionerClass(TextPairKeyPartitioner.class);
            job1.setGroupingComparatorClass(TextPair.FirstComparator.class);
            job1.setMapOutputKeyClass(TextPair.class);
            job1.setMapOutputValueClass(Text.class);

            job1.setNumReduceTasks(1);
            exitcode = job1.waitForCompletion(true) ? 0 : 1;
        }

        return exitcode;
    }

    public class AMapper extends Mapper<LongWritable, Text, TextPair, Text>
    {
        public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException
        {
            String[] data = value.toString().split("\t", -1);
            String id = "";
            if (data.length >= 1)
            {
                id = data[0];
                if (!"".equals(id))
                {
                    context.write(new TextPair(id, "1"), new Text("0"));
                }
            }
        }
    }

    public class BMapper extends Mapper<LongWritable, Text, TextPair, Text>
    {
        public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException
        {
            String[] data = value.toString().split("\t", -1);
            String id1 = "";
            String id2 = "";
            if (data.length >= 2)
            {
                id1 = data[0];
                id2 = data[1];
                if (!"".equals(id1))
                {
                    context.write(new TextPair(id1, "2"), new Text(id2));
                }
            }
        }
    }

    public class CMapper extends Mapper<LongWritable, Text, TextPair, Text>
    {
        public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException
        {
            String[] data = value.toString().split("\t", -1);
            String id1 = "";
            String id2 = "";
            if (data.length >= 2)
            {
                id1 = data[0];
                id2 = data[1];
                if (!"".equals(id1))
                {
                    context.write(new TextPair(id1, "3"), new Text(id2));
                }
            }
        }
    }

    public class TestReducer extends Reducer<TextPair, Text, Text, Text>
    {
        public void reduce(TextPair key, Iterable<Text> values, Context context) throws IOException, InterruptedException
        {
            String data = "";
            Iterator<Text> i = values.iterator();
            while (i.hasNext())
            {
                data = i.next().toString();
                context.write(key.getFirst(), new Text(data));
            }
        }

    }

}

Hadoop多目录输入,join,进入reduce,数据流分析,布布扣,bubuko.com

时间: 2024-08-05 08:57:03

Hadoop多目录输入,join,进入reduce,数据流分析的相关文章

hadoop 多目录输入,map到reduce如何排序

使用MultipleInputs.addInputPath 对多个路径输入 现在假设有三个目录,并使用了三个mapper去处理, 经过map处理后,输出的结果会根据key 进行join, 如果使用TextPair,会根据第一个字段jion,第二个字段排序 然后在作为reduce的输入,进行计算 hadoop 多目录输入,map到reduce如何排序

Hadoop中两表JOIN的处理方法(转)

1. 概述 在传统数据库(如:MYSQL)中,JOIN操作是非常常见且非常耗时的.而在HADOOP中进行JOIN操作,同样常见且耗时,由于Hadoop的独特设计思想,当进行JOIN操作时,有一些特殊的技巧. 本文首先介绍了Hadoop上通常的JOIN实现方法,然后给出了几种针对不同输入数据集的优化方法. 2. 常见的join方法介绍 假设要进行join的数据分别来自File1和File2. 2.1 reduce side join reduce side join是一种最简单的join方式,其主

hadoop多文件格式输入

版本: CDH5.0.0 (hdfs:2.3,mapreduce:2.3,yarn:2.3) hadoop多文件格式输入,一般可以使用MultipleInputs类指定不同的输入文件路径以及输入文件格式. 比如现在有如下的需求: 现有两份数据: phone: 123,good number 124,common number 125,bad number user: zhangsan,123 lisi,124 wangwu,125 现在需要把user和phone按照phone number连接起

Hadoop简介(1):什么是Map/Reduce

看这篇文章请出去跑两圈,然后泡一壶茶,边喝茶,边看,看完你就对hadoop整体有所了解了. Hadoop简介 Hadoop就是一个实现了Google云计算系统的开源系统,包括并行计算模型Map/Reduce,分布式文件系统HDFS,以及分布式数据库Hbase,同时Hadoop的相关项目也很丰富,包括ZooKeeper,Pig,Chukwa,Hive,Hbase,Mahout,flume等. 这里详细分解这里面的概念让大家通过这篇文章了解到底是什么hadoop: 1.什么是Map/Reduce,看

map join 与 reduce join

要解决什么问题? 解决的都是同一个问题,即将两张“表‘进行join操作. reduce join是在map阶段完成数据的标记,在reduce阶段完成数据的合并 map join是直接在map阶段完成数据的合并,没有reduce阶段 比如有如下问题: 这是订单表. 这是商品表. 现在需要将商品表中的商品名称填充到订单表中.得到如下的联合表: 也就是对商品表和订单表根据pid进行join操作,同时剔除联合表中的pid属性. Reduce Join map: 将输入数据统一封装为一个Bean,此Bea

【hadoop】如何向map和reduce脚本传递参数,加载文件和目录

本文主要讲解三个问题: 1 使用Java编写MapReduce程序时,如何向map.reduce函数传递参数. 2 使用Streaming编写MapReduce程序(C/C++, Shell, Python)时,如何向map.reduce脚本传递参数. 3 使用Streaming编写MapReduce程序(C/C++, Shell, Python)时,如何向map.reduce脚本传递文件或文件夹. (1) streaming 加载本地单个文件 (2) streaming 加载本地多个文件 (3

Hadoop中MapReduce多种join实现实例分析

一.概述 对于RDBMS中的join操作大伙一定非常熟悉,写sql的时候要十分注意细节,稍有差池就会耗时巨久造成很大的性能瓶颈,而在Hadoop中使用MapReduce框架进行join的操作时同样耗时,但是由于hadoop的分布式设计理念的特殊性,因此对于这种join操作同样也具备了一定的特殊性.本文主要对MapReduce框架对表之间的join操作的几种实现方式进行详细分析,并且根据我在实际开发过程中遇到的实际例子来进行进一步的说明. 二.实现原理 1.在Reudce端进行连接. 在Reudc

MapReduce中的join算法-reduce端join

在海量数据的环境下,不可避免的会碰到join需求, 例如在数据分析时需要连接从不同的数据源中获取到数据. 假设有两个数据集:气象站数据库和天气记录数据库,并考虑如何合二为一. 一个典型的查询是:输出气象站的历史信息,同时各行记录也包含气象站的元数据信息. 气象站和天气记录的示例数据分别如下所示: Station ID            Station Name 011990-99999    SIHCCAJAVRI 012650-99999    TRNSET-HANSMOEN Statio

Hadoop进阶之输入路径如何正则通配?

在hadoop的编程中,如果你是手写MapReduce来处理一些数据,那么就避免不了输入输出参数路径的设定,hadoop里文件基类FileInputFormat提供了如下几种api来制定: 如上图,里面有 (1)addInputPath(),每次添加一个输入路径Path (2)addInputPaths, 将多个路径以逗号分割的字符串,作为入参,支持多个路径 (3)setInputPath ,设置一个输入路径Path,会覆盖原来的路径 (4)setInputPath , 设置多个路径,支持Had