2018-08-05 期 MapReduce实现每个单词在每个文件中坐标信息统计

package cn.sjq.bigdata.inverted.index;

import java.io.IOException;

import java.util.Iterator;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.InputSplit;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.input.FileSplit;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.junit.Test;

/**

* 实现功能: MapReduce实现每个单词在每个文件中坐标信息统计

* 输入文件:a.txt b.txt c.txt d.txt e.txt

* 输出结果:

* 利用MapReduce实现每个单词在每个文件中出现行列坐标信息,比如hello在a.txt、b.txt、c.txt中的坐标位置

* word (file1<row,col> file2<row,col>... filen<row,col>)

* hello (a.txt<1,1><2,3><4,8>) (b.txt<2,1><3,5>) (c.txt<2,3><4,9>)

* 输出描述:

* (a.txt<1,1><2,3><4,8>)表示hello在a.txt文件中第1行第一列、第2行第3列、第4行第8列出现过

* (b.txt<2,1><3,5>) 表示hello在b.txt文件中第2行第1列、第3行第5列出现过

** 实现思路:

* 1、Mapper阶段

* 读入a.txt第1行数据:

* a.txt -->Hello word

* 计算坐标信息

* Hello:a.txt--><1,1>

* word :a.txt--><1,7>

*

* 读入c.txt第1行数据:

* c.txt -->Hello Java

* 计算坐标信息

* Hello:c.txt--><1,1>

* Java :c.txt--><1,7>

*

* 读入a.txt第3行数据

* a.txt-->Hello Python Java

* 计算坐标信息

* Hello:a.txt--><3,1>

* Python:a.txt--><3,7>

* Java:a.txt--><3,7>

*

* 通过分析发现如下规律:

* Hello:a.txt--><1,1>

* Hello:c.txt--><1,1>

* Hello:a.txt--><3,1>

*

* Java :c.txt--><1,7>

* Java :a.txt--><3,7>

*

* word :a.txt--><1,7>

*

* Python:a.txt--><3,7>

*

* 通过Mapper处理后,输出的<k2,v2>格式如下

* <k2> <v2>

* Hello:a.txt <1,1>

* Hello:a.txt <3,1>

* Hello:c.txt <1,1>

* Java :c.txt <1,7>

* Java :a.txt <3,7>

* Python:a.txt <3,4>

* word :a.txt <1,7>

*

* 2、Combiner阶段,对Mapper阶段输出数据进行部分合并处理

* Mapper阶段输出数据格式为

* <k2> <v2>

* Hello:a.txt <1,1>

* Hello:a.txt <3,1>

* Hello:c.txt <1,1>

* Combiner处理逻辑

* 对输入的key进行切分操作

* 比如输入:Hello:a.txt <1,1>

* 通过Hello:a.txt后可以得出Combiner阶段的输出数据格式

* <k3`> <v3`>

* Hello a.txt--><1,1><3,1>

* Hello c.txt--><1,1>

*

* 3、Reducer阶段,Reducer阶段主要处理来自Combiner阶段输出,Combiner阶段输出数据格式如下

* <k3> <v3>

* Hello a.txt--><1,1><3,1>

* Hello c.txt--><1,1>

* 通过Reducer处理后,最终输出数据格式为

*

* <k4> <v4>

* Hello (a.txt<1,1><3,1>),(c.txt<1,1>)

*

* 这样就实现了每个单词在每个文件中坐标信息统计。

*

*

* @author songjq

*

*/

