手动实现一个单词统计MapReduce程序与过程原理分析

[toc]


手动实现一个单词统计MapReduce程序与过程原理分析

前言

我们知道,在搭建好hadoop环境后,可以运行wordcount程序来体验一下hadoop的功能,该程序在hadoop目录下的share/hadoop/mapreduce目录中,通过下面的命令:

yarn jar $HADOOP_HOME/share/hadoop/mapreducehadoop-mapreduce-examples-2.6.4.jar wordcount inputPath outPath

即可对输入文件执行单词统计的计算。

那么下面就通过手动写一个wordcount的例子来加深对MapReduce的基本理解。

案例场景

假如有下面一个文本文件需要进行单词统计:

$ cat hello
hello you
hello he
hello me

Note:该hello文件为李老师的经典文本文件。

下面就来演示MapReduce程序如何来对该文本文件进行计算,最后再依据此写一个wordcount程序。

MapReduce计算分析

我们先来简单分析一下MapReduce是如何处理上面的文本文件,然后才写一个程序。

对于上面的一个文本文件,MapReduce程序分三个步骤进行处理:Map阶段、Shuffle阶段和Reduce阶段。(三个阶段的分析在代码的注释中也是非常详细的解释)

Map阶段

上面的文本文件经过Map处理后会得到类似下面的结果:

<hello, 1>
<heelo, 1>
<hello, 1>
<you, 1>
<he, 1>
<me, 1>

shuffle阶段

对Map阶段的结果进行处理,会得到如下的结果:

<hello, [1, 1, 1]>
<you, [1]>
<he, [1]>
<me, [1]>

Reduce阶段

经过reducer处理之后,结果如下:

<hello, 3>
<you, 1>
<he, 1>
<me, 1>

关于上面的过程分析,可以参考下面的几张图示以帮助理解:

图示1:

图示2:

图示3:

程序思路分析

 * 整个的解题思路,使用map函数进行单词的拆分,使用reduce函数进行汇总,中间进行shuffle
 * 要想让我们的map函数和reduce函数进行接替运行,需要一个驱动程序
 * 代码的思路:
 * 1、编写一个类继承Mapper,成为自定义的Mapper,主要业务逻辑就是复写其中的map函数
 *  map
 *  首先要确定清楚Mapper类或者map函数的数据类型/类型参数--->泛型
 *  Mapper<K1, V1, K2, V2>
 * 2、编写一个类继承Reducer,成为自定义的Reducer,主要业务逻辑就是复写其中的reduce函数
 *  reduce
 *  首先要确定清楚Reducer类或者reduce函数它的数据类型/类型参数--->泛型
 *  Reducer<K2, V2s, K3, V3>
 *
 * 需要我们用户自定义的类型就是K2, V2, K3, V3
 * K1和V1一般情况下是固定的,只要数据格式确定,其类型就确定
 * 比如我们操作的是普通的文本文件,那么K1=LongWritable,V1=Text
 * K1--->代表的是这一行记录在整个文本中的偏移量,V1就是这一行文本的内容
 * (也就是说,K1和V1取决于我们要处理的是什么文件)
 * 注意:与Hadoop的程序需要使用Hadoop提供的数据类型,而不能使用java中提供的数据类型

wordcount程序

程序代码中有非常详细的注释,可以参考来进行理解。

WordCount.java

 package com.uplooking.bigdata.mr.wc;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

/**
 * 统计hdfs://uplooking01:9000/input/mr/hello的单词出现次数
 *
 * 整个的解题思路,使用map函数进行单词的拆分,使用reduce函数进行汇总,中间进行shuffle
 * 要想让我们的map函数和reduce函数进行接替运行,需要一个驱动程序
 * 代码的思路:
 * 1、编写一个类继承Mapper,成为自定义的Mapper,主要业务逻辑就是复写其中的map函数
 *  map
 *  首先要确定清楚Mapper类或者map函数的数据类型/类型参数--->泛型
 *  Mapper<K1, V1, K2, V2>
 * 2、编写一个类继承Reducer,成为自定义的Reducer,主要业务逻辑就是复写其中的reduce函数
 *  reduce
 *  首先要确定清楚Reducer类或者reduce函数它的数据类型/类型参数--->泛型
 *  Reducer<K2, V2s, K3, V3>
 *
 * 需要我们用户自定义的类型就是K2, V2, K3, V3
 * K1和V1一般情况下是固定的,只要数据格式确定,其类型就确定
 * 比如我们操作的是普通的文本文件,那么K1=LongWritable,V1=Text
 * K1--->代表的是这一行记录在整个文本中的偏移量,V1就是这一行文本的内容
 * (也就是说,K1和V1取决于我们要处理的是什么文件)
 * 注意:与Hadoop的程序需要使用Hadoop提供的数据类型,而不能使用java中提供的数据类型
 */
public class WordCount {

