1、项目名称:
2、程序代码:
版本一(详细版):
package com.mtjoin; import java.io.IOException; import java.util.Iterator; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; public class MTjoin { public static int time = 0; public static class Map extends Mapper<Object, Text, Text, Text>{ public void map(Object key, Text value, Context context)throws IOException,InterruptedException{ System.out.println("mapper........................"); String line = value.toString(); if(line.contains("factoryname")==true || line.contains("addressID")== true){ return ; } int i = 0; while(line.charAt(i) >= ‘9‘|| line.charAt(i) <= ‘0‘){ i++; } if(line.charAt(0) >= ‘9‘|| line.charAt(0) <= ‘0‘){ int j = i-1; while(line.charAt(j) != ‘ ‘) j--; System.out.println("key:"+line.substring(i)+" value:"+line.substring(0,j)); String values[] = {line.substring(0, j),line.substring(i)}; context.write(new Text(values[1]), new Text("1+"+values[0])); } else { int j = i + 1; while(line.charAt(j)!=‘ ‘) j++; System.out.println("key:"+line.substring(0, i+1)+" value:"+line.substring(j)); String values[] ={line.substring(0,i+1),line.substring(j)}; context.write(new Text(values[0]), new Text("2+"+values[1])); } } } public static class Reduce extends Reducer<Text, Text, Text, Text>{ public void reduce(Text key, Iterable<Text> values, Context context)throws IOException,InterruptedException{ System.out.println("reducer........................"); if( time == 0){ context.write(new Text("factoryname"), new Text("addressname")); time++; } int factorynum = 0; String factory[] = new String[10]; int addressnum = 0; String address[] = new String[10]; Iterator ite = values.iterator(); while(ite.hasNext()){ String record = ite.next().toString(); char type = record.charAt(0); if(type == ‘1‘){ factory[factorynum] = record.substring(2); factorynum++; } else{ address[addressnum] = record.substring(2); addressnum++; } } if(factorynum != 0 && addressnum != 0){ for(int m = 0 ; m < factorynum ; m++){ for(int n = 0; n < addressnum; n++){ context.write(new Text(factory[m]), new Text(address[n])); System.out.println("factoryname:"+factory[m]+" addressname:"+address[n]); } } } } } public static void main(String [] args)throws Exception{ Configuration conf = new Configuration(); String otherArgs[] = new GenericOptionsParser(conf,args).getRemainingArgs(); if(otherArgs.length != 2){ System.err.println("Usage:MTjoin<in><out>"); System.exit(2); } Job job = new Job(conf,"multiple table join"); job.setJarByClass(MTjoin.class); job.setMapperClass(Map.class); job.setReducerClass(Reduce.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); System.exit(job.waitForCompletion(true)? 0:1); } }
版本二(简化版):
package com.mtjoin; import java.io.IOException; import java.util.Iterator; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; public class MTjoin { public static int time = 0; public static class Map extends Mapper<Object, Text, Text, Text>{ public void map(Object key, Text value, Context context)throws IOException,InterruptedException{ System.out.println("mapper........................"); String line = value.toString(); if(line.contains("factoryname")==true || line.contains("addressID")== true){ return ; } int len = line.length(); if(line.charAt(0) > ‘9‘|| line.charAt(0) < ‘0‘){ System.out.println("key:"+line.substring(len-1)+" value:"+line.substring(0,len-2)); String values[] = {line.substring(0, len-2),line.substring(len-1)}; context.write(new Text(values[1]), new Text("1+"+values[0])); } else { System.out.println("key:"+line.substring(0, 1)+" value:"+line.substring(2)); String values[] ={line.substring(0,1),line.substring(2)}; context.write(new Text(values[0]), new Text("2+"+values[1])); } } } public static class Reduce extends Reducer<Text, Text, Text, Text>{ public void reduce(Text key, Iterable<Text> values, Context context)throws IOException,InterruptedException{ System.out.println("reducer........................"); if( time == 0){ context.write(new Text("factoryname"), new Text("addressname")); time++; } int factorynum = 0; String factory[] = new String[10]; int addressnum = 0; String address[] = new String[10]; Iterator ite = values.iterator(); while(ite.hasNext()){ String record = ite.next().toString(); char type = record.charAt(0); if(type == ‘1‘){ factory[factorynum] = record.substring(2); factorynum++; } else{ address[addressnum] = record.substring(2); addressnum++; } } if(factorynum != 0 && addressnum != 0){ for(int m = 0 ; m < factorynum ; m++){ for(int n = 0; n < addressnum; n++){ context.write(new Text(factory[m]), new Text(address[n])); System.out.println("factoryname:"+factory[m]+" addressname:"+address[n]); } } } } } public static void main(String [] args)throws Exception{ Configuration conf = new Configuration(); String otherArgs[] = new GenericOptionsParser(conf,args).getRemainingArgs(); if(otherArgs.length != 2){ System.err.println("Usage:MTjoin<in><out>"); System.exit(2); } Job job = new Job(conf,"multiple table join"); job.setJarByClass(MTjoin.class); job.setMapperClass(Map.class); job.setReducerClass(Reduce.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); System.exit(job.waitForCompletion(true)? 0:1); } }
3、测试数据:
address:
addressID addressname
1 Beijing
2 Guangzhou
3 Shenzhen
4 Xian
factory:
factoryname addressname
Beijing Red Star 1
Shenzhen Thunder 3
Guangzhou Honda 2
Beijing Rising 1
Guangzhou Development Bank 2
Tencent 3
Bank of Beijing 1
4、运行过程:
14/09/24 09:39:55 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
14/09/24 09:39:55 WARN mapred.JobClient: No job jar file set. User classes may not be found. See JobConf(Class) or JobConf#setJar(String).
14/09/24 09:39:55 INFO input.FileInputFormat: Total input paths to process : 2
14/09/24 09:39:55 WARN snappy.LoadSnappy: Snappy native library not loaded
14/09/24 09:39:55 INFO mapred.JobClient: Running job: job_local_0001
14/09/24 09:39:55 INFO util.ProcessTree: setsid exited with exit code 0
14/09/24 09:39:55 INFO mapred.Task: Using ResourceCalculatorPlugin : [email protected]
14/09/24 09:39:55 INFO mapred.MapTask: io.sort.mb = 100
14/09/24 09:39:55 INFO mapred.MapTask: data buffer = 79691776/99614720
14/09/24 09:39:55 INFO mapred.MapTask: record buffer = 262144/327680
mapper........................
mapper........................
key:1 value:Beijing Red Star
mapper........................
key:3 value:Shenzhen Thunder
mapper........................
key:2 value:Guangzhou Honda
mapper........................
key:1 value:Beijing Rising
mapper........................
key:2 value:Guangzhou Development Bank
mapper........................
key:3 value:Tencent
mapper........................
key:1 value:Bank of Beijing
14/09/24 09:39:55 INFO mapred.MapTask: Starting flush of map output
14/09/24 09:39:55 INFO mapred.MapTask: Finished spill 0
14/09/24 09:39:55 INFO mapred.Task: Task:attempt_local_0001_m_000000_0 is done. And is in the process of commiting
14/09/24 09:39:56 INFO mapred.JobClient: map 0% reduce 0%
14/09/24 09:39:58 INFO mapred.LocalJobRunner:
14/09/24 09:39:58 INFO mapred.Task: Task ‘attempt_local_0001_m_000000_0‘ done.
14/09/24 09:39:58 INFO mapred.Task: Using ResourceCalculatorPlugin : [email protected]
14/09/24 09:39:58 INFO mapred.MapTask: io.sort.mb = 100
14/09/24 09:39:58 INFO mapred.MapTask: data buffer = 79691776/99614720
14/09/24 09:39:58 INFO mapred.MapTask: record buffer = 262144/327680
mapper........................
mapper........................
key:1 value:Beijing
mapper........................
key:2 value:Guangzhou
mapper........................
key:3 value:Shenzhen
mapper........................
key:4 value:Xian
14/09/24 09:39:58 INFO mapred.MapTask: Starting flush of map output
14/09/24 09:39:58 INFO mapred.MapTask: Finished spill 0
14/09/24 09:39:58 INFO mapred.Task: Task:attempt_local_0001_m_000001_0 is done. And is in the process of commiting
14/09/24 09:39:59 INFO mapred.JobClient: map 100% reduce 0%
14/09/24 09:40:01 INFO mapred.LocalJobRunner:
14/09/24 09:40:01 INFO mapred.Task: Task ‘attempt_local_0001_m_000001_0‘ done.
14/09/24 09:40:01 INFO mapred.Task: Using ResourceCalculatorPlugin : [email protected]
14/09/24 09:40:01 INFO mapred.LocalJobRunner:
14/09/24 09:40:01 INFO mapred.Merger: Merging 2 sorted segments
14/09/24 09:40:01 INFO mapred.Merger: Down to the last merge-pass, with 2 segments left of total size: 218 bytes
14/09/24 09:40:01 INFO mapred.LocalJobRunner:
reducer........................
factoryname:Beijing Red Star addressname:Beijing
factoryname:Beijing Rising addressname:Beijing
factoryname:Bank of Beijing addressname:Beijing
reducer........................
factoryname:Guangzhou Honda addressname:Guangzhou
factoryname:Guangzhou Development Bank addressname:Guangzhou
reducer........................
factoryname:Shenzhen Thunder addressname:Shenzhen
factoryname:Tencent addressname:Shenzhen
reducer........................
14/09/24 09:40:01 INFO mapred.Task: Task:attempt_local_0001_r_000000_0 is done. And is in the process of commiting
14/09/24 09:40:01 INFO mapred.LocalJobRunner:
14/09/24 09:40:01 INFO mapred.Task: Task attempt_local_0001_r_000000_0 is allowed to commit now
14/09/24 09:40:01 INFO output.FileOutputCommitter: Saved output of task ‘attempt_local_0001_r_000000_0‘ to hdfs://localhost:9000/user/hadoop/mtjoin_output02
14/09/24 09:40:04 INFO mapred.LocalJobRunner: reduce > reduce
14/09/24 09:40:04 INFO mapred.Task: Task ‘attempt_local_0001_r_000000_0‘ done.
14/09/24 09:40:05 INFO mapred.JobClient: map 100% reduce 100%
14/09/24 09:40:05 INFO mapred.JobClient: Job complete: job_local_0001
14/09/24 09:40:05 INFO mapred.JobClient: Counters: 22
14/09/24 09:40:05 INFO mapred.JobClient: Map-Reduce Framework
14/09/24 09:40:05 INFO mapred.JobClient: Spilled Records=22
14/09/24 09:40:05 INFO mapred.JobClient: Map output materialized bytes=226
14/09/24 09:40:05 INFO mapred.JobClient: Reduce input records=11
14/09/24 09:40:05 INFO mapred.JobClient: Virtual memory (bytes) snapshot=0
14/09/24 09:40:05 INFO mapred.JobClient: Map input records=13
14/09/24 09:40:05 INFO mapred.JobClient: SPLIT_RAW_BYTES=238
14/09/24 09:40:05 INFO mapred.JobClient: Map output bytes=192
14/09/24 09:40:05 INFO mapred.JobClient: Reduce shuffle bytes=0
14/09/24 09:40:05 INFO mapred.JobClient: Physical memory (bytes) snapshot=0
14/09/24 09:40:05 INFO mapred.JobClient: Reduce input groups=4
14/09/24 09:40:05 INFO mapred.JobClient: Combine output records=0
14/09/24 09:40:05 INFO mapred.JobClient: Reduce output records=8
14/09/24 09:40:05 INFO mapred.JobClient: Map output records=11
14/09/24 09:40:05 INFO mapred.JobClient: Combine input records=0
14/09/24 09:40:05 INFO mapred.JobClient: CPU time spent (ms)=0
14/09/24 09:40:05 INFO mapred.JobClient: Total committed heap usage (bytes)=813170688
14/09/24 09:40:05 INFO mapred.JobClient: File Input Format Counters
14/09/24 09:40:05 INFO mapred.JobClient: Bytes Read=216
14/09/24 09:40:05 INFO mapred.JobClient: FileSystemCounters
14/09/24 09:40:05 INFO mapred.JobClient: HDFS_BYTES_READ=586
14/09/24 09:40:05 INFO mapred.JobClient: FILE_BYTES_WRITTEN=122093
14/09/24 09:40:05 INFO mapred.JobClient: FILE_BYTES_READ=1658
14/09/24 09:40:05 INFO mapred.JobClient: HDFS_BYTES_WRITTEN=202
14/09/24 09:40:05 INFO mapred.JobClient: File Output Format Counters
14/09/24 09:40:05 INFO mapred.JobClient: Bytes Written=202
5、运行结果:
factoryname addressname
Beijing Red Star Beijing
Beijing Rising Beijing
Bank of Beijing Beijing
Guangzhou Honda Guangzhou
Guangzhou Development Bank Guangzhou
Shenzhen Thunder Shenzhen
Tencent Shenzhen