hadoop1中partition和combiner作用

---恢复内容开始---

1、解析Partiton

  把map任务的输出的中间结果按照key的范围进行划分成r份,r代表reduce任务的个数。hadoop默认有个类HashPartition实现分区,通过key对reduce的个数取模(key%r),这样可以保证一段范围内的key交由一个reduce处理。以此来实现reduce的负载均衡。不至于使有些reduce处理的任务压力过大,有些reduce空闲。

  如果我们对hadoop本身的分区算法不满意,或者我们因为我们的业务需求,我们可以自定义一个类实现Partition接口,实现里面的方法,在getPartiton()方法中实现自己的分区算法。在提交作业的main方法中通setPartitonclass()方法这个类,就可以了。

 以下为代码实例

  

  1. package org.apache.hadoop.examples;
  2. import java.io.IOException;
  3. import java.util.*;
  4. import org.apache.hadoop.fs.Path;
  5. import org.apache.hadoop.conf.*;
  6. import org.apache.hadoop.io.*;
  7. import org.apache.hadoop.mapred.*;
  8. import org.apache.hadoop.util.*;
  9. /**
  10. * 输入文本,以tab间隔
  11. * kaka    1       28
  12. * hua     0       26
  13. * chao    1
  14. * tao     1       22
  15. * mao     0       29      22
  16. * */
  17. //Partitioner函数的使用
  18. public class MyPartitioner {
  19. // Map函数
  20. public static class MyMap extends MapReduceBase implements
  21. Mapper<LongWritable, Text, Text, Text> {
  22. public void map(LongWritable key, Text value,
  23. OutputCollector<Text, Text> output, Reporter reporter)
  24. throws IOException {
  25. String[] arr_value = value.toString().split("\t");
  26. //测试输出
  27. //          for(int i=0;i<arr_value.length;i++)
  28. //          {
  29. //              System.out.print(arr_value[i]+"\t");
  30. //          }
  31. //          System.out.print(arr_value.length);
  32. //          System.out.println();
  33. Text word1 = new Text();
  34. Text word2 = new Text();
  35. if (arr_value.length > 3) {
  36. word1.set("long");
  37. word2.set(value);
  38. } else if (arr_value.length < 3) {
  39. word1.set("short");
  40. word2.set(value);
  41. } else {
  42. word1.set("right");
  43. word2.set(value);
  44. }
  45. output.collect(word1, word2);
  46. }
  47. }
  48. public static class MyReduce extends MapReduceBase implements
  49. Reducer<Text, Text, Text, Text> {
  50. public void reduce(Text key, Iterator<Text> values,
  51. OutputCollector<Text, Text> output, Reporter reporter)
  52. throws IOException {
  53. int sum = 0;
  54. System.out.println(key);
  55. while (values.hasNext()) {
  56. output.collect(key, new Text(values.next().getBytes()));
  57. }
  58. }
  59. }
  60. // 接口Partitioner继承JobConfigurable,所以这里有两个override方法
  61. public static class MyPartitionerPar implements Partitioner<Text, Text> {
  62. /**
  63. * getPartition()方法的
  64. * 输入参数:键/值对<key,value>与reducer数量numPartitions
  65. * 输出参数:分配的Reducer编号,这里是result
  66. * */
  67. @Override
  68. public int getPartition(Text key, Text value, int numPartitions) {
  69. // TODO Auto-generated method stub
  70. int result = 0;
  71. System.out.println("numPartitions--" + numPartitions);
  72. if (key.toString().equals("long")) {
  73. result = 0 % numPartitions;
  74. } else if (key.toString().equals("short")) {
  75. result = 1 % numPartitions;
  76. } else if (key.toString().equals("right")) {
  77. result = 2 % numPartitions;
  78. }
  79. System.out.println("result--" + result);
  80. return result;
  81. }
  82. @Override
  83. public void configure(JobConf arg0)
  84. {
  85. // TODO Auto-generated method stub
  86. }
  87. }
  88. //输入参数:/home/hadoop/input/PartitionerExample /home/hadoop/output/Partitioner
  89. public static void main(String[] args) throws Exception {
  90. JobConf conf = new JobConf(MyPartitioner.class);
  91. conf.setJobName("MyPartitioner");
  92. //控制reducer数量,因为要分3个区,所以这里设定了3个reducer
  93. conf.setNumReduceTasks(3);
  94. conf.setMapOutputKeyClass(Text.class);
  95. conf.setMapOutputValueClass(Text.class);
  96. //设定分区类
  97. conf.setPartitionerClass(MyPartitionerPar.class);
  98. conf.setOutputKeyClass(Text.class);
  99. conf.setOutputValueClass(Text.class);
  100. //设定mapper和reducer类
  101. conf.setMapperClass(MyMap.class);
  102. conf.setReducerClass(MyReduce.class);
  103. conf.setInputFormat(TextInputFormat.class);
  104. conf.setOutputFormat(TextOutputFormat.class);
  105. FileInputFormat.setInputPaths(conf, new Path(args[0]));
  106. FileOutputFormat.setOutputPath(conf, new Path(args[1]));
  107. JobClient.runJob(conf);
  108. }
  109. }

