第八篇:经典案例 - 排序

前言

在计算机领域,排序的重要性不用多说。而排序的算法,效率分析等也一直是研究的热点。

本文将给出使用Hadoop分布式方案进行排序的例子,这能极大提高排序的速度,是需要重点掌握的一个案例。

需求

对输入文件中的数据进行排序。

输入文件中的每行内容都是一个数字,要求在输出文件中每行有两个数字,第一个数字代表位次,第二个数字为原始数据。

比如文件1包含以下数据:

1

3

5

2

4

6

文件2包含以下数据:

2

4

6

3

1

5

那么输出文件应当为:

1  1

2  1

3  2

4  2

...

方案制定

表面上看,这是一个非常简单的例子 - Hadoop中存放的键值对本身就是有序的,直接将输入存放进来然后再取出来就完成排序了。

但事实上,直接这样做行不通。为何?因为默认的排序过程是在单个的节点上完成的。也就是说,每个reduce节点收到键值对是在该节点局部有序,而不是在所有reduce节点里全局有序。

解决之道是重写Partition方法,请仔细阅读以下内容:

在shuffle阶段之后(或者说是shuffle最后),将根据map中间输出键值对中的key值来决定将此键值对划分给哪个Partition区间,或者说哪个reduce节点。

可以根据数据的最大最小值将数据划分为多个区间,这样,每个reduce节点就能获取到某个数据段的完整的数据,而且根据hadoop特性,这些数据在单个的reduce节点之内都是有序存放的。

因此每个reduce节点的任务很简单,输出结果就可以了。

至于说位次,只需要在reduce类中声明一个static变量,让这个static变量在不同的reduce调用之间共享就可以了。

要说明的是这里统计的只是数据在每个reduce节点之内的位次,如果要获得全局位次,则需要再遍历一次所有reduce输出文件。时间复杂度仅为O(n)。

代码实现

  1 package org.apache.hadoop.examples;
  2
  3 import java.io.IOException;
  4
  5 //导入各种Hadoop包
  6 import org.apache.hadoop.conf.Configuration;
  7 import org.apache.hadoop.fs.Path;
  8 import org.apache.hadoop.io.IntWritable;
  9 import org.apache.hadoop.io.Text;
 10 import org.apache.hadoop.mapreduce.Job;
 11 import org.apache.hadoop.mapreduce.Mapper;
 12 import org.apache.hadoop.mapreduce.Partitioner;
 13 import org.apache.hadoop.mapreduce.Reducer;
 14 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 15 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 16 import org.apache.hadoop.util.GenericOptionsParser;
 17
 18 // 主类
 19 public class Sort {
 20
 21     // Mapper类
 22     public static class Map extends Mapper<Object, Text, IntWritable, IntWritable>{
 23
 24         // new一个值为1的IntWritable对象
 25         private static IntWritable data = new IntWritable(1);
 26
 27         // 实现map函数
 28         public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
 29
 30             // 将切分后的value作为中间输出的key,然后value值为1。
 31             String line = value.toString();
 32             data.set(Integer.parseInt(line));
 33             context.write(data, new IntWritable(1));
 34         }
 35     }
 36
 37     // Reducer类
 38     public static class Reduce extends Reducer<IntWritable, IntWritable, IntWritable, IntWritable> {
 39
 40         // new一个值为空的IntWritable对象
 41         private static IntWritable linenum = new IntWritable();
 42
 43         // 实现reduce函数
 44         public void reduce(IntWritable key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
 45
 46             // 写入结果键值对
 47             for (IntWritable val : values) {
 48                 context.write(linenum, key);
 49                 linenum = new IntWritable(linenum.get()+1);
 50             }
 51         }
 52     }
 53
 54     // 重写Partitioner类
 55     public static class Partition extends Partitioner <IntWritable, IntWritable> {
 56
 57         // 重载getPartition方法。下面的三个参数分别为map中间输出的键,值,以及分割区间的个数。
 58         public int getPartition(IntWritable key, IntWritable value, int numPartitions) {
 59
 60             // 依次将键值对分配到各个分割区间
 61             int MaxNumber = 65223;
 62             int bound = MaxNumber/numPartitions + 1;
 63             int keynumber = key.get();
 64
 65             for (int i=0; i<numPartitions; i++) {
 66                 if (keynumber < bound * (i+1) && keynumber >= bound*i) {
 67
 68                     // 返回的 i 就是分配到的区间号
 69                     return i;
 70                 }
 71             }
 72
 73             return -1;
 74         }
 75     }
 76
 77     // 主函数
 78     public static void main(String[] args) throws Exception {
 79
 80         // 获取配置参数
 81         Configuration conf = new Configuration();
 82         String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
 83
 84         // 检查命令语法
 85         if (otherArgs.length != 2) {
 86             System.err.println("Usage: Dedup <in> <out>");
 87             System.exit(2);
 88         }
 89
 90         // 定义作业对象
 91         Job job = new Job(conf, "Sort");
 92         // 注册分布式类
 93         job.setJarByClass(Sort.class);
 94         // 注册Mapper类
 95         job.setMapperClass(Map.class);
 96         // 注册Reducer类
 97         job.setReducerClass(Reduce.class);
 98         // 注册Partition类
 99         job.setPartitionerClass(Partition.class);
100         // 注册输出格式类
101         job.setOutputKeyClass(IntWritable.class);
102         job.setOutputValueClass(IntWritable.class);
103         // 设置输入输出路径
104         FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
105         FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
106
107         // 运行程序
108         System.exit(job.waitForCompletion(true) ? 0 : 1);
109     }
110 }

