版本:
CDH5.0.0 (hdfs:2.3,mapreduce:2.3,yarn:2.3)
hadoop多文件格式输入,一般可以使用MultipleInputs类指定不同的输入文件路径以及输入文件格式。
比如现在有如下的需求:
现有两份数据:
phone:
123,good number 124,common number 125,bad number
user:
zhangsan,123 lisi,124 wangwu,125
现在需要把user和phone按照phone number连接起来,得到下面的结果:
zhangsan,123,good number lisi,123,common number wangwu,125,bad number
那么就可以使用MultipleInputs来操作,这里把user和phone上传到hdfs目录中,分别是/multiple/user/user , /multiple/phone/phone。
设计的MultipleDriver如下:
package multiple.input; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.MultipleInputs; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; //import org.slf4j.Logger; //import org.slf4j.LoggerFactory; /** * input1(/multiple/user/user): * username,user_phone * * input2(/multiple/phone/phone): * user_phone,description * * output: username,user_phone,description * * @author fansy * */ public class MultipleDriver extends Configured implements Tool{ // private Logger log = LoggerFactory.getLogger(MultipleDriver.class); private String input1=null; private String input2=null; private String output=null; private String delimiter=null; public static void main(String[] args) throws Exception { Configuration conf=new Configuration(); // conf.set("fs.defaultFS", "hdfs://node33:8020"); // conf.set("mapreduce.framework.name", "yarn"); // conf.set("yarn.resourcemanager.address", "node33:8032"); ToolRunner.run(conf, new MultipleDriver(), args); } @Override public int run(String[] arg0) throws Exception { configureArgs(arg0); checkArgs(); Configuration conf= getConf(); conf.set("delimiter", delimiter); @SuppressWarnings("deprecation") Job job = new Job(conf, "merge user and phone information "); job.setJarByClass(MultipleDriver.class); job.setReducerClass(MultipleReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(FlagStringDataType.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); job.setNumReduceTasks(1); MultipleInputs.addInputPath(job, new Path(input1), TextInputFormat.class, Multiple1Mapper.class); MultipleInputs.addInputPath(job, new Path(input2), TextInputFormat.class, Multiple2Mapper.class); FileOutputFormat.setOutputPath(job, new Path(output)); int res = job.waitForCompletion(true) ? 0 : 1; return res; } /** * check the args */ private void checkArgs() { if(input1==null||"".equals(input1)){ System.out.println("no user input..."); printUsage(); System.exit(-1); } if(input2==null||"".equals(input2)){ System.out.println("no phone input..."); printUsage(); System.exit(-1); } if(output==null||"".equals(output)){ System.out.println("no output..."); printUsage(); System.exit(-1); } if(delimiter==null||"".equals(delimiter)){ System.out.println("no delimiter..."); printUsage(); System.exit(-1); } } /** * configuration the args * @param args */ private void configureArgs(String[] args) { for(int i=0;i<args.length;i++){ if("-i1".equals(args[i])){ input1=args[++i]; } if("-i2".equals(args[i])){ input2=args[++i]; } if("-o".equals(args[i])){ output=args[++i]; } if("-delimiter".equals(args[i])){ delimiter=args[++i]; } } } public static void printUsage(){ System.err.println("Usage:"); System.err.println("-i1 input \t user data path."); System.err.println("-i2 input \t phone data path."); System.err.println("-o output \t output data path."); System.err.println("-delimiter data delimiter , default is comma ."); } }
这里指定两个mapper和一个reducer,两个mapper分别对应处理user和phone的数据,分别如下:
mapper1(处理user数据):
package multiple.input; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * input : * username,phone * * output: * <key,value> --> <[phone],[0,username]> * @author fansy * */ public class Multiple1Mapper extends Mapper<LongWritable,Text,Text,FlagStringDataType>{ private Logger log = LoggerFactory.getLogger(Multiple1Mapper.class); private String delimiter=null; // default is comma @Override public void setup(Context cxt){ delimiter= cxt.getConfiguration().get("delimiter", ","); log.info("This is the begin of Multiple1Mapper"); } @Override public void map(LongWritable key,Text value,Context cxt) throws IOException,InterruptedException{ String info= new String(value.getBytes(),"UTF-8"); String[] values = info.split(delimiter); if(values.length!=2){ return; } log.info("key-->"+values[1]+"=========value-->"+"[0,"+values[0]+"]"); cxt.write(new Text(values[1]), new FlagStringDataType(0,values[0])); } }
mapper2(处理phone数据):
package multiple.input; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * input : * phone,description * * output: * <key,value> --> <[phone],[1,description]> * @author fansy * */ public class Multiple2Mapper extends Mapper<LongWritable,Text,Text,FlagStringDataType>{ private Logger log = LoggerFactory.getLogger(Multiple2Mapper.class); private String delimiter=null; // default is comma @Override public void setup(Context cxt){ delimiter= cxt.getConfiguration().get("delimiter", ","); log.info("This is the begin of Multiple2Mapper"); } @Override public void map(LongWritable key,Text value,Context cxt) throws IOException,InterruptedException{ String[] values= value.toString().split(delimiter); if(values.length!=2){ return; } log.info("key-->"+values[0]+"=========value-->"+"[1,"+values[1]+"]"); cxt.write(new Text(values[0]), new FlagStringDataType(1,values[1])); } }
这里的FlagStringDataType是自定义的:
package multiple.input; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.WritableComparable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.primitives.Ints; public class FlagStringDataType implements WritableComparable<FlagStringDataType> { private Logger log = LoggerFactory.getLogger(FlagStringDataType.class); private String value; private int flag; public FlagStringDataType() { } public FlagStringDataType(int flag,String value) { this.value = value; this.flag=flag; } public String get() { return value; } public void set(String value) { this.value = value; } @Override public boolean equals(Object other) { return other != null && getClass().equals(other.getClass()) && ((FlagStringDataType) other).get() == value &&((FlagStringDataType) other).getFlag()==flag; } @Override public int hashCode() { return Ints.hashCode(flag)+value.hashCode(); } @Override public int compareTo(FlagStringDataType other) { if (flag >= other.flag) { if (flag > other.flag) { return 1; } } else { return -1; } return value.compareTo(other.value); } @Override public void write(DataOutput out) throws IOException { log.info("in write()::"+"flag:"+flag+",vlaue:"+value); out.writeInt(flag); out.writeUTF(value); } @Override public void readFields(DataInput in) throws IOException { log.info("in read()::"+"flag:"+flag+",vlaue:"+value); flag=in.readInt(); value = in.readUTF(); log.info("in read()::"+"flag:"+flag+",vlaue:"+value); } public int getFlag() { return flag; } public void setFlag(int flag) { this.flag = flag; } public String toString(){ return flag+":"+value; } }
这个自定义类,使用一个flag来指定是哪个数据,而value则对应是其值。这样做的好处是在reduce端可以根据flag的值来判断其输出位置,这种设计方式可以对多种输入的整合有很大帮助,在mahout中也可以看到这样的设计。
reducer(汇总输出数据):
package multiple.input; import java.io.IOException; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class MultipleReducer extends Reducer<Text,FlagStringDataType,Text,NullWritable>{ private Logger log = LoggerFactory.getLogger(MultipleReducer.class); private String delimiter=null; // default is comma @Override public void setup(Context cxt){ delimiter= cxt.getConfiguration().get("delimiter", ","); } @Override public void reduce(Text key, Iterable<FlagStringDataType> values,Context cxt) throws IOException,InterruptedException{ log.info("================"); log.info(" ======="); log.info(" =="); String[] value= new String[3]; value[2]=key.toString(); for(FlagStringDataType v:values){ int index= v.getFlag(); log.info("index:"+index+"-->value:"+v.get()); value[index]= v.get(); } log.info(" =="); log.info(" ======="); log.info("================"); cxt.write(new Text(value[2]+delimiter+value[0]+delimiter+value[1]),NullWritable.get()); } }
这样设计的好处是,可以针对不同的输入数据采取不同的逻辑处理,而且不同的输入数据可以是序列文件的格式。
下面介绍一种方式和上面的比,略有不足,但是可以借鉴。
首先是Driver:
package multiple.input; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; //import org.slf4j.Logger; //import org.slf4j.LoggerFactory; /** * input1(/multiple/user/user): * username,user_phone * * input2(/multiple/phone/phone): * user_phone,description * * output: username,user_phone,description * * @author fansy * */ public class MultipleDriver2 extends Configured implements Tool{ // private Logger log = LoggerFactory.getLogger(MultipleDriver.class); private String input1=null; private String input2=null; private String output=null; private String delimiter=null; public static void main(String[] args) throws Exception { Configuration conf=new Configuration(); // conf.set("fs.defaultFS", "hdfs://node33:8020"); // conf.set("mapreduce.framework.name", "yarn"); // conf.set("yarn.resourcemanager.address", "node33:8032"); ToolRunner.run(conf, new MultipleDriver2(), args); } @Override public int run(String[] arg0) throws Exception { configureArgs(arg0); checkArgs(); Configuration conf= getConf(); conf.set("delimiter", delimiter); @SuppressWarnings("deprecation") Job job = new Job(conf, "merge user and phone information "); job.setJarByClass(MultipleDriver2.class); job.setMapperClass(MultipleMapper.class); job.setReducerClass(MultipleReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(FlagStringDataType.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); job.setNumReduceTasks(1); FileInputFormat.addInputPath(job, new Path(input1)); FileInputFormat.addInputPath(job, new Path(input2)); FileOutputFormat.setOutputPath(job, new Path(output)); int res = job.waitForCompletion(true) ? 0 : 1; return res; } /** * check the args */ private void checkArgs() { if(input1==null||"".equals(input1)){ System.out.println("no user input..."); printUsage(); System.exit(-1); } if(input2==null||"".equals(input2)){ System.out.println("no phone input..."); printUsage(); System.exit(-1); } if(output==null||"".equals(output)){ System.out.println("no output..."); printUsage(); System.exit(-1); } if(delimiter==null||"".equals(delimiter)){ System.out.println("no delimiter..."); printUsage(); System.exit(-1); } } /** * configuration the args * @param args */ private void configureArgs(String[] args) { for(int i=0;i<args.length;i++){ if("-i1".equals(args[i])){ input1=args[++i]; } if("-i2".equals(args[i])){ input2=args[++i]; } if("-o".equals(args[i])){ output=args[++i]; } if("-delimiter".equals(args[i])){ delimiter=args[++i]; } } } public static void printUsage(){ System.err.println("Usage:"); System.err.println("-i1 input \t user data path."); System.err.println("-i2 input \t phone data path."); System.err.println("-o output \t output data path."); System.err.println("-delimiter data delimiter , default is comma ."); } }
这里添加路径直接使用FileInputFormat添加输入路径,这样的话,针对不同的输入数据的不同业务逻辑可以在mapper中先判断目前正在处理的是那个数据,然后根据其路径来进行相应的业务逻辑处理:
package multiple.input; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileSplit; /** * input1 : * username,phone * * input2 * phone,description * * output: * <key,value> --> <[phone],[0,username]> * <key,value> --> <[phone],[1,description]> * @author fansy * */ public class MultipleMapper extends Mapper<LongWritable,Text,Text,FlagStringDataType>{ private String delimiter=null; // default is comma private boolean flag=false; @Override public void setup(Context cxt){ delimiter= cxt.getConfiguration().get("delimiter", ","); InputSplit input=cxt.getInputSplit(); String filename=((FileSplit) input).getPath().getParent().getName(); if("user".equals(filename)){ flag=true; } } @Override public void map(LongWritable key,Text value,Context cxt) throws IOException,InterruptedException{ String[] values= value.toString().split(delimiter); if(values.length!=2){ return; } if(flag){ cxt.write(new Text(values[1]), new FlagStringDataType(0,values[0])); }else{ cxt.write(new Text(values[0]), new FlagStringDataType(1,values[1])); } } }
总体来说,这种处理方式其实是不如第一种的,在每个map函数中都需要进行判断,比第一种多了很多操作;同时,针对不同的序列文件,这种方式处理不了(Key、value的类型不一样的情况下)。所以针对多文件格式的输入,最好还是使用第一种方式。
分享,成长,快乐
转载请注明blog地址:http://blog.csdn.net/fansy1990