Sample SecondarySort 浅析

示例文件:

100 99
100 98
100 56
100 78
20 100
30 100
20 50
30 50
30 60
20 80

需求:首先按第一个数字分组,组成按第二个数字排序。

解决方案:

首先,第一个数字相同的情况下,应该分到同一个reduce去处理,这就需要重写了Partitioner

因为默认的HashPartitioner会根据key值的hash值进行分配reduce task,但这里我们的key类型是自定义的intPair,

所以需要特别处理一下,根据第一个值进行分配reduce task即可。

默认的排序是根据key值排序的,这不需要特别处理。

另外,如何实现分组呢?即第一个数字相同,则第二个数字就在reduce的value 迭代器里面。

默认的情况下,如果key相同,value自然会被汇总到一起,但现在我们使用的技巧就是让key值不同的情况下,

我们也让它们的value汇总到一起。

关键代码是下面:

job.setGroupingComparatorClass(FirstGroupingComparator.class);

这个函数设定了按什么进行分组,进一步查看源码:

conf.setOutputValueGroupingComparator(cls);

相关说明如下:

* <p>For key-value pairs (K1,V1) and (K2,V2), the values (V1, V2) are passed
  * in a single call to the reduce function if K1 and K2 compare as equal.</p>
  *
  * <p>Since {@link #setOutputKeyComparatorClass(Class)} can be used to control
  * how keys are sorted, this can be used in conjunction to simulate
  * <i>secondary sort on values</i>.</p>

结果:

------------------------------------------------
20    50
20    80
20    100
------------------------------------------------
30    50
30    60
30    100
------------------------------------------------
100    56
100    78
100    98
100    99

这个示例体现了hadoop里面最核心的一些东西,一个是writable,一个是RawComparator.

前者体现了hadoop进行序列化的方式,后者体现了hadoop排序的比较机制。

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.examples;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.util.GenericOptionsParser;

/**
 * This is an example Hadoop Map/Reduce application.
 * It reads the text input files that must contain two integers per a line.
 * The output is sorted by the first and second number and grouped on the
 * first number.
 *
 * To run: bin/hadoop jar build/hadoop-examples.jar secondarysort
 *            <i>in-dir</i> <i>out-dir</i>
 */
public class SecondarySort {

  /**
   * Define a pair of integers that are writable.
   * They are serialized in a byte comparable format.
   */
  public static class IntPair
                      implements WritableComparable<IntPair> {
    private int first = 0;
    private int second = 0;

    /**
     * Set the left and right values.
     */
    public void set(int left, int right) {
      first = left;
      second = right;
    }
    public int getFirst() {
      return first;
    }
    public int getSecond() {
      return second;
    }
    /**
     * Read the two integers.
     * Encoded as: MIN_VALUE -> 0, 0 -> -MIN_VALUE, MAX_VALUE-> -1
     */
    @Override
    public void readFields(DataInput in) throws IOException {
      first = in.readInt() + Integer.MIN_VALUE;
      second = in.readInt() + Integer.MIN_VALUE;
    }
    @Override
    public void write(DataOutput out) throws IOException {
      out.writeInt(first - Integer.MIN_VALUE);
      out.writeInt(second - Integer.MIN_VALUE);
    }
    @Override
    public int hashCode() {
      return first * 157 + second;// why multiply 157?
    }
    @Override
    public boolean equals(Object right) {
      if (right instanceof IntPair) {
        IntPair r = (IntPair) right;
        return r.first == first && r.second == second;
      } else {
        return false;
      }
    }
    /** A Comparator that compares serialized IntPair. */
    public static class Comparator extends WritableComparator {
      public Comparator() {
        super(IntPair.class);
      }

      public int compare(byte[] b1, int s1, int l1,
                         byte[] b2, int s2, int l2) {
        return compareBytes(b1, s1, l1, b2, s2, l2);
      }
    }

    static {                                        // register this comparator
      WritableComparator.define(IntPair.class, new Comparator());
    }

    @Override
    public int compareTo(IntPair o) {
      if (first != o.first) {
        return first < o.first ? -1 : 1;
      } else if (second != o.second) {
        return second < o.second ? -1 : 1;
      } else {
        return 0;
      }
    }
  }

  /**
   * Partition based on the first part of the pair.
   */
  public static class FirstPartitioner extends Partitioner<IntPair,IntWritable>{
    @Override
    public int getPartition(IntPair key, IntWritable value,
                            int numPartitions) {
      return Math.abs(key.getFirst() * 127) % numPartitions;
    }
  }

  /**
   * Compare only the first part of the pair, so that reduce is called once
   * for each value of the first part.
   */
  public static class FirstGroupingComparator
                implements RawComparator<IntPair> {
       @Override
        public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
          return WritableComparator.compareBytes(b1, s1, Integer.SIZE/8,
                                                 b2, s2, Integer.SIZE/8);
        }

    @Override
    public int compare(IntPair o1, IntPair o2) {
      int l = o1.getFirst();
      int r = o2.getFirst();
      return l == r ? 0 : (l < r ? -1 : 1);
    }

  }

  /**
   * Read two integers from each line and generate a key, value pair
   * as ((left, right), right).
   */
  public static class MapClass
         extends Mapper<LongWritable, Text, IntPair, IntWritable> {

    private final IntPair key = new IntPair();
    private final IntWritable value = new IntWritable();

    @Override
    public void map(LongWritable inKey, Text inValue,
                    Context context) throws IOException, InterruptedException {
      StringTokenizer itr = new StringTokenizer(inValue.toString());
      int left = 0;
      int right = 0;
      if (itr.hasMoreTokens()) {
        left = Integer.parseInt(itr.nextToken());
        if (itr.hasMoreTokens()) {
          right = Integer.parseInt(itr.nextToken());
        }
        key.set(left, right);
        value.set(right);
        context.write(key, value);
      }
    }
  }

  /**
   * A reducer class that just emits the sum of the input values.
   */
  public static class Reduce
         extends Reducer<IntPair, IntWritable, Text, IntWritable> {
    private static final Text SEPARATOR =
      new Text("------------------------------------------------");
    private final Text first = new Text();

    @Override
    public void reduce(IntPair key, Iterable<IntWritable> values,
                       Context context
                       ) throws IOException, InterruptedException {
      context.write(SEPARATOR, null);
      first.set(Integer.toString(key.getFirst()));
      for(IntWritable value: values) {
        context.write(first, value);
      }
    }
  }

  public static void main(String[] args) throws Exception {
     args = "-Dio.sort.mb=10 hdfs://namenode:9000/user/hadoop/test/intpair.txt hdfs://namenode:9000/user/hadoop/secsortout".split(" ");

    Configuration conf = new Configuration();
    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
    if (otherArgs.length != 2) {
      System.err.println("Usage: secondarysrot <in> <out>");
      System.exit(2);
    }

    Job job = new Job(conf, "secondary sort");
    job.setJarByClass(SecondarySort.class);
    job.setMapperClass(MapClass.class);
    job.setReducerClass(Reduce.class);

    // group and partition by the first int in the pair
    job.setPartitionerClass(FirstPartitioner.class);
    job.setGroupingComparatorClass(FirstGroupingComparator.class);

    // the map output is IntPair, IntWritable
    job.setMapOutputKeyClass(IntPair.class);
    job.setMapOutputValueClass(IntWritable.class);

    // the reduce output is Text, IntWritable
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);

    FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
    FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
    myUtils.myUtils.DeleteFolder(conf, otherArgs[1]);

    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }

}
时间: 2024-10-12 21:25:33