2、解析Combiner

  在Partiton之前,我们还可以对中间结果进行Combiner,即将中间结果中有着相同key 的(key,value)键值对进行合并成一对,Combiner的过程与reduce的过程类似,很多情况下可以直接使用reduce,但是Combiner作为Map任务的一部分,在Map输出后紧接着执行,通过Combiner的执行,减少了中间结果中的(key,value)对数目,reduce在从map复制数据时将会大大减少网络流量,每个reduce需要和原许多个map任务节点通信以此来取得落到它负责key区间内的中间结果,然后执行reduce函数,得到一个最中结果文件。有R个reduce任务,就有R个最终结果,这R个最终结果并不需要合并成一个结果,因为这R个最终结果又可以作为另一次计算的输入,开始另一次计算。

  combiner使用总结:

  combiner的使用可以在满足业务需求的情况下,大大提高job的运行速度,如果不合适,则将到最后导致结果不一致(如:求平均值)。

  以下为Combiner代码示例

  1. package com;
  2. import java.io.IOException;
  3. import org.apache.hadoop.conf.Configuration;
  4. import org.apache.hadoop.conf.Configured;
  5. import org.apache.hadoop.fs.Path;
  6. import org.apache.hadoop.io.DoubleWritable;
  7. import org.apache.hadoop.io.LongWritable;
  8. import org.apache.hadoop.io.Text;
  9. import org.apache.hadoop.mapreduce.Job;
  10. import org.apache.hadoop.mapreduce.Mapper;
  11. import org.apache.hadoop.mapreduce.Reducer;
  12. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  13. import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
  14. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  15. import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
  16. import org.apache.hadoop.util.Tool;
  17. import org.apache.hadoop.util.ToolRunner;
  18. public class AveragingWithCombiner extends Configured implements Tool {
  19. public static class MapClass extends Mapper<LongWritable,Text,Text,Text> {
  20. static enum ClaimsCounters { MISSING, QUOTED };
  21. // Map Method
  22. public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
  23. String fields[] = value.toString().split(",", -20);
  24. String country = fields[4];
  25. String numClaims = fields[8];
  26. if (numClaims.length() > 0 && !numClaims.startsWith("\"")) {
  27. context.write(new Text(country), new Text(numClaims + ",1"));
  28. }
  29. }
  30. }
  31. public static class Reduce extends Reducer<Text,Text,Text,DoubleWritable> {
  32. // Reduce Method
  33. public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
  34. double sum = 0;
  35. int count = 0;
  36. for (Text value : values) {
  37. String fields[] = value.toString().split(",");
  38. sum += Double.parseDouble(fields[0]);
  39. count += Integer.parseInt(fields[1]);
  40. }
  41. context.write(key, new DoubleWritable(sum/count));
  42. }
  43. }
  44. public static class Combine extends Reducer<Text,Text,Text,Text> {
  45. // Reduce Method
  46. public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
  47. double sum = 0;
  48. int count = 0;
  49. for (Text value : values) {
  50. String fields[] = value.toString().split(",");
  51. sum += Double.parseDouble(fields[0]);
  52. count += Integer.parseInt(fields[1]);
  53. }
  54. context.write(key, new Text(sum+","+count));
  55. }
  56. }
  57. // run Method
  58. public int run(String[] args) throws Exception {
  59. // Create and Run the Job
  60. Job job = new Job();
  61. job.setJarByClass(AveragingWithCombiner.class);
  62. FileInputFormat.addInputPath(job, new Path(args[0]));
  63. FileOutputFormat.setOutputPath(job, new Path(args[1]));
  64. job.setJobName("AveragingWithCombiner");
  65. job.setMapperClass(MapClass.class);
  66. job.setCombinerClass(Combine.class);
  67. job.setReducerClass(Reduce.class);
  68. job.setInputFormatClass(TextInputFormat.class);
  69. job.setOutputFormatClass(TextOutputFormat.class);
  70. job.setOutputKeyClass(Text.class);
  71. job.setOutputValueClass(Text.class);
  72. System.exit(job.waitForCompletion(true) ? 0 : 1);
  73. return 0;
  74. }
  75. public static void main(String[] args) throws Exception {
  76. int res = ToolRunner.run(new Configuration(), new AveragingWithCombiner(), args);
  77. System.exit(res);
  78. }
  79. }