public class InvertedIndexCaseTwo {

/**

* Mapper阶段

* k1:读入数据便宜量 LongWritable

* v1:读入一行数据 Text

* k2:输出到Combiner阶段key Text

* v2:输出到Combiner阶段value Text

* Mapper阶段主要实现对输入数据的分词计算单词行列坐标处理,处理后输出数据格式为:

* <k1> <v1>

* Hello:a.txt <1,1>

* Hello:a.txt <3,1>

* Hello:c.txt <1,1>

* @author songjq

*

*/

static class InvertedIndexCaseTwoMapper extends Mapper<LongWritable, Text, Text, Text> {

//定义单词所在文件行号,rownum对输入的当前文件有效,如果重新输入另外一个文件,则会自动清零

private long rownum = 0;

private Text tkey = new Text();

private Text tvalue = new Text();

@Override

protected void map(LongWritable k1, Text v1, Context context) throws IOException, InterruptedException {

rownum++;

//读入一行数据

String line = v1.toString();

//获取读入文件名称

FileSplit inputSplit = (FileSplit) context.getInputSplit();

String fileName = inputSplit.getPath().getName();

//分词

String[] words = line.split(" ");

//遍历每一行单词,并获取该单词行号和列号,将其传输到Combiner

for(String word:words) {

int word_col = line.indexOf(word)+1;

String outKey = word+":"+fileName;

String outValue = "<"+rownum+","+word_col+">";

tkey.set(outKey);

tvalue.set(outValue);

context.write(tkey, tvalue);

}

}

}

/**

* Combiner是一种特殊的Reducer,因此同样需要继承Reducer类,使用Combiner不能改变原有业务逻辑及传输数据类型,慎用。

* 这里使用Combiner来对Mapper阶段输入的数据进行部分合并处理

* Mapper输出的数据格式

* <k2> <v2>

* Hello:a.txt <1,1>

* Hello:a.txt <3,1>

* Hello:c.txt <1,1>

* 通过Combiner处理后,输出数据格式

* <k3> <v3>

* Hello a.txt<1,1><3,1>

* Hello c.txt<1,1>

* @author songjq

*

*/

static class InvertedIndexCaseTwoCombiner extends Reducer<Text, Text, Text, Text> {

private Text tkey = new Text();

private Text tvalue = new Text();

@Override

protected void reduce(Text k3_, Iterable<Text> v3_, Context ctx) throws IOException, InterruptedException {

//定义存放Combiner输出的数据

StringBuffer v3str_ = new StringBuffer();

Iterator<Text> iterator = v3_.iterator();

//输出tvalue处理

while(iterator.hasNext()) {

Text row_col = iterator.next();

v3str_.append(row_col.toString());

}

String key3 = k3_.toString();

String[] split = key3.split(":");

String fileName = split[1];

tvalue.set(fileName+v3str_.toString());

//tkey处理

String word = split[0];

tkey.set(word);

//通过ctx将tkey tvalue传输到Reducer端

ctx.write(tkey, tvalue);

}

}

/**

* Reducer端

* Reducer端主要对Combiner端输出数据进行处理

* Combiner端输出数据格式为:

* <k3> <v3>

* Hello a.txt--><1,1><3,1>

* Hello c.txt--><1,1>

* 通过Reducer处理后,输出数据格式为

* <k4> <v4>

* hello a.txt--><1,1><3,1>,c.txt--><1,1>

* @author songjq

*

*/

static class InvertedIndexCaseTwoReducer extends Reducer<Text, Text, Text, Text> {

@Override

protected void reduce(Text k3, Iterable<Text> v3, Context ctx) throws IOException, InterruptedException {

StringBuffer result = new StringBuffer();

for(Text value:v3) {

result.append(value.toString()).append(" ");

}

ctx.write(new Text(formatStr(k3.toString(), 10)), new Text(result.toString()));

}

//输出文件头信息

@Override

protected void setup(Reducer<Text, Text, Text, Text>.Context context) throws IOException, InterruptedException {

context.write(new Text(formatStr("单词", 10)), new Text("文件行列信息(file<row,col>"));

}

/**

* 字符串填充空格

* @param str

* @param length

* @return

*/

public static String formatStr(String str, int length) {

if (str == null) {

str = "";

}

int strLen = str.getBytes().length;

if (strLen == length) {

return str;

} else if (strLen < length) {

int temp = length - strLen;

String tem = "";

for (int i = 0; i < temp; i++) {

tem = tem + " ";

}

return str + tem;

} else {

return str.substring(0, length);

}

}

}

/**

* 提交Job到hadoop集群执行

* @throws IOException

* @throws ClassNotFoundException

* @throws InterruptedException

*/

@Test

public void InvertedIndexCaseTwoJob() throws IOException, ClassNotFoundException, InterruptedException {

Job job = Job.getInstance(new Configuration());

job.setJarByClass(InvertedIndexCaseTwo.class);

job.setMapperClass(InvertedIndexCaseTwoMapper.class);

job.setMapOutputKeyClass(Text.class);

job.setMapOutputValueClass(Text.class);

//设置combiner类

job.setCombinerClass(InvertedIndexCaseTwoCombiner.class);

job.setReducerClass(InvertedIndexCaseTwoReducer.class);

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(Text.class);

FileInputFormat.setInputPaths(job, new Path("D:\\test\\InvertedIndex\\srcdata"));

FileOutputFormat.setOutputPath(job, new Path("D:\\test\\InvertedIndex\\output5"));

job.waitForCompletion(true);

}

}

执行结果:

