入门程序 ==> WordCount

package com.rabbit.hadoop.mapreduce;

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.omg.CORBA.ARG_IN;

/**
* MapReduce
* Word counter
*/

public class WordCountMapReduce extends Configured implements Tool{

// 1. Map Class
public static class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

private Text mapOutPutKey = new Text();
private final static IntWritable mapOutPutValue = new IntWritable(1);

@Override
public void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context)
throws IOException, InterruptedException {

// read one line <0,hadoop yarn> 偏移量,值
String lineValue = value.toString(); // 获取值

// words split
StringTokenizer stringTokenizer = new StringTokenizer(lineValue);

//iterator
while (stringTokenizer.hasMoreTokens()) {
String word = stringTokenizer.nextToken();

mapOutPutKey.set(word);

context.write(mapOutPutKey, mapOutPutValue);

}
}

}

// 2. Reduce Class
public static class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

private IntWritable outputValue = new IntWritable();

@Override
protected void reduce(Text keys, Iterable<IntWritable> values,
Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {

// <hadoop,1> <yarn,1> <hadoop,1> <hive,1> <hadoop,1>...... -> 分组 <hadoop,list(1,1,1)>
// list中的元素相加,求和,就是"hadoop"这个单词的出现次数。
System.out.println("input key: "+keys.toString());
System.out.println("::::::::::::::::::::::::::::::::::::::");

int sum = 0;
for (IntWritable value : values) {
System.out.println("recursively print key: "+keys.toString()+" value is: "+value.get());
sum += value.get();
}

outputValue.set(sum);

context.write(keys, outputValue);
}

}

// 3. Driver
public int run(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

// 1. get configuration

Configuration configuration = getConf();

// 2. create job : parameters: configuration, jobName
Job job = Job.getInstance(configuration,this.getClass().getSimpleName());

// 3.run jars
job.setJarByClass(this.getClass());

// 4. set job:
// input -> map -> reduce -> output
// input
Path inPath = new Path(args[0]);

FileInputFormat.addInputPath(job, inPath);

// set mapper class,output key, output value
job.setMapperClass(WordCountMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);

// set reducer class,output key, output value
job.setReducerClass(WordCountReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);

// output
Path outPath = new Path(args[1]);
FileOutputFormat.setOutputPath(job, outPath);

// 5. submit job
boolean isSuccess = job.waitForCompletion(true);

return isSuccess? 0 : 1;

}

// run program
public static void main(String[] args) throws Exception {

Configuration configuration = new Configuration();
// int status = new WordCountMapReduce().run(args);
// configuration.set("mapreduce.framework.name", "yarn");  表示提交到yarn上面执行
args = new String[] {"D:\\input\\1.txt","D:\\output2"};

int status = ToolRunner.run(configuration, new WordCountMapReduce(), args);

System.exit(status);
}

}

======================================================================================================

MapReduce的写法比较固定,主要就是重写map()和 reduce()两个方法,在方法里面实现自己的逻辑。

整个mr程序执行的过程可以归结为5步: input -> map -> shuffle -> reduce -> output

shuflle阶段可以说是带reduce的mr程序的核心了。之所以说是带reduce的mr的核心,是因为mr可以没有reduce方法,但一定有map方法。

shuffle的过程中包含了排序、溢写、分区、合并、分组这些操作。

mr在读数据的时候是按键值对的形式读取的,key即为每一行开头在整个文件中的偏移量,value即为这一行的文本内容。

比如 :

Hadoop  Hadoop Hive

Spark Hadoop Hive

读到mr中的格式为 <0,"Hadoop  Hadoop Hive"> , <19,"Spark Hadoop Hive">  。回车符也是一个字符,所以第二行的起始位置是19

我们要统计每个单词出现的次数,首先需要将单个的单词按照空格切割出来。将单个单词以键值对的形式写出,得到这种形式:<Hadoop,1> <Hadoop,1> <Hadoop,1> <Hive,1> <Hive,1><Spark,1> 。这个逻辑在map()中实现,可以理解为对于读到的每一行都做相同的处理。

每个map会溢写出一个或者多个小文件,在溢写之前会在内存中将key按字典序排序。如果指定了多个reduce,那么还会做分区。分区编号可以简单理解为 key的hashcode 对reduce的个数取模。

假如有两个分区,那么每个key的分区编号就为 key.hashcode() % 2 ,每个数字对2取模只有0和1两个值,就代表了两个分区。

接下来,每个map溢写出的小文件会合并成大文件。合并的过程中还是会按照key的字典序排序,并且保留分区。

经过这些过程,最终,每个map会有一个合并后的大文件。

接下来,reduce会主动去到这些大文件中拉取属于自己分区的数据,并且将拿到的分散的数据再做合并,形成一个文件。

接着,分组操作来了。我们目前的数据形式还是 <Hadoop,1> <Hadoop,1> <Hadoop,1> <Hive,1> <Hive,1><Spark,1> 。 经过分组操作之后,就会变成这种形式:<Hadoop,[1,1,1]> <Hive,[1,1]> <Spark,[1]>