    /**
     * 这里的main函数就是用来组织map函数和reduce函数的
     * 最终mr的运行会转变成一个个的Job
     *
     * @param args
     */
    public static void main(String[] args) throws Exception {
        // 构建Job所需的配置文件和jobName
        Configuration configuration = new Configuration();
        String jobName = "wordcount";
        // 1.创建一个job
        Job job = Job.getInstance(configuration, jobName);

        // 添加mr要运行的主函数所在的类,就是WordCount这个类
        job.setJarByClass(WordCount.class);

        // 2.设置mr的输入参数
        // 设置计算的文件
        Path inputPath = new Path("hdfs://uplooking01:9000/input/mr/hello");
        FileInputFormat.setInputPaths(job, inputPath);
        // 指定解析数据源的Format类,即将输入的数据解析为<K1, V1>的形式,然后再交由mapper函数处理
        job.setInputFormatClass(TextInputFormat.class);
        // 指定使用哪个mapper来进行计算
        job.setMapperClass(WordCountMapper.class);
        // 指定mapper结果的key的数据类型(即K2的数据类型),注意要与我们写的Mapper中定义的一致
        job.setMapOutputKeyClass(Text.class);
        // 指定mapper结果的value的数据类型(即V2的数据类型),注意要与我们写的Mapper中定义的一致
        job.setMapOutputValueClass(IntWritable.class);

        // 3.设置mr的输出参数
        // 设置输出的目录
        Path outputPath = new Path("hdfs://uplooking01:9000/output/mr/wc");
        // 如果outputPath目录存在,会抛出目录存在异常,这里先删除,保证该目录不存在
        outputPath.getFileSystem(configuration).delete(outputPath, true);
        FileOutputFormat.setOutputPath(job, outputPath);
        // 指定格式化数据结果的Format类
        job.setOutputFormatClass(TextOutputFormat.class);
        // 指定使用哪个reducer来进行汇总
        job.setReducerClass(WordCountReducer.class);
        // 指定reduce结果的key的数据类型(即K3的数据类型),注意要与我们写的Reducer中定义的一致
        job.setOutputKeyClass(Text.class);
        // 指定reduce结果的value的数据类型(即V3的数据类型),注意要与我们写的Reducer中定义的一致
        job.setOutputValueClass(IntWritable.class);

        // 设置有几个reducer来执行mr程序,默认为1个
        job.setNumReduceTasks(1);
        // 提交mapreduce job
        job.waitForCompletion(true);
    }
}

WordCountMapper.java

 package com.uplooking.bigdata.mr.wc;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/**
 * 1、编写一个类继承Mapper,成为自定义的Mapper,主要业务逻辑就是复写其中的map函数
 *  map
 *  首先要确定清楚Mapper类或者map函数的数据类型/类型参数--->泛型
 *  Mapper<K1, V1, K2, V2>
 *  K1:行的偏移量,如第998行
 *  V1:行的内容,如 hello you
 *  K2:输出的数据的key值,如hello
 *  V2:输出的数据的value值,如1
 *  注意,为了减少在网络中传输的数据,map之后得到的结果还需要进行shuffle,将相同key的value汇总起来:
 *  如:
 *  map后的结果有:<hello, 1>, <hello, 1>, <hello, 1>, <you, 1>, <he, 1>, <me, 1>
 *  shuffle后的结果为:<hello, [1, 1, 1]>, <you, [1]>, <he, [1]>, <me, [1]>
 *  这样相比原来map的结果,数据的量就少了许多
 */
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

    @Override
    protected void map(LongWritable k1, Text v1, Context context) throws IOException, InterruptedException {
        // 先将每一行转换为java的String类型
        String line = v1.toString();
        // 将行中的单词以空格作为分隔符分离出来得到一个字符串数组
        String[] words = line.split(" ");
        // 定义输出数据的变量k2和v2,类型分别为Text和IntWritable
        Text k2 = null;
        IntWritable v2 = null;
        // 统计单词并写入到上下文变量context中
        for (String word : words) {
            k2 = new Text(word);
            v2 = new IntWritable(1);
            context.write(k2, v2);
        }
    }
}

WordCountReducer.java

 package com.uplooking.bigdata.mr.wc;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

/**
 * 2、编写一个类继承Reducer,成为自定义的Reducer,主要业务逻辑就是复写其中的reduce函数
 *  reduce
 *  首先要确定清楚Reducer类或者reduce函数它的数据类型/类型参数--->泛型
 *  Reducer<K2, V2s, K3, V3>
 *  K2:map输出中的key值
 *  V2s:map输出中根据本周key值shuffle后得到的可迭代列表
 *  如:<hello, [1, 1, 1]>, <you, [1]>, <he, [1]>, <me, [1]>
 *  K3:reduce输出中的key值
 *  V3:reduce输出中的value值
 */
public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

    @Override
    protected void reduce(Text k2, Iterable<IntWritable> v2s, Context context) throws IOException, InterruptedException {
        // 定义某个key值k2出现次数的变量
        int sum = 0;
        // 统计k2孤个数
        for (IntWritable item : v2s) {
            sum += item.get();
        }
        // 构建reduce输出的k3和v3,类型分别为Text和IntWritable
        Text k3 = k2;
        IntWritable v3 = new IntWritable(sum);
        // 结果reduce结果写入到上下文变量context中
        context.write(k2, v3);
    }
}

