单表:
package org.bigdata.util;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.Reducer;
import org.bigdata.util.WordCountMapReduce.TextDescComparator;
import org.bigdata.util.WordCountMapReduce.WordCountCombiner;
import org.bigdata.util.WordCountMapReduce.WordCountMapper;
import org.bigdata.util.WordCountMapReduce.WordCountReducer;
/**
* 单表关联
*
* @author wwhhf
*
*/
public class SingleJoinMapReduce {
/**
* a->b b->c a->c
*
* @author wwhhf
*
*/
public static class SingleJoinMapper extends
Mapper<LongWritable, Text, Text, Text> {
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String terms[] = value.toString().split(" ");
// 正常顺序
context.write(new Text(terms[0]), new Text(terms[1] + ":1"));
// 颠倒顺序
context.write(new Text(terms[1]), new Text(terms[0] + ":2"));
}
}
public static class SingleJoinReducer extends
Reducer<Text, Text, Text, Text> {
@Override
protected void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
List<String> lefts = new ArrayList<>();
List<String> rights = new ArrayList<>();
for (Text value : values) {
String terms[] = value.toString().split(":");
if ("1".equals(terms[1])) {
lefts.add(terms[0]);
} else {
rights.add(terms[0]);
}
}
for (String left : lefts) {
for (String right : rights) {
context.write(new Text(left), new Text(right));
}
}
}
}
public static void main(String[] args) {
try {
Configuration cfg = HadoopCfg.getConfiguration();
Job job = Job.getInstance(cfg);
job.setJobName("SingleJoin");
job.setJarByClass(SingleJoinMapReduce.class);
// mapper
job.setMapperClass(SingleJoinMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
// reducer
job.setReducerClass(SingleJoinReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setSortComparatorClass(TextDescComparator.class);
FileInputFormat.addInputPath(job, new Path("/single"));
FileOutputFormat.setOutputPath(job, new Path("/single_out/"));
System.exit(job.waitForCompletion(true) ? 0 : 1);
} catch (IllegalStateException | IllegalArgumentException
| ClassNotFoundException | IOException | InterruptedException e) {
e.printStackTrace();
}
}
}
多表:
package org.bigdata.util;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/**
* 多表关联
*
* @author wwhhf
*
*/
public class MultiJoinMapReduce {
public static class MultiJoinMapper extends
Mapper<LongWritable, Text, Text, Text> {
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
FileSplit fileSplit = (FileSplit) context.getInputSplit();
String fileName = fileSplit.getPath().getName();
String record = value.toString();
if (fileName.startsWith("b")) {
// table 2
// 1 Beijing
Pattern pattern = Pattern
.compile("((\\d+)\\s([\\w\\W\\s\\S]+))");
Matcher matcher = pattern.matcher(record);
String ckey = null;
String cvalue = null;
while (matcher.find()) {
ckey = matcher.group(2);
cvalue = matcher.group(3);
}
context.write(new Text(ckey), new Text(cvalue + ":2"));
} else {
// table 1
// 1 Beijing
Pattern pattern = Pattern
.compile("(([\\w\\W\\s\\S]+)\\s(\\d+))");
Matcher matcher = pattern.matcher(record);
String ckey = null;
String cvalue = null;
while (matcher.find()) {
cvalue = matcher.group(2);
ckey = matcher.group(3);
}
context.write(new Text(ckey), new Text(cvalue + ":1"));
}
}
}
public static class MultiJoinReducer extends
Reducer<Text, Text, Text, Text> {
@Override
protected void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
List<String> lefts = new ArrayList<>();
List<String> rights = new ArrayList<>();
for (Text value : values) {
String terms[] = value.toString().split(":");
if ("1".equals(terms[1])) {
lefts.add(terms[0]);
} else {
rights.add(terms[0]);
}
}
for (String left : lefts) {
for (String right : rights) {
context.write(new Text(left), new Text(right));
}
}
}
}
public static void main(String[] args) {
try {
Configuration cfg = HadoopCfg.getConfiguration();
Job job = Job.getInstance(cfg);
job.setJobName("MultiJoin");
job.setJarByClass(MultiJoinMapReduce.class);
// mapper
job.setMapperClass(MultiJoinMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
// reducer
job.setReducerClass(MultiJoinReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path("/multi"));
FileOutputFormat.setOutputPath(job, new Path("/multi_out/"));
System.exit(job.waitForCompletion(true) ? 0 : 1);
} catch (IllegalStateException | IllegalArgumentException
| ClassNotFoundException | IOException | InterruptedException e) {
e.printStackTrace();
}
}
}
时间: 2024-10-12 05:47:15