不多说,直接上代码。
代码版本1
package zhouls.bigdata.myMapReduce.Join;
import java.util.Set;
import java.io.*;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
public class TextPair implements WritableComparable<TextPair>
{
private Text first; //Text 类型的实例变量first
private Text second;//Text 类型的实例变量second
public TextPair() //无参构造方法
{
set(new Text(),new Text());
}
public TextPair(String first,String second) // Sting类型参数的构造方法
{
set(new Text(first),new Text(second));
}
public TextPair(Text first,Text second) // Text类型参数的构造方法
{
set(first,second);
}
public void set(Text first,Text second) //set方法
{
this.first=first;
this.second=second;
}
public Text getFirst() //getFirst方法
{
return first;
}
public Text getSecond() //getSecond方法
{
return second;
}
//将对象转换为字节流并写入到输出流out中
public void write(DataOutput out) throws IOException //write方法
{
first.write(out);
second.write(out);
}
//从输入流in中读取字节流反序列化为对象
public void readFields(DataInput in) throws IOException //readFields方法
{
first.readFields(in);
second.readFields(in);
}
@Override
public int hashCode() //在mapreduce中,通过hashCode来选择reduce分区
{
return first.hashCode() *163+second.hashCode();
}
@Override
public boolean equals(Object o) //equals方法,这里是两个对象的内容之间比较
{
if (o instanceof TextPair)
{
TextPair tp=(TextPair) o;
return first.equals(tp.first) && second.equals(tp.second);
}
return false;
}
@Override
public String toString() //toString方法
{
return first +"\t"+ second;
}
public int compareTo(TextPair o)
{
// TODO Auto-generated method stub
if(!first.equals(o.first))
{
return first.compareTo(o.first);
}
else if(!second.equals(o.second))
{
return second.compareTo(o.second);
}
return 0;
}
}
package zhouls.bigdata.myMapReduce.Join;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import zhouls.bigdata.myMapReduce.Join.TextPair;
public class JoinStationMapper extends Mapper<LongWritable,Text,TextPair,Text>
{
protected void map(LongWritable key,Text value,Context context) throws IOException,InterruptedException
{
String line = value.toString();
String[] arr = line.split("\\s+");//解析气象站数据
int length = arr.length;
if(length==2)
{//满足这种数据格式
//key=气象站id value=气象站名称
System.out.println("station="+arr[0]+"0");
context.write(new TextPair(arr[0],"0"),new Text(arr[1]));
}
}
}
package zhouls.bigdata.myMapReduce.Join;
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class JoinReducer extends Reducer< TextPair,Text,Text,Text>
{
protected void reduce(TextPair key, Iterable< Text> values,Context context) throws IOException,InterruptedException
{
Iterator< Text> iter = values.iterator();
Text stationName = new Text(iter.next());//气象站名称
while(iter.hasNext()){
Text record = iter.next();//天气记录的每条数据
Text outValue = new Text(stationName.toString()+"\t"+record.toString());
context.write(key.getFirst(),outValue);
}
}
}
package zhouls.bigdata.myMapReduce.Join;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class JoinRecordMapper extends Mapper< LongWritable,Text,TextPair,Text>
{
protected void map(LongWritable key,Text value,Context context) throws IOException,InterruptedException
{
String line = value.toString();
String[] arr = line.split("\\s+",2);//解析天气记录数据
int length = arr.length;
if(length==2){
//key=气象站id value=天气记录数据
context.write(new TextPair(arr[0],"1"),new Text(arr[1]));
}
}
}
package zhouls.bigdata.myMapReduce.Join;
import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.Hashtable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class JoinRecordWithStationName extends Configured implements Tool
{
public static class TemperatureMapper extends Mapper< LongWritable, Text, Text, Text>
{
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException
{
String[] arr = value.toString().split("\t", 2);
if (arr.length == 2)
{
context.write(new Text(arr[0]), value);
}
}
}
public static class TemperatureReducer extends Reducer< Text, Text, Text, Text>
{
private Hashtable< String, String> table = new Hashtable< String, String>();//定义Hashtable存放缓存数据
/**
* 获取分布式缓存文件
*/
protected void setup(Context context) throws IOException,
InterruptedException
{
Path[] localPaths = (Path[]) context.getLocalCacheFiles();//返回本地文件路径
if (localPaths.length == 0)
{
throw new FileNotFoundException("Distributed cache file not found.");
}
FileSystem fs = FileSystem.getLocal(context.getConfiguration());//获取本地 FileSystem 实例
FSDataInputStream in = null;
in = fs.open(new Path(localPaths[0].toString()));// 打开输入流
BufferedReader br = new BufferedReader(new InputStreamReader(in));// 创建BufferedReader读取器
String infoAddr = null;
while (null != (infoAddr = br.readLine()))
{// 按行读取并解析气象站数据
String[] records = infoAddr.split("\t");
table.put(records[0], records[1]);//key为stationID,value为stationName
}
}
public void reduce(Text key, Iterable< Text> values, Context context)
throws IOException, InterruptedException
{
String stationName = table.get(key.toString());//天气记录根据stationId 获取stationName
for (Text value : values)
{
context.write(new Text(stationName), value);
}
}
}
public int run(String[] args) throws Exception
{
// TODO Auto-generated method stub
Configuration conf = new Configuration();
// FileSystem hdfs = FileSystem.get(new URI("hdfs://HadoopMaster:9000"), conf);
// Path out = new Path(args[1]);
// if (hdfs.isDirectory(out))
// {
// hdfs.delete(out, true);
// }
Job job = Job.getInstance();//获取一个job实例
job.setJarByClass(JoinRecordWithStationName.class);
// FileInputFormat.addInputPath(job,
// new org.apache.hadoop.fs.Path(args[0]));
// FileOutputFormat.setOutputPath(job, new org.apache.hadoop.fs.Path(args[1]));
FileInputFormat.addInputPath(job,
new org.apache.hadoop.fs.Path("./data/join/station.txt"));
FileOutputFormat.setOutputPath(job, new org.apache.hadoop.fs.Path("./out/join/"));
//添加分布式缓存文件 station.txt
// job.addCacheFile(new URI("hdfs://HadoopMaster:9000/join/station.txt"));
job.addCacheFile(new URI("./data/join/station.txt"));
job.setMapperClass(TemperatureMapper.class);
job.setReducerClass(TemperatureReducer.class);
job.setOutputKeyClass(Text.class);// 输出key类型
job.setOutputValueClass(Text.class);// 输出value类型
return job.waitForCompletion(true)?0:1;
}
public static void main(String[] args) throws Exception
{
// String[] arg = {
// "hdfs://HadoopMaster:9000/join/records.txt",
// "hdfs://HadoopMaster:9000/join/out/" };
//
String[] arg = {
"./data/join/records.txt",
"./out/join/" };
int ec = ToolRunner.run(new Configuration(),new JoinRecordWithStationName(), arg);
System.exit(ec);
}
}
package zhouls.bigdata.myMapReduce.Join;
public class JoinRecordAndStationName
{
/**
* @param args
*/
public static void main(String[] args)
{
// TODO Auto-generated method stub
}
}
代码版本2
package zhouls.bigdata.myMapReduce.Join;
import java.util.Set;
import java.io.*;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
public class TextPair implements WritableComparable<TextPair>
{
private Text first; //Text 类型的实例变量first
private Text second;//Text 类型的实例变量second
public TextPair() //无参构造方法
{
set(new Text(),new Text());
}
public TextPair(String first,String second) // Sting类型参数的构造方法
{
set(new Text(first),new Text(second));
}
public TextPair(Text first,Text second) // Text类型参数的构造方法
{
set(first,second);
}
public void set(Text first,Text second) //set方法
{
this.first=first;
this.second=second;
}
public Text getFirst() //getFirst方法
{
return first;
}
public Text getSecond() //getSecond方法
{
return second;
}
//将对象转换为字节流并写入到输出流out中
public void write(DataOutput out) throws IOException //write方法
{
first.write(out);
second.write(out);
}
//从输入流in中读取字节流反序列化为对象
public void readFields(DataInput in) throws IOException //readFields方法
{
first.readFields(in);
second.readFields(in);
}
@Override
public int hashCode() //在mapreduce中,通过hashCode来选择reduce分区
{
return first.hashCode() *163+second.hashCode();
}
@Override
public boolean equals(Object o) //equals方法,这里是两个对象的内容之间比较
{
if (o instanceof TextPair)
{
TextPair tp=(TextPair) o;
return first.equals(tp.first) && second.equals(tp.second);
}
return false;
}
@Override
public String toString() //toString方法
{
return first +"\t"+ second;
}
public int compareTo(TextPair o)
{
// TODO Auto-generated method stub
if(!first.equals(o.first))
{
return first.compareTo(o.first);
}
else if(!second.equals(o.second))
{
return second.compareTo(o.second);
}
return 0;
}
}
package zhouls.bigdata.myMapReduce.Join;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import zhouls.bigdata.myMapReduce.Join.TextPair;
public class JoinStationMapper extends Mapper<LongWritable,Text,TextPair,Text>
{
protected void map(LongWritable key,Text value,Context context) throws IOException,InterruptedException
{
String line = value.toString();
String[] arr = line.split("\\s+");//解析气象站数据
int length = arr.length;
if(length==2)
{//满足这种数据格式
//key=气象站id value=气象站名称
System.out.println("station="+arr[0]+"0");
context.write(new TextPair(arr[0],"0"),new Text(arr[1]));
}
}
}
package zhouls.bigdata.myMapReduce.Join;
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class JoinReducer extends Reducer< TextPair,Text,Text,Text>
{
protected void reduce(TextPair key, Iterable< Text> values,Context context) throws IOException,InterruptedException
{
Iterator< Text> iter = values.iterator();
Text stationName = new Text(iter.next());//气象站名称
while(iter.hasNext()){
Text record = iter.next();//天气记录的每条数据
Text outValue = new Text(stationName.toString()+"\t"+record.toString());
context.write(key.getFirst(),outValue);
}
}
}
package zhouls.bigdata.myMapReduce.Join;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class JoinRecordMapper extends Mapper< LongWritable,Text,TextPair,Text>
{
protected void map(LongWritable key,Text value,Context context) throws IOException,InterruptedException
{
String line = value.toString();
String[] arr = line.split("\\s+",2);//解析天气记录数据
int length = arr.length;
if(length==2){
//key=气象站id value=天气记录数据
context.write(new TextPair(arr[0],"1"),new Text(arr[1]));
}
}
}
//版本2
package zhouls.bigdata.myMapReduce.Join;
import java.io.InputStream;
import org.apache.hadoop.util.Tool;
import java.io.OutputStream;
import java.util.Set;
import javax.lang.model.SourceVersion;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.ToolRunner;
public class JoinRecordWithStationName extends Configured implements Tool
{
public static class KeyPartitioner extends Partitioner< TextPair,Text>
{
public int getPartition(TextPair key,Text value,int numPartitions)
{
return (key.getFirst().hashCode()&Integer.MAX_VALUE) % numPartitions;
}
}
public static class GroupingComparator extends WritableComparator
{
protected GroupingComparator()
{
super(TextPair.class,true);
}
@Override
public int compare(WritableComparable w1,WritableComparable w2)
{
TextPair ip1=(TextPair) w1;
TextPair ip2=(TextPair) w2;
Text l=ip1.getFirst();
Text r=ip2.getFirst();
return l.compareTo(r);
}
}
public int run(String[] args) throws Exception
{
Configuration conf = new Configuration();// 读取配置文件
Path mypath=new Path(args[2]);
FileSystem hdfs=mypath.getFileSystem(conf);
if (hdfs.isDirectory(mypath))
{
hdfs.delete(mypath,true);
}
Job job = Job.getInstance(conf,"join");// 新建一个任务
job.setJarByClass(JoinRecordWithStationName.class);// 主类
Path recordInputPath = new Path(args[0]);//天气记录数据源,这里是牵扯到多路径输入和多路径输出的问题。默认是从args[0]开始
Path stationInputPath = new Path(args[1]);//气象站数据源
Path outputPath = new Path(args[2]);//输出路径
//若只有一个输入和一个输出,则输入是args[0],输出是args[1]。
//若有两个输入和一个输出,则输入是args[0]和args[1],输出是args[2]
MultipleInputs.addInputPath(job,recordInputPath,TextInputFormat.class,JoinRecordMapper.class);//读取天气记录Mapper
MultipleInputs.addInputPath(job,stationInputPath,TextInputFormat.class,JoinStationMapper.class);//读取气象站Mapper
FileOutputFormat.setOutputPath(job,outputPath);
job.setReducerClass(JoinReducer.class);// Reducer
job.setNumReduceTasks(2);
job.setPartitionerClass(KeyPartitioner.class);//自定义分区
job.setGroupingComparatorClass(GroupingComparator.class);//自定义分组
job.setMapOutputKeyClass(TextPair.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
return job.waitForCompletion(true)?0:1;
}
public static void main(String[] args) throws Exception
{
String[] args0={"hdfs://HadoopMaster:9000/join/records.txt"
,"hdfs://HadoopMaster:9000/join/station.txt"
,"hdfs://HadoopMaster:9000/join/out"
};
int exitCode=ToolRunner.run( new JoinRecordWithStationName(), args0);
System.exit(exitCode);
}
}
package zhouls.bigdata.myMapReduce.Join;
public class JoinRecordAndStationName
{
/**
* @param args
*/
public static void main(String[] args)
{
// TODO Auto-generated method stub
}
}