所以reduce方法的第二个参数是 Iterable<>,它是一个迭代器。实现累加就可以计算出key出现的次数了。

原文地址:https://www.cnblogs.com/rabbit624/p/10551557.html

时间: 2024-10-18 18:07:57

入门程序 ==> WordCount的相关文章

Hadoop入门经典:WordCount

以下程序在hadoop1.2.1上测试成功. 本例先将源代码呈现,然后详细说明执行步骤,最后对源代码及执行过程进行分析. 一.源代码 package org.jediael.hadoopdemo.wordcount; import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path;

(转载)Hadoop示例程序WordCount详解

最近在学习云计算,研究Haddop框架,费了一整天时间将Hadoop在Linux下完全运行起来,看到官方的map-reduce的demo程序WordCount,仔细研究了一下,算做入门了. 其实WordCount并不难,只是一下子接触到了很多的API,有一些陌生,还有就是很传统的开发相比,map-reduce确实是一种新的编程理念,为了让各位新手少走弯路,我将WordCount中的很多API都做了注释,其实这些方法搞明白了以后程序就很简单了,无非就是将一句话分词,先用map处理再用reduce处

Mahout学习之Mahout简介、安装、配置、入门程序测试

一.Mahout简介 查了Mahout的中文意思--驭象的人,再看看Mahout的logo,好吧,想和小黄象happy地玩耍,得顺便陪陪这位驭象人耍耍了... 附logo: (就是他,骑在象头上的那个Mahout) 步入正文啦: Mahout 是一个很强大的数据挖掘工具,是一个分布式机器学习算法的集合,包括:被称为Taste的分布式协同过滤的实现.分类.聚类等.Mahout最大的优点就是基于hadoop实现,把很多以前运行于单机上的算法,转化为了MapReduce模式,这样大大提升了算法可处理的

springmvc入门程序

springmvc的入门程序(和前面的mybatis使用同一个案例,商城的案例.订单,购物车等) 需求: 功能需求: 商品的列表查询 环境准备: Java环境: Jdk1.7 Myeclipes9 Springmvc版本:spring3.2 需要spring3.2所有jar,一定要包括spring-webmvc那个 开发步骤: 1.导入jar包,这个不解释了. 2.在web.xml中配置前端控制器: <servlet> <servlet-name>springmvc</ser

ant入门程序

一. ant简介 Ant是apache的一个核心项目, 它的作用是项目自动化构建, 因为它内置了Javac.Java.创建目录.复制文件等功能, 直接运行build.xml文件就可以编译我们的项目. 二. ant配置 1. 新建环境变量ANT_HOME: ant的解压目录 2. 在path中配置:%ANT_HOME%/bin; 3. 在命令行中输入ant, 如果出现 Buildfile:build.xml does not exist! Build failed 说明配置成功 三. ant入门程

python web入门程序

python2.x web入门程序 #!/usr/bin/python # -*- coding: UTF-8 -*- # 只在python2.x 有效 import os #Python的标准库中的os模块包含普遍的操作系统功能 import re #引入正则表达式对象 import urllib #用于对URL进行编解码 from BaseHTTPServer import HTTPServer, BaseHTTPRequestHandler #导入HTTP处理相关的模块 #自定义处理程序,

MyBatis 介绍、简单入门程序

JDBC 编程中的问题 1. 将 SQL 语句硬编码到 Java 代码.不利于系统维护. 设想怎样解决:将SQL单独抽取出来,在配置文件(xml方式.properties文件)进行配置. 2. 数据库连接不能反复利用,对数据库资源是一中浪费. 设想怎样解决:使用数据库连接池管理数据库连接. 3. 向 Statement 设置參数时,对于參数的位置通过硬编码指定,不利于系统维护. 设想怎样解决:是否可以自己主动将 Java 对象的值设置到 Statement. 4. 遍历结果集.resultSet

struts2入门程序

1.搭建struts2环境开发的步骤 搭建struts2环境时,我们一般需要做一下几个步骤的工作: 1.  创建javaweb工程 2.  找到开发struts应用所需要使用的jar 3.  创建jsp文件 4.  创建action文件 5.  编写struts2的配置文件 6.  在web.xml中加入struts2 MVC框架启动配置 开发struts2中需要使用的基本jar包: 关于struts2的提示: 我的myeclipes默认的struts2提示只到2.1,如果需要有2.3的提示,有

.Net 转战 Android 4.4 日常笔记(2)--HelloWorld入门程序

原文:.Net 转战 Android 4.4 日常笔记(2)--HelloWorld入门程序 我不知道人们为什么那么喜欢用HelloWorld来做为自己的第一个程序入门,为什么不是hello **其他的东西或者hi. 一.打开ADT 的Eclipse开发工具新建一个Android项目 New----> Android Application Project Minimum Required SDK这个是运行hello world的最低android版本 Target SDK是现在的目标版本 Co