自定义分组

job.setGroupingComparatorClass(MyGroupingComparator.class); //按照第一列进行分组,然后找出每个分组中的第二列中的最小值

为什么要自定义分组?

业务要求分组是按照第一列分组,但是NewK2的比较规则决定了不能按照第一列分。只能自定义分组比较器。

  1 package group;
  2
  3 import java.io.DataInput;
  4 import java.io.DataOutput;
  5 import java.io.IOException;
  6 import java.net.URI;
  7 import java.util.Comparator;
  8 import java.util.function.Function;
  9 import java.util.function.ToDoubleFunction;
 10 import java.util.function.ToIntFunction;
 11 import java.util.function.ToLongFunction;
 12
 13 import org.apache.hadoop.conf.Configuration;
 14 import org.apache.hadoop.fs.FileSystem;
 15 import org.apache.hadoop.fs.Path;
 16 import org.apache.hadoop.io.LongWritable;
 17 import org.apache.hadoop.io.RawComparator;
 18 import org.apache.hadoop.io.Text;
 19 import org.apache.hadoop.io.WritableComparable;
 20 import org.apache.hadoop.io.WritableComparator;
 21 import org.apache.hadoop.io.file.tfile.RawComparable;
 22 import org.apache.hadoop.mapreduce.Job;
 23 import org.apache.hadoop.mapreduce.Mapper;
 24 import org.apache.hadoop.mapreduce.Reducer;
 25 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 26 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
 27 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 28 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
 29 import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
 30
 31 public class GroupApp {
 32     static final String INPUT_PATH = "hdfs://chaoren:9000/input";
 33     static final String OUT_PATH = "hdfs://chaoren:9000/out";
 34
 35     public static void main(String[] args) throws Exception {
 36         final Configuration configuration = new Configuration();
 37
 38         final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH),
 39                 configuration);
 40         if (fileSystem.exists(new Path(OUT_PATH))) {
 41             fileSystem.delete(new Path(OUT_PATH), true);
 42         }
 43
 44         final Job job = new Job(configuration, GroupApp.class.getSimpleName());
 45
 46         // 1.1 指定输入文件路径
 47         FileInputFormat.setInputPaths(job, INPUT_PATH);
 48         // 指定哪个类用来格式化输入文件
 49         job.setInputFormatClass(TextInputFormat.class);
 50
 51         // 1.2指定自定义的Mapper类
 52         job.setMapperClass(MyMapper.class);
 53         // 指定输出<k2,v2>的类型
 54         job.setMapOutputKeyClass(NewK2.class);
 55         job.setMapOutputValueClass(LongWritable.class);
 56
 57         // 1.3 指定分区类
 58         job.setPartitionerClass(HashPartitioner.class);
 59         job.setNumReduceTasks(1);
 60
 61         // 1.4 TODO 排序、分区
 62         /**
 63          * 分组:按照第一列分区
 64          */
 65         job.setGroupingComparatorClass(MyGroupingComparator.class);
 66
 67         // 1.5 TODO (可选)合并
 68
 69         // 2.2 指定自定义的reduce类
 70         job.setReducerClass(MyReducer.class);
 71         // 指定输出<k3,v3>的类型
 72         job.setOutputKeyClass(LongWritable.class);
 73         job.setOutputValueClass(LongWritable.class);
 74
 75         // 2.3 指定输出到哪里
 76         FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));
 77         // 设定输出文件的格式化类
 78         job.setOutputFormatClass(TextOutputFormat.class);
 79
 80         // 把代码提交给JobTracker执行
 81         job.waitForCompletion(true);
 82     }
 83
 84     static class MyMapper extends
 85             Mapper<LongWritable, Text, NewK2, LongWritable> {
 86         protected void map(
 87                 LongWritable key,
 88                 Text value,
 89                 org.apache.hadoop.mapreduce.Mapper<LongWritable, Text, NewK2, LongWritable>.Context context)
 90                 throws java.io.IOException, InterruptedException {
 91             final String[] splited = value.toString().split("\t");
 92             final NewK2 k2 = new NewK2(Long.parseLong(splited[0]),
 93                     Long.parseLong(splited[1]));
 94             final LongWritable v2 = new LongWritable(Long.parseLong(splited[1]));
 95             context.write(k2, v2);
 96         };
 97     }
 98
 99     static class MyReducer extends
