单词计数的mapreduce原理

Hello you

Hello  me

1.1 读取hdfs中的文件。每一行解析成一个<k,v>。每一个键值对调用一次map函数

解析成2个<k,v>,分别是<0,hello you> <10,hello me>

调用2次map函数

1.2覆盖map()函数,接受1.1的<k,v>进行处理,转换成新<k,v>输出

Public  void  map(k,v,ctx){

String[] splited = v.split(“\t”)

for(string word:splited){

//return <word,1>

ctx.write(word,1)

}

}

1.3对1.2输出<k,v>进行分区.默认一个分区

1.4对不同分区的的数据进行排序(按照k)、分组。分组是指相同的key的value放到一个集合中。

Map输出后的数据是:<hello,1><you,1><hello,1><me,1>

排序后为:<hello,1> <hello,1> <me,1> <you,1>

分组:<hello,{1,1}> <me,{1}> <you,{1}>  ----->分组的数量有3个

1.5(可选)对分组后的数据进行规约。(是指大文件转换成小文件)

2.1多个map的任务输出,安装不同的分区,通过网络拷贝到不同的reduce上

2.2对多个map的输出进行合并、排序。覆盖reduce函数,接受的是分组后的数据,实现自己的业务逻辑,处理后形成新的<k,v>输出

//reduce被调用的次数为3等于分组后的次数

Public void reduce(k,vs,ctx){

Long count = 0L;

for(Long times:vs){

count += times

}

ctx.write(k,count) ;

}

2.3对reduce的输出<k,v>写入到hdfs中

hello 2

me  1

you  1

代码为:

package mapreduce;

import java.net.URI;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.FileSystem;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCountApp {

static final String INPUT_PATH = "hdfs://hadoop:9000/hello";

static final String OUT_PATH = "hdfs://hadoop:9000/out";

public static void main(String[] args) throws Exception {

Configuration conf = new Configuration();

final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), conf);

final Path outPath = new Path(OUT_PATH);

if(fileSystem.exists(outPath)){

fileSystem.delete(outPath, true);

}

final Job job = new Job(conf , WordCountApp.class.getSimpleName());

//1.1指定读取的文件位于哪里

FileInputFormat.setInputPaths(job, INPUT_PATH);

//指定如何对输入文件进行格式化,把输入文件每一行解析成键值对

//job.setInputFormatClass(TextInputFormat.class);

//1.2 指定自定义的map类

job.setMapperClass(MyMapper.class);

//map输出的<k,v>类型。如果<k3,v3>的类型与<k2,v2>类型一致,则可以省略

//job.setMapOutputKeyClass(Text.class);

//job.setMapOutputValueClass(LongWritable.class);

//1.3 分区

//job.setPartitionerClass(HashPartitioner.class);

//有一个reduce任务运行

//job.setNumReduceTasks(1);

//1.4 TODO 排序、分组

//1.5 TODO 规约

//2.2 指定自定义reduce类

job.setReducerClass(MyReducer.class);

//指定reduce的输出类型

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(LongWritable.class);

//2.3 指定写出到哪里

FileOutputFormat.setOutputPath(job, outPath);

//指定输出文件的格式化类

//job.setOutputFormatClass(TextOutputFormat.class);

//把job提交给JobTracker运行

job.waitForCompletion(true);

}

/**

* KEYIN
即k1 表示行的偏移量

* VALUEIN
即v1 表示行文本内容

* KEYOUT
即k2 表示行中出现的单词

* VALUEOUT
即v2 表示行中出现的单词的次数,固定值1

*/

static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable>{

protected void map(LongWritable k1, Text v1, Context context) throws java.io.IOException ,InterruptedException {

final String[] splited = v1.toString().split("\t");

for (String word : splited) {

//System.out.println(word);

context.write(new Text(word), new LongWritable(1));

}

//System.out.println("***************");

System.out.println(v1);

};

}

/**

* KEYIN
即k2 表示行中出现的单词

* VALUEIN
即v2 表示行中出现的单词的次数

* KEYOUT
即k3 表示文本中出现的不同单词

* VALUEOUT
即v3 表示文本中出现的不同单词的总次数

*

*/

static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable>{

protected void reduce(Text k2, java.lang.Iterable<LongWritable> v2s, Context ctx) throws java.io.IOException ,InterruptedException {

long times = 0L;

for (LongWritable count : v2s) {

times += count.get();

}

ctx.write(k2, new LongWritable(times));

System.out.println(k2+"  ---->"+times);

};

}

}

