2018-08-03 期 MapReduce倒排索引编程案例1(Combiner方式)

package cn.sjq.bigdata.inverted.index;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.input.FileSplit;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.junit.Test;

/**

* 利用MapReduce实现输入多个文件中单词在每个文件中出现的次数,输出格式如下:

* hello (a.txt 2,b.txt 1,c.txt 4)

* tom (a.txt 5,b.txt 3)

* 实现方法:采用倒排索引算法并结合MapReduce Combiner实现

* 中间添加Combiner需要注意不能改变原有实现逻辑及改变Mapper到Reducer的数据类型

*

* 本案例中所有的Mapper、Reducer、Job均采用匿名内部类实现

* @author songjq

*

*/

public class InvertedIndexCaseOne {

/**

* Mapper阶段

* k1:输入key LongWritable  读入数据偏移量

* v1:输入value Text   读入的一行数据

* k2:输出key Text 格式为<hello:a.txt>,<hello:b.txt>

* v2:输出value Text 格式为<1>,<1>

* @author songjq

*

*/

static class InvertedIndexCaseOneMapper extends Mapper<LongWritable, Text, Text, Text> {

private Text tkey = new Text();

private Text tvalue = new Text();

@Override

protected void map(LongWritable k1, Text v1, Context context) throws IOException, InterruptedException {

//读入数据

String line = v1.toString();

//分词,安装空格切分

String[] words = line.split(" ");

//获取输入文件名称

FileSplit inputSplit = (FileSplit) context.getInputSplit();

String fileName = inputSplit.getPath().getName();

//将数据通过context传输到Reducer

for(String word:words) {

tkey.set(word+":"+fileName);

tvalue.set("1");

context.write(tkey, tvalue);

}

}

}

/**

* Combiner阶段

* 定义Combiner类

* 由于Combiner是一个特殊的Reducer,因此需要继承Reducer

* 其作用就是对Mapper端输入的数据进行部分求和,并发送到Reducer阶段处理

* Mapper端输入的数据格式如下:

* <k2> <v2>

* <hello:a.txt <"1","1">

* <hello:b.txt <"1">

* 通过Combiner处理后,最终输出到Reducer的数据格式如下

* <k3> <v3>

* <hello> <a.txt:"2">

* <hello> <b.txt:"1">

* @author songjq

*

*/

static class InvertedIndexCaseOneCombiner extends Reducer<Text, Text, Text, Text> {

@Override

protected void reduce(Text k31, Iterable<Text> v31, Context ctx) throws IOException, InterruptedException {

int total = 0;

for(Text val:v31) {

//单词在每个文件中出现次数统计

total+=Integer.parseInt(val.toString());

}

//k3处理,格式hello:a.txt

String[] split = k31.toString().split(":");

String word = split[0];

String fileName = split[1];

//输出 k3:<hello> v3:<a.txt:"2",b.txt:"1">

ctx.write(new Text(word), new Text(fileName+":"+total));

}

}

/**

* Reducer阶段

* Reducer阶段主要对Combiner阶段输出的数据进行处理

* Combiner阶段输出数据格式如下:

* <k3> <v3>

* <hello> <a.txt:"2",b.txt:"1">

* 通过Reducer处理后,最终输出数据格式如下:

* <k4> <v4>

* <hello> <(a.txt 2,b.txt 1)>

* @author songjq

*

*/

static class InvertedIndexCaseOneReducer extends Reducer<Text, Text, Text, Text> {

/*

* 由于setup方法只会被调用一次,因此可以在这里输出文件头

* (non-Javadoc)

* @see org.apache.hadoop.mapreduce.Reducer#setup(org.apache.hadoop.mapreduce.Reducer.Context)

*/

@Override

protected void setup(Reducer<Text, Text, Text, Text>.Context context) throws IOException, InterruptedException {

context.write(new Text(formatStr("Word", 20)), new Text("Frequency statistics [eg:(a.txt 2,b.txt 1)]"));

}

@Override

protected void reduce(Text k3, Iterable<Text> v3, Context ctx) throws IOException, InterruptedException {

//定义存放输出结果的对象result

StringBuffer result = new StringBuffer();

for(Text val:v3) {

//<v3>数据<a.txt:"2">

String[] split = val.toString().split(":");

String fileName = split[0];

String count = split[1];

result.append(fileName).append(" ").append(count).append(",");

}

//将<k4,v4>写入HDFS

//最终输出到文件的数据格式 hello (a.txt 2,b.txt 1)

ctx.write(new Text(formatStr(k3.toString(), 20)), new Text(result.deleteCharAt(result.length()-1).toString()));

}

/**

* 字符串填充空格

* @param str

* @param length

* @return

*/

public static String formatStr(String str, int length) {

if (str == null) {

str = "";

}

int strLen = str.getBytes().length;

if (strLen == length) {

return str;

} else if (strLen < length) {

int temp = length - strLen;

String tem = "";

for (int i = 0; i < temp; i++) {

tem = tem + " ";

}

return str + tem;

} else {

return str.substring(0, length);

}

}

}

/**

* 提交job

* @throws IOException

* @throws InterruptedException

* @throws ClassNotFoundException

*

*/

@Test

public void InvertedIndexCaseOneJob() throws IOException, ClassNotFoundException, InterruptedException {

Job job = Job.getInstance(new Configuration());

job.setJarByClass(InvertedIndexCaseOne.class);

job.setMapperClass(InvertedIndexCaseOneMapper.class);

job.setMapOutputKeyClass(Text.class);

job.setMapOutputValueClass(Text.class);

// 设置Combiner Class类

job.setCombinerClass(InvertedIndexCaseOneCombiner.class);

job.setReducerClass(InvertedIndexCaseOneReducer.class);

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(Text.class);

FileInputFormat.setInputPaths(job, new Path("D:\\test\\InvertedIndex\\srcdata"));

FileOutputFormat.setOutputPath(job, new Path("D:\\test\\InvertedIndex\\output2"));

job.waitForCompletion(true);

}

}

