Hadoop(二):MapReduce程序(Java)

Java版本程序开发过程主要包含三个步骤,一是map、reduce程序开发;第二是将程序编译成JAR包;第三使用Hadoop jar命令进行任务提交。

下面拿一个具体的例子进行说明,一个简单的词频统计,输入数据是一个单词文本,输出每个单词的出现个数。

一、MapReduce程序

  标准的MapReduce程序包含一个Mapper函数、一个Reducer函数和一个main函数

  

1、主程序

 1 package hadoop;
 2 import org.apache.hadoop.conf.Configuration;  // 读写和保存各种配置资源
 3 import org.apache.hadoop.fs.Path;  // 保存文件或者目录的路径
 4 import org.apache.hadoop.io.IntWritable;  // hadoop自身定义的整形类
 5 import org.apache.hadoop.io.Text;  // hadoop自身定义的存储字符串的类
 6 import org.apache.hadoop.mapreduce.Job;  // 每个hadoop任务是一个Job
 7 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  // 读取输入
 8 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  // 将结果存到输出文件
 9 import org.apache.hadoop.util.GenericOptionsParser;  // 解析hadoop的命里行参数
10
11 public class WordCount {
12     public static void main(String[] args) throws Exception {
13         Configuration conf = new Configuration();  // 从hadoop配置文件里读取参数
14         String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();  // 从hadoop命令行读取参数
15         if (otherArgs.length != 2) {  // 从命令行读取的参数正常是两个,分别是输入文件和输出文件的目录
16             System.err.println("Usage: wordcount <in> <out>");
17             System.exit(2);
18         }
19         Job job = new Job(conf, "wordcount");  // 定义一个新的Job,第一个参数是hadoop配置信息,第二个参数是Job的名字
20         job.setJarByClass(WordCount.class);  // 根据WordCount类的位置设置Jar文件
21         job.setMapperClass(WordCountMapper.class);  // 设置mapper文件  
22         job.setReducerClass(WordCountReducer.class);  // 设置reducer文件
23         job.setOutputKeyClass(Text.class);  // 设定输出键的类型
24         job.setOutputValueClass(IntWritable.class);  // 设定输出值的类型
25         FileInputFormat.addInputPath(job, new Path(otherArgs[0])); // 设定输入文件
26         FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); // 设定输出文件
27         System.exit(job.waitForCompletion(true) ? 0 : 1); // 开始执行Job
28     }
29 }

2、mapper程序

 1 package hadoop;
 2
 3 import java.io.IOException;
 4 import java.util.StringTokenizer;  // java提供的字符串分隔函数
 5
 6 import org.apache.hadoop.io.IntWritable;
 7 import org.apache.hadoop.io.Text;
 8 import org.apache.hadoop.mapreduce.Mapper;  // hadoop提供的mapper基类,用户在此基础上进行自己的mapper程序开发
 9
10 public class WordCountMapper extends Mapper<Object,Text,Text,IntWritable>{  // ①
11     IntWritable one = new IntWritable(1);
12     Text word = new Text();
13
14     public void map(Object key,Text value,Context context) throws IOException,InterruptedException{  // ②
15         StringTokenizer itr = new StringTokenizer(value.toString());  // 将字符串根据空格进行分割(value是Text类型的,所以需要将其转化成String类型进行处理)
16         while(itr.hasMoreTokens()){
17             word.set(itr.nextToken());
18             context.write(word,one);
19         }
20     }
21 }

  ① Mapper类包含四个参数,分别用来表示输入数据的key类型、value类型、输出数据的key类型和value类型。在本案例中,输入数据只有一个value没有key,所以将key类型设置为了object,值的类型是Text;对于输出数据,key类型是Text,value的类型是IntWritable。

  ② map方法包含三个参数,分别是输入数据的key类型、value类型和输出数据类型(包含了key和value)

 1 package hadoop;
 2
 3 import java.io.IOException;
 4
 5 import org.apache.hadoop.io.IntWritable;
 6 import org.apache.hadoop.io.Text;
 7 import org.apache.hadoop.mapreduce.Reducer; // Reducer基类  
 8
 9 public class WordCountReducer extends Reducer<Text,IntWritable,Text,IntWritable> {  // ①
10     IntWritable result = new IntWritable();
11     public void reduce(Text key,Iterable<IntWritable>values,Context context) throws IOException,InterruptedException{   // ②
12         int sum = 0;
13         for(IntWritable val:values){
14             sum += val.get();
15         }
16         result.set(sum);
17         context.write(key,result);
18     }
19
20 }

  ① 和Mapper类一致,Reducer类同样包含四个参数,分别用来表示输入数据的key类型、value类型、输出数据的key类型和value类型。在本案例中,reducer的输入数据key类型为Text,值的类型是一个IntWritable的list;对于输出数据,key类型是Text,value的类型是IntWritable。

  ② reduce方法包含三个参数,分别是输入数据的key类型、value类型和输出数据类型(包含了key和value)

