hadoop多文件格式输入

版本:

CDH5.0.0 (hdfs:2.3,mapreduce:2.3,yarn:2.3)

hadoop多文件格式输入,一般可以使用MultipleInputs类指定不同的输入文件路径以及输入文件格式。

比如现在有如下的需求:

现有两份数据:

phone:

123,good number
124,common number
125,bad number

user:

zhangsan,123
lisi,124
wangwu,125

现在需要把user和phone按照phone number连接起来,得到下面的结果:

zhangsan,123,good number
lisi,123,common number
wangwu,125,bad number

那么就可以使用MultipleInputs来操作,这里把user和phone上传到hdfs目录中,分别是/multiple/user/user , /multiple/phone/phone。

设计的MultipleDriver如下:

package multiple.input;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
//import org.slf4j.Logger;
//import org.slf4j.LoggerFactory;
/**
 * input1(/multiple/user/user):
 * username,user_phone
 *
 * input2(/multiple/phone/phone):
 *  user_phone,description
 *
 * output: username,user_phone,description
 *
 * @author fansy
 *
 */
public class MultipleDriver extends Configured implements Tool{
//	private  Logger log = LoggerFactory.getLogger(MultipleDriver.class);

	private String input1=null;
	private String input2=null;
	private String output=null;
	private String delimiter=null;

	public static void main(String[] args) throws Exception {
		Configuration conf=new Configuration();
//		conf.set("fs.defaultFS", "hdfs://node33:8020");
//        conf.set("mapreduce.framework.name", "yarn");
//        conf.set("yarn.resourcemanager.address", "node33:8032"); 

		ToolRunner.run(conf, new MultipleDriver(), args);
	}

	@Override
	public int run(String[] arg0) throws Exception {
		configureArgs(arg0);
		checkArgs();

		Configuration conf= getConf();
		conf.set("delimiter", delimiter);
		 @SuppressWarnings("deprecation")
		Job job = new Job(conf, "merge user and phone information ");
        job.setJarByClass(MultipleDriver.class);

        job.setReducerClass(MultipleReducer.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(FlagStringDataType.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);

        job.setNumReduceTasks(1);
        MultipleInputs.addInputPath(job, new Path(input1), TextInputFormat.class, Multiple1Mapper.class);
        MultipleInputs.addInputPath(job, new Path(input2), TextInputFormat.class, Multiple2Mapper.class);
        FileOutputFormat.setOutputPath(job, new Path(output));

        int res = job.waitForCompletion(true) ? 0 : 1;
        return res;
	}

	/**
	 * check the args
	 */
	private void checkArgs() {
		if(input1==null||"".equals(input1)){
			System.out.println("no user input...");
			printUsage();
			System.exit(-1);
		}
		if(input2==null||"".equals(input2)){
			System.out.println("no phone input...");
			printUsage();
			System.exit(-1);
		}
		if(output==null||"".equals(output)){
			System.out.println("no output...");
			printUsage();
			System.exit(-1);
		}
		if(delimiter==null||"".equals(delimiter)){
			System.out.println("no delimiter...");
			printUsage();
			System.exit(-1);
		}

	}

	/**
	 * configuration the args
	 * @param args
	 */
	private void configureArgs(String[] args) {
    	for(int i=0;i<args.length;i++){
    		if("-i1".equals(args[i])){
    			input1=args[++i];
    		}
    		if("-i2".equals(args[i])){
    			input2=args[++i];
    		}

    		if("-o".equals(args[i])){
    			output=args[++i];
    		}

    		if("-delimiter".equals(args[i])){
    			delimiter=args[++i];
    		}

    	}
	}
	public static void printUsage(){
    	System.err.println("Usage:");
    	System.err.println("-i1 input \t user data path.");
    	System.err.println("-i2 input \t phone data path.");
    	System.err.println("-o output \t output data path.");
    	System.err.println("-delimiter  data delimiter , default is comma  .");
    }
}

这里指定两个mapper和一个reducer,两个mapper分别对应处理user和phone的数据,分别如下:

mapper1(处理user数据):

package multiple.input;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
 * input :
 * username,phone
 *
 * output:
 * <key,value>  --> <[phone],[0,username]>
 * @author fansy
 *
 */
public class Multiple1Mapper extends Mapper<LongWritable,Text,Text,FlagStringDataType>{
	private  Logger log = LoggerFactory.getLogger(Multiple1Mapper.class);
	private String delimiter=null; // default is comma
	@Override
	public void setup(Context cxt){
		delimiter= cxt.getConfiguration().get("delimiter", ",");
		log.info("This is the begin of Multiple1Mapper");
	} 