单词       文件行列信息(file<row,col>

Are       d.txt<1,1>

China     b.txt<2,1>

Do         e.txt<2,1>

Hello     a.txt<3,1><1,1> c.txt<1,1>

I         e.txt<1,1> b.txt<1,1>

Java       c.txt<2,1><1,7> a.txt<3,14>

Python     a.txt<3,7>

We         a.txt<2,1>

You       d.txt<2,1>

a         c.txt<2,2>

are       d.txt<2,5> a.txt<2,4>

boys       d.txt<2,14>

china     e.txt<2,16><1,13>

come       e.txt<1,3>

country   b.txt<2,23><1,11>

friend     a.txt<2,13>

from       e.txt<1,8>

good       a.txt<2,8> c.txt<2,11> d.txt<2,9>

greatest   b.txt<2,14>

in         b.txt<2,3>

is         b.txt<2,7> c.txt<2,6>

language   c.txt<2,16>

love       b.txt<1,3>

my         b.txt<1,8>

ok         d.txt<1,9>

the       b.txt<2,10><2,10>

to         e.txt<2,13>

want       e.txt<2,8>

word       a.txt<1,7>

world     b.txt<2,38>

you       e.txt<2,4> d.txt<1,5>

原文地址:http://blog.51cto.com/2951890/2154968

时间: 2024-10-12 07:42:20

2018-08-05 期 MapReduce实现每个单词在每个文件中坐标信息统计的相关文章

2018-08-03 期 MapReduce倒排索引编程案例1(Combiner方式)

package cn.sjq.bigdata.inverted.index; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.map

2018-08-04 期 MapReduce倒排索引编程案例2(jobControll方式)

1.第一阶段MapReduce任务程序 package cn.itcast.bigdata.index; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import or

Java的实验程序之输出单个文件中的前 N 个最常出现的英语单词

日期:2018.10.11 星期四 博客期:016 题目:输出单个文件中的前 N 个最常出现的英语单词,并输出到文本文件中 在程序运行之前,我试着先写了字符的字母的总结,加载代码如下: 1 //如下是第一个程序的 CharBasic文件 2 package src; 3 4 public final class CharBasic { 5 //检测字母是否为字母 6 public static boolean isAtoZ(char c){ 7 return c<='z'&&c>

2014/08/05 – Backbonejs

[来自: Backbone.js 开发秘笈 第2章] Model API: (function ($) { //define Model Class ------------------- var ModelClass = Backbone.Model.extend({ defaults: {},//Backbone 支持在模型初始化时动态进行定义 [支持多行表达式设置默认值,即值为函数] initialize: function () { //模型对象被创建后即被调用 /* 注:如定义了默认属

【Android UI设计与开发】第05期:引导界面(五)实现应用程序只启动一次引导界面

[Android UI设计与开发]第05期:引导界面(五)实现应用程序只启动一次引导界面 jingqing 发表于 2013-7-11 14:42:02 浏览(229501) 这篇文章算是对整个引导界面开发专题的一个终结了吧,个人觉得大部分的引导界面基本上都是千篇一律的,只要熟练掌握了一个,基本上也就没什么好说的了,要是在今后的开发中遇到了更好玩,更有趣的引导界面,博主也会在这里及时的跟大家分享,今天的内容主要是教大家的应用程序只有在第一次启动的时候显示引导界面,以后在启动程序的时候就不再显示了

饥饿疗法是目前唯一确信能够延缓衰老的办法:4星|《三联生活周刊》2018年3期

三联生活周刊·人类到底能活多久:抗衰老科学指南(2018年3期) 本期主题是抗衰老,科学记者袁越走访了全球抗衰老研究的顶级机构,把这个领域最前沿的进展深入浅出地展现出来,非常有价值.这一类报道也是国内比较稀缺的. 总体评价4星. 以下是书中一些内容的摘抄,#号后面是kindle电子版中的页码,[]中是我根据上下文补充的信息: 1:2016年世界人均一次性能源消费量为1.87吨油当量,中国为2.25吨,相比十几年前还不足1吨有了飞跃,但只相当于经合组织(OECD)4.5吨的一半.OECD目前有34

沈阳当年对学校承认了他和高岩的性关系:3星|《三联生活周刊》2018年16期

三联生活周刊·教授的权力:高校内的不平等关系(2018年16期) 本期主题是高校教师性侵学生的调查与思考. 总体评价3星,有参考价值. 以下是书中一些内容的摘抄,#号后面是kindle电子版中的页码: 1:自第二次世界大战以来,以色列制造的暗杀事件比任何西方国家都多.该国领导人甚至认为通过杀戮指定的目标保护其国家安全,危害无辜平民的生命是合情合理的.#52 2:旅游业难以聚集大量的财富,给其从业者带来的回报也有限,这就是为什么海南成了高消费的代名词,可当地人收入却普遍不高的原因.这也可以说是资源

新手C#string类常用函数的学习2018.08.04

ToLower()用于将字符串变为小写,注意字符串的不可变特性,需要重新赋值给另一个字符串变量. s = s.ToLower();//字符串具有不可变性,转换后需要重新赋值,不可仅有s.ToLower(); 这可以使用户的输入不区分大小写,例如验证码. ToUpper()用于将字符串全部变为大写,与上面类似. Trim()可以用于去掉两边的空格. string s1 = " a b c "; s1 = s1.Trim();//用于去除字符串两边的空格 Console.WriteLine

Intel Digital Innovation Industry Summit(2018.08.17)

时间:2018.08.17地点:北京金隅喜来登大酒店 原文地址:https://www.cnblogs.com/xuefeng1982/p/10331638.html