1. map任务处理
1.3 对输出的key、value进行分区。
分区的目的指的是把相同分类的<k,v>交给同一个reducer任务处理。
public static class MyPartitioner<Text, LongWritable> extends Partitioner<Text, LongWritable>{ static HashMap<String,Integer> map = null; static{ map = new HashMap<String,Integer>(); map.put("gz1", 0); map.put("gz2", 0); map.put("sz1", 1); map.put("sz2", 1); } /** * 这里是对mapper任务输出的<k2,v2>进行操作 * getPartition函数返回多少的值,就会有多少个reducer任务 * * “gz1”与“gz2”的返回的都是0,所以与分发到同一个reducer任务上,但是k2的值不一样 * 所以分组就是 * <gz1,123> * <gz2,234> * 然后出现在不同reduce函数上 */ @Override public int getPartition(Text key, LongWritable value, int numPartitions) { return (Integer)map.get(key.toString()).intValue(); } } //设置分区 wcjob.setPartitionerClass(MyPartitioner.class);
自定义排序,排序是根据k2来进行排序的,k2就需要自己进行自定义类型
private static class MyNewKey implements WritableComparable<MyNewKey> { long firstNum; long secondNum; public MyNewKey() { } public MyNewKey(long first, long second) { firstNum = first; secondNum = second; } @Override public void write(DataOutput out) throws IOException { out.writeLong(firstNum); out.writeLong(secondNum); } @Override public void readFields(DataInput in) throws IOException { firstNum = in.readLong(); secondNum = in.readLong(); } /* * 当key进行排序时会调用以下这个compreTo方法 */ @Override public int compareTo(MyNewKey anotherKey) { long min = firstNum - anotherKey.firstNum; if (min != 0) { // 说明第一列不相等,则返回两数之间小的数 return (int) min; } else { return (int) (secondNum - anotherKey.secondNum); } } }
自定义分组
为了针对新的key类型作分组,我们也需要自定义一下分组规则:
(1)编写一个新的分组比较类型用于我们的分组:
private static class MyGroupingComparator implements RawComparator<MyNewKey> { /* * 基本分组规则:按第一列firstNum进行分组 */ @Override public int compare(MyNewKey key1, MyNewKey key2) { return (int) (key1.firstNum - key2.firstNum); } /* * @param b1 表示第一个参与比较的字节数组 * * @param s1 表示第一个参与比较的字节数组的起始位置 * * @param l1 表示第一个参与比较的字节数组的偏移量 * * @param b2 表示第二个参与比较的字节数组 * * @param s2 表示第二个参与比较的字节数组的起始位置 * * @param l2 表示第二个参与比较的字节数组的偏移量 */ @Override public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { return WritableComparator.compareBytes(b1, s1, 8, b2, s2, 8); } }
从代码中我们可以知道,我们自定义了一个分组比较器MyGroupingComparator,该类实现了RawComparator接口,而RawComparator接口又实现了Comparator接口,下面看看这两个接口的定义:
首先是RawComparator接口的定义:
public interface RawComparator<T> extends Comparator<T> { public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2); }
其次是Comparator接口的定义:
public interface Comparator<T> { int compare(T o1, T o2); boolean equals(Object obj); }
在MyGroupingComparator中分别对这两个接口中的定义进行了实现,RawComparator中的compare()方法是基于字节的比较,Comparator中的compare()方法是基于对象的比较。
在基于字节的比较方法中,有六个参数,一下子眼花了:
Params:
* @param arg0 表示第一个参与比较的字节数组
* @param arg1 表示第一个参与比较的字节数组的起始位置
* @param arg2 表示第一个参与比较的字节数组的偏移量
*
* @param arg3 表示第二个参与比较的字节数组
* @param arg4 表示第二个参与比较的字节数组的起始位置
* @param arg5 表示第二个参与比较的字节数组的偏移量
由于在MyNewKey中有两个long类型,每个long类型又占8个字节。这里因为比较的是第一列数字,所以读取的偏移量为8字节。
(2)添加对分组规则的设置:
// 设置自定义分组规则 job.setGroupingComparatorClass(MyGroupingComparator.class);
时间: 2024-10-08 10:04:22