	@Override
	public void map(LongWritable key,Text value,Context cxt) throws IOException,InterruptedException{
		String info= new String(value.getBytes(),"UTF-8");
		String[] values = info.split(delimiter);
		if(values.length!=2){
			return;
		}
		log.info("key-->"+values[1]+"=========value-->"+"[0,"+values[0]+"]");
		cxt.write(new Text(values[1]), new FlagStringDataType(0,values[0]));
	}
}

mapper2(处理phone数据):

package multiple.input;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
 * input :
 * phone,description
 *
 * output:
 * <key,value>  --> <[phone],[1,description]>
 * @author fansy
 *
 */
public class Multiple2Mapper extends Mapper<LongWritable,Text,Text,FlagStringDataType>{
	private  Logger log = LoggerFactory.getLogger(Multiple2Mapper.class);
	private String delimiter=null; // default is comma
	@Override
	public void setup(Context cxt){
		delimiter= cxt.getConfiguration().get("delimiter", ",");
		log.info("This is the begin of Multiple2Mapper");
	} 

	@Override
	public void map(LongWritable key,Text value,Context cxt) throws IOException,InterruptedException{
		String[] values= value.toString().split(delimiter);
		if(values.length!=2){
			return;
		}
		log.info("key-->"+values[0]+"=========value-->"+"[1,"+values[1]+"]");
		cxt.write(new Text(values[0]), new FlagStringDataType(1,values[1]));
	}
}

这里的FlagStringDataType是自定义的:

package multiple.input;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.primitives.Ints;

public class FlagStringDataType implements WritableComparable<FlagStringDataType> {
	private  Logger log = LoggerFactory.getLogger(FlagStringDataType.class);
  private String value;
  private int flag;
  public FlagStringDataType() {
  }

  public FlagStringDataType(int flag,String value) {
    this.value = value;
    this.flag=flag;
  }

  public String get() {
    return value;
  }

  public void set(String value) {
    this.value = value;
  }

  @Override
  public boolean equals(Object other) {
    return other != null && getClass().equals(other.getClass())
    		&& ((FlagStringDataType) other).get() == value
    		&&((FlagStringDataType) other).getFlag()==flag;
  }

  @Override
  public int hashCode() {
    return Ints.hashCode(flag)+value.hashCode();
  }

  @Override
  public int compareTo(FlagStringDataType other) {

    if (flag >= other.flag) {
      if (flag > other.flag) {
        return 1;
      }
    } else {
      return -1;
    }
    return value.compareTo(other.value);
  }

  @Override
  public void write(DataOutput out) throws IOException {
	log.info("in write()::"+"flag:"+flag+",vlaue:"+value);
    out.writeInt(flag);
    out.writeUTF(value);
  }

  @Override
  public void readFields(DataInput in) throws IOException {
	  log.info("in read()::"+"flag:"+flag+",vlaue:"+value);
	  flag=in.readInt();
	  value = in.readUTF();
	  log.info("in read()::"+"flag:"+flag+",vlaue:"+value);
  }

public int getFlag() {
	return flag;
}

public void setFlag(int flag) {
	this.flag = flag;
}

public String toString(){
	return flag+":"+value;
}

}

这个自定义类,使用一个flag来指定是哪个数据,而value则对应是其值。这样做的好处是在reduce端可以根据flag的值来判断其输出位置,这种设计方式可以对多种输入的整合有很大帮助,在mahout中也可以看到这样的设计。

reducer(汇总输出数据):

package multiple.input;

import java.io.IOException;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MultipleReducer extends Reducer<Text,FlagStringDataType,Text,NullWritable>{
	private  Logger log = LoggerFactory.getLogger(MultipleReducer.class);
	private String delimiter=null; // default is comma
	@Override
	public void setup(Context cxt){
		delimiter= cxt.getConfiguration().get("delimiter", ",");
	}
	@Override
	public void reduce(Text key, Iterable<FlagStringDataType> values,Context cxt) throws IOException,InterruptedException{
		log.info("================");
		log.info("         =======");
		log.info("              ==");
		String[] value= new String[3];
		value[2]=key.toString();
		for(FlagStringDataType v:values){
			int index= v.getFlag();
			log.info("index:"+index+"-->value:"+v.get());
			value[index]= v.get();
		}
		log.info("              ==");
		log.info("         =======");
		log.info("================");
		cxt.write(new Text(value[2]+delimiter+value[0]+delimiter+value[1]),NullWritable.get());
	}
}

这样设计的好处是,可以针对不同的输入数据采取不同的逻辑处理,而且不同的输入数据可以是序列文件的格式。

下面介绍一种方式和上面的比,略有不足,但是可以借鉴。

首先是Driver:

package multiple.input;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
//import org.slf4j.Logger;
//import org.slf4j.LoggerFactory;
/**
 * input1(/multiple/user/user):
 * username,user_phone
 *
 * input2(/multiple/phone/phone):
 *  user_phone,description
 *
 * output: username,user_phone,description
 *
 * @author fansy
 *
 */
public class MultipleDriver2 extends Configured implements Tool{
//	private  Logger log = LoggerFactory.getLogger(MultipleDriver.class);

