Hadoop MapReduce编程 API入门系列之join(二十五)(未完)

  不多说,直接上代码。

 代码版本1

package zhouls.bigdata.myMapReduce.Join;

import java.util.Set;

import java.io.*;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;

public class TextPair implements WritableComparable<TextPair>
{
private Text first; //Text 类型的实例变量first
private Text second;//Text 类型的实例变量second

public TextPair() //无参构造方法
{
set(new Text(),new Text());
}
public TextPair(String first,String second) // Sting类型参数的构造方法
{
set(new Text(first),new Text(second));
}
public TextPair(Text first,Text second) // Text类型参数的构造方法
{
set(first,second);
}
public void set(Text first,Text second) //set方法
{
this.first=first;
this.second=second;
}
public Text getFirst() //getFirst方法
{
return first;
}
public Text getSecond() //getSecond方法
{
return second;
}

//将对象转换为字节流并写入到输出流out中
public void write(DataOutput out) throws IOException //write方法
{
first.write(out);
second.write(out);
}

//从输入流in中读取字节流反序列化为对象
public void readFields(DataInput in) throws IOException //readFields方法
{
first.readFields(in);
second.readFields(in);
}

@Override
public int hashCode() //在mapreduce中,通过hashCode来选择reduce分区
{
return first.hashCode() *163+second.hashCode();
}

@Override
public boolean equals(Object o) //equals方法,这里是两个对象的内容之间比较
{
if (o instanceof TextPair)
{
TextPair tp=(TextPair) o;
return first.equals(tp.first) && second.equals(tp.second);
}
return false;
}

@Override
public String toString() //toString方法
{
return first +"\t"+ second;
}
public int compareTo(TextPair o)
{
// TODO Auto-generated method stub
if(!first.equals(o.first))
{
return first.compareTo(o.first);
}
else if(!second.equals(o.second))
{
return second.compareTo(o.second);
}
return 0;
}

}

package zhouls.bigdata.myMapReduce.Join;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import zhouls.bigdata.myMapReduce.Join.TextPair;

public class JoinStationMapper extends Mapper<LongWritable,Text,TextPair,Text>
{
protected void map(LongWritable key,Text value,Context context) throws IOException,InterruptedException
{
String line = value.toString();
String[] arr = line.split("\\s+");//解析气象站数据
int length = arr.length;
if(length==2)
{//满足这种数据格式
//key=气象站id value=气象站名称
System.out.println("station="+arr[0]+"0");
context.write(new TextPair(arr[0],"0"),new Text(arr[1]));
}
}
}

package zhouls.bigdata.myMapReduce.Join;

import java.io.IOException;

import java.util.Iterator;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class JoinReducer extends Reducer< TextPair,Text,Text,Text>
{
protected void reduce(TextPair key, Iterable< Text> values,Context context) throws IOException,InterruptedException
{
Iterator< Text> iter = values.iterator();
Text stationName = new Text(iter.next());//气象站名称
while(iter.hasNext()){
Text record = iter.next();//天气记录的每条数据
Text outValue = new Text(stationName.toString()+"\t"+record.toString());
context.write(key.getFirst(),outValue);
}
}
}

package zhouls.bigdata.myMapReduce.Join;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class JoinRecordMapper extends Mapper< LongWritable,Text,TextPair,Text>
{
protected void map(LongWritable key,Text value,Context context) throws IOException,InterruptedException
{
String line = value.toString();
String[] arr = line.split("\\s+",2);//解析天气记录数据
int length = arr.length;
if(length==2){
//key=气象站id value=天气记录数据
context.write(new TextPair(arr[0],"1"),new Text(arr[1]));
}
}
}

package zhouls.bigdata.myMapReduce.Join;