时间: 2024-11-05 12:24:50

单词计数的mapreduce原理的相关文章

大数据【四】MapReduce(单词计数;二次排序;计数器;join;分布式缓存)

   前言: 根据前面的几篇博客学习,现在可以进行MapReduce学习了.本篇博客首先阐述了MapReduce的概念及使用原理,其次直接从五个实验中实践学习(单词计数,二次排序,计数器,join,分布式缓存). 一 概述 定义 MapReduce是一种计算模型,简单的说就是将大批量的工作(数据)分解(MAP)执行,然后再将结果合并成最终结果(REDUCE).这样做的好处是可以在任务被分解后,可以通过大量机器进行并行计算,减少整个操作的时间. 适用范围:数据量大,但是数据种类小可以放入内存. 基

MapReduce之单词计数

最近在看google那篇经典的MapReduce论文,中文版可以参考孟岩推荐的 mapreduce 中文版 中文翻译 论文中提到,MapReduce的编程模型就是: 计算利用一个输入key/value对集,来产生一个输出key/value对集.MapReduce库的用户用两个函数表达这个计算:map和reduce. 用户自定义的map函数,接受一个输入对,然后产生一个中间key/value对集.MapReduce库把所有具有相同中间key I的中间value聚合在一起,然后把它们传递给reduc

大数据学习之MapReduce编程案例一单词计数 10

一:单词计数 1:单词计数总流程图 2:代码实现 1:Map阶段 package it.dawn.YARNPra.wc_hdfs; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapp

通过简单的Word Count讲解MapReduce原理以及Java实现

MapReduce原理: MapReduce采用"分而治之"的思想,把对大规模数据集的操作,分发给一个主节点管理下的各个分节点共同完成,然后通过整合各个节点的中间结果,得到最终结果.简单地说,MapReduce就是"任务的分解与结果的汇总". 在Hadoop中,用于执行MapReduce任务的机器角色有两个:一个是JobTracker:另一个是TaskTracker,JobTracker是用于调度工作的,TaskTracker是用于执行工作的.一个Hadoop集群中

大数据运算模型 MapReduce 原理

大数据运算模型 MapReduce 原理 2016-01-24 杜亦舒 MapReduce 是一个大数据集合的并行运算模型,由google提出,现在流行的hadoop中也使用了MapReduce作为计算模型 MapReduce 通俗解释 图书馆要清点图书数量,有10个书架,管理员为了加快统计速度,找来了10个同学,每个同学负责统计一个书架的图书数量 张同学 统计 书架1王同学 统计 书架2刘同学 统计 书架3...... 过了一会儿,10个同学陆续到管理员这汇报自己的统计数字,管理员把各个数字加

Storm实现单词计数

package com.mengyao.storm; import java.io.File;import java.io.IOException;import java.util.Collection;import java.util.HashMap;import java.util.List;import java.util.Map;import java.util.Map.Entry; import org.apache.commons.io.FileUtils; import backt

【Hadoop基础教程】5、Hadoop之单词计数

单词计数是最简单也是最能体现MapReduce思想的程序之一,可以称为MapReduce版"Hello World",该程序的完整代码可以在Hadoop安装包的src/example目录下找到.单词计数主要完成的功能:统计一系列文本文件中每个单词出现的次数,如下图所示.本blog将通过分析WordCount源码来帮助大家摸清MapReduce程序的基本结构和运行机制. 开发环境 硬件环境:Centos 6.5 服务器4台(一台为Master节点,三台为Slave节点) 软件环境:Jav

Storm实验 -- 单词计数4

在上一次单词计数的基础上做如下改动: 使用 自定义  分组策略,将首字母相同的单词发送给同一个task计数 自定义 CustomStreamGrouping package com.zhch.v4; import backtype.storm.generated.GlobalStreamId; import backtype.storm.grouping.CustomStreamGrouping; import backtype.storm.task.WorkerTopologyContext;

Storm实验 -- 单词计数3

在上一次单词计数的基础上做如下改动: 使用 Direct Grouping 分组策略,将首字母相同的单词发送给同一个task计数 数据源spout package com.zhch.v3; import backtype.storm.spout.SpoutOutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import b