	private String input1=null;
	private String input2=null;
	private String output=null;
	private String delimiter=null;

	public static void main(String[] args) throws Exception {
		Configuration conf=new Configuration();
//		conf.set("fs.defaultFS", "hdfs://node33:8020");
//        conf.set("mapreduce.framework.name", "yarn");
//        conf.set("yarn.resourcemanager.address", "node33:8032"); 

		ToolRunner.run(conf, new MultipleDriver2(), args);
	}

	@Override
	public int run(String[] arg0) throws Exception {
		configureArgs(arg0);
		checkArgs();

		Configuration conf= getConf();
		conf.set("delimiter", delimiter);
		 @SuppressWarnings("deprecation")
		Job job = new Job(conf, "merge user and phone information ");
        job.setJarByClass(MultipleDriver2.class);
        job.setMapperClass(MultipleMapper.class);
        job.setReducerClass(MultipleReducer.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(FlagStringDataType.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);

        job.setNumReduceTasks(1);
        FileInputFormat.addInputPath(job, new Path(input1));
        FileInputFormat.addInputPath(job, new Path(input2));
        FileOutputFormat.setOutputPath(job, new Path(output));

        int res = job.waitForCompletion(true) ? 0 : 1;
        return res;
	}

	/**
	 * check the args
	 */
	private void checkArgs() {
		if(input1==null||"".equals(input1)){
			System.out.println("no user input...");
			printUsage();
			System.exit(-1);
		}
		if(input2==null||"".equals(input2)){
			System.out.println("no phone input...");
			printUsage();
			System.exit(-1);
		}
		if(output==null||"".equals(output)){
			System.out.println("no output...");
			printUsage();
			System.exit(-1);
		}
		if(delimiter==null||"".equals(delimiter)){
			System.out.println("no delimiter...");
			printUsage();
			System.exit(-1);
		}

	}

	/**
	 * configuration the args
	 * @param args
	 */
	private void configureArgs(String[] args) {
    	for(int i=0;i<args.length;i++){
    		if("-i1".equals(args[i])){
    			input1=args[++i];
    		}
    		if("-i2".equals(args[i])){
    			input2=args[++i];
    		}

    		if("-o".equals(args[i])){
    			output=args[++i];
    		}

    		if("-delimiter".equals(args[i])){
    			delimiter=args[++i];
    		}

    	}
	}
	public static void printUsage(){
    	System.err.println("Usage:");
    	System.err.println("-i1 input \t user data path.");
    	System.err.println("-i2 input \t phone data path.");
    	System.err.println("-o output \t output data path.");
    	System.err.println("-delimiter  data delimiter , default is comma  .");
    }
}

这里添加路径直接使用FileInputFormat添加输入路径,这样的话,针对不同的输入数据的不同业务逻辑可以在mapper中先判断目前正在处理的是那个数据,然后根据其路径来进行相应的业务逻辑处理:

package multiple.input;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
/**
 * input1 :
 * username,phone
 *
 * input2
 * phone,description
 *
 * output:
 * <key,value>  --> <[phone],[0,username]>
 * <key,value>  --> <[phone],[1,description]>
 * @author fansy
 *
 */
public class MultipleMapper extends Mapper<LongWritable,Text,Text,FlagStringDataType>{

	private String delimiter=null; // default is comma
	private boolean flag=false;
	@Override
	public void setup(Context cxt){
		delimiter= cxt.getConfiguration().get("delimiter", ",");
		InputSplit input=cxt.getInputSplit();
	    String filename=((FileSplit) input).getPath().getParent().getName();
	    if("user".equals(filename)){
	    	flag=true;
	    }
	} 