import java.io.BufferedReader;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.Hashtable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class JoinRecordWithStationName extends Configured implements Tool
{
public static class TemperatureMapper extends Mapper< LongWritable, Text, Text, Text>
{
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException
{
String[] arr = value.toString().split("\t", 2);
if (arr.length == 2)
{
context.write(new Text(arr[0]), value);
}

}
}

public static class TemperatureReducer extends Reducer< Text, Text, Text, Text>
{
private Hashtable< String, String> table = new Hashtable< String, String>();//定义Hashtable存放缓存数据

/**
* 获取分布式缓存文件
*/
protected void setup(Context context) throws IOException,
InterruptedException
{
Path[] localPaths = (Path[]) context.getLocalCacheFiles();//返回本地文件路径
if (localPaths.length == 0)
{
throw new FileNotFoundException("Distributed cache file not found.");
}
FileSystem fs = FileSystem.getLocal(context.getConfiguration());//获取本地 FileSystem 实例
FSDataInputStream in = null;

in = fs.open(new Path(localPaths[0].toString()));// 打开输入流
BufferedReader br = new BufferedReader(new InputStreamReader(in));// 创建BufferedReader读取器
String infoAddr = null;
while (null != (infoAddr = br.readLine()))
{// 按行读取并解析气象站数据
String[] records = infoAddr.split("\t");
table.put(records[0], records[1]);//key为stationID,value为stationName
}
}

public void reduce(Text key, Iterable< Text> values, Context context)
throws IOException, InterruptedException
{
String stationName = table.get(key.toString());//天气记录根据stationId 获取stationName
for (Text value : values)
{
context.write(new Text(stationName), value);
}
}
}

public int run(String[] args) throws Exception
{
// TODO Auto-generated method stub
Configuration conf = new Configuration();

// FileSystem hdfs = FileSystem.get(new URI("hdfs://HadoopMaster:9000"), conf);
// Path out = new Path(args[1]);
// if (hdfs.isDirectory(out))
// {
// hdfs.delete(out, true);
// }

Job job = Job.getInstance();//获取一个job实例
job.setJarByClass(JoinRecordWithStationName.class);

// FileInputFormat.addInputPath(job,
// new org.apache.hadoop.fs.Path(args[0]));
// FileOutputFormat.setOutputPath(job, new org.apache.hadoop.fs.Path(args[1]));

FileInputFormat.addInputPath(job,
new org.apache.hadoop.fs.Path("./data/join/station.txt"));
FileOutputFormat.setOutputPath(job, new org.apache.hadoop.fs.Path("./out/join/"));

//添加分布式缓存文件 station.txt
// job.addCacheFile(new URI("hdfs://HadoopMaster:9000/join/station.txt"));
job.addCacheFile(new URI("./data/join/station.txt"));

job.setMapperClass(TemperatureMapper.class);
job.setReducerClass(TemperatureReducer.class);

job.setOutputKeyClass(Text.class);// 输出key类型
job.setOutputValueClass(Text.class);// 输出value类型

return job.waitForCompletion(true)?0:1;
}

public static void main(String[] args) throws Exception
{
// String[] arg = {
// "hdfs://HadoopMaster:9000/join/records.txt",
// "hdfs://HadoopMaster:9000/join/out/" };
//
String[] arg = {
"./data/join/records.txt",
"./out/join/" };

int ec = ToolRunner.run(new Configuration(),new JoinRecordWithStationName(), arg);
System.exit(ec);
}
}

package zhouls.bigdata.myMapReduce.Join;

public class JoinRecordAndStationName
{

/**
* @param args
*/
public static void main(String[] args)
{
// TODO Auto-generated method stub

}

}

代码版本2

package zhouls.bigdata.myMapReduce.Join;

import java.util.Set;

import java.io.*;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;

public class TextPair implements WritableComparable<TextPair>
{
private Text first; //Text 类型的实例变量first
private Text second;//Text 类型的实例变量second

public TextPair() //无参构造方法
{
set(new Text(),new Text());
}
public TextPair(String first,String second) // Sting类型参数的构造方法
{
set(new Text(first),new Text(second));
}
public TextPair(Text first,Text second) // Text类型参数的构造方法
{
set(first,second);
}
public void set(Text first,Text second) //set方法
{
this.first=first;
this.second=second;
}
public Text getFirst() //getFirst方法
{
return first;
}
public Text getSecond() //getSecond方法
{
return second;
}

//将对象转换为字节流并写入到输出流out中
public void write(DataOutput out) throws IOException //write方法
{
first.write(out);
second.write(out);
}

//从输入流in中读取字节流反序列化为对象
public void readFields(DataInput in) throws IOException //readFields方法
{
first.readFields(in);
second.readFields(in);
}

@Override
public int hashCode() //在mapreduce中,通过hashCode来选择reduce分区
{
return first.hashCode() *163+second.hashCode();
}

@Override
public boolean equals(Object o) //equals方法,这里是两个对象的内容之间比较
{
if (o instanceof TextPair)
{
TextPair tp=(TextPair) o;
return first.equals(tp.first) && second.equals(tp.second);
}
return false;
}

@Override
public String toString() //toString方法
{
return first +"\t"+ second;
}
public int compareTo(TextPair o)
{
// TODO Auto-generated method stub
if(!first.equals(o.first))
{
return first.compareTo(o.first);
}
else if(!second.equals(o.second))
{
return second.compareTo(o.second);
}
return 0;
}

}

