1、第一阶段MapReduce任务程序
package cn.itcast.bigdata.index;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
/**
* 利用MapReduce实现输入多个文件中单词在每个文件中出现的次数,输出格式如下:
* hello (a.txt 2,b.txt 1,c.txt 4)
* tom (a.txt 5,b.txt 3)
* 实现方法:采用倒排索引算法并结合jobControll实现
* 本案例中所有的Mapper、Reducer、Job均采用匿名内部类实现
* @author songjq
*
*/
public class IndexStepOne {
/**
* 第一阶段Mapper处理后输出数据格式为
* <k2> <v2>
* <hello:a.txt> <1>
* <hello:a.txt> <1>
* <hello:b.txt> <1>
* @author songjq
*
*/
static class IndexStepOneMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
/**
* 格式:<hello-->a.txt,1><helle-->b.txt,1>
*/
private Text tkey = new Text();
private IntWritable tvalue = new IntWritable(1);
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
FileSplit inputSplit = (FileSplit) context.getInputSplit();
String fileName = inputSplit.getPath().getName();
String line = value.toString();
String[] split = line.split(" ");
for (String val : split) {
tkey.set(val + "-->" + fileName);
context.write(tkey, tvalue);
}
}
}
/**
* 第一阶段Mapper输出数据格式为
* <k2> <v2>
* <hello:a.txt> <1>
* <hello:a.txt> <1>
* <hello:b.txt> <1>
* 第一阶段Reducer处理后输出到HDFS数据格式为
* <k3> <v3>
* <hello> <a.txt-->2>
* <hello> <b.txt-->1>
* @author songjq
*
*/
static class IndexStepOneReducer extends Reducer<Text, IntWritable, Text, LongWritable> {
private LongWritable tvalue = new LongWritable(0);
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context ctx)
throws IOException, InterruptedException {
long count = 0;
for(IntWritable value:values) {
count++;
}
tvalue.set(count);
ctx.write(key, tvalue);
}
}
}
2、第二阶段MapReduce任务程序
package cn.itcast.bigdata.index;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
/**
* 利用MapReduce实现输入多个文件中单词在每个文件中出现的次数,输出格式如下:
* hello (a.txt 2,b.txt 1,c.txt 4)
* tom (a.txt 5,b.txt 3)
* 实现方法:采用倒排索引算法并结合jobControll实现
* 本案例中所有的Mapper、Reducer、Job均采用匿名内部类实现
* @author songjq
*
*/
public class IndexStepTwo {
/**
* 第二阶段Mapper
* 第二阶段Mapper输入数据为第一阶段Reducer输出到HDFS的数据,格式为
* hello a.txt-->2
* hello b.txt-->1
* 通过第二阶段Mapper处理,输出数据格式为
* <k2> <v2>
* <hello> <a.txt-->2,b.txt-->1>
* @author songjq
*
*/
static class IndexStepTwoMapper extends Mapper<LongWritable, Text, Text, Text>{
private Text tkey = new Text();
private Text tvalue = new Text();
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
String[] split = line.split("\t");
if(split.length>1) {
String[] split2 = split[0].split("-->");
tkey.set(split2[0]);
if(split2.length>1) {
tvalue.set(split2[1]+"-->"+split[1]);
context.write(tkey, tvalue);
}
}
}
}
/**
* 第二阶段Reducer
* 通过第二阶段Reducer处理后,为最终输出结果,输出格式为
* <k4> <v4>
* <hello> <(a.txt 2,b.txt 1)>
* @author songjq
*
*/
static class IndexStepTwoReducer extends Reducer<Text, Text, Text, Text>{
private Text tval = new Text();
@Override
protected void reduce(Text key, Iterable<Text> values, Context ctx)
throws IOException, InterruptedException {
StringBuffer sb = new StringBuffer();
for(Text value:values) {
sb.append(value+" ");
}
tval.set(sb.toString());
ctx.write(key, tval);
}
}
}
3、利用jobControll来实现依赖任务的提交
package cn.itcast.bigdata.index;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.jobcontrol.JobControl;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import cn.itcast.bigdata.index.IndexStepOne.IndexStepOneMapper;
import cn.itcast.bigdata.index.IndexStepOne.IndexStepOneReducer;
import cn.itcast.bigdata.index.IndexStepTwo.IndexStepTwoMapper;
import cn.itcast.bigdata.index.IndexStepTwo.IndexStepTwoReducer;
/**
* 简单的job串联可以使用jobControll来实现 更复杂的job的调度可以考虑用shell脚本来写,或者干脆用现成的任务调度工具oozie来做
* 这里使用简单的jobControll来实现两个阶段MapReduce任务依赖提交处理
* 由于第二阶段的Mapper输入需要依赖第一阶段Reducer的输出,因此可以利用jobControll来实现第二阶段Mapper的等待,直到
* 第一阶段Reducer输出后,第二阶段的job才开始提交处理
* 核心方法:
* controlledJob2.addDependingJob(controlledJob1);
* @author songjq
*
*/
public class OnceSubmitClient {
public static void main(String[] args) throws Exception {
// 构造第一阶段的基本job对象job1
Configuration conf1 = new Configuration();
Job job1 = Job.getInstance(conf1, "inexStepOne");
job1.setJarByClass(OnceSubmitClient.class);
job1.setMapperClass(IndexStepOneMapper.class);
job1.setReducerClass(IndexStepOneReducer.class);
job1.setMapOutputKeyClass(Text.class);
job1.setMapOutputValueClass(IntWritable.class);
job1.setOutputKeyClass(Text.class);
job1.setOutputValueClass(LongWritable.class);
FileInputFormat.setInputPaths(job1, new Path(args[0]));
FileOutputFormat.setOutputPath(job1, new Path(args[1]));
// 构造第二阶段的基本job对象job2
Configuration conf2 = new Configuration();
Job job2 = Job.getInstance(conf2, "inexStepTwo");
job2.setJarByClass(OnceSubmitClient.class);
job2.setMapperClass(IndexStepTwoMapper.class);
job2.setReducerClass(IndexStepTwoReducer.class);
job2.setMapOutputKeyClass(Text.class);
job2.setMapOutputValueClass(Text.class);
job2.setOutputKeyClass(Text.class);
job2.setOutputValueClass(Text.class);
// 第二个job的输出是第一个job的输入
FileInputFormat.setInputPaths(job2, new Path(args[1]));
FileOutputFormat.setOutputPath(job2, new Path(args[2]));
// ControlledJob是基本的job的封装
ControlledJob controlledJob1 = new ControlledJob(conf1);
// 将job1封装到controlledJob1中去
controlledJob1.setJob(job1);
ControlledJob controlledJob2 = new ControlledJob(conf2);
// 将job2封装到controlledJob2中去
controlledJob2.setJob(job2);
// 先构造一个job控制器
JobControl jobControl = new JobControl("index");
// 指定两个job之间的依赖关系
controlledJob2.addDependingJob(controlledJob1);
// 向job控制器中添加job
jobControl.addJob(controlledJob1);
jobControl.addJob(controlledJob2);
// 创建一个线程去启动jobControl
Thread thread = new Thread(jobControl);
thread.start();
// 如果job没有运行完,主线程就等等
while (!jobControl.allFinished()) {
thread.sleep(500);
}
int succeedSize = jobControl.getSuccessfulJobList().size();
//0正常退出 1异常退出
System.exit(succeedSize == 2 ? 0 : 1);
}
}
原文地址:http://blog.51cto.com/2951890/2154751