	@Override
	public void map(LongWritable key,Text value,Context cxt) throws IOException,InterruptedException{
		String[] values= value.toString().split(delimiter);
		if(values.length!=2){
			return;
		}
		if(flag){
			cxt.write(new Text(values[1]), new FlagStringDataType(0,values[0]));
		}else{
			cxt.write(new Text(values[0]), new FlagStringDataType(1,values[1]));
		}
	}
}

总体来说,这种处理方式其实是不如第一种的,在每个map函数中都需要进行判断,比第一种多了很多操作;同时,针对不同的序列文件,这种方式处理不了(Key、value的类型不一样的情况下)。所以针对多文件格式的输入,最好还是使用第一种方式。

分享,成长,快乐

转载请注明blog地址:http://blog.csdn.net/fansy1990

hadoop多文件格式输入,布布扣,bubuko.com

时间: 2024-10-14 21:27:48

hadoop多文件格式输入的相关文章

Hadoop多目录输入,join,进入reduce,数据流分析

前言 在做需求时,经常遇到多个目录,也就是多个维度进行join,这里分析一下,数据是怎么流动的. 1.多目录输入 使用MultipleInputs.addInputPath()  对多目录制定格式和map 2.数据流分析 map按行读入数据,需要对不同的输入目录,打上不同的标记(这个方法又叫reduce端连接),map在输出后会进行partition和sort,按照key进行排序,然后输出到reduce进行处理. 例子 三个输入文件: a.txt: 500 501 b.txt: 500 501

hadoop 多目录输入,map到reduce如何排序

使用MultipleInputs.addInputPath 对多个路径输入 现在假设有三个目录,并使用了三个mapper去处理, 经过map处理后,输出的结果会根据key 进行join, 如果使用TextPair,会根据第一个字段jion,第二个字段排序 然后在作为reduce的输入,进行计算 hadoop 多目录输入,map到reduce如何排序

Hadoop进阶之输入路径如何正则通配?

在hadoop的编程中,如果你是手写MapReduce来处理一些数据,那么就避免不了输入输出参数路径的设定,hadoop里文件基类FileInputFormat提供了如下几种api来制定: 如上图,里面有 (1)addInputPath(),每次添加一个输入路径Path (2)addInputPaths, 将多个路径以逗号分割的字符串,作为入参,支持多个路径 (3)setInputPath ,设置一个输入路径Path,会覆盖原来的路径 (4)setInputPath , 设置多个路径,支持Had

Hadoop的数据输入的源码解析

我们知道,任何一个工程项目,最重要的是三个部分:输入,中间处理,输出.今天我们来深入的了解一下我们熟知的Hadoop系统中,输入是如何输入的? 在hadoop中,输入数据都是通过对应的InputFormat类和RecordReader类来实现的,其中InputFormat来实现将对应输入文件进行分片,RecordReader类将对应分片中的数据读取进来.具体的方式如下: (1)InputFormat类是一个接口. public interface InputFormat<K, V> {    

Fp关联规则算法计算置信度及MapReduce实现思路

说明:參考Mahout FP算法相关相关源代码. 算法project能够在FP关联规则计算置信度下载:(仅仅是单机版的实现,并没有MapReduce的代码) 使用FP关联规则算法计算置信度基于以下的思路: 1. 首先使用原始的FP树关联规则挖掘出全部的频繁项集及其支持度:这里须要注意,这里是输出全部的频繁项集,并没有把频繁项集合并,所以须要改动FP树的相关代码,在某些步骤把全部的频繁项集输出:(ps:參考Mahout的FP树单机版的实现,进行了改动,暂不确定是否已经输出了全部频繁项集) 为举例简

【转】Hadoop HDFS分布式环境搭建

原文地址  http://blog.sina.com.cn/s/blog_7060fb5a0101cson.html Hadoop HDFS分布式环境搭建 最近选择给大家介绍Hadoop HDFS系统,因此研究了一下如何在Linux 下配置一个HDFS Clust.小记一下,以备将来进一步研究和记忆. HDFS简介 全称 Hadoop Distributed File System, Hadoop分布式文件系统. 根据Google的GFS论文,由Doug Cutting使用JAVA开发的开源项目

Hadoop学习笔记(8) ——实战 做个倒排索引

Hadoop学习笔记(8) ——实战 做个倒排索引 倒排索引是文档检索系统中最常用数据结构.根据单词反过来查在文档中出现的频率,而不是根据文档来,所以称倒排索引(Inverted Index).结构如下: 这张索引表中, 每个单词都对应着一系列的出现该单词的文档,权表示该单词在该文档中出现的次数.现在我们假定输入的是以下的文件清单: T1 : hello world hello china T2 : hello hadoop T3 : bye world bye hadoop bye bye 输

hadoop快速扫盲帖,从零了解hadoop

1.MapReduce理论简介 1.1 MapReduce编程模型 MapReduce采用"分而治之"的思想,把对大规模数据集的操作,分发给一个主节点管理下的各个分节点共同完成,然后通过整合各个节点的中间结果,得到最终结果.简单地说,MapReduce就是"任务的分解与结果的汇总". 在Hadoop中,用于执行MapReduce任务的机器角色有两个:一个是JobTracker:另一个是TaskTracker,JobTracker是用于调度工作的,TaskTracke

hadoop安装笔记

一.设置Linux的静态IP 修改桌面图标修改,或者修改配置文件修改 1.先执行ifconfig,得到网络设备的名称eth0 2.编辑/etc/sysconfig/network-scripts/ifcfg-eth0文件 设置静态ip.dns.ip地址.子网掩码.网关等信息 3.重启网卡:service network restart 最后执行ifconfig命令查看是否修改成功 该步骤需保证虚拟机中的Linux能与客户机在同一网段并且ping通 二.修改主机名 修改当前会话主机名(当前会话生效