package zhouls.bigdata.myMapReduce.Join;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import zhouls.bigdata.myMapReduce.Join.TextPair;

public class JoinStationMapper extends Mapper<LongWritable,Text,TextPair,Text>
{
protected void map(LongWritable key,Text value,Context context) throws IOException,InterruptedException
{
String line = value.toString();
String[] arr = line.split("\\s+");//解析气象站数据
int length = arr.length;
if(length==2)
{//满足这种数据格式
//key=气象站id value=气象站名称
System.out.println("station="+arr[0]+"0");
context.write(new TextPair(arr[0],"0"),new Text(arr[1]));
}
}
}

package zhouls.bigdata.myMapReduce.Join;

import java.io.IOException;

import java.util.Iterator;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class JoinReducer extends Reducer< TextPair,Text,Text,Text>
{
protected void reduce(TextPair key, Iterable< Text> values,Context context) throws IOException,InterruptedException
{
Iterator< Text> iter = values.iterator();
Text stationName = new Text(iter.next());//气象站名称
while(iter.hasNext()){
Text record = iter.next();//天气记录的每条数据
Text outValue = new Text(stationName.toString()+"\t"+record.toString());
context.write(key.getFirst(),outValue);
}
}
}

package zhouls.bigdata.myMapReduce.Join;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class JoinRecordMapper extends Mapper< LongWritable,Text,TextPair,Text>
{
protected void map(LongWritable key,Text value,Context context) throws IOException,InterruptedException
{
String line = value.toString();
String[] arr = line.split("\\s+",2);//解析天气记录数据
int length = arr.length;
if(length==2){
//key=气象站id value=天气记录数据
context.write(new TextPair(arr[0],"1"),new Text(arr[1]));
}
}
}

//版本2
package zhouls.bigdata.myMapReduce.Join;

import java.io.InputStream;
import org.apache.hadoop.util.Tool;
import java.io.OutputStream;
import java.util.Set;

import javax.lang.model.SourceVersion;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.ToolRunner;

public class JoinRecordWithStationName extends Configured implements Tool
{
public static class KeyPartitioner extends Partitioner< TextPair,Text>
{

public int getPartition(TextPair key,Text value,int numPartitions)
{
return (key.getFirst().hashCode()&Integer.MAX_VALUE) % numPartitions;
}
}

public static class GroupingComparator extends WritableComparator
{
protected GroupingComparator()
{
super(TextPair.class,true);
}
@Override
public int compare(WritableComparable w1,WritableComparable w2)
{
TextPair ip1=(TextPair) w1;
TextPair ip2=(TextPair) w2;
Text l=ip1.getFirst();
Text r=ip2.getFirst();
return l.compareTo(r);

}
}
public int run(String[] args) throws Exception
{
Configuration conf = new Configuration();// 读取配置文件

Path mypath=new Path(args[2]);
FileSystem hdfs=mypath.getFileSystem(conf);
if (hdfs.isDirectory(mypath))
{
hdfs.delete(mypath,true);
}

Job job = Job.getInstance(conf,"join");// 新建一个任务
job.setJarByClass(JoinRecordWithStationName.class);// 主类

Path recordInputPath = new Path(args[0]);//天气记录数据源,这里是牵扯到多路径输入和多路径输出的问题。默认是从args[0]开始
Path stationInputPath = new Path(args[1]);//气象站数据源
Path outputPath = new Path(args[2]);//输出路径

//若只有一个输入和一个输出,则输入是args[0],输出是args[1]。
//若有两个输入和一个输出,则输入是args[0]和args[1],输出是args[2]

MultipleInputs.addInputPath(job,recordInputPath,TextInputFormat.class,JoinRecordMapper.class);//读取天气记录Mapper
MultipleInputs.addInputPath(job,stationInputPath,TextInputFormat.class,JoinStationMapper.class);//读取气象站Mapper
FileOutputFormat.setOutputPath(job,outputPath);
job.setReducerClass(JoinReducer.class);// Reducer
job.setNumReduceTasks(2);

job.setPartitionerClass(KeyPartitioner.class);//自定义分区
job.setGroupingComparatorClass(GroupingComparator.class);//自定义分组

job.setMapOutputKeyClass(TextPair.class);
job.setMapOutputValueClass(Text.class);

job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);

return job.waitForCompletion(true)?0:1;
}