运行结果

输入文件1,2分别为:

小结

1. 掌握Partitioner方法的重写技巧,这是本程序最核心的部分。

2. 熟悉hadoop的key默认有序的性质。

3. 本文采取的是伪分布式,故只有1个reduce节点,体现不出hadoop的优越性。当对海量数据进行排序的时候,它的速度价值才能真正体现出来。

时间: 2024-11-03 05:39:52

第八篇:经典案例 - 排序的相关文章

秒杀多线程第八篇 经典线程同步 信号量Semaphore

版权声明:本文为博主原创文章,未经博主允许不得转载. 阅读本篇之前推荐阅读以下姊妹篇: <秒杀多线程第四篇一个经典的多线程同步问题> <秒杀多线程第五篇经典线程同步关键段CS> <秒杀多线程第六篇经典线程同步事件Event> <秒杀多线程第七篇经典线程同步互斥量Mutex> 前面介绍了关键段CS.事件Event.互斥量Mutex在经典线程同步问题中的使用.本篇介绍用信号量Semaphore来解决这个问题. 首先也来看看如何使用信号量,信号量Semaphore

转---秒杀多线程第八篇 经典线程同步 信号量Semaphore

阅读本篇之前推荐阅读以下姊妹篇: <秒杀多线程第四篇一个经典的多线程同步问题> <秒杀多线程第五篇经典线程同步关键段CS> <秒杀多线程第六篇经典线程同步事件Event> <秒杀多线程第七篇经典线程同步互斥量Mutex> 前面介绍了关键段CS.事件Event.互斥量Mutex在经典线程同步问题中的使用.本篇介绍用信号量Semaphore来解决这个问题. 首先也来看看如何使用信号量,信号量Semaphore常用有三个函数,使用很方便.下面是这几个函数的原型和使

经典案例 - 排序

前言 在计算机领域,排序的重要性不用多说.而排序的算法,效率分析等也一直是研究的热点. 本文将给出使用Hadoop分布式方案进行排序的例子,这能极大提高排序的速度,是需要重点掌握的一个案例. 需求 对输入文件中的数据进行排序. 输入文件中的每行内容都是一个数字,要求在输出文件中每行有两个数字,第一个数字代表位次,第二个数字为原始数据. 比如文件1包含以下数据: 1 3 5 2 4 6 文件2包含以下数据: 2 4 6 3 1 5 那么输出文件应当为: 1 1 2 1 3 2 4 2 ... 方案