直接结果

Word                 Frequency statistics [eg:(a.txt 2,b.txt 1)]

Are                 d.txt 1

China               b.txt 1

Do                   e.txt 1

Hello               c.txt 1,a.txt 1

I                   e.txt 1,b.txt 1

Java                 c.txt 2

We                   a.txt 1

You                 d.txt 1

a                   c.txt 1

are                 d.txt 1,a.txt 1

boys                 d.txt 1

china               e.txt 2

come                 e.txt 1

country             b.txt 2

friend               a.txt 1

from                 e.txt 1

good                 d.txt 1,c.txt 1,a.txt 1

greatest             b.txt 1

in                   b.txt 1

is                   b.txt 1,c.txt 1

language             c.txt 1

love                 b.txt 1

my                   b.txt 1

ok                   d.txt 1

the                 b.txt 2

to                   e.txt 1

want                 e.txt 1

word                 a.txt 1

world               b.txt 1

you                 d.txt 1,e.txt 1

原文地址:http://blog.51cto.com/2951890/2153886

时间: 2024-10-16 02:37:06

2018-08-03 期 MapReduce倒排索引编程案例1(Combiner方式)的相关文章

2018-08-04 期 MapReduce倒排索引编程案例2(jobControll方式)

1.第一阶段MapReduce任务程序 package cn.itcast.bigdata.index; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import or

MapReduce 单词统计案例编程

MapReduce 单词统计案例编程 一.在Linux环境安装Eclipse软件 1.   解压tar包 下载安装包eclipse-jee-kepler-SR1-linux-gtk-x86_64.tar.gz到/opt/software目录下. 解压到/opt/tools目录下: [[email protected] tools]$ tar -zxf /opt/sofeware/eclipse-jee-kepler-SR1-linux-gtk-x86_64.tar.gz -C /opt/tool

Hadoop集群(第9期)_MapReduce初级案例 - 虾皮 - 博客园

body{ font-family: "Microsoft YaHei UI","Microsoft YaHei",SimSun,"Segoe UI",Tahoma,Helvetica,Sans-Serif,"Microsoft YaHei", Georgia,Helvetica,Arial,sans-serif,宋体, PMingLiU,serif; font-size: 10.5pt; line-height: 1.5;}

[连载]JavaScript讲义(03)--- JavaScript面向对象编程

[连载]JavaScript讲义(03)--- JavaScript面向对象编程,布布扣,bubuko.com

Hadoop集群(第9期)_MapReduce初级案例

1.数据去重  "数据去重"主要是为了掌握和利用并行化思想来对数据进行有意义的筛选.统计大数据集上的数据种类个数.从网站日志中计算访问地等这些看似庞杂的任务都会涉及数据去重.下面就进入这个实例的MapReduce程序设计. 1.1 实例描述 对数据文件中的数据进行去重.数据文件中的每行都是一个数据. 样例输入如下所示: 1)file1: 2012-3-1 a 2012-3-2 b 2012-3-3 c 2012-3-4 d 2012-3-5 a 2012-3-6 b 2012-3-7

hadoop笔记之MapReduce的应用案例(利用MapReduce进行排序)

MapReduce的应用案例(利用MapReduce进行排序) MapReduce的应用案例(利用MapReduce进行排序) 思路: Reduce之后直接进行结果合并 具体样例: 程序名:Sort.java import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; impo

面向对象编程案例02--显示地调用父类的__init__()

# -*- coding: utf-8 -*- #python 27 #xiaodeng #面向对象编程案例02--显示地调用父类的__init__() ''' 继承是面向对象的重要特征之一,继承是2个类或多个类之间的父子关系,子类继承父类的所有共有实例变量和方法. 继承实现了代码的重用,减少代码的编写量 python在类名后用圆括号来表示继承关系,括号中的类表示父类 如果父类有init方法,则子类会显示地调用其父类的init方法,不需要重写定义.如果子类需要拓展父类的init方法,则可以父类的

《游戏人工智能编程案例精粹》读书笔记&mdash;状态驱动智能体设计

一个有限状态机是一个设备,或是一个设备模型,具有有限数量的状态,它可以在任何给定的时间根据输入进行操作,使得从一个状态变换到另一个状态,或者是促使一个输出或者一种行为的发生.一个有限状态机在任何瞬间只能处于一种状态. 状态变换表 状态变换表是一个条件和那些条件导致的状态的表,这个表可以被智能体在规则的间隔内训问,使得它能基于从游戏环境中接收到刺激进行必须的状态转换. 内置的规则 每个状态模块依靠自身的逻辑来决定它是否应该运行自己变换到一个替代状态,智能体只向外部提供操作和获取自身属性的函数,状态

《游戏人工智能编程案例精粹》读书笔记—数学和物理学初探

1.1.1 笛卡尔坐标系 在二维空间中,笛卡尔坐标系被定义成两个坐标轴成直角相交并且用单位长度标出.水平轴称为x 轴,而垂直轴称为y 轴,两个轴的交点称为原点,如图1.1 所示. 如图1.1所示,每个坐标轴端点的箭头表示它们在每个方向上无限延伸.假想有一张无限大的纸,上面有x 轴和y 轴,纸就表示 xy 平面,所有二维的笛卡尔坐标系中的点都可以给制在这个平面上.在2D 空间中的一个点可以用一对坐标(x,y) 表示.x 和y 的值代表沿着各自的轴上的距离. 为了表达三维空间,需要另外一个坐标轴z铀