Hadoop阅读笔记(二)——利用MapReduce求平均数和去重

前言:圣诞节来了,我怎么能虚度光阴呢?!依稀记得,那一年,大家互赠贺卡,短短几行字,字字融化在心里;那一年,大家在水果市场,寻找那些最能代表自己心意的苹果香蕉梨,摸着冰冷的水果外皮,内心早已滚烫。这一年……我在博客园-_-#,希望用dt的代码燃烧脑细胞,温暖小心窝。

上篇《Hadoop阅读笔记(一)——强大的MapReduce》主要介绍了MapReduce的在大数据集上处理的优势以及运行机制,通过专利数据编写Demo加深了对于MapReduce中输入输出数据结构的细节理解。有了理论上的指导,仍需手捧我的hadoop圣经——Hadoop实战2,继续走完未走过的路(有种怪怪的感觉,我一直在原地踏步)。

正文:实践是检验真理的唯一标准,不知道这是谁说的,但是作为码农,倒也是实用受用的座右铭。除却大牛们能够在阅读高深理论时new一个并发线程,眼睛所到之处,已然可以理清program的精髓所在,好似一台计算机扫描了所看到的代码一般(有点夸张^_^)。作为普罗大众的搬砖者来说,还是通过一些实例来加深对于理论的认识。

今天主要是通过以下两个例子:求平均成绩、去重来加深对MapReduce的理解。

  1.如何用MapReduce求平均成绩——WordCount的加强版

  在谈平均成绩之前我们回顾下属性的Hadoop HelloWorld程序——WordCount,其主要是统计数据集中各个单词出现的次数。因为次数没有多少之分,如果将这里的次数换成分数就将字数统计问题转化成了求每个个体的总成绩的问题,再加上一步(总成绩/学科数)运算就是这里要讨论的求平均数的问题了。在笔者看来,MapReduce是一种编程思维,它引导码农们如何将一个亟待解决的问题转换为一个MapReduce过程:map阶段输入什么、map过程执行什么、map阶段输出什么、reduce阶段输入什么、执行什么、输出什么。能够将以上几个点弄清楚整明白,一个MapReduce程序就会跃然纸上。这里:

  Map:      指定格式的数据集(如"张三  60")——输入数据

         执行每条记录的分割操作以key-value写入上下文context中——执行功能

             得到指定键值对类型的输出(如"(new Text(张三),new IntWritable(60))")——输出结果

  Reduce:   map的输出——输入数据

       求出单个个体的总成绩后再除以该个体课程数目——执行功能

         得到指定键值对类型的输入——输出结果

  鉴于上面的map和reduce过程,我们可以得到如下的代码:

 1 public class Test1214 {
 2
 3     public static class MapperClass extends Mapper<LongWritable, Text, Text, IntWritable> {
 4         public void map(LongWritable key, Text value, Context context){
 5             String line = value.toString();
 6             System.out.println("该行数据为:" + line);
 7             StringTokenizer token = new StringTokenizer(line,"\t");
 8             String nameT = token.nextToken();
 9             int score = Integer.parseInt(token.nextToken());
10             Text name = new Text(nameT);
11             try {
12                 context.write(name, new IntWritable(score));
13             } catch (IOException e) {
14                 e.printStackTrace();
15             } catch (InterruptedException e) {
16                 e.printStackTrace();
17             }
18         }
19     }
20
21     public static class ReducerClass extends Reducer<Text, IntWritable, Text, IntWritable>{
22         public void reduce(Text key, Iterable<IntWritable> value, Context context){
23             int sum = 0;
24             int count =0;
25             for(IntWritable score : value){
26                 sum += score.get();
27                 count++;
28                 System.out.println("第" + count + "个数值为:" + score.get());
29             }
30             IntWritable avg = new IntWritable(sum/count);
31             try {
32                 context.write(key, avg);
33             } catch (IOException e) {
34                 e.printStackTrace();
35             } catch (InterruptedException e) {
36                 e.printStackTrace();
37             }
38         }
39     }
40     /**
41      * @param args
42      * @throws IOException
43      * @throws ClassNotFoundException
44      * @throws InterruptedException
45      */
46     public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
47
48         Configuration conf = new Configuration();
49         String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
50         if (otherArgs.length != 2) {
51           System.err.println("Usage: wordcount <in> <out>");
52           System.exit(2);
53         }
54         Job job = new Job(conf, "Test1214");
55
56         job.setJarByClass(Test1214.class);
57         job.setMapperClass(MapperClass.class);
58         job.setCombinerClass(ReducerClass.class);
59         job.setReducerClass(ReducerClass.class);
60         job.setOutputKeyClass(Text.class);
61         job.setOutputValueClass(IntWritable.class);
62
63         org.apache.hadoop.mapreduce.lib.input.FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
64         org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
65         System.exit(job.waitForCompletion(true) ? 0 : 1);
66         System.out.println("end");
67     }
68
69 }

  数据集:这里的数据是码农我自己手工创建的,主要是想看看mapreduce的运行过程,所以就创建了两个文件,当然这里面的成绩也就没有什么是否符合正态分布的考虑了……

数据中设定有A-K共11个学生,共16门课程,具体数据如下:

  NameScore1.txt:

A	55
B	65
C	44
D	87
E	66
F	90
G	70
H	59
I	61
J	58
K	40
A	45
B	62
C	64
D	77
E	36
F	50
G	80
H	69
I	71
J	70
K	49
A	51
B	64
C	74
D	37
E	76
F	80
G	50
H	51
I	81
J	68
K	80
A	85
B	55
C	49
D	67
E	69
F	50
G	80
H	79
I	81
J	68
K	80
A	35
B	55
C	40
D	47
E	60
F	72
G	76
H	79
I	68
J	78
K	50
A	65
B	45
C	74
D	57
E	56
F	50
G	60
H	59
I	61
J	58
K	60
A	85
B	45
C	74
D	67
E	86
F	70
G	50
H	79
I	81
J	78
K	60
A	50
B	69
C	40
D	89
E	69
F	95
G	75
H	59
I	60
J	59
K	45

  NameScore2.txt:

A	65
B	75
C	64
D	67
E	86
F	70
G	90
H	79
I	81
J	78
K	60
A	65
B	82
C	84
D	97
E	66
F	70
G	80
H	89
I	91
J	90
K	69
A	71
B	84
C	94
D	67
E	96
F	80
G	70
H	71
I	81
J	98
K	80
A	85
B	75
C	69
D	87
E	89
F	80
G	70
H	99
I	81
J	88
K	60
A	65
B	75
C	60
D	67
E	80
F	92
G	76
H	79
I	68
J	78
K	70
A	85
B	85
C	74
D	87
E	76
F	60
G	60
H	79
I	81
J	78
K	80
A	85
B	65
C	74
D	67
E	86
F	70
G	70
H	79
I	81
J	78
K	60
A	70
B	69
C	60
D	89
E	69
F	95
G	75
H	59
I	60
J	79
K	65

  其执行过程中控制台打印的信息为:

14/12/14 17:05:27 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId=
14/12/14 17:05:27 WARN mapred.JobClient: No job jar file set. User classes may not be found. See JobConf(Class) or JobConf#setJar(String).
14/12/14 17:05:27 INFO input.FileInputFormat: Total input paths to process : 2
14/12/14 17:05:27 INFO mapred.JobClient: Running job: job_local_0001
14/12/14 17:05:27 INFO input.FileInputFormat: Total input paths to process : 2
14/12/14 17:05:27 INFO mapred.MapTask: io.sort.mb = 100
14/12/14 17:05:28 INFO mapred.MapTask: data buffer = 79691776/99614720
14/12/14 17:05:28 INFO mapred.MapTask: record buffer = 262144/327680
该行数据为:A 55
该行数据为:B 65
该行数据为:C 44
该行数据为:D 87
该行数据为:E 66
该行数据为:F 90
该行数据为:G 70
该行数据为:H 59
该行数据为:I 61
该行数据为:J 58
该行数据为:K 40
该行数据为:A 45
该行数据为:B 62
该行数据为:C 64
该行数据为:D 77
该行数据为:E 36
该行数据为:F 50
该行数据为:G 80
该行数据为:H 69
该行数据为:I 71
该行数据为:J 70
该行数据为:K 49
该行数据为:A 51
该行数据为:B 64
该行数据为:C 74
该行数据为:D 37
该行数据为:E 76
该行数据为:F 80
该行数据为:G 50
该行数据为:H 51
该行数据为:I 81
该行数据为:J 68
该行数据为:K 80
该行数据为:A 85
该行数据为:B 55
该行数据为:C 49
该行数据为:D 67
该行数据为:E 69
该行数据为:F 50
该行数据为:G 80
该行数据为:H 79
该行数据为:I 81
该行数据为:J 68
该行数据为:K 80
该行数据为:A 35
该行数据为:B 55
该行数据为:C 40
该行数据为:D 47
该行数据为:E 60
该行数据为:F 72
该行数据为:G 76
该行数据为:H 79
该行数据为:I 68
该行数据为:J 78
该行数据为:K 50
该行数据为:A 65
该行数据为:B 45
该行数据为:C 74
该行数据为:D 57
该行数据为:E 56
该行数据为:F 50
该行数据为:G 60
该行数据为:H 59
该行数据为:I 61
该行数据为:J 58
该行数据为:K 60
该行数据为:A 85
该行数据为:B 45
该行数据为:C 74
该行数据为:D 67
该行数据为:E 86
该行数据为:F 70
该行数据为:G 50
该行数据为:H 79
该行数据为:I 81
该行数据为:J 78
该行数据为:K 60
该行数据为:A 50
该行数据为:B 69
该行数据为:C 40
该行数据为:D 89
该行数据为:E 69
该行数据为:F 95
该行数据为:G 75
该行数据为:H 59
该行数据为:I 60
该行数据为:J 59
该行数据为:K 45
14/12/14 17:05:28 INFO mapred.MapTask: Starting flush of map output
第1个数值为:55
第2个数值为:45
第3个数值为:51
第4个数值为:85
第5个数值为:35
第6个数值为:65
第7个数值为:85
第8个数值为:50
第1个数值为:45
第2个数值为:64
第3个数值为:65
第4个数值为:45
第5个数值为:55
第6个数值为:69
第7个数值为:62
第8个数值为:55
第1个数值为:64
第2个数值为:49
第3个数值为:44
第4个数值为:74
第5个数值为:74
第6个数值为:40
第7个数值为:40
第8个数值为:74
第1个数值为:67
第2个数值为:67
第3个数值为:77
第4个数值为:37
第5个数值为:87
第6个数值为:57
第7个数值为:89
第8个数值为:47
第1个数值为:36
第2个数值为:66
第3个数值为:76
第4个数值为:86
第5个数值为:69
第6个数值为:69
第7个数值为:60
第8个数值为:56
第1个数值为:90
第2个数值为:95
第3个数值为:70
第4个数值为:50
第5个数值为:80
第6个数值为:50
第7个数值为:50
第8个数值为:72
第1个数值为:60
第2个数值为:76
第3个数值为:50
第4个数值为:50
第5个数值为:80
第6个数值为:70
第7个数值为:75
第8个数值为:80
第1个数值为:59
第2个数值为:69
第3个数值为:51
第4个数值为:79
第5个数值为:59
第6个数值为:79
第7个数值为:59
第8个数值为:79
第1个数值为:60
第2个数值为:61
第3个数值为:81
第4个数值为:81
第5个数值为:61
第6个数值为:71
第7个数值为:68
第8个数值为:81
第1个数值为:58
第2个数值为:59
第3个数值为:78
第4个数值为:68
第5个数值为:78
第6个数值为:68
第7个数值为:70
第8个数值为:58
第1个数值为:40
第2个数值为:50
第3个数值为:49
第4个数值为:60
第5个数值为:60
第6个数值为:45
第7个数值为:80
第8个数值为:80
14/12/14 17:05:28 INFO mapred.MapTask: Finished spill 0
14/12/14 17:05:28 INFO mapred.TaskRunner: Task:attempt_local_0001_m_000000_0 is done. And is in the process of commiting
14/12/14 17:05:28 INFO mapred.LocalJobRunner:
14/12/14 17:05:28 INFO mapred.TaskRunner: Task ‘attempt_local_0001_m_000000_0‘ done.
14/12/14 17:05:28 INFO mapred.MapTask: io.sort.mb = 100
14/12/14 17:05:28 INFO mapred.MapTask: data buffer = 79691776/99614720
14/12/14 17:05:28 INFO mapred.MapTask: record buffer = 262144/327680
该行数据为:A 65
该行数据为:B 75
该行数据为:C 64
该行数据为:D 67
该行数据为:E 86
该行数据为:F 70
该行数据为:G 90
该行数据为:H 79
该行数据为:I 81
该行数据为:J 78
该行数据为:K 60
该行数据为:A 65
该行数据为:B 82
该行数据为:C 84
该行数据为:D 97
该行数据为:E 66
该行数据为:F 70
该行数据为:G 80
该行数据为:H 89
该行数据为:I 91
该行数据为:J 90
该行数据为:K 69
该行数据为:A 71
该行数据为:B 84
该行数据为:C 94
该行数据为:D 67
该行数据为:E 96
该行数据为:F 80
该行数据为:G 70
该行数据为:H 71
该行数据为:I 81
该行数据为:J 98
该行数据为:K 80
该行数据为:A 85
该行数据为:B 75
该行数据为:C 69
该行数据为:D 87
该行数据为:E 89
该行数据为:F 80
该行数据为:G 70
该行数据为:H 99
该行数据为:I 81
该行数据为:J 88
该行数据为:K 60
该行数据为:A 65
该行数据为:B 75
该行数据为:C 60
该行数据为:D 67
该行数据为:E 80
该行数据为:F 92
该行数据为:G 76
该行数据为:H 79
该行数据为:I 68
该行数据为:J 78
该行数据为:K 70
该行数据为:A 85
该行数据为:B 85
该行数据为:C 74
该行数据为:D 87
该行数据为:E 76
该行数据为:F 60
该行数据为:G 60
该行数据为:H 79
该行数据为:I 81
该行数据为:J 78
该行数据为:K 80
该行数据为:A 85
该行数据为:B 65
该行数据为:C 74
该行数据为:D 67
该行数据为:E 86
该行数据为:F 70
该行数据为:G 70
该行数据为:H 79
该行数据为:I 81
该行数据为:J 78
该行数据为:K 60
该行数据为:A 70
该行数据为:B 69
该行数据为:C 60
该行数据为:D 89
该行数据为:E 69
该行数据为:F 95
该行数据为:G 75
该行数据为:H 59
该行数据为:I 60
该行数据为:J 79
该行数据为:K 65
14/12/14 17:05:28 INFO mapred.MapTask: Starting flush of map output
第1个数值为:65
第2个数值为:65
第3个数值为:71
第4个数值为:85
第5个数值为:65
第6个数值为:85
第7个数值为:85
第8个数值为:70
第1个数值为:65
第2个数值为:84
第3个数值为:75
第4个数值为:85
第5个数值为:75
第6个数值为:69
第7个数值为:82
第8个数值为:75
第1个数值为:84
第2个数值为:69
第3个数值为:64
第4个数值为:74
第5个数值为:94
第6个数值为:60
第7个数值为:60
第8个数值为:74
第1个数值为:67
第2个数值为:87
第3个数值为:97
第4个数值为:67
第5个数值为:67
第6个数值为:87
第7个数值为:89
第8个数值为:67
第1个数值为:66
第2个数值为:86
第3个数值为:96
第4个数值为:86
第5个数值为:89
第6个数值为:69
第7个数值为:80
第8个数值为:76
第1个数值为:70
第2个数值为:95
第3个数值为:70
第4个数值为:70
第5个数值为:80
第6个数值为:60
第7个数值为:80
第8个数值为:92
第1个数值为:60
第2个数值为:76
第3个数值为:70
第4个数值为:70
第5个数值为:80
第6个数值为:90
第7个数值为:75
第8个数值为:70
第1个数值为:79
第2个数值为:89
第3个数值为:71
第4个数值为:99
第5个数值为:59
第6个数值为:79
第7个数值为:79
第8个数值为:79
第1个数值为:60
第2个数值为:81
第3个数值为:81
第4个数值为:81
第5个数值为:81
第6个数值为:91
第7个数值为:68
第8个数值为:81
第1个数值为:78
第2个数值为:79
第3个数值为:78
第4个数值为:88
第5个数值为:78
第6个数值为:98
第7个数值为:90
第8个数值为:78
第1个数值为:60
第2个数值为:70
第3个数值为:69
第4个数值为:60
第5个数值为:80
第6个数值为:65
第7个数值为:60
第8个数值为:80
14/12/14 17:05:28 INFO mapred.MapTask: Finished spill 0
14/12/14 17:05:28 INFO mapred.TaskRunner: Task:attempt_local_0001_m_000001_0 is done. And is in the process of commiting
14/12/14 17:05:28 INFO mapred.LocalJobRunner:
14/12/14 17:05:28 INFO mapred.TaskRunner: Task ‘attempt_local_0001_m_000001_0‘ done.
14/12/14 17:05:28 INFO mapred.LocalJobRunner:
14/12/14 17:05:28 INFO mapred.Merger: Merging 2 sorted segments
14/12/14 17:05:28 INFO mapred.Merger: Down to the last merge-pass, with 2 segments left of total size: 180 bytes
14/12/14 17:05:28 INFO mapred.LocalJobRunner:
第1个数值为:58
第2个数值为:73
第1个数值为:76
第2个数值为:57
第1个数值为:57
第2个数值为:72
第1个数值为:78
第2个数值为:66
第1个数值为:64
第2个数值为:81
第1个数值为:77
第2个数值为:69
第1个数值为:67
第2个数值为:73
第1个数值为:79
第2个数值为:66
第1个数值为:70
第2个数值为:78
第1个数值为:83
第2个数值为:67
第1个数值为:58
第2个数值为:68
14/12/14 17:05:28 INFO mapred.TaskRunner: Task:attempt_local_0001_r_000000_0 is done. And is in the process of commiting
14/12/14 17:05:28 INFO mapred.LocalJobRunner:
14/12/14 17:05:28 INFO mapred.TaskRunner: Task attempt_local_0001_r_000000_0 is allowed to commit now
14/12/14 17:05:28 INFO output.FileOutputCommitter: Saved output of task ‘attempt_local_0001_r_000000_0‘ to hdfs://hadoop:9000/user/hadoop/output4
14/12/14 17:05:28 INFO mapred.LocalJobRunner: reduce > reduce
14/12/14 17:05:28 INFO mapred.TaskRunner: Task ‘attempt_local_0001_r_000000_0‘ done.
14/12/14 17:05:28 INFO mapred.JobClient: map 100% reduce 100%
14/12/14 17:05:28 INFO mapred.JobClient: Job complete: job_local_0001
14/12/14 17:05:28 INFO mapred.JobClient: Counters: 14
14/12/14 17:05:28 INFO mapred.JobClient: FileSystemCounters
14/12/14 17:05:28 INFO mapred.JobClient: FILE_BYTES_READ=50573
14/12/14 17:05:28 INFO mapred.JobClient: HDFS_BYTES_READ=2630
14/12/14 17:05:28 INFO mapred.JobClient: FILE_BYTES_WRITTEN=103046
14/12/14 17:05:28 INFO mapred.JobClient: HDFS_BYTES_WRITTEN=55
14/12/14 17:05:28 INFO mapred.JobClient: Map-Reduce Framework
14/12/14 17:05:28 INFO mapred.JobClient: Reduce input groups=11
14/12/14 17:05:28 INFO mapred.JobClient: Combine output records=22
14/12/14 17:05:28 INFO mapred.JobClient: Map input records=176
14/12/14 17:05:28 INFO mapred.JobClient: Reduce shuffle bytes=0
14/12/14 17:05:28 INFO mapred.JobClient: Reduce output records=11
14/12/14 17:05:28 INFO mapred.JobClient: Spilled Records=44
14/12/14 17:05:28 INFO mapred.JobClient: Map output bytes=1056
14/12/14 17:05:28 INFO mapred.JobClient: Combine input records=176
14/12/14 17:05:28 INFO mapred.JobClient: Map output records=176
14/12/14 17:05:28 INFO mapred.JobClient: Reduce input records=22

最终结果为:
A 65
B 66
C 64
D 72
E 72
F 73
G 70
H 72
I 74
J 75
K 63

  

  为了更清晰从将要执行的控制台中看到map和reduce过程的执行都进行了那些操作,我们在其中打印了相关信息,这里有自己的两点疑惑需要拿出来闹闹:

  (1).这里我写的程序和书中不一样,没有增加StringTokenizer token = new StringTokenizer(line,"line")这行,事实上我加上去后会出现错误,我的理解是,因为默认格式是TextInputFormat即已经将文件中的文本按照行标示进行分割,即输入给map方法的已经是以一行为单位的记录,所以这里不需要以“\n”进行分割了。书中的做法应该是假定将整个文件拿过来,统一处理,但事实上这里默认的TextInputFormat已经完成了前期工作。(如果执迷不悟这样处理的话,距离来说NameScore1.txt中第一行是“A     55”整个以“\n”分割后就是一个整体了,再以“\t”就无法分割了。)

  (2).从执行过程打印的信息,起初让我有些疑惑,因为从信息来看,似乎是:NameScore1.txt被分割并以每行记录进入map过程,当执行到该文件的最后一行记录时,从打印信息我们似乎看到的是紧接着就去执行reduce过程了,后面的NameScore2.txt也是如此,当两个文件都分别执行了map和reduce时似乎又执行了一次reduce操作。那么事实是不是如此,如果真是这样,这与先前所看到的理论中提到当map执行完后再执行reduce是否有冲突。

  通过查看代码我们发现

  job.setMapperClass(MapperClass.class);
  job.setCombinerClass(ReducerClass.class);
  job.setReducerClass(ReducerClass.class);

  是的,没错,在这里我们发现了端倪,其真正执行过程是:先执行map,这就是过程信息中打印了每条成绩记录,后面执行的reduce其实是介于map和reduce之间的combiner操作,那么这个Combiner类又为何物,通过神奇的API我们可以发现Combine其实就是一次reduce过程,而且这里明确写了combiner和reduce过程都是使用ReducerClass类,从而就出现了这里为什么对于单个文件都会先执行map然后在reduce,再后来是两个map执行后,才执行的真正的reduce。

  2.去重——阉割版的WordCount

  相比于前面的求平均值例子需要添加一些逻辑代码来说,这里的去重更像是阉割版的WordCount。

  如果你还是用传统的思维在考量一个去重的程序需要多少次的判断,如果你还不了解什么是真正的map和reduce。Hadoop中的去重问题被你整复杂了。要知道,当一个map执行完后会对执行的数据进行一个排序,比如按照字母先后顺序;后面会进入combine阶段,这阶段主要是针对key-value中有相同的key就合并;再到reduce阶段,通过迭代器遍历前一阶段合并的各个元素,得到最终的输出结果。

  对于去重来说,我们不在乎一个元素到底出现了几次,只要知道这个元素确实出现了,并能够再最后显示出来就行了,通过map和combiner,我们最终得到的key-value对中的key都是不一样的,也就是说在完成合并的同时就是我们所需要的去重操作(是不是有点绕)。最终reduce输出的就是具有唯一性的去重的元素集合。我们还是按照理清map和reduce的思路来看待这个去重问题:

  map:  数据中的一行记录如"(安徽  jack)"——输入数据

       直接以key-value的方式写入上下文对象context(这里的value并不是我们关心的对象,可以为空)——执行功能

               得到指定键值对类型的输出如"(new Text(安徽),new Text(""))"——输出结果

  reduce: map的输出——输入数据

                直接以key-value的方式写入上下文对象context(同样,value并不是我们关心的对象)——执行功能

           得到指定键值对类型的输入——输出结果 

  鉴于以上对于map和reduce的理解,代码如下:

 1 package org.apache.mapreduce;
 2
 3 import java.io.IOException;
 4 import java.util.Collection;
 5 import java.util.Iterator;
 6 import java.util.StringTokenizer;
 7
 8 import org.apache.hadoop.conf.Configuration;
 9 import org.apache.hadoop.fs.Path;
10 import org.apache.hadoop.io.IntWritable;
11 import org.apache.hadoop.io.LongWritable;
12 import org.apache.hadoop.io.Text;
13 import org.apache.hadoop.mapred.TextInputFormat;
14 import org.apache.hadoop.mapreduce.Job;
15 import org.apache.hadoop.mapreduce.Mapper;
16 import org.apache.hadoop.mapreduce.Reducer;
17 import org.apache.hadoop.util.GenericOptionsParser;
18 import org.apache.mapreduce.Test1123.MapperClass;
19 import org.apache.mapreduce.Test1123.ReducerClass;
20
21 public class Test1215 {
22
23     public static class MapperClass extends Mapper<LongWritable, Text, Text, Text> {
24         public void map(LongWritable key, Text value, Context context){
25
26             try {
27                 context.write(value, new Text(""));
28                 System.out.println("value:" + value);
29             } catch (IOException e) {
30                 e.printStackTrace();
31             } catch (InterruptedException e) {
32                 e.printStackTrace();
33             }
34         }
35     }
36
37     public static class ReducerClass extends Reducer<Text, Text, Text, Text>{
38         public void reduce(Text key, Iterable<Text> value, Context context){
39
40             try {
41                 context.write(key, new Text(""));
42                 System.out.println("key:"+key);
43             } catch (IOException e) {
44                 e.printStackTrace();
45             } catch (InterruptedException e) {
46                 e.printStackTrace();
47             }
48         }
49     }
50     /**
51      * @param args
52      * @throws IOException
53      * @throws ClassNotFoundException
54      * @throws InterruptedException
55      */
56     public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
57
58         Configuration conf = new Configuration();
59         String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
60         if (otherArgs.length != 2) {
61           System.err.println("Usage: wordcount <in> <out>");
62           System.exit(2);
63         }
64         Job job = new Job(conf, "Test1214");
65
66         job.setJarByClass(Test1215.class);
67         job.setMapperClass(MapperClass.class);
68         job.setCombinerClass(ReducerClass.class);
69         job.setReducerClass(ReducerClass.class);
70         job.setOutputKeyClass(Text.class);
71         job.setOutputValueClass(Text.class);
72
73         org.apache.hadoop.mapreduce.lib.input.FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
74         org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
75         System.exit(job.waitForCompletion(true) ? 0 : 1);
76         System.out.println("end");
77     }
78
79 }

  数据集:手动创建两个文件,每个文件内都有重复元素,两个文件内也有重复元素,具体如下:

  repeat1.txt:

安徽 jack
江苏 jim
江西 lucy
广东 david
上海 smith
安徽 jack
江苏 jim
北京 yemener

  repeat2.txt

江西 lucy
安徽 jack
上海 hanmei
北京 yemener
新疆 afanti
黑龙江 lily
福建 tom
安徽 jack

  通过运行,我们发现控制台打印的信息如下:

14/12/15 21:57:07 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId=
14/12/15 21:57:07 WARN mapred.JobClient: No job jar file set. User classes may not be found. See JobConf(Class) or JobConf#setJar(String).
14/12/15 21:57:07 INFO input.FileInputFormat: Total input paths to process : 2
14/12/15 21:57:07 INFO mapred.JobClient: Running job: job_local_0001
14/12/15 21:57:07 INFO input.FileInputFormat: Total input paths to process : 2
14/12/15 21:57:07 INFO mapred.MapTask: io.sort.mb = 100
14/12/15 21:57:07 INFO mapred.MapTask: data buffer = 79691776/99614720
14/12/15 21:57:07 INFO mapred.MapTask: record buffer = 262144/327680
value:安徽 jack
value:江苏 jim
value:江西 lucy
value:广东 david
value:上海 smith
value:安徽 jack
value:江苏 jim
value:北京 yemener
14/12/15 21:57:08 INFO mapred.MapTask: Starting flush of map output
key:上海 smith
key:北京 yemener
key:安徽 jack
key:广东 david
key:江苏 jim
key:江西 lucy
14/12/15 21:57:08 INFO mapred.MapTask: Finished spill 0
14/12/15 21:57:08 INFO mapred.TaskRunner: Task:attempt_local_0001_m_000000_0 is done. And is in the process of commiting
14/12/15 21:57:08 INFO mapred.LocalJobRunner:
14/12/15 21:57:08 INFO mapred.TaskRunner: Task ‘attempt_local_0001_m_000000_0‘ done.
14/12/15 21:57:08 INFO mapred.MapTask: io.sort.mb = 100
14/12/15 21:57:08 INFO mapred.MapTask: data buffer = 79691776/99614720
14/12/15 21:57:08 INFO mapred.MapTask: record buffer = 262144/327680
value:江西 lucy
value:安徽 jack
value:上海 hanmei
value:北京 yemener
value:新疆 afanti
value:黑龙江 lily
value:福建 tom
value:安徽 jack
14/12/15 21:57:08 INFO mapred.MapTask: Starting flush of map output
key:上海 hanmei
key:北京 yemener
key:安徽 jack
key:新疆 afanti
key:江西 lucy
key:福建 tom
key:黑龙江 lily
14/12/15 21:57:08 INFO mapred.MapTask: Finished spill 0
14/12/15 21:57:08 INFO mapred.TaskRunner: Task:attempt_local_0001_m_000001_0 is done. And is in the process of commiting
14/12/15 21:57:08 INFO mapred.LocalJobRunner:
14/12/15 21:57:08 INFO mapred.TaskRunner: Task ‘attempt_local_0001_m_000001_0‘ done.
14/12/15 21:57:08 INFO mapred.LocalJobRunner:
14/12/15 21:57:08 INFO mapred.Merger: Merging 2 sorted segments
14/12/15 21:57:08 INFO mapred.Merger: Down to the last merge-pass, with 2 segments left of total size: 212 bytes
14/12/15 21:57:08 INFO mapred.LocalJobRunner:
key:上海 hanmei
key:上海 smith
key:北京 yemener
key:安徽 jack
key:广东 david
key:新疆 afanti
key:江苏 jim
key:江西 lucy
key:福建 tom
key:黑龙江 lily
14/12/15 21:57:08 INFO mapred.TaskRunner: Task:attempt_local_0001_r_000000_0 is done. And is in the process of commiting
14/12/15 21:57:08 INFO mapred.LocalJobRunner:
14/12/15 21:57:08 INFO mapred.TaskRunner: Task attempt_local_0001_r_000000_0 is allowed to commit now
14/12/15 21:57:08 INFO output.FileOutputCommitter: Saved output of task ‘attempt_local_0001_r_000000_0‘ to hdfs://hadoop:9000/user/hadoop/output5
14/12/15 21:57:08 INFO mapred.LocalJobRunner: reduce > reduce
14/12/15 21:57:08 INFO mapred.TaskRunner: Task ‘attempt_local_0001_r_000000_0‘ done.
14/12/15 21:57:08 INFO mapred.JobClient: map 100% reduce 100%
14/12/15 21:57:08 INFO mapred.JobClient: Job complete: job_local_0001
14/12/15 21:57:08 INFO mapred.JobClient: Counters: 14
14/12/15 21:57:08 INFO mapred.JobClient: FileSystemCounters
14/12/15 21:57:08 INFO mapred.JobClient: FILE_BYTES_READ=50584
14/12/15 21:57:08 INFO mapred.JobClient: HDFS_BYTES_READ=507
14/12/15 21:57:08 INFO mapred.JobClient: FILE_BYTES_WRITTEN=102997
14/12/15 21:57:08 INFO mapred.JobClient: HDFS_BYTES_WRITTEN=140
14/12/15 21:57:08 INFO mapred.JobClient: Map-Reduce Framework
14/12/15 21:57:08 INFO mapred.JobClient: Reduce input groups=10
14/12/15 21:57:08 INFO mapred.JobClient: Combine output records=13
14/12/15 21:57:08 INFO mapred.JobClient: Map input records=16
14/12/15 21:57:08 INFO mapred.JobClient: Reduce shuffle bytes=0
14/12/15 21:57:08 INFO mapred.JobClient: Reduce output records=10
14/12/15 21:57:08 INFO mapred.JobClient: Spilled Records=26
14/12/15 21:57:08 INFO mapred.JobClient: Map output bytes=220
14/12/15 21:57:08 INFO mapred.JobClient: Combine input records=16
14/12/15 21:57:08 INFO mapred.JobClient: Map output records=16
14/12/15 21:57:08 INFO mapred.JobClient: Reduce input records=13

 

  基于以上两个例子的分析,让我们了解map是怎么一回事,reduce又做了什么,在map和reduce之间还有那些猫腻,整个mapreduce是如何一次次的帮助我们完成我们想要的逻辑,当然这里为了方便用的是小数据集,事实上在大数据集上解决这样的问题更能凸显mapreduce高大上和救世主的形象。

  真心觉得Doug Cutting很牛,如何写出这样的框架,低头一想,前面的路还很长。今天就到这,觉得有用,记得点赞哦,你的到来是对我最大的鼓舞^_^

本文链接:《Hadoop阅读笔记(二)——利用MapReduce求平均数和去重》http://www.cnblogs.com/bigdataZJ/p/hadoopreading2.html

时间: 2024-12-24 22:19:03

Hadoop阅读笔记(二)——利用MapReduce求平均数和去重的相关文章

Hadoop阅读笔记(三)——深入MapReduce排序和单表连接

继上篇了解了使用MapReduce计算平均数以及去重后,我们再来一探MapReduce在排序以及单表关联上的处理方法.在MapReduce系列的第一篇就有说过,MapReduce不仅是一种分布式的计算方法,更是一种解决问题的新思维.新思路.将原先看似可以一条龙似的处理一刀切成两端,一端是Map.一端是Reduce,Map负责分,Reduce负责合. 1.MapReduce排序 问题模型: 给出多个数据文件输入如: sortfile1.txt 11 13 15 17 19 21 23 25 27

Hadoop阅读笔记(一)——强大的MapReduce

前言:来园子已经有8个月了,当初入园凭着满腔热血和一脑门子冲动,给自己起了个响亮的旗号“大数据 小世界”,顿时有了种世界都是我的,世界都在我手中的赶脚.可是......时光飞逝,岁月如梭~~~随手一翻自己的博客,可视化已经快占据了半壁江山,思来想去,还是觉得把一直挂在嘴头,放在心头的大数据拿出来说说,哦不,是拿过来学学.入园前期写了有关Nutch和Solr的自己的一些阅读体会和一些尝试,挂着大数据的旗号做着爬虫的买卖.可是,时间在流失,对于大数据的憧憬从未改变,尤其是Hadoop一直让我魂牵梦绕

Hadoop阅读笔记(四)——一幅图看透MapReduce机制

时至今日,已然看到第十章,似乎越是焦躁什么时候能翻完这本圣经的时候也让自己变得更加浮躁,想想后面还有一半的行程没走,我觉得这样“有口无心”的学习方式是不奏效的,或者是收效甚微的.如果有幸能有大牛路过,请指教如何能以效率较高的方式学习Hadoop. 我已经记不清圣经<hadoop 实战2>在我手中停留了多久,但是每一页每一章的翻过去,还是在脑壳里留下了点什么. 一段时间以来,我还是通过这本书加深以及纠正了我对于MapReduce.HDFS乃至Hadoop的新的认识.本篇主要介绍MapReduce

Hadoop阅读笔记(五)——重返Hadoop目录结构

常言道:男人是视觉动物.我觉得不完全对,我的理解是范围再扩大点,不管男人女人都是视觉动物.某些场合(比如面试.初次见面等),别人没有那么多的闲暇时间听你诉说过往以塑立一个关于你的完整模型.所以,第一眼,先走外貌协会的路线,打量一番,再通过望闻问切等各种手段获取关于你的大量信息(如谈吐.举止等),以快速建立起对于你的认识. 待人接物如此,搞技术也不例外,起码我是这样的.把玩了一番Hadoop的MapReduce过程,单词计数.去重.单表关联等运行的时候控制台打印出各种我看懂看不懂的信息,有了这些视

Hadoop阅读笔记(七)——代理模式

关于Hadoop已经小记了六篇,<Hadoop实战>也已经翻完7章.仔细想想,这么好的一个框架,不能只是流于应用层面,跑跑数据排序.单表链接等,想得其精髓,还需深入内部. 按照<Hadoop阅读笔记(五)——重返Hadoop目录结构>中介绍的hadoop目录结构,前面已经介绍了MapReduce的内部运行机制,今天准备入手Hadoop RPC,它是hadoop一种通信机制. RPC(Remote Procedure Call Protocol)——远程过程调用协议,它是一种通过网络

Hadoop阅读笔记(六)——洞悉Hadoop序列化机制Writable

酒,是个好东西,前提要适量.今天参加了公司的年会,主题就是吃.喝.吹,除了那些天生话唠外,大部分人需要加点酒来作催化剂,让一个平时沉默寡言的码农也能成为一个喷子!在大家推杯换盏之际,难免一些画面浮现脑海,有郁闷抓狂的,有出成果喜极而涕的,有不知前途在哪儿的迷茫与不安……总的来说,近一年来,不白活,不虚度,感触良多,不是一言两语能说得清道的明的,有时间可以做个总结,下面还是言归正传谈技术吧. 上篇在了解了Hadoop的目录和源码结构后,说好的要啃源码的,那就得啃.也感谢一直以来关注我.支持我的网友

《逻辑思维简易入门》(第2版) 阅读笔记二

<逻辑思维简易入门>(第2版) 阅读笔记二 本周阅读的是<逻辑思维简易入门>的第三章,也就是说,本书的第一部分就已经读完了. 第三章.信念的优点 信念和负信念是人们在接受一个事物时一种心理态度,延伸来说也就是对事物的认知态度.因为我们在研究 逻辑思维的时候,都有一个前提:“以正常情况以及说话者真诚”,所以有人如果对于一件事物不做回应,我们可以认为这是一种既不相信,也不怀疑的的态度. 信念的优缺点有很多,在书中主要介绍了下面几种: 1.准确性 好的信念实在准确的表达事实,同样真的信念

寒假阅读笔记二

大型网站技术架构-阅读笔记二 模式:每一个模式描述了一个在我们周围不断发生的问题及该问题解决方案的核心.这样你就能一次又一次地使用该方案而不必做重复工作. 分层:将系统在横向维度上切分成几个部分,每个部分负责一部分相对比较单一的职责,然后通过上层对下层的依赖和调用组成一个完整的系统.分层时必须合理规划层次边界和接口,在开发过程中,严格遵循分层架构的约束,禁止跨层次的调用(应用层直接调用数据层)及你想调用(数据层调用服务层,或者服务层调用运用层). 分割:网站越大,功能越复杂,服务和数据处理的种类

构建之法--阅读笔记二

阅读笔记二—代码规范 代码的风格的原则就是:简明,易读,无二义性.我虽然是计算机系的学生,但是我以前却没有秉着这个原则来编写代码,现在阅读了构建之法后,我明白了如何让你的代码变得简明,更容易理解. 代码在编写的过程中注意: 用Tab键缩进 要注意行宽,最多限定100字符的行宽 在复杂的条件表达式中,用括号清楚地表达逻辑优先级 要注意断行与空白的{ }行,有明确的“{”和“}”来判断程序的结构 不要把过多的语句放在同一行上 对变量命名要有实际的意义 用下划线来分隔变量名字中的作用域标注和变量的语义