---恢复内容结束---

时间: 2024-11-07 02:08:15

hadoop1中partition和combiner作用的相关文章

24、redis中的sentinel的作用?

redis中的sentinel的作用? Redis-Sentinel是Redis官方推荐的高可用性(HA)解决方案,当用Redis做Master-slave的高可用方案时,假如master宕机了,Redis本身(包括它的很多客户端)都没有实现自动进行主备切换,而Redis-sentinel本身也是一个独立运行的进程,它能监控多个master-slave集群,发现master宕机后能进行自动切换. 它的主要功能有以下几点: 不时地监控redis是否按照预期良好地运行; 如果发现某个redis节点运

FAQ: Python中if __name__ == &#39;__main__&#39;:作用

#hello.pydef sayHello(): str="hello" print(str); if __name__ == "__main__": print ('This is main of module "hello.py"') sayHello() python作为一种脚本语言,我们用python写的各个module都可以包含以上那么一个类似c中的main函数,只不过python中的这种__main__与c中有一些区别,主要体现在:

Linux内核中的jiffies及其作用介绍及jiffies等相关函数详解

在LINUX的时钟中断中涉及至二个全局变量一个是xtime,它是timeval数据结构变量,另一个则是jiffies,首先看timeval结构struct timeval{time_t tv_sec; /***second***/susecond_t tv_usec;/***microsecond***/}到底microsecond是毫秒还是微秒?? 1秒=1000毫秒(3个零),1秒=1000 000微秒(6个零),1秒=1000 000 000纳秒(9个零),1秒=1000 000 000

iOS中关于字符 “&amp;”的作用?

如NSFileManager中关于判断是否目录的 iOS中关于字符 "&"的作用? >> ios 这个答案描述的挺清楚的:http://www.goodpm.net/postreply/ios/1010000008969096/iOS中关于字符amp的作用.html

C++中减去‘0’的作用( -&#39;0&#39; )

我们在日常研(chao)究(xi)大佬们的代码时,可能会遇到减去字符'0'的情况,比如下图,这种语法问题吧说简单它不那么简单,毕竟不好理解:但说难吧也不难,其实就是让代码更简洁更有逼格的途径而已. 说了那么多屁话那么这个减去零到底是干啥的呢? 它的作用就是减去0的ASCII值:48.可以方便的用来转换大小写或者数字和和字符.比如我们可以写这么一个函数: #include <cstdio> #include <iostream> using namespace std; int ch

浅析python 中__name__ = &#39;__main__&#39; 的作用

很多新手刚开始学习python的时候经常会看到python 中__name__ = \'__main__\' 这样的代码,可能很多新手一开始学习的时候都比较疑惑,python 中__name__ = '__main__' 的作用,到底干嘛的? 有句话经典的概括了这段代码的意义: "Make a script both importable and executable" 意思就是说让你写的脚本模块既可以导入到别的模块中用,另外该模块自己也可执行. __name__ 是当前模块名,当模块

NSPredicate用法总结(Cocoa框架中的NSPredicate用于查询,原理和用法都类似于SQL中的where,作用相当于数据库的过滤取)

简述:Cocoa框架中的NSPredicate用于查询,原理和用法都类似于SQL中的where,作用相当于数据库的过滤取. 定义(最常用到的方法): [objc] view plaincopy NSPredicate *ca = [NSPredicate predicateWithFormat:(NSString *), ...]; Format:(1)比较运算符>,<,==,>=,<=,!=可用于数值及字符串例:@"number > 100" (2)范围

谈谈python 中__name__ = &#39;__main__&#39; 的作用

position:static(静态定位) 当position属性定义为static时,可以将元素定义为静态位置,所谓静态位置就是各个元素在HTML文档流中应有的位置 podisition定位问题.所以当没有定义position属性时,并不说明该元素没有自己的位置,它会遵循默认显示为静态位置,在静态定位状态下无法通过坐标值(top,left,right,bottom)来改变它的位置. position:absolute(绝对定位) 当position属性定义为absolute时,元素会脱离文档流

C++中explicit关键字的作用

转自:http://www.cnblogs.com/winnersun/archive/2011/07/16/2108440.html explicit用来防止由构造函数定义的隐式转换. 要明白它的作用,首先要了解隐式转换:可以用单个实参来调用的构造函数定义了从形参类型到该类类型的一个隐式转换. 例如: class things{ public: things(const std::string&name =""): m_name(name),height(0),weight(