Sample SecondarySort 浅析的相关文章

【转载】DXUT11框架浅析(4)--调试相关

原文:DXUT11框架浅析(4)--调试相关 DXUT11框架浅析(4)--调试相关 1. D3D8/9和D3D10/11的调试区别 只要安装了DXSDK,有个调试工具DirectX ControlPanel,如下图所示.这里可以将Direct3D 9设置为调试运行时(Debug D3D9 Runtime)或零售运行时(RetailD3D9 Runtime).注意这里的设置是全局的,如果改成调试运行时,则所有用到D3D9的程序都会进入调试模式,这会使这些程序运行的很慢. 从Vista开始系统自己

【Spark】Stage生成和Stage源码浅析

引入 上一篇文章<DAGScheduler源码浅析>中,介绍了handleJobSubmitted函数,它作为生成finalStage的重要函数存在,这一篇文章中,我将就DAGScheduler生成Stage过程继续学习,同时介绍Stage的相关源码. Stage生成 Stage的调度是由DAGScheduler完成的.由RDD的有向无环图DAG切分出了Stage的有向无环图DAG.Stage的DAG通过最后执行的Stage为根进行广度优先遍历,遍历到最开始执行的Stage执行,如果提交的St

合并果子问题浅析

这是04年NOIP普及组的一道题,题目如下: 题目描述 Description 在一个果园里,多多已经将所有的果子打了下来,而且按果子的不同种类分成了不同的堆.多多决定把所有的果子合成一堆. 每一次合并,多多可以把两堆果子合并到一起,消耗的体力等于两堆果子的重量之和.可以看出,所有的果子经过n-1次合并之后,就只剩下一堆了.多多在合并果子时总共消耗的体力等于每次合并所耗体力之和. 因为还要花大力气把这些果子搬回家,所以多多在合并果子时要尽可能地节省体力.假定每个果子重量都为1,并且已知果子的种类

