package cn.sjq.bigdata.inverted.index;
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.junit.Test;
/**
* 实现功能: MapReduce实现每个单词在每个文件中坐标信息统计
* 输入文件:a.txt b.txt c.txt d.txt e.txt
* 输出结果:
* 利用MapReduce实现每个单词在每个文件中出现行列坐标信息,比如hello在a.txt、b.txt、c.txt中的坐标位置
* word (file1<row,col> file2<row,col>... filen<row,col>)
* hello (a.txt<1,1><2,3><4,8>) (b.txt<2,1><3,5>) (c.txt<2,3><4,9>)
* 输出描述:
* (a.txt<1,1><2,3><4,8>)表示hello在a.txt文件中第1行第一列、第2行第3列、第4行第8列出现过
* (b.txt<2,1><3,5>) 表示hello在b.txt文件中第2行第1列、第3行第5列出现过
** 实现思路:
* 1、Mapper阶段
* 读入a.txt第1行数据:
* a.txt -->Hello word
* 计算坐标信息
* Hello:a.txt--><1,1>
* word :a.txt--><1,7>
*
* 读入c.txt第1行数据:
* c.txt -->Hello Java
* 计算坐标信息
* Hello:c.txt--><1,1>
* Java :c.txt--><1,7>
*
* 读入a.txt第3行数据
* a.txt-->Hello Python Java
* 计算坐标信息
* Hello:a.txt--><3,1>
* Python:a.txt--><3,7>
* Java:a.txt--><3,7>
*
* 通过分析发现如下规律:
* Hello:a.txt--><1,1>
* Hello:c.txt--><1,1>
* Hello:a.txt--><3,1>
*
* Java :c.txt--><1,7>
* Java :a.txt--><3,7>
*
* word :a.txt--><1,7>
*
* Python:a.txt--><3,7>
*
* 通过Mapper处理后,输出的<k2,v2>格式如下
* <k2> <v2>
* Hello:a.txt <1,1>
* Hello:a.txt <3,1>
* Hello:c.txt <1,1>
* Java :c.txt <1,7>
* Java :a.txt <3,7>
* Python:a.txt <3,4>
* word :a.txt <1,7>
*
* 2、Combiner阶段,对Mapper阶段输出数据进行部分合并处理
* Mapper阶段输出数据格式为
* <k2> <v2>
* Hello:a.txt <1,1>
* Hello:a.txt <3,1>
* Hello:c.txt <1,1>
* Combiner处理逻辑
* 对输入的key进行切分操作
* 比如输入:Hello:a.txt <1,1>
* 通过Hello:a.txt后可以得出Combiner阶段的输出数据格式
* <k3`> <v3`>
* Hello a.txt--><1,1><3,1>
* Hello c.txt--><1,1>
*
* 3、Reducer阶段,Reducer阶段主要处理来自Combiner阶段输出,Combiner阶段输出数据格式如下
* <k3> <v3>
* Hello a.txt--><1,1><3,1>
* Hello c.txt--><1,1>
* 通过Reducer处理后,最终输出数据格式为
*
* <k4> <v4>
* Hello (a.txt<1,1><3,1>),(c.txt<1,1>)
*
* 这样就实现了每个单词在每个文件中坐标信息统计。
*
*
* @author songjq
*
*/
public class InvertedIndexCaseTwo {
/**
* Mapper阶段
* k1:读入数据便宜量 LongWritable
* v1:读入一行数据 Text
* k2:输出到Combiner阶段key Text
* v2:输出到Combiner阶段value Text
* Mapper阶段主要实现对输入数据的分词计算单词行列坐标处理,处理后输出数据格式为:
* <k1> <v1>
* Hello:a.txt <1,1>
* Hello:a.txt <3,1>
* Hello:c.txt <1,1>
* @author songjq
*
*/
static class InvertedIndexCaseTwoMapper extends Mapper<LongWritable, Text, Text, Text> {
//定义单词所在文件行号,rownum对输入的当前文件有效,如果重新输入另外一个文件,则会自动清零
private long rownum = 0;
private Text tkey = new Text();
private Text tvalue = new Text();
@Override
protected void map(LongWritable k1, Text v1, Context context) throws IOException, InterruptedException {
rownum++;
//读入一行数据
String line = v1.toString();
//获取读入文件名称
FileSplit inputSplit = (FileSplit) context.getInputSplit();
String fileName = inputSplit.getPath().getName();
//分词
String[] words = line.split(" ");
//遍历每一行单词,并获取该单词行号和列号,将其传输到Combiner
for(String word:words) {
int word_col = line.indexOf(word)+1;
String outKey = word+":"+fileName;
String outValue = "<"+rownum+","+word_col+">";
tkey.set(outKey);
tvalue.set(outValue);
context.write(tkey, tvalue);
}
}
}
/**
* Combiner是一种特殊的Reducer,因此同样需要继承Reducer类,使用Combiner不能改变原有业务逻辑及传输数据类型,慎用。
* 这里使用Combiner来对Mapper阶段输入的数据进行部分合并处理
* Mapper输出的数据格式
* <k2> <v2>
* Hello:a.txt <1,1>
* Hello:a.txt <3,1>
* Hello:c.txt <1,1>
* 通过Combiner处理后,输出数据格式
* <k3> <v3>
* Hello a.txt<1,1><3,1>
* Hello c.txt<1,1>
* @author songjq
*
*/
static class InvertedIndexCaseTwoCombiner extends Reducer<Text, Text, Text, Text> {
private Text tkey = new Text();
private Text tvalue = new Text();
@Override
protected void reduce(Text k3_, Iterable<Text> v3_, Context ctx) throws IOException, InterruptedException {
//定义存放Combiner输出的数据
StringBuffer v3str_ = new StringBuffer();
Iterator<Text> iterator = v3_.iterator();
//输出tvalue处理
while(iterator.hasNext()) {
Text row_col = iterator.next();
v3str_.append(row_col.toString());
}
String key3 = k3_.toString();
String[] split = key3.split(":");
String fileName = split[1];
tvalue.set(fileName+v3str_.toString());
//tkey处理
String word = split[0];
tkey.set(word);
//通过ctx将tkey tvalue传输到Reducer端
ctx.write(tkey, tvalue);
}
}
/**
* Reducer端
* Reducer端主要对Combiner端输出数据进行处理
* Combiner端输出数据格式为:
* <k3> <v3>
* Hello a.txt--><1,1><3,1>
* Hello c.txt--><1,1>
* 通过Reducer处理后,输出数据格式为
* <k4> <v4>
* hello a.txt--><1,1><3,1>,c.txt--><1,1>
* @author songjq
*
*/
static class InvertedIndexCaseTwoReducer extends Reducer<Text, Text, Text, Text> {
@Override
protected void reduce(Text k3, Iterable<Text> v3, Context ctx) throws IOException, InterruptedException {
StringBuffer result = new StringBuffer();
for(Text value:v3) {
result.append(value.toString()).append(" ");
}
ctx.write(new Text(formatStr(k3.toString(), 10)), new Text(result.toString()));
}
//输出文件头信息
@Override
protected void setup(Reducer<Text, Text, Text, Text>.Context context) throws IOException, InterruptedException {
context.write(new Text(formatStr("单词", 10)), new Text("文件行列信息(file<row,col>"));
}
/**
* 字符串填充空格
* @param str
* @param length
* @return
*/
public static String formatStr(String str, int length) {
if (str == null) {
str = "";
}
int strLen = str.getBytes().length;
if (strLen == length) {
return str;
} else if (strLen < length) {
int temp = length - strLen;
String tem = "";
for (int i = 0; i < temp; i++) {
tem = tem + " ";
}
return str + tem;
} else {
return str.substring(0, length);
}
}
}
/**
* 提交Job到hadoop集群执行
* @throws IOException
* @throws ClassNotFoundException
* @throws InterruptedException
*/
@Test
public void InvertedIndexCaseTwoJob() throws IOException, ClassNotFoundException, InterruptedException {
Job job = Job.getInstance(new Configuration());
job.setJarByClass(InvertedIndexCaseTwo.class);
job.setMapperClass(InvertedIndexCaseTwoMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
//设置combiner类
job.setCombinerClass(InvertedIndexCaseTwoCombiner.class);
job.setReducerClass(InvertedIndexCaseTwoReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.setInputPaths(job, new Path("D:\\test\\InvertedIndex\\srcdata"));
FileOutputFormat.setOutputPath(job, new Path("D:\\test\\InvertedIndex\\output5"));
job.waitForCompletion(true);
}
}
执行结果:
单词 文件行列信息(file<row,col>
Are d.txt<1,1>
China b.txt<2,1>
Do e.txt<2,1>
Hello a.txt<3,1><1,1> c.txt<1,1>
I e.txt<1,1> b.txt<1,1>
Java c.txt<2,1><1,7> a.txt<3,14>
Python a.txt<3,7>
We a.txt<2,1>
You d.txt<2,1>
a c.txt<2,2>
are d.txt<2,5> a.txt<2,4>
boys d.txt<2,14>
china e.txt<2,16><1,13>
come e.txt<1,3>
country b.txt<2,23><1,11>
friend a.txt<2,13>
from e.txt<1,8>
good a.txt<2,8> c.txt<2,11> d.txt<2,9>
greatest b.txt<2,14>
in b.txt<2,3>
is b.txt<2,7> c.txt<2,6>
language c.txt<2,16>
love b.txt<1,3>
my b.txt<1,8>
ok d.txt<1,9>
the b.txt<2,10><2,10>
to e.txt<2,13>
want e.txt<2,8>
word a.txt<1,7>
world b.txt<2,38>
you e.txt<2,4> d.txt<1,5>
原文地址:http://blog.51cto.com/2951890/2154968