秒杀多线程第八篇 经典线程同步 信号量Semaphore (续)

java semaphore实现: Semaphore当前在多线程环境下被扩放使用,操作系统的信号量是个很重要的概念,在进程控制方面都有应用.Java 并发库 的Semaphore 可以很轻松完成信号量控制,Semaphore可以控制某个资源可被同时访问的个数,通过 acquire() 获取一个许可,如果没有就等待,而 release() 释放一个许可.比如在Windows下可以设置共享文件的最大客户端访问个数. package com.multithread.semaphore; import

秒杀多线程第十六篇 多线程十大经典案例之一 双线程读写队列数据

版权声明:本文为博主原创文章,未经博主允许不得转载. 目录(?)[+] 本文配套程序下载地址为:http://download.csdn.net/detail/morewindows/5136035 转载请标明出处,原文地址:http://blog.csdn.net/morewindows/article/details/8646902 欢迎关注微博:http://weibo.com/MoreWindows 在<秒杀多线程系列>的前十五篇中介绍多线程的相关概念,多线程同步互斥问题<秒杀多

多线程十大经典案例之一 双线程读写队列数据

本文配套程序下载地址为:http://download.csdn.net/detail/morewindows/5136035 转载请标明出处,原文地址:http://blog.csdn.net/morewindows/article/details/8646902 欢迎关注微博:http://weibo.com/MoreWindows 在<秒杀多线程系列>的前十五篇中介绍多线程的相关概念,多线程同步互斥问题<秒杀多线程第四篇一个经典的多线程同步问题>及解决多线程同步互斥的常用方法

秒杀多线程第九篇 经典线程同步总结 关键段 事件 互斥量 信号量

版权声明:本文为博主原创文章,未经博主允许不得转载. 前面<秒杀多线程第四篇一个经典的多线程同步问题>提出了一个经典的多线程同步互斥问题,这个问题包括了主线程与子线程的同步,子线程间的互斥,是一道非常经典的多线程同步互斥问题范例,后面分别用了四篇 <秒杀多线程第五篇经典线程同步关键段CS> <秒杀多线程第六篇经典线程同步事件Event> <秒杀多线程第七篇经典线程同步互斥量Mutex> <秒杀多线程第八篇经典线程同步信号量Semaphore> 来

转----秒杀多线程第九篇 经典线程同步总结 关键段 事件 互斥量 信号量

前面<秒杀多线程第四篇一个经典的多线程同步问题>提出了一个经典的多线程同步互斥问题,这个问题包括了主线程与子线程的同步,子线程间的互斥,是一道非常经典的多线程同步互斥问题范例,后面分别用了四篇 <秒杀多线程第五篇经典线程同步关键段CS> <秒杀多线程第六篇经典线程同步事件Event> <秒杀多线程第七篇经典线程同步互斥量Mutex> <秒杀多线程第八篇经典线程同步信号量Semaphore> 来详细介绍常用的线程同步互斥机制——关键段.事件.互斥量

转--- 秒杀多线程第六篇 经典线程同步 事件Event

阅读本篇之前推荐阅读以下姊妹篇: <秒杀多线程第四篇 一个经典的多线程同步问题> <秒杀多线程第五篇 经典线程同步关键段CS> 上一篇中使用关键段来解决经典的多线程同步互斥问题,由于关键段的“线程所有权”特性所以关键段只能用于线程的互斥而不能用于同步.本篇介绍用事件Event来尝试解决这个线程同步问题. 首先介绍下如何使用事件.事件Event实际上是个内核对象,它的使用非常方便.下面列出一些常用的函数. 第一个 CreateEvent 函数功能:创建事件 函数原型: HANDLEC