【Spark】Stage生成和Stage源代码浅析

引入 上一篇文章<DAGScheduler源代码浅析>中,介绍了handleJobSubmitted函数,它作为生成finalStage的重要函数存在.这一篇文章中,我将就DAGScheduler生成Stage过程继续学习,同一时候介绍Stage的相关源代码. Stage生成 Stage的调度是由DAGScheduler完毕的.由RDD的有向无环图DAG切分出了Stage的有向无环图DAG.Stage的DAG通过最后运行的Stage为根进行广度优先遍历,遍历到最開始运行的Stage运行.假设提

Android应用Loaders全面详解及源码浅析

1 背景 在Android中任何耗时的操作都不能放在UI主线程中,所以耗时的操作都需要使用异步实现.同样的,在ContentProvider中也可能存在耗时操作,这时也该使用异步操作,而3.0之后最推荐的异步操作就是Loader.它可以方便我们在Activity和Fragment中异步加载数据,而不是用线程或AsyncTask,他的优点如下: 提供异步加载数据机制: 对数据源变化进行监听,实时更新数据: 在Activity配置发生变化(如横竖屏切换)时不用重复加载数据: 适用于任何Activit

【浅析】IMU代码

IMU的代码的引自https://storage.googleapis.com/google-code-archive-downloads/v2/code.google.com/imumargalgorithm30042010sohm/IMU.zip 1 //===================================================================================================== 2 // IMU.c 3 // S.

浅析NRF51822之GPIOTE

GPIOTE handling library Note This library is obsolete and should not be used in new designs. Instead, you should use GPIOTE driver. The general purpose Input/Output is organized in the nRF51 Series chips as one port with up to 32 I/Os (dependent on p

Python之encode与decode浅析

 Python之encode与decode浅析 在 python 源代码文件中,如果你有用到非ASCII字符,则需要在文件头部进行字符编码的声明,声明如下: # code: UTF-8 因为python 只检查 #.coding 和编码字符串,为了美观等原因可以如下写法: #-*-coding:utf-8-*- 常见编码介绍: GB2312编码:适用于汉字处理.汉字通信等系统之间的信息交换. GBK编码:是汉字编码标准之一,是在 GB2312-80 标准基础上的内码扩展规范,使用了双字节编码.

浅析PHP的开源产品二次开发的基本要求

浅析PHP的开源产品二次开发的基本要求 第一, 基本要求:HTML(必须要非常熟悉),PHP(能看懂代码,能写一些小系统,如:留言板,小型CMS),Mysql(至少会一种数据库),Javascript(能看懂,能改现成的一些代码),Div+Css(能进行界面的调整,明白CSS是怎么使用的) 第二, 熟悉开源产品的使用,比如 Dedecms,你要知道怎么登录,怎么新建栏目,怎么添加文章,模板标签的使用方法,模型的概念和使用方法等等一些功能 第三, 要熟悉这个开源产品的数据库结构,还要理解里面核心文