mapper阶段的输入 hello world hello hadoop

mapper阶段的输出 <hello 1> <world 1><hello 1> <hadoop 1>

reducer阶段的输入 <hello <1,1>> <world 1><hadoop 1>

reducer阶段的输出 <hello 2> <world 1><hadoop 1>

二、编译打包

1、编译(*.java —>*.class)

  首先进入代码目录,运行以下命令:

  javac -classpath /home/work/usr/hadoop/hadoop-1.2.1/hadoop-core-1.2.1.jar:/home/.../hadoop-1.2.1/lib/commons-cli-1.2.jar

      -d ./classes/ ./src/*.java

  (1)javac:JDK的命令行编译器

  (2)-classpath:设置需要用到的jar包路径,各个jar包之间用":"分隔

  (3)-d:设置编译后的文件存储路径,本案例中存储在./classes/下,即当前目录的classes子目录

  (4)最后一个参数是要被编译的java文件,本案例中是存储在./src/目录下的所有java文件,包含上面所讲的三个类

  注意:hadoop-2.*版本所需要用到的jar包和hadoop-1.*版本有所不同

2、打包

  jar -cvf wordcount.jar -C ./classes/ .

  (1)jar:JDK的打包命令行工具

  (2)-cvf:jar命令的参数

  

  (3)注意最后有一个.代表当前目录,把打包结果放在当前目录下

三、任务提交

1、将处理数据提交到HDFS上

  进入hadoop的安装目录,如上文 cd /home/work/usr/hadoop/hadoop-1.2.1

  (1)在集群上创建输入文件夹:./bin/hadoop fs -mkdir input

  (2)上传本地的数据文件到集群input目录:./bin/hadoop fs -put input/* input

  (3)删除集群上的输出目录(如果目录已经存在会报错):./bin/hadoop fs -rmr output(删除的时候小心点...)

2、运行程序

  ./bin/hadoop jar /../wordcount.jar hadoop.WordCount input output

  (1)jar:指定jar包的位置

  (2)hadoop.WordCount:用户自己定义的包名+主类

  (3)指定输入和输出路径

3、查看输出结果

  ./bin/hadoop fs -cat output/part-00000

注意:

(1)mapreduce程序最后的输出文件通常都是以part-00*这种方式命名的

(2)上述用到了很多hdfs的相关命令,对于hdfs上数据的访问,如果知道它的存储位置,也可以直接进入其目录进行一些查看、删除操作

(3)启动任务之后,命令行会返回当前任务的运行进度

时间: 2024-10-09 17:49:08

Hadoop(二):MapReduce程序(Java)的相关文章

HADOOP之MAPREDUCE程序应用二

摘要:MapReduce程序进行单词计数. 关键词:MapReduce程序  单词计数 数据源:人工构造英文文档file1.txt,file2.txt. file1.txt 内容 Hello   Hadoop I   am  studying   the   Hadoop  technology file2.txt内容 Hello  world The  world  is  very  beautiful I   love    the   Hadoop    and    world 问题描

Hadoop之MapReduce程序应用三

摘要:MapReduce程序进行数据去重. 关键词:MapReduce   数据去重 数据源:人工构造日志数据集log-file1.txt和log-file2.txt. log-file1.txt内容 2014-1-1    wangluqing 2014-1-2    root 2014-1-3   root 2014-1-4  wangluqing 2014-1-5  root 2014-1-6  wangluqing log-file2.txt内容 2014-1-1  root 2014-

用PHP编写Hadoop的MapReduce程序

用PHP写hadoop的mapreduce程序 Hadoop本身是Java写的,所以,给hadoop写mapreduce,人们会自然地想到java 但hadoop里面有个contrib叫做hadoop streaming,这是一个小工具,为hadoop提供streaming支持,使得任何支持标准IO (stdin, stdout)的可执行程序都能成为hadoop的mapper 或者 reducer 例如:hadoop jar hadoop-streaming.jar -input SOME_IN

Hadoop之MapReduce程序分析

摘要:Hadoop之MapReduce程序包括三个部分:Mapper,Reducer和作业执行.本文介绍和分析MapReduce程序三部分结构. 关键词:MapReduce   Mapper  Reducer   作业执行 MapReduce程序包括三个部分,分别是Mapper,Reducer和作业执行. Mapper 一个类要充当Mapper需要继承MapReduceBase并实现Mapper接口. Mapper接口负责数据处理阶段.它采用形式为Mapper<K1,V1,K2,V2>的Jav

Hadoop之MapReduce程序应用一

摘要:MapReduce程序处理专利数据集. 关键词:MapReduce程序   专利数据集 数据源:专利引用数据集cite75_99.txt.(该数据集可以从网址http://www.nber.org/patents/下载) 问题描述: 读取专利引用数据集并对它进行倒排.对于每一个专利,找到那些引用它的专利并进行合并.top5输出结果如下: 1                                3964859, 4647229 10000                      

为Hadoop的MapReduce程序编写makefile

最近需要把基于hadoop的MapReduce程序集成到一个大的用C/C++编写的框架中,需要在make的时候自动将MapReduce应用进行编译和打包.这里以简单的WordCount1为例说明具体的实现细节,注意:hadoop版本为2.4.0. 源代码包含两个文件,一个是WordCount1.java是具体的对单词计数实现的逻辑:第二个是CounterThread.java,其中简单的当前处理的行数做一个统计和打印.代码分别见附1. 编写makefile的关键是将hadoop提供的jar包的路

hadoop(二MapReduce)

hadoop(二MapReduce) 介绍 MapReduce:其实就是把数据分开处理后再将数据合在一起. Map负责“分”,即把复杂的任务分解为若干个“简单的任务”来并行处理.可以进行拆分的前提是这些小任务可以并行计算,彼此间几乎没有依赖关系. Reduce负责“合”,即对map阶段的结果进行全局汇总. MapReduce运行在yarn集群 MapReduce中定义了如下的Map和Reduce两个抽象的编程接口,由用户去编程实现.Map和Reduce, MapReduce处理的数据类型是键值对

Hadoop之MapReduce程序开发流程

摘要:MapReduce程序开发流程遵循算法思路.Mapper.Reducer.作业执行的步骤. 关键词:MapReduce 程序   开发流程 对于一个数据处理问题,若需要MapReduce,那么如何设计和实现?MapReduce程序基础模板,包含两个部分,一个是map,一个是reduce.map和reduce的设计取决解决问题的算法思路:而map和reduce的执行需要作业的调度. 因此,MapReduce程序开发可以遵循以下流程. 第一步:清楚问题是什么,确定解决问题的算法思路. 第二步:

hadoop开发MapReduce程序

准备工作: 1.设置HADOOP_HOME,指向hadoop安装目录,否则报这个错: 2.在window下,需要把hadoop/bin那个目录替换下,在网上搜一个对应版本的 3.如果还报org.apache.hadoop.io.nativeio.NativeIO$Windows.access0错,把其中的hadoop.dll复制到c:\windows\system32目录 依赖的jar 1.common hadoop-2.7.3\share\hadoop\common\hadoop-common

hadoop-初学者写map-reduce程序中容易出现的问题 3

1.写hadoop的map-reduce程序之前所必须知道的基础知识: 1)hadoop map-reduce的自带的数据类型: Hadoop提供了如下内容的数据类型,这些数据类型都实现了WritableComparable接口,以便用这些类型定义的数据可以被序列化进行网络传输和文件存储,以及进行大小比较.(如果是自定义的key,value的数据类型,必须也要写其大小比较的方法) BooleanWritable:标准布尔型数值 ByteWritable:单字节数值 DoubleWritable: