Hadoop日记Day17---计数器、map规约、分区学习

一、Hadoop计数器

1.1 什么是Hadoop计数器

  Haoop是处理大数据的,不适合处理小数据,有些大数据问题是小数据程序是处理不了的,他是一个高延迟的任务,有时处理一个大数据需要花费好几个小时这都是正常的。下面我们说一下Hadoop计数器,Hadoop计数器就相当于我们的日志,而日志可以让我们查看程序运行时的很多状态,而计数器也有这方面的作用。那么就研究一下Hadoop自身的计数器。计数器的程序如代码1.1所示,下面代码还是以内容为“hello you;hell0 me”的单词统计为例。

 1 package counter;
 2
 3 import java.net.URI;
 4
 5 import org.apache.hadoop.conf.Configuration;
 6 import org.apache.hadoop.fs.FileSystem;
 7 import org.apache.hadoop.fs.Path;
 8 import org.apache.hadoop.io.LongWritable;
 9 import org.apache.hadoop.io.Text;
10 import org.apache.hadoop.mapreduce.Counter;
11 import org.apache.hadoop.mapreduce.Job;
12 import org.apache.hadoop.mapreduce.Mapper;
13 import org.apache.hadoop.mapreduce.Reducer;
14 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
15 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
16 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
17 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
18 import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
19
20 public class WordCountApp {
21     static final String INPUT_PATH = "hdfs://hadoop:9000/input";
22     static final String OUT_PATH = "hdfs://hadoop:9000/output";
23
24     public static void main(String[] args) throws Exception {
25
26         Configuration conf = new Configuration();
27
28         final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), conf);
29         final Path outPath = new Path(OUT_PATH);
30
31         if(fileSystem.exists(outPath)){
32             fileSystem.delete(outPath, true);
33         }
34         final Job job = new Job(conf , WordCountApp.class.getSimpleName());
35
36         //1.1指定读取的文件位于哪里
37         FileInputFormat.setInputPaths(job, INPUT_PATH);
38         job.setInputFormatClass(TextInputFormat.class);//指定如何对输入文件进行格式化,把输入文件每一行解析成键值对
39
40         //1.2 指定自定义的map类
41         job.setMapperClass(MyMapper.class);
42         job.setMapOutputKeyClass(Text.class);//map输出的<k,v>类型。
43         job.setMapOutputValueClass(LongWritable.class);//如果<k3,v3>的类型与<k2,v2>类型一致,则可以省略
44
45         //1.3 分区
46         job.setPartitionerClass(HashPartitioner.class);
47         job.setNumReduceTasks(1);//有一个reduce任务运行
48
49         //2.2 指定自定义reduce类
50         job.setReducerClass(MyReducer.class);
51
52         job.setOutputKeyClass(Text.class);//指定reduce的输出类型
53         job.setOutputValueClass(LongWritable.class);
54
55         //2.3 指定写出到哪里
56         FileOutputFormat.setOutputPath(job, outPath);
57         job.setOutputFormatClass(TextOutputFormat.class);//指定输出文件的格式化类
58
59         job.waitForCompletion(true);//把job提交给JobTracker运行
60     }
61
62     /**
63      * KEYIN    即k1        表示行的偏移量
64      * VALUEIN    即v1        表示行文本内容
65      * KEYOUT    即k2        表示行中出现的单词
66      * VALUEOUT    即v2        表示行中出现的单词的次数,固定值1
67      */
68     static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable>{
69         protected void map(LongWritable k1, Text v1, Context context) throws java.io.IOException ,InterruptedException {
70             final String line = v1.toString();
71             final String[] splited = line.split("\t");
72             for (String word : splited) {
73                 context.write(new Text(word), new LongWritable(1));
74             }
75         };
76     }
77
78     /**
79      * KEYIN    即k2        表示行中出现的单词
80      * VALUEIN    即v2        表示行中出现的单词的次数
81      * KEYOUT    即k3        表示文本中出现的不同单词
82      * VALUEOUT    即v3        表示文本中出现的不同单词的总次数
83      *
84      */
85     static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable>{
86         protected void reduce(Text k2, java.lang.Iterable<LongWritable> v2s, Context ctx) throws java.io.IOException ,InterruptedException {
87             long times = 0L;
88             for (LongWritable count : v2s) {
89                 times += count.get();
90             }
91             ctx.write(k2, new LongWritable(times));
92         };
93     }
94
95 }

代码 1.1

  运行结果如下图1.1所示。

Counters: 19//Counter表示计数器,19表示有19个计数器(下面一共4计数器组)
   File Output Format Counters //文件输出格式化计数器组
     Bytes Written=19       //reduce输出到hdfs的字节数,一共19个字节
   FileSystemCounters//文件系统计数器组
     FILE_BYTES_READ=481
     HDFS_BYTES_READ=38
     FILE_BYTES_WRITTEN=81316
     HDFS_BYTES_WRITTEN=19
   File Input Format Counters //文件输入格式化计数器组
     Bytes Read=19     //map从hdfs读取的字节数
   Map-Reduce Framework//MapReduce框架
     Map output materialized bytes=49
     Map input records=2       //map读入的记录行数,读取两行记录,”hello you”,”hello me”
     Reduce shuffle bytes=0//规约分区的字节数
     Spilled Records=8
     Map output bytes=35
     Total committed heap usage (bytes)=266469376
     SPLIT_RAW_BYTES=105
     Combine input records=0//合并输入的记录数
     Reduce input records=4     //reduce从map端接收的记录行数
     Reduce input groups=3     //reduce函数接收的key数量,即归并后的k2数量
     Combine output records=0//合并输出的记录数
     Reduce output records=3    //reduce输出的记录行数。<helllo,{1,1}>,<you,{1}>,<me,{1}>
     Map output records=4     //map输出的记录行数,输出4行记录

图 1.1

  通过上面我们对计数器的分析,可以知道,我们可以通过计数器来分析MapReduece程序的运行状态。

1.2 自定义计数器

  通过上面的分析,我们了解了计数器的作用,那么我们可以自定义一个计数器,来实现我们自己想要的功能。定义一个记录敏感词的计数器,记录敏感词在一行所出现的次数,如代码2.1所示。我们处理文件内容为“hello you”,“hello me”。

 1 Counters: 19//Counter表示计数器,19表示有19个计数器(下面一共4计数器组)
 2    File Output Format Counters //文件输出格式化计数器组
 3      Bytes Written=19       //reduce输出到hdfs的字节数,一共19个字节
 4    FileSystemCounters//文件系统计数器组
 5      FILE_BYTES_READ=481
 6      HDFS_BYTES_READ=38
 7      FILE_BYTES_WRITTEN=81316
 8      HDFS_BYTES_WRITTEN=19
 9    File Input Format Counters //文件输入格式化计数器组
10      Bytes Read=19     //map从hdfs读取的字节数
11    Map-Reduce Framework//MapReduce框架
12      Map output materialized bytes=49
13      Map input records=2       //map读入的记录行数,读取两行记录,”hello you”,”hello me”
14      Reduce shuffle bytes=0//规约分区的字节数
15      Spilled Records=8
16      Map output bytes=35
17      Total committed heap usage (bytes)=266469376
18      SPLIT_RAW_BYTES=105
19      Combine input records=0//合并输入的记录数
20      Reduce input records=4     //reduce从map端接收的记录行数
21      Reduce input groups=3     //reduce函数接收的key数量,即归并后的k2数量
22      Combine output records=0//合并输出的记录数
23      Reduce output records=3    //reduce输出的记录行数。<helllo,{1,1}>,<you,{1}>,<me,{1}>
24      Map output records=4     //map输出的记录行数,输出4行记录

代码2.1

运行结果如下图2.1所示。

 Counters: 20
   Sensitive Words
     hello=2
   File Output Format Counters
     Bytes Written=21
   FileSystemCounters
     FILE_BYTES_READ=359
     HDFS_BYTES_READ=42
     FILE_BYTES_WRITTEN=129080
     HDFS_BYTES_WRITTEN=21
   File Input Format Counters
     Bytes Read=21
   Map-Reduce Framework
     Map output materialized bytes=67
     Map input records=2
     Reduce shuffle bytes=0
     Spilled Records=8
     Map output bytes=53
     Total committed heap usage (bytes)=391774208
     SPLIT_RAW_BYTES=95
     Combine input records=0
     Reduce input records=4
     Reduce input groups=3
     Combine output records=0
     Reduce output records=3
     Map output records=4

图 2.1

二、Combiners编程

2.1 什么是Combiners

  从上面程序运行的结果我们可以发现,在Map-Reduce Framework即MapReduce框架的输出中,Combine input records这个字段为零, 那么combine怎么使用呢?其实这是MapReduce程序中Mapper任务中第五步,这是可选的一步,使用方法非常简单,以上面单词统计为例,只需添加下面一行代码即可,如下: job.setCombinerClass(MyReducer.class);

  combine操作是一个可选的操作,使用时需要我们自己设定,我们用MyReducer类来设置Combiners,表示Combiners与Reduce功能相同,带有combine功能的MapRduce程序如代码3.1所示。

  1 package combine;
  2
  3 import java.net.URI;
  4
  5 import org.apache.hadoop.conf.Configuration;
  6 import org.apache.hadoop.fs.FileSystem;
  7 import org.apache.hadoop.fs.Path;
  8 import org.apache.hadoop.io.LongWritable;
  9 import org.apache.hadoop.io.Text;
 10 import org.apache.hadoop.mapreduce.Job;
 11 import org.apache.hadoop.mapreduce.Mapper;
 12 import org.apache.hadoop.mapreduce.Partitioner;
 13 import org.apache.hadoop.mapreduce.Reducer;
 14 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 15 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
 16 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 17 import org.apache.jasper.tagplugins.jstl.core.If;
 18
 19 public class WordCountApp2 {
 20     static final String INPUT_PATH = "hdfs://hadoop:9000/hello";
 21     static final String OUT_PATH = "hdfs://hadoop:9000/out";
 22
 23     public static void main(String[] args) throws Exception {
 24         Configuration conf = new Configuration();
 25         final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), conf);
 26         final Path outPath = new Path(OUT_PATH);
 27         if(fileSystem.exists(outPath)){
 28             fileSystem.delete(outPath, true);
 29         }
 30         final Job job = new Job(conf , WordCountApp2.class.getSimpleName());
 31         job.setJarByClass(WordCountApp2.class);
 32
 33         //1.1指定读取的文件位于哪里
 34         FileInputFormat.setInputPaths(job, INPUT_PATH);
 35         job.setInputFormatClass(TextInputFormat.class);//指定如何对输入文件进行格式化,把输入文件每一行解析成键值对
 36
 37         //1.2 指定自定义的map类
 38         job.setMapperClass(MyMapper.class);
 39         job.setMapOutputKeyClass(Text.class);//map输出的<k,v>类型。
 40         job.setMapOutputValueClass(LongWritable.class);//如果<k3,v3>的类型与<k2,v2>类型一致,则可以省略
 41
 42         //1.3 分区
 43         job.setPartitionerClass(MyPartitioner.class);
 44         //有几个reduce任务运行
 45         job.setNumReduceTasks(2);
 46
 47         //1.4 TODO 排序、分组
 48
 49         //1.5 规约
 50         job.setCombinerClass(MyCombiner.class);
 51
 52         //2.2 指定自定义reduce类
 53         job.setReducerClass(MyReducer.class);
 54         //指定reduce的输出类型
 55         job.setOutputKeyClass(Text.class);
 56         job.setOutputValueClass(LongWritable.class);
 57
 58         //2.3 指定写出到哪里
 59         FileOutputFormat.setOutputPath(job, outPath);
 60         //指定输出文件的格式化类
 61         //job.setOutputFormatClass(TextOutputFormat.class);
 62
 63         //把job提交给JobTracker运行
 64         job.waitForCompletion(true);
 65     }
 66
 67     static class MyPartitioner extends Partitioner<Text, LongWritable>{
 68         @Override
 69         public int getPartition(Text key, LongWritable value, int numReduceTasks) {
 70             return (key.toString().equals("hello"))?0:1;
 71         }
 72     }
 73
 74     /**
 75      * KEYIN    即k1        表示行的偏移量
 76      * VALUEIN    即v1        表示行文本内容
 77      * KEYOUT    即k2        表示行中出现的单词
 78      * VALUEOUT    即v2        表示行中出现的单词的次数,固定值1
 79      */
 80     static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable>{
 81         protected void map(LongWritable k1, Text v1, Context context) throws java.io.IOException ,InterruptedException {
 82             final String[] splited = v1.toString().split("\t");
 83             for (String word : splited) {
 84                 context.write(new Text(word), new LongWritable(1));
 85                 System.out.println("Mapper输出<"+word+","+1+">");
 86             }
 87         };
 88     }
 89
 90     /**
 91      * KEYIN    即k2        表示行中出现的单词
 92      * VALUEIN    即v2        表示行中出现的单词的次数
 93      * KEYOUT    即k3        表示文本中出现的不同单词
 94      * VALUEOUT    即v3        表示文本中出现的不同单词的总次数
 95      *
 96      */
 97     static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable>{
 98         protected void reduce(Text k2, java.lang.Iterable<LongWritable> v2s, Context ctx) throws java.io.IOException ,InterruptedException {
 99             //显示次数表示redcue函数被调用了多少次,表示k2有多少个分组
100             System.out.println("MyReducer输入分组<"+k2.toString()+",...>");
101             long times = 0L;
102             for (LongWritable count : v2s) {
103                 times += count.get();
104                 //显示次数表示输入的k2,v2的键值对数量
105                 System.out.println("MyReducer输入键值对<"+k2.toString()+","+count.get()+">");
106             }
107             ctx.write(k2, new LongWritable(times));
108         };
109     }
110
111
112     static class MyCombiner extends Reducer<Text, LongWritable, Text, LongWritable>{
113         protected void reduce(Text k2, java.lang.Iterable<LongWritable> v2s, Context ctx) throws java.io.IOException ,InterruptedException {
114             //显示次数表示redcue函数被调用了多少次,表示k2有多少个分组
115             System.out.println("Combiner输入分组<"+k2.toString()+",...>");
116             long times = 0L;
117             for (LongWritable count : v2s) {
118                 times += count.get();
119                 //显示次数表示输入的k2,v2的键值对数量
120                 System.out.println("Combiner输入键值对<"+k2.toString()+","+count.get()+">");
121             }
122
123             ctx.write(k2, new LongWritable(times));
124             //显示次数表示输出的k2,v2的键值对数量
125             System.out.println("Combiner输出键值对<"+k2.toString()+","+times+">");
126         };
127     }
128 }

代码 3.1

  运行结果如下图3.1所示。

Counters: 20
   Sensitive Words
     hello=2
   File Output Format Counters
     Bytes Written=21
   FileSystemCounters
     FILE_BYTES_READ=359
     HDFS_BYTES_READ=42
     FILE_BYTES_WRITTEN=129080
     HDFS_BYTES_WRITTEN=21
   File Input Format Counters
     Bytes Read=21
   Map-Reduce Framework
     Map output materialized bytes=67
     Map input records=2
     Reduce shuffle bytes=0
     Spilled Records=8
     Map output bytes=53
     Total committed heap usage (bytes)=391774208
     SPLIT_RAW_BYTES=95
     Combine input records=4
     Reduce input records=3
     Reduce input groups=3
     Combine output records=3
     Reduce output records=3
     Map output records=4

图 3.1

  从上面的运行结果我们可以发现,此时Combine input records=4,Combine output records=3,Reduce input records=3,因为Combine阶段在Ma pper结束与Reducer开始之间,Combiners处理的数据,就是在不设置Combiners时,Reduce所应该接受的数据,所以为4,然后再将Combiners的输出作为Re duce端的输入,所以Reduce input records这个字段由4变成了3。注意,combine操作是一个可选的操作,使用时需要我们自己设定,在本代码中我们用MyRed ucer类来设置Combiners,Combine方法的使用的是Reduce的方法,这说明归约的方法是通用的,Reducer阶段的方法也可以用到Mapper阶段。

2.1 自定义Combiners

  为了能够更加清晰的理解Combiners的工作原理,我们自定义一个Combiners类,不再使用MyReduce做为Combiners的类,如代码3.2所示。

  1 package combine;
  2
  3 import java.net.URI;
  4
  5 import org.apache.hadoop.conf.Configuration;
  6 import org.apache.hadoop.fs.FileSystem;
  7 import org.apache.hadoop.fs.Path;
  8 import org.apache.hadoop.io.LongWritable;
  9 import org.apache.hadoop.io.Text;
 10 import org.apache.hadoop.mapreduce.Job;
 11 import org.apache.hadoop.mapreduce.Mapper;
 12 import org.apache.hadoop.mapreduce.Partitioner;
 13 import org.apache.hadoop.mapreduce.Reducer;
 14 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 15 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
 16 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 17 import org.apache.jasper.tagplugins.jstl.core.If;
 18
 19 /**
 20  * 问:为什么使用Combiner?
 21  * 答:Combiner发生在Map端,对数据进行规约处理,数据量变小了,传送到reduce端的数据量变小了,传输时间变短,作业的整体时间变短。
 22  *
 23  * 问:为什么Combiner不作为MR运行的标配,而是可选步骤哪?
 24  * 答:因为不是所有的算法都适合使用Combiner处理,例如求平均数。
 25  *
 26  * 问:Combiner本身已经执行了reduce操作,为什么在Reducer阶段还要执行reduce操作哪?
 27  * 答:combiner操作发生在map端的,处理一个任务所接收的文件中的数据,不能跨map任务执行;只有reduce可以接收多个map任务处理的数据。
 28  *
 29  */
 30 public class WordCountApp2 {
 31     static final String INPUT_PATH = "hdfs://hadoop:9000/hello";
 32     static final String OUT_PATH = "hdfs://hadoop:9000/out";
 33
 34     public static void main(String[] args) throws Exception {
 35         Configuration conf = new Configuration();
 36         final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), conf);
 37         final Path outPath = new Path(OUT_PATH);
 38         if(fileSystem.exists(outPath)){
 39             fileSystem.delete(outPath, true);
 40         }
 41         final Job job = new Job(conf , WordCountApp2.class.getSimpleName());
 42         job.setJarByClass(WordCountApp2.class);
 43
 44         //1.1指定读取的文件位于哪里
 45         FileInputFormat.setInputPaths(job, INPUT_PATH);
 46         job.setInputFormatClass(TextInputFormat.class);//指定如何对输入文件进行格式化,把输入文件每一行解析成键值对
 47
 48         //1.2 指定自定义的map类
 49         job.setMapperClass(MyMapper.class);
 50         job.setMapOutputKeyClass(Text.class);//map输出的<k,v>类型。
 51         job.setMapOutputValueClass(LongWritable.class);//如果<k3,v3>的类型与<k2,v2>类型一致,则可以省略
 52
 53         //1.3 分区
 54         job.setPartitionerClass(MyPartitioner.class);
 55         //有几个reduce任务运行
 56         job.setNumReduceTasks(2);
 57
 58         //1.4 TODO 排序、分组
 59
 60         //1.5 规约
 61         job.setCombinerClass(MyCombiner.class);
 62
 63         //2.2 指定自定义reduce类
 64         job.setReducerClass(MyReducer.class);
 65         //指定reduce的输出类型
 66         job.setOutputKeyClass(Text.class);
 67         job.setOutputValueClass(LongWritable.class);
 68
 69         //2.3 指定写出到哪里
 70         FileOutputFormat.setOutputPath(job, outPath);
 71         //指定输出文件的格式化类
 72         //job.setOutputFormatClass(TextOutputFormat.class);
 73
 74         //把job提交给JobTracker运行
 75         job.waitForCompletion(true);
 76     }
 77
 78     static class MyPartitioner extends Partitioner<Text, LongWritable>{
 79         @Override
 80         public int getPartition(Text key, LongWritable value, int numReduceTasks) {
 81             return (key.toString().equals("hello"))?0:1;
 82         }
 83     }
 84
 85     /**
 86      * KEYIN    即k1        表示行的偏移量
 87      * VALUEIN    即v1        表示行文本内容
 88      * KEYOUT    即k2        表示行中出现的单词
 89      * VALUEOUT    即v2        表示行中出现的单词的次数,固定值1
 90      */
 91     static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable>{
 92         protected void map(LongWritable k1, Text v1, Context context) throws java.io.IOException ,InterruptedException {
 93             final String[] splited = v1.toString().split("\t");
 94             for (String word : splited) {
 95                 context.write(new Text(word), new LongWritable(1));
 96                 System.out.println("Mapper输出<"+word+","+1+">");
 97             }
 98         };
 99     }
100
101     /**
102      * KEYIN    即k2        表示行中出现的单词
103      * VALUEIN    即v2        表示行中出现的单词的次数
104      * KEYOUT    即k3        表示文本中出现的不同单词
105      * VALUEOUT    即v3        表示文本中出现的不同单词的总次数
106      *
107      */
108     static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable>{
109         protected void reduce(Text k2, java.lang.Iterable<LongWritable> v2s, Context ctx) throws java.io.IOException ,InterruptedException {
110             //显示次数表示redcue函数被调用了多少次,表示k2有多少个分组
111             System.out.println("MyReducer输入分组<"+k2.toString()+",...>");
112             long times = 0L;
113             for (LongWritable count : v2s) {
114                 times += count.get();
115                 //显示次数表示输入的k2,v2的键值对数量
116                 System.out.println("MyReducer输入键值对<"+k2.toString()+","+count.get()+">");
117             }
118             ctx.write(k2, new LongWritable(times));
119         };
120     }
121
122
123     static class MyCombiner extends Reducer<Text, LongWritable, Text, LongWritable>{
124         protected void reduce(Text k2, java.lang.Iterable<LongWritable> v2s, Context ctx) throws java.io.IOException ,InterruptedException {
125             //显示次数表示redcue函数被调用了多少次,表示k2有多少个分组
126             System.out.println("Combiner输入分组<"+k2.toString()+",...>");
127             long times = 0L;
128             for (LongWritable count : v2s) {
129                 times += count.get();
130                 //显示次数表示输入的k2,v2的键值对数量
131                 System.out.println("Combiner输入键值对<"+k2.toString()+","+count.get()+">");
132             }
133
134             ctx.write(k2, new LongWritable(times));
135             //显示次数表示输出的k2,v2的键值对数量
136             System.out.println("Combiner输出键值对<"+k2.toString()+","+times+">");
137         };
138     }
139 }

代码 3.2

运行结果如图3.2所示。

14/10/07 18:56:32 INFO mapred.MapTask: record buffer = 262144/327680
Mapper输出<hello,1>
14/10/07 18:56:32 INFO mapred.MapTask: Starting flush of map output
Mapper输出<world,1>
Mapper输出<hello,1>
Mapper输出<me,1>
Combiner输入分组<hello,...>
Combiner输入键值对<hello,1>
Combiner输入键值对<hello,1>
Combiner输出键值对<hello,2>
Combiner输入分组<me,...>
Combiner输入键值对<me,1>
Combiner输出键值对<me,1>
Combiner输入分组<world,...>
Combiner输入键值对<world,1>
Combiner输出键值对<world,1>
14/10/07 18:56:32 INFO mapred.MapTask: Finished spill 0
14/10/07 18:56:32 INFO mapred.Task: Task:attempt_local_0001_m_000000_0 is done. And is in the process of commiting
14/10/07 18:56:32 INFO mapred.LocalJobRunner:
14/10/07 18:56:32 INFO mapred.Task: Task ‘attempt_local_0001_m_000000_0‘ done.
14/10/07 18:56:32 INFO mapred.Task:  Using ResourceCalculatorPlugin : null
14/10/07 18:56:32 INFO mapred.LocalJobRunner:
14/10/07 18:56:32 INFO mapred.Merger: Merging 1 sorted segments
14/10/07 18:56:32 INFO mapred.Merger: Down to the last merge-pass, with 1 segments left of total size: 47 bytes
14/10/07 18:56:32 INFO mapred.LocalJobRunner:
MyReducer输入分组<hello,...>
MyReducer输入键值对<hello,2>
MyReducer输入分组<me,...>
MyReducer输入键值对<me,1>
MyReducer输入分组<world,...>
MyReducer输入键值对<world,1>
14/10/07 18:56:33 INFO mapred.Task: Task:attempt_local_0001_r_000000_0 is done. And is in the process of commiting
14/10/07 18:56:33 INFO mapred.LocalJobRunner:
14/10/07 18:56:33 INFO mapred.Task: Task attempt_local_0001_r_000000_0 is allowed to commit now
14/10/07 18:56:33 INFO output.FileOutputCommitter: Saved output of task ‘attempt_local_0001_r_000000_0‘ to hdfs://hadoop:9000/output
14/10/07 18:56:33 INFO mapred.LocalJobRunner: reduce > reduce
14/10/07 18:56:33 INFO mapred.Task: Task ‘attempt_local_0001_r_000000_0‘ done.
14/10/07 18:56:33 INFO mapred.JobClient:  map 100% reduce 100%
14/10/07 18:56:33 INFO mapred.JobClient: Job complete: job_local_0001
14/10/07 18:56:33 INFO mapred.JobClient: Counters: 19
14/10/07 18:56:33 INFO mapred.JobClient:   File Output Format Counters
14/10/07 18:56:33 INFO mapred.JobClient:     Bytes Written=21
14/10/07 18:56:33 INFO mapred.JobClient:   FileSystemCounters
14/10/07 18:56:33 INFO mapred.JobClient:     FILE_BYTES_READ=343
14/10/07 18:56:33 INFO mapred.JobClient:     HDFS_BYTES_READ=42
14/10/07 18:56:33 INFO mapred.JobClient:     FILE_BYTES_WRITTEN=129572
14/10/07 18:56:33 INFO mapred.JobClient:     HDFS_BYTES_WRITTEN=21
14/10/07 18:56:33 INFO mapred.JobClient:   File Input Format Counters
14/10/07 18:56:33 INFO mapred.JobClient:     Bytes Read=21
14/10/07 18:56:33 INFO mapred.JobClient:   Map-Reduce Framework
14/10/07 18:56:33 INFO mapred.JobClient:     Map output materialized bytes=51
14/10/07 18:56:33 INFO mapred.JobClient:     Map input records=2
14/10/07 18:56:33 INFO mapred.JobClient:     Reduce shuffle bytes=0
14/10/07 18:56:33 INFO mapred.JobClient:     Spilled Records=6
14/10/07 18:56:33 INFO mapred.JobClient:     Map output bytes=53
14/10/07 18:56:33 INFO mapred.JobClient:     Total committed heap usage (bytes)=391774208
14/10/07 18:56:33 INFO mapred.JobClient:     SPLIT_RAW_BYTES=95
14/10/07 18:56:33 INFO mapred.JobClient:     Combine input records=4
14/10/07 18:56:33 INFO mapred.JobClient:     Reduce input records=3
14/10/07 18:56:33 INFO mapred.JobClient:     Reduce input groups=3
14/10/07 18:56:33 INFO mapred.JobClient:     Combine output records=3
14/10/07 18:56:33 INFO mapred.JobClient:     Reduce output records=3
14/10/07 18:56:33 INFO mapred.JobClient:     Map output records=4

图 3.2

  从上面的运行结果我们可以得知,combine具体作用如下:

  • 每一个map可能会产生大量的输出,combiner的作用就是在map端对输出先做一次合并,以减少传输到reducer的数据量。
  • combiner最基本是实现本地key的归并,combiner具有类似本地的reduce功能。
  • 如果不用combiner,那么,所有的结果都是reduce完成,效率会相对低下。使用combiner,先完成的map会在本地聚合,提升速度。

   注意:Combiner的输出是Reducer的输入,Combiner绝不能改变最终的计算结果。所以从我的想法来看,Combiner只应该用于那 种Reduce的输入key/value与输出key/value类型完全一致,且不影响最终结果的场景。比如累加,最大值等。

解释一下

*问:为什么使用Combiner?
   答:Combiner发生在Map端,对数据进行规约处理,数据量变小了,传送到reduce端的数据量变小了,传输时间变短,作业的整体时间变短。
* 问:为什么Combiner不作为MR运行的标配,而是可选步骤?
    答:因为不是所有的算法都适合使用Combiner处理,例如求平均数。
* 问:Combiner本身已经执行了reduce操作,为什么在Reducer阶段还要执行reduce操作?
    答:combiner操作发生在map端的,智能处理一个map任务中的数据,不能跨map任务执行;只有reduce可以接收多个map任务处理的数据。

三、Partitioner编程

4.1 什么是分区

  在MapReuce程序中的Mapper任务的第三步就是分区,那么分区到底是干什么的呢?其实,把数据分区是为了更好的利用数据,根据数据的属性不同来分成不同区,再根据不同的分区完成不同的任务。MapReduce程序中他的默认分区是1个分区,我们看一下默认分区的代码,还是以单词统计为例如代码4.1所示。

  1 package counter;
  2
  3 import java.net.URI;
  4
  5 import org.apache.hadoop.conf.Configuration;
  6 import org.apache.hadoop.fs.FileSystem;
  7 import org.apache.hadoop.fs.Path;
  8 import org.apache.hadoop.io.LongWritable;
  9 import org.apache.hadoop.io.Text;
 10 import org.apache.hadoop.mapreduce.Counter;
 11 import org.apache.hadoop.mapreduce.Job;
 12 import org.apache.hadoop.mapreduce.Mapper;
 13 import org.apache.hadoop.mapreduce.Reducer;
 14 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 15 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
 16 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 17 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
 18 import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
 19
 20 public class WordCountApp {
 21     static final String INPUT_PATH = "hdfs://hadoop:9000/input";
 22     static final String OUT_PATH = "hdfs://hadoop:9000/output";
 23
 24     public static void main(String[] args) throws Exception {
 25
 26         Configuration conf = new Configuration();
 27
 28         final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), conf);
 29         final Path outPath = new Path(OUT_PATH);
 30
 31         if(fileSystem.exists(outPath)){
 32             fileSystem.delete(outPath, true);
 33         }
 34         final Job job = new Job(conf , WordCountApp.class.getSimpleName());
 35
 36         //1.1指定读取的文件位于哪里
 37         FileInputFormat.setInputPaths(job, INPUT_PATH);
 38         job.setInputFormatClass(TextInputFormat.class);//指定如何对输入文件进行格式化,把输入文件每一行解析成键值对
 39
 40         //1.2 指定自定义的map类
 41         job.setMapperClass(MyMapper.class);
 42         job.setMapOutputKeyClass(Text.class);//map输出的<k,v>类型。
 43         job.setMapOutputValueClass(LongWritable.class);//如果<k3,v3>的类型与<k2,v2>类型一致,则可以省略
 44
 45         //1.3 分区
 46         job.setPartitionerClass(HashPartitioner.class);
 47         job.setNumReduceTasks(1);//有一个reduce任务运行
 48
 49         job.setCombinerClass(MyReducer.class);
 50         //2.2 指定自定义reduce类
 51         job.setReducerClass(MyReducer.class);
 52
 53         job.setOutputKeyClass(Text.class);//指定reduce的输出类型
 54         job.setOutputValueClass(LongWritable.class);
 55
 56         //2.3 指定写出到哪里
 57         FileOutputFormat.setOutputPath(job, outPath);
 58         job.setOutputFormatClass(TextOutputFormat.class);//指定输出文件的格式化类
 59
 60         job.waitForCompletion(true);//把job提交给JobTracker运行
 61     }
 62
 63     /**
 64      * KEYIN    即k1        表示行的偏移量
 65      * VALUEIN    即v1        表示行文本内容
 66      * KEYOUT    即k2        表示行中出现的单词
 67      * VALUEOUT    即v2        表示行中出现的单词的次数,固定值1
 68      */
 69     static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable>{
 70         protected void map(LongWritable k1, Text v1, Context context) throws java.io.IOException ,InterruptedException {
 71             final Counter helloCounter = context.getCounter("Sensitive Words", "hello");
 72
 73             final String line = v1.toString();
 74             if(line.contains("hello")){
 75                 //记录敏感词出现在一行中
 76                 helloCounter.increment(1L);
 77             }
 78             final String[] splited = line.split("\t");
 79             for (String word : splited) {
 80                 context.write(new Text(word), new LongWritable(1));
 81             }
 82         };
 83     }
 84
 85     /**
 86      * KEYIN    即k2        表示行中出现的单词
 87      * VALUEIN    即v2        表示行中出现的单词的次数
 88      * KEYOUT    即k3        表示文本中出现的不同单词
 89      * VALUEOUT    即v3        表示文本中出现的不同单词的总次数
 90      *
 91      */
 92     static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable>{
 93         protected void reduce(Text k2, java.lang.Iterable<LongWritable> v2s, Context ctx) throws java.io.IOException ,InterruptedException {
 94             long times = 0L;
 95             for (LongWritable count : v2s) {
 96                 times += count.get();
 97             }
 98             ctx.write(k2, new LongWritable(times));
 99         };
100     }
101
102 }

代码 4.1

  在MapReduce程序中默认的分区方法为HashPartitioner,代码job.setNumReduceTasks(1)表示运行的Reduce任务数,他会将numReduceTask这个变量设为1. HashPartitioner继承自Partitioner,Partitioner是Partitioner的基类,如果需要定制partitioner也需要继承该类。 HashPartitioner计算方法如代码4.2所示。

1 public class HashPartitioner<K, V> extends Partitioner<K, V> {
2
3   /** Use {@link Object#hashCode()} to partition. */
4   public int getPartition(K key, V value,
5                           int numReduceTasks) {
6     return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
7   }
8
9 }

代码 4.2

  在上面的代码中K和V,表示k2和v2,该类中只有一个方法getPartition(),返回值如下”(key.hashCode()& Integer.MAX_VALUE)%numReduceTasks“其中key.hashCode()表示该关键是否属于该类。numReduceTasks的值在上面代码中设置为1,取模后只有一种结果那就是0。getPartition()的意义就是表示划分到不同区域的一个标记,返回0,就是表示划分到第0区,所以我们可以把它理解分区的下标,来代表不同的分区。

4.2 自定义分区

  下面我们尝试自定义一个分区,来处理一下手机的日志数据(在前面学习中用过),手机日志数据如下图4.1所示。

图 4.1

  从图中我们可以发现,在第二列上并不是所有的数据都是手机号,我们任务就是在统计手机流量时,将手机号码和非手机号输出到不同的文件中。我们的分区是按手机和非手机号码来分的,所以我们可以按该字段的长度来划分,如代码4.3所示。

  1 package partition;
  2
  3 import java.io.DataInput;
  4 import java.io.DataOutput;
  5 import java.io.IOException;
  6
  7 import org.apache.hadoop.conf.Configuration;
  8 import org.apache.hadoop.fs.Path;
  9 import org.apache.hadoop.io.LongWritable;
 10 import org.apache.hadoop.io.Text;
 11 import org.apache.hadoop.io.Writable;
 12 import org.apache.hadoop.mapreduce.Job;
 13 import org.apache.hadoop.mapreduce.Mapper;
 14 import org.apache.hadoop.mapreduce.Reducer;
 15 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 16 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
 17 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 18 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
 19 import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
 20
 21 public class KpiApp {
 22     static final String INPUT_PATH = "hdfs://hadoop:9000/wlan";
 23     static final String OUT_PATH = "hdfs://hadoop:9000/out";
 24     public static void main(String[] args) throws Exception{
 25         final Job job = new Job(new Configuration(), KpiApp.class.getSimpleName());
 26
 27         job.setJarByClass(KpiApp.class);
 28
 29         //1.1 指定输入文件路径
 30         FileInputFormat.setInputPaths(job, INPUT_PATH);
 31         job.setInputFormatClass(TextInputFormat.class);//指定哪个类用来格式化输入文件
 32
 33         //1.2指定自定义的Mapper类
 34         job.setMapperClass(MyMapper.class);
 35         job.setMapOutputKeyClass(Text.class);//指定输出<k2,v2>的类型
 36         job.setMapOutputValueClass(KpiWritable.class);
 37
 38         //1.3 指定分区类
 39         job.setPartitionerClass(KpiPartitioner.class);
 40         job.setNumReduceTasks(2);
 41
 42         //2.2 指定自定义的reduce类
 43         job.setReducerClass(MyReducer.class);
 44         job.setOutputKeyClass(Text.class);//指定输出<k3,v3>的类型
 45         job.setOutputValueClass(KpiWritable.class);
 46
 47         //2.3 指定输出到哪里
 48         FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));
 49         job.setOutputFormatClass(TextOutputFormat.class);//设定输出文件的格式化类
 50         job.waitForCompletion(true);//把代码提交给JobTracker执行
 51     }
 52
 53     static class MyMapper extends Mapper<LongWritable, Text, Text, KpiWritable>{
 54         protected void map(LongWritable key, Text value, org.apache.hadoop.mapreduce.Mapper<LongWritable,Text,Text,KpiWritable>.Context context) throws IOException ,InterruptedException {
 55             final String[] splited = value.toString().split("\t");
 56             final String msisdn = splited[1];
 57             final Text k2 = new Text(msisdn);
 58             final KpiWritable v2 = new KpiWritable(splited[6],splited[7],splited[8],splited[9]);
 59             context.write(k2, v2);
 60         };
 61     }
 62
 63     static class MyReducer extends Reducer<Text, KpiWritable, Text, KpiWritable>{
 64         /**
 65          * @param    k2    表示整个文件中不同的手机号码
 66          * @param    v2s    表示该手机号在不同时段的流量的集合
 67          */
 68         protected void reduce(Text k2, java.lang.Iterable<KpiWritable> v2s, org.apache.hadoop.mapreduce.Reducer<Text,KpiWritable,Text,KpiWritable>.Context context) throws IOException ,InterruptedException {
 69             long upPackNum = 0L;
 70             long downPackNum = 0L;
 71             long upPayLoad = 0L;
 72             long downPayLoad = 0L;
 73
 74             for (KpiWritable kpiWritable : v2s) {
 75                 upPackNum += kpiWritable.upPackNum;
 76                 downPackNum += kpiWritable.downPackNum;
 77                 upPayLoad += kpiWritable.upPayLoad;
 78                 downPayLoad += kpiWritable.downPayLoad;
 79             }
 80
 81             final KpiWritable v3 = new KpiWritable(upPackNum+"", downPackNum+"", upPayLoad+"", downPayLoad+"");
 82             context.write(k2, v3);
 83         };
 84     }
 85
 86     static class KpiPartitioner extends HashPartitioner<Text, KpiWritable>{
 87         @Override
 88         public int getPartition(Text key, KpiWritable value, int numReduceTasks) {
 89             return (key.toString().length()==11)?0:1;
 90         }
 91     }
 92 }
 93
 94 class KpiWritable implements Writable{
 95     long upPackNum;
 96     long downPackNum;
 97     long upPayLoad;
 98     long downPayLoad;
 99
100     public KpiWritable(){}
101
102     public KpiWritable(String upPackNum, String downPackNum, String upPayLoad, String downPayLoad){
103         this.upPackNum = Long.parseLong(upPackNum);
104         this.downPackNum = Long.parseLong(downPackNum);
105         this.upPayLoad = Long.parseLong(upPayLoad);
106         this.downPayLoad = Long.parseLong(downPayLoad);
107     }
108
109
110     @Override
111     public void readFields(DataInput in) throws IOException {
112         this.upPackNum = in.readLong();
113         this.downPackNum = in.readLong();
114         this.upPayLoad = in.readLong();
115         this.downPayLoad = in.readLong();
116     }
117
118     @Override
119     public void write(DataOutput out) throws IOException {
120         out.writeLong(upPackNum);
121         out.writeLong(downPackNum);
122         out.writeLong(upPayLoad);
123         out.writeLong(downPayLoad);
124     }
125
126     @Override
127     public String toString() {
128         return upPackNum + "\t" + downPackNum + "\t" + upPayLoad + "\t" + downPayLoad;
129     }
130 }

代码 4.3

  注意:分区的例子必须打成jar运行,运行结果如下图4.3,4.4所示,4.3表示手机号码流量,4.4为非手机号流量。

图 4.3

图4.4

  我们知道一个分区对应一个Reducer任务是否是这样呢,我可以通过访问50030MapReduce端口来验证,在浏览器输入”http://hadoop:50030"可以看到MapReduce界面,如图4.5,4.6所示。

图 4.5

图4.6

  从图中可以知道,该MapReduce任务有一个Mapper任务,两个Reducer任务,那么我们细看一下Reducer的两个任务到底是什么?如图4.7,4.8,4.9所示。task_201410070239_0002_r_000000表示第一个分区的输出,有20条记录,task_201410070239_0002_r_000001表示第二分区,有一条输出记录。和我们程序运行结果一样。



图 4.7

图 4.8 第一分区

图 4.9 第二分区

  综上一些列分析,分区的用处如下:
    1.根据业务需要,产生多个输出文件
    2.多个reduce任务在并发运行,提高整体job的运行效率

时间: 2024-10-18 09:45:32

Hadoop日记Day17---计数器、map规约、分区学习的相关文章

Hadoop日记系列目录

下面是Hadoop日记系列的目录,由于目前时间不是很充裕,以后的更新的速度会变慢,会按照一星期发布一期的原则进行,希望能和大家相互学习.交流. 目录安排 1>  Hadoop日记Day1---Hadoop介绍 2>  Hadoop日记Day2---虚拟机中搭建Linux 3>  Hadoop日记Day3---Hadoop的伪分布式安装 4>  Hadoop日记Day4---去除HADOOP_HOME is deprecated 5>  Hadoop日记Day5---HDFS介

Hadoop、Pig、Hive、NOSQL 学习资源收集

(一)hadoop 相关安装部署 1.hadoop在windows cygwin下的部署: http://lib.open-open.com/view/1333428291655 http://blog.csdn.net/ruby97/article/details/7423088 http://blog.csdn.net/savechina/article/details/5656937 2.hadoop 伪分布式安装: http://www.thegeekstuff.com/2012/02/

Map接口的学习

接口Map<K, V> 一.Map功能 1.添加 put(K key, V value) putAll(Map<? extends K, ? extends V>); 2.删除 clear() remove(Object key); 返回对应的值 3.判断 containsKey(Object key) containsValue(Object value) isEmpty() 4.获取 get(Object key)  :不在返回null size() values(); ---

Hadoop的HDFS和Map/Reduce

HDFS HDFS是一个具有高度容错性的分布式文件系统,适合部署在廉价的机器上,它具有以下几个特点: 1)适合存储非常大的文件 2)适合流式数据读取,即适合"只写一次,读多次"的数据处理模式 3)适合部署在廉价的机器上 但HDFS不适合以下场景(任何东西都要分两面看,只有适合自己业务的技术才是真正的好技术): 1)不适合存储大量的小文件,因为受Namenode内存大小限制 2)不适合实时数据读取,高吞吐量和实时性是相悖的,HDFS选择前者 3)不适合需要经常修改数据的场景 HDFS的架

Hadoop 2.4.1 Map/Reduce小结

看了下MapReduce的例子.再看了下Mapper和Reducer源码,理清了参数的意义,就o了. public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> public class Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT> Map是打散过程,把输入的数据,拆分成若干的键值对.Reduce是重组的,根据前面的键值对,重组数据. 自己写Map/Reduce的话,理解了如何拆分数据.组装数据,理解了

splittability A SequenceFile can be split by Hadoop and distributed across map jobs whereas a GZIP file cannot be.

splittability CompressedStorage Skip to end of metadata Created by Confluence Administrator, last modified by Lefty Leverenz on Sep 19, 2017 Go to start of metadata Compressed Data Storage Keeping data compressed in Hive tables has, in some cases, be

Hadoop日记Day12---MapReduce学习

一.MapReduce简介 1.1 MapReduce概述 MapReduce是一种分布式计算模型,由Google提出,主要用于搜索领域,解决海量数据的计算问题.MR由两个阶段组成:Map和Reduce,用户只需要实现map()和reduce()两个函数,即可实现分布式计算,其执行流程如图1.这两个函数的形参是key.value对,表示函数的输入信息. 图 1 1.2 Map和Reduce编程模型 在Hadoop 中, map 函数位于内置类org.apache.hadoop.mapreduce

Hadoop之——自定义计数器

转载请注明出处:http://blog.csdn.net/l1028386804/article/details/46057909 1.Mapper类的实现 /** * KEYIN 即k1 表示行的偏移量 * VALUEIN 即v1 表示行文本内容 * KEYOUT 即k2 表示行中出现的单词 * VALUEOUT 即v2 表示行中出现的单词的次数,固定值1 */ static class MyMapper extends Mapper<LongWritable, Text, Text, Lon

Hadoop日记Day13---使用hadoop自定义类型处理手机上网日志

测试数据的下载地址为:http://pan.baidu.com/s/1gdgSn6r 一.文件分析 首先可以用文本编辑器打开一个HTTP_20130313143750.dat的二进制文件,这个文件的内容是我们的手机日志,文件的内容已经经过了优化,格式比较规整,便于学习研究,感兴趣的读者可以尝试一下. 我从中截取文件中的一行记录内容进行分析: 1363157985066     13726230503    00-FD-07-A4-72-B8:CMCC    120.196.100.82    i