测试

将上面的程序打包成jar包,然后上传到我们的hadoop服务器上,执行下面的命令:

 yarn jar wordcount.jar com.uplooking.bigdata.mr.wc.WordCount

这样就可以使用在hadoop中使用我们自己写的wodcount程序来进行MapReduce的计算。

任务执行结束后,通过下面的命令查看结果:

 $ hdfs dfs -cat /output/mr/wc/part-r-00000
18/03/03 13:59:55 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
he  1
hello   3
me  1
you 1

这样就完成了从编写MR程序到测试的完整过程。

原文地址:http://blog.51cto.com/xpleaf/2082642

时间: 2024-07-31 21:53:57

手动实现一个单词统计MapReduce程序与过程原理分析的相关文章

Hadoop:统计文本中单词熟练MapReduce程序

这是搭建hadoop环境后的第一个MapReduce程序: 基于python的脚本: 1 map.py文件,把文本的内容划分成单词: #!/bin/pythonimport sys for line in sys.stdin:    data_list = line.strip().split()    for i in range(0, len(data_list)):        print data_list[i]         2 reduce文件,把统计单词出现的次数: #!/bi

一个单词统计的实例,怎样通过MapReduce完成排序?

假设有一批海量的数据,每个数据都是由26个字母组成的字符串,原始的数据集合是完全无序的,怎样通过MapReduce完成排序工作,使其有序(字典序)呢? 对原始的数据进行分割(Split),得到N个不同的数据分块: 实例分析:WordCount 这个类实现Mapper接口中的map 方法,输入参数中的value 是文本文件中的一行,利用StringTokenizer将这个字符串拆成单词,然后将输出结果<单词,1> 写入到org.apache.hadoop.mapred.OutputCollect

Android应用程序启动过程源代码分析

文章转载至CSDN社区罗升阳的安卓之旅,原文地址:http://blog.csdn.net/luoshengyang/article/details/6689748 前文简要介绍了Android应用程序的Activity的启动过程.在Android系统中,应用程序是由Activity组成的,因此,应用程 序的启动过程实际上就是应用程序中的默认Activity的启动过程,本文将详细分析应用程序框架层的源代码,了解Android应用程序的启动过程. 在上一篇文章Android应用程序的Activit

Android应用程序安装过程源代码分析

文章转载至CSDN社区罗升阳的安卓之旅,原文地址:http://blog.csdn.net/luoshengyang/article/details/6766010 Android系统在启动的过程中,会启动一个应用程序管理服务PackageManagerService,这个服务 负责扫描系统中特定的目录,找到里面的应用程序文件,即以Apk为后缀的文件,然后对这些文件进解析,得到应用程序的相关信息,完成应用程序的安装过程, 本文将详细分析这个过程. 应用程序管理服务PackageManagerSe

hadoop学习之----------IntelliJ IDEA上实现MapReduce中最简单的单词统计的程序(本地 和 hadoop 两种实现方式)

idea上的maven中的pom.xml文件 <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.a

使用Python实现Hadoop MapReduce程序

转自:使用Python实现Hadoop MapReduce程序 英文原文:Writing an Hadoop MapReduce Program in Python 根据上面两篇文章,下面是我在自己的ubuntu上的运行过程.文字基本采用博文使用Python实现Hadoop MapReduce程序,  打字很浪费时间滴. 在这个实例中,我将会向大家介绍如何使用Python 为 Hadoop编写一个简单的MapReduce程序. 尽管Hadoop 框架是使用Java编写的但是我们仍然需要使用像C+

Hadoop之MapReduce程序分析

摘要:Hadoop之MapReduce程序包括三个部分:Mapper,Reducer和作业执行.本文介绍和分析MapReduce程序三部分结构. 关键词:MapReduce   Mapper  Reducer   作业执行 MapReduce程序包括三个部分,分别是Mapper,Reducer和作业执行. Mapper 一个类要充当Mapper需要继承MapReduceBase并实现Mapper接口. Mapper接口负责数据处理阶段.它采用形式为Mapper<K1,V1,K2,V2>的Jav

Ant编译MapReduce程序

本文记录Ant编译MapReduce程序的过程. 程序使用<Hadoop MapReduce Cookbook>中的示例代码. 1.安装Ant Ant项目主页:http://ant.apache.org/ 下载二进制安装包,例如apache-ant-1.9.4-bin.tar.gz,上传到Hadoop集群环境上,解压即完成安装. [huser@master apache-ant-1.9.4]$ pwd /home/huser/hadoop/apache-ant-1.9.4 2.下载示例程序 下

在本地文件系统上测试MapReduce程序

在开发MapReduce程序的过程中,可以首先在本地文件系统上对程序进行测试,而不是一开始就在HDFS上,这样调试起来更加方便. 以<Hadoop权威指南>上的MaxTemperature程序为例,整个项目中包括如下3个源文件,依次为Mapper程序.Reducer程序和job启动程序: MaxTemperatureMapper.java,MaxTemperatureReducer.java,MaxTemperatureDriver.java MaxTemperatureMapper.java