public static void main(String[] args) throws Exception
{
String[] args0={"hdfs://HadoopMaster:9000/join/records.txt"
,"hdfs://HadoopMaster:9000/join/station.txt"
,"hdfs://HadoopMaster:9000/join/out"
};
int exitCode=ToolRunner.run( new JoinRecordWithStationName(), args0);
System.exit(exitCode);
}
}

package zhouls.bigdata.myMapReduce.Join;

public class JoinRecordAndStationName
{

/**
* @param args
*/
public static void main(String[] args)
{
// TODO Auto-generated method stub

}

}

时间: 2024-10-12 19:49:15

Hadoop MapReduce编程 API入门系列之join(二十五)(未完)的相关文章

Hadoop MapReduce编程 API入门系列之压缩和计数器(三十)

不多说,直接上代码. Hadoop MapReduce编程 API入门系列之小文件合并(二十九) 生成的结果,作为输入源. 代码 package zhouls.bigdata.myMapReduce.ParseTVDataCompressAndCounter; import java.net.URI; import java.util.List;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Co

Hadoop MapReduce编程 API入门系列之挖掘气象数据版本2(九)

下面,是版本1. Hadoop MapReduce编程 API入门系列之挖掘气象数据版本1(一) 这篇博文,包括了,实际生产开发非常重要的,单元测试和调试代码.这里不多赘述,直接送上代码. MRUnit 框架 MRUnit是Cloudera公司专为Hadoop MapReduce写的单元测试框架,API非常简洁实用.MRUnit针对不同测试对象使用不同的Driver: MapDriver:针对单独的Map测试  ReduceDriver:针对单独的Reduce测试    MapReduceDri

Hadoop MapReduce编程 API入门系列之二次排序

不多说,直接上代码. 2016-12-12 17:04:32,012 INFO [org.apache.hadoop.metrics.jvm.JvmMetrics] - Initializing JVM Metrics with processName=JobTracker, sessionId=2016-12-12 17:04:33,056 WARN [org.apache.hadoop.mapreduce.JobSubmitter] - Hadoop command-line option

Hadoop MapReduce编程 API入门系列之处理Excel通话记录(二十)

不多说,直接上代码. 与家庭成员之间的通话记录一份,存储在Excel文件中,如下面的数据集所示.我们需要基于这份数据,统计每个月每个家庭成员给自己打电话的次数,并按月份输出到不同文件夹. 2016-12-12 20:04:10,203 INFO [zhouls.bigdata.myMapReduce.ExcelContactCount.ExcelContactCount$ExcelMapper] - Map processing finished2016-12-12 20:04:10,203 I

Hadoop MapReduce编程 API入门系列之FOF(Fund of Fund)(二十三)

不多说,直接上代码. 代码 package zhouls.bigdata.myMapReduce.friend; import org.apache.hadoop.io.Text; public class Fof extends Text{//自定义Fof,表示f1和f2关系 public Fof(){//无参构造 super(); } public Fof(String a,String b){//有参构造 super(getFof(a, b)); } public static Strin

Hadoop MapReduce编程 API入门系列之网页流量版本1(二十二)

不多说,直接上代码. 对流量原始日志进行流量统计,将不同省份的用户统计结果输出到不同文件. 代码 package zhouls.bigdata.myMapReduce.flowsum; import java.io.DataInput;import java.io.DataOutput;import java.io.IOException; import org.apache.hadoop.io.Writable;import org.apache.hadoop.io.WritableCompa

Hadoop MapReduce编程 API入门系列之统计学生成绩版本2(十八)

不多说,直接上代码. 统计出每个年龄段的 男.女 学生的最高分 这里,为了空格符的差错,直接,我们有时候,像如下这样的来排数据. 代码 package zhouls.bigdata.myMapReduce.Gender; import java.io.IOException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs

Hadoop MapReduce编程 API入门系列之统计学生成绩版本1(十七)

不多说,直接上代码. 代码 package zhouls.bigdata.myMapReduce.ScoreCount; import java.io.IOException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FSDataInputStream;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;

Hadoop MapReduce编程 API入门系列之倒排索引(二十四)

不多说,直接上代码. 2016-12-12 21:54:04,509 INFO [org.apache.hadoop.metrics.jvm.JvmMetrics] - Initializing JVM Metrics with processName=JobTracker, sessionId=2016-12-12 21:54:05,166 WARN [org.apache.hadoop.mapreduce.JobSubmitter] - Hadoop command-line option