100             Reducer<NewK2, LongWritable, LongWritable, LongWritable> {
101         protected void reduce(
102                 NewK2 k2,
103                 java.lang.Iterable<LongWritable> v2s,
104                 org.apache.hadoop.mapreduce.Reducer<NewK2, LongWritable, LongWritable, LongWritable>.Context context)
105                 throws java.io.IOException, InterruptedException {
106             long min = Long.MAX_VALUE;
107             for (LongWritable v2 : v2s) {
108                 if (v2.get() < min) {
109                     min = v2.get();
110                 }
111             }
112             context.write(new LongWritable(k2.first), new LongWritable(min));
113         };
114     }
115
116     /**
117      * 问:为什么实现该类? 答:因为原来的v2不能参与排序,把原来的k2和v2封装到一个类中,作为新的k2
118      *
119      */
120     // WritableComparable:Hadoop的序列化
121     static class NewK2 implements WritableComparable<NewK2> {
122         Long first;
123         Long second;
124
125         public NewK2() {
126         }
127
128         public NewK2(long first, long second) {
129             this.first = first;
130             this.second = second;
131         }
132
133         public void readFields(DataInput in) throws IOException {
134             this.first = in.readLong();
135             this.second = in.readLong();
136         }
137
138         public void write(DataOutput out) throws IOException {
139             out.writeLong(first);
140             out.writeLong(second);
141         }
142
143         /**
144          * 当k2进行排序时,会调用该方法. 当第一列不同时,升序;当第一列相同时,第二列升序
145          */
146         public int compareTo(NewK2 o) {
147             final long minus = this.first - o.first;
148             if (minus != 0) {
149                 return (int) minus;
150             }
151             return (int) (this.second - o.second);
152         }
153
154         @Override
155         public int hashCode() {
156             return this.first.hashCode() + this.second.hashCode();
157         }
158
159         @Override
160         public boolean equals(Object obj) {
161             if (!(obj instanceof NewK2)) {
162                 return false;
163             }
164             NewK2 oK2 = (NewK2) obj;
165             return (this.first == oK2.first) && (this.second == oK2.second);
166         }
167     }
168
169     static class MyGroupingComparator implements RawComparator<NewK2> {
170
171         public int compare(NewK2 o1, NewK2 o2) {
172             return (int) (o1.first - o2.first);
173         }
174
175         /**
176          * @param arg0
177          *            表示第一个参与分组的字节数组
178          * @param arg1
179          *            表示第一个参与分组的字节数组的起始位置
180          * @param arg2
181          *            表示第一个参与分组的字节数组的偏移量
182          *
183          * @param arg0
184          *            表示第二个参与分组的字节数组
185          * @param arg1
186          *            表示第二个参与分组的字节数组的起始位置
187          * @param arg2
188          *            表示第二个参与分组的字节数组的偏移量
189          */
190         public int compare(byte[] arg0, int arg1, int arg2, byte[] arg3,
191                 int arg4, int arg5) {
192             return WritableComparator
193                     .compareBytes(arg0, arg1, 8, arg3, arg4, 8);
194         }
195
196     }
197
198 }

时间: 2024-11-08 01:28:18

自定义分组的相关文章

Hadoop mapreduce自定义分组RawComparator

本文发表于本人博客. 今天接着上次[Hadoop mapreduce自定义排序WritableComparable]文章写,按照顺序那么这次应该是讲解自定义分组如何实现,关于操作顺序在这里不多说了,需要了解的可以看看我在博客园的评论,现在开始. 首先我们查看下Job这个类,发现有setGroupingComparatorClass()这个方法,具体源码如下: /** * Define the comparator that controls which keys are grouped toge

gridcontrol自定义分组表头显示

gridcontrol分组后可以显示分组列名.值 以及汇总项,但是想再显示其他列的值就需要使用自定义分组头了,如下所示: private void bandedGridView2_CustomDrawGroupRow(object sender, DevExpress.XtraGrid.Views.Base.RowObjectCustomDrawEventArgs e) { GridGroupRowInfo GridGroupRowInfo = e.Info as GridGroupRowInf

MapReduce自定义分组Group

一:背景 在上一篇文章中我们可以对两列数据进行排序,即完成了当第一列相同时第二列数据升序排列的功能,现在我们需要进一步完善一个功能,那就是当第一列相同时求出第二列的最小值或最大值,Hadoop提供了自定义分组的功能,可以满足我们的需求. 二:技术实现 我们先来看看需求 #当第一列不相等时,第一列按升序排列,当第一列相等时,求出对应第二列的最小值 [java] view plain copy 3   3 3   2 3   1 2   2 2   1 1   1 输出结果应该是: [java] v

Hadoop之--&gt;自定义分组 RawComparator

data: 3 33 23 22 22 11 1 --------------------- 需求: 1 12 23 3 当第一列相同时候要第二列的最小值 package group; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.net.URI; import org.apache.hadoop.conf.Configuration; import org

关于MapReduce中自定义分组类(三)

Job类  /**    * Define the comparator that controls which keys are grouped together    * for a single call to    * {@link Reducer#reduce(Object, Iterable,    *                       org.apache.hadoop.mapreduce.Reducer.Context)}    * @param cls the raw

Hadoop之——自定义分组比较器实现分组功能

转载请注明出处:http://blog.csdn.net/l1028386804/article/details/46287985 不多说,直接上代码,大家都懂得 1.Mapper类的实现 /** * Mapper类的实现 * @author liuyazhuang * */ static class MyMapper extends Mapper<LongWritable, Text, NewK2, LongWritable>{ protected void map(LongWritable

一脸懵逼学习Hadoop中的MapReduce程序中自定义分组的实现

1:首先搞好实体类对象: write 是把每个对象序列化到输出流,readFields是把输入流字节反序列化,实现WritableComparable,Java值对象的比较:一般需要重写toString(),hashCode(),equals()方法 1 package com.areapartition; 2 3 import java.io.DataInput; 4 import java.io.DataOutput; 5 import java.io.IOException; 6 7 im

hadoop的自定义分组实现 (Partition机制)

hadoop开发中我们会遇到类似这样的问题,比如 如何将不同省份的手机号分别输出到不同的文件中,本片文章将对hadoop内置的Partition类进行重写以解决这个问题. MapReduce的使用者通常会指定Reduce任务和Reduce任务输出文件的数量(R).用户在中间key上使用分区函数来对数据进行分区,之后在输入到后续任务执行进程.Hadoop中自带了一个默认的分区类HashPartitioner,它继承了Partitioner类,提供了一个getPartition的方法,它的定义如下所

【Hadoop】Hadoop MR 自定义分组 Partition机制

1.概念 2.Hadoop默认分组机制--所有的Key分到一个组,一个Reduce任务处理 3.代码示例 FlowBean package com.ares.hadoop.mr.flowgroup; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.WritableComparable; public class FlowBean