mapreduce的二次排序实现方式

本文主要介绍下二次排序的实现方式

我们知道mapreduce是按照key来进行排序的,那么如果有有个需求就是先按照第一个字段排序,在第一个字段相等的情况下,按照第二个字段排序,这个其实就是二次排序。

下面就具体说一下二次排序的实现方式

1. 自定义一个key

为什么要自定义一个key,我们知道mapreduce中排序就是按照key来排序的,我们既然想要实现按照两个字段进行排序,默认的方式肯定不行,所以需要定义一个新的key,key里面有两个属性,也就是我们要排序的两个字段

首先,实现WritableComparable接口,因为key是可序列化并且可以比较的

其次,重载相关的方法,例如序列化、反序列化相关的方法write、readFields。重载在分区的时候要用到的hashcode方法,注意后面会说道一个partitioner类,也是用来分区的,用hashcode方法和partitioner类进行分区都是可以的,使用其中的一个即可。重载排序用的compareTo方法,这个就是真正对排序起作用的方法。

2. 分区函数类

上面定义了一个新的key,那么我现在做分发,到底按照什么样的规则进行分发是在分区函数类中定义的,这个类要继承Partitioner类,重载其中的分区方法getPartition,在main函数里给job添加上即可,例如:job.setPartitionerClass(partitioner.class)

这个类的作用跟key的hashcode方法的作用一样,所以如果在hashcode方法中写了分区的方法,这个分区类是可以省掉的

3. 比较函数类

这个类决定着key的排序规则,是一个比较器,需要继承WritableComparator类,并且重载其中的compare方法。在main函数里给job添加上即可,例如:job.setSortComparatorClass(KeyComparator.class)

这个类的作用跟自定义key的compareTo方法一样,如果在自定义的key中重载的compareTo方法,则这个类可省略。

4. 分组函数类

通过分区类,我们重新定义了key的分区规则,但是多个key不同的也可以进入到一个reducer中,所以我们需要分组函数类来定义什么样的key做为一组来执行,因为也涉及到比较,所以这个类也需要继承WritableComparator,并且重载其中的compare方法,在main函数中加入即可,例如:job.setPartitionerClass(partitioner.class);

下面是具体实现的代码

public class SecondSortTest {

	private static String input = "/dsap/rawdata/secondSortTest/result3";
	private static String output = "/dsap/rawdata/secondSortTest/result6";

	public static class Mapper1 extends Mapper<Object, Text, Pair, Text> {

		private Pair pair = new Pair();

		public void map(Object key, Text value, Context context)
				throws IOException, InterruptedException {

			String[] segs = value.toString().split("\\s+");

			pair.set(Float.parseFloat(segs[0]), Float.parseFloat(segs[1]));
			context.write(pair, new Text(segs[1]));
		}
	}

	public static class Reducer2 extends Reducer<Pair, Text, Text, Text> {

		public void reduce(Pair key, Iterable<Text> values, Context context)
				throws IOException, InterruptedException {

			context.write(new Text(key.toString()), new Text("==========="));
			for (Text text : values) {
				context.write(new Text(key.toString()), text);
			}
		}
	}

	public static void main(String[] args) throws Exception {

		Configuration conf = new Configuration();
		/** 判断输出路径是否存在,如果存在,则删除 */
		FileSystem hdfs = FileSystem.get(conf);
		Job job = new Job(conf, "secondSortTest");
		job.setJarByClass(SecondSortTest.class);

		FileInputFormat.addInputPath(job, new Path(input));
		if (hdfs.exists(new Path(output)))
			hdfs.delete(new Path(output));
		FileOutputFormat.setOutputPath(job, new Path(output));

		job.setGroupingComparatorClass(GroupingComparator.class);

		job.setNumReduceTasks(19);
		job.setMapperClass(Mapper1.class);
		job.setReducerClass(Reducer2.class);
		job.setMapOutputKeyClass(Pair.class);
		job.setMapOutputValueClass(Text.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(Text.class);
		job.waitForCompletion(true);

	}

	public static class partitioner extends Partitioner<Pair, Text> {

		@Override
		public int getPartition(Pair key, Text value, int numPartitions) {
			return Math.abs((int) (key.getFirst() * 127)) % numPartitions;
		}
	}

	static class Pair implements WritableComparable<Pair> {

		private float first;
		private float second = 0;

		@Override
		public void readFields(DataInput in) throws IOException {
			first = in.readFloat();
			second = in.readFloat();
		}

		@Override
		public void write(DataOutput out) throws IOException {
			out.writeFloat(first);
			out.writeFloat(second);
		}

		@Override
		public int hashCode() {
			return (int) (first * 127);
		}

		// 这里的代码是关键,因为对key排序时,调用的就是这个compareTo方法
		@Override
		public int compareTo(Pair o) {
			if (first != o.first) {
				return first - o.first > 0 ? 1 : -1;
			} else if (second != o.second) {
				return second - o.second > 0 ? 1 : -1;
			}
			return 0;
		}

		public void set(float left, float right) {
			first = left;
			second = right;
		}

		public float getFirst() {
			return first;
		}

		public float getSecond() {
			return second;
		}

		@Override
		public String toString() {
			return "Pair [first=" + first + ", second=" + second + "]";
		}
	}

	static class GroupingComparator implements RawComparator<Pair> {

		@Override
		public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
			return WritableComparator.compareBytes(b1, s1, Integer.SIZE / 8,
					b2, s2, Integer.SIZE / 8);
		}

		@Override
		public int compare(Pair o1, Pair o2) {
			float first1 = o1.getFirst();
			float first2 = o2.getFirst();
			return first1 - first2 > 0 ? 1 : -1;
		}
	}
}

上面的代码中注意一点,就是reduce中的key到底是什么,如果我把key直接tostring打印出来,那么这个值是排序排在最前面的那个key,如果我遍历value迭代器,并且在里面将key也打印出来,可以看到,迭代器的value里对应的key也被迭代出来了

参考:http://www.cnblogs.com/xuxm2007/archive/2011/09/03/2165805.html

时间: 2024-10-19 15:56:20

mapreduce的二次排序实现方式的相关文章

mapreduce 的二次排序

一: 理解二次排序的功能, 使用自己理解的方式表达(包括自定义数据类型,分区,分组,排序) 二: 编写实现二次排序功能, 提供源码文件. 三:理解mapreduce join 的几种 方式,编码实现reduce join,提供源代码,说出思路. 一: 二次排序 使用自己理解的方式表达(包括自定义数据类型,分区,分组,排序) 1.1 二次排序的功能 1. 当客户端提交一个作业的时候,hadoop 会开启yarn 接受进行数据拷贝处理,之后交友有yarn 框架上的启动服务resourcemanage

MapReduce处理二次排序(分区-排序-分组)

MapReduce二次排序原理 在map阶段,使用job.setInputFormatClass定义的InputFormat将输入的数据集分割成小数据块splites,同时InputFormat提供一个RecordReader的实现. 本例子中使用的时TextInputFormat,他提供的RecordReader会将文本的字节偏移量作为key,这一行的文本作为value. 这就是自定义Map的输入是<LongWritable,Text>的原因,然后调用自定义的Map的map方法,将一个个&l

Hadoop二次排序及MapReduce处理流程实例详解

一.概述 MapReduce框架对处理结果的输出会根据key值进行默认的排序,这个默认排序可以满足一部分需求,但是也是十分有限的,在我们实际的需求当中,往往有要对reduce输出结果进行二次排序的需求.对于二次排序的实现,网络上已经有很多人分享过了,但是对二次排序的实现原理及整个MapReduce框架的处理流程的分析还是有非常大的出入,而且部分分析是没有经过验证的.本文将通过一个实际的MapReduce二次排序的例子,讲述二次排序的实现和其MapReduce的整个处理流程,并且通过结果和Map.

结合手机上网流量业务来说明Hadoop中的二次排序机制,分区机制

本篇博客将结合手机上网流量业务来详细介绍Hadoop的二次排序机制.分区机制,先介绍一下业务场景: 先介绍一下业务场景:统计每个用户的上行流量和,下行流量和,以及总流量和. 本次描述所用数据: 日志格式描述: 日志flowdata.txt中的具体数据: 首先我们先通过mapreduce程序实现上面的业务逻辑: 代码实现: package FlowSum; import java.io.DataInput; import java.io.DataOutput; import java.io.IOE

MapReduce二次排序

本文主要介绍下二次排序的实现方式 我们知道MapReduce是按照key来进行排序的,那么如果有个需求就是先按照第一个字段排序,在第一个字段相等的情况下,按照第二个字段排序,这就是传说中的二次排序. 下面就具体说一下二次排序的实现方式 主要就是4点 1.自定义一个Key 为什么要自定义一个Key,我们知道MapReduce中排序就是按照Key来排序的,我们既然想要实现按照两个字段进行排序,默认的方式肯定是不行的,所以自定义一个新的Key,Key里面有两个属性,也就是我们要排序的两个字段. 首先,

大数据【四】MapReduce(单词计数;二次排序;计数器;join;分布式缓存)

   前言: 根据前面的几篇博客学习,现在可以进行MapReduce学习了.本篇博客首先阐述了MapReduce的概念及使用原理,其次直接从五个实验中实践学习(单词计数,二次排序,计数器,join,分布式缓存). 一 概述 定义 MapReduce是一种计算模型,简单的说就是将大批量的工作(数据)分解(MAP)执行,然后再将结果合并成最终结果(REDUCE).这样做的好处是可以在任务被分解后,可以通过大量机器进行并行计算,减少整个操作的时间. 适用范围:数据量大,但是数据种类小可以放入内存. 基

Hadoop.2.x_高级应用_二次排序及MapReduce端join

一.对于二次排序案例部分理解 1. 分析需求(首先对第一个字段排序,然后在对第二个字段排序) 杂乱的原始数据 排序完成的数据 a,1 a,1 b,1 a,2 a,2 [排序] a,100 b,6 ===> b,-3 c,2 b,-2 b,-2 b,1 a,100 b,6 b,-3 c,-7 c,-7 c,2 2. 分析[MapRedice过程] 1> 分析数据传入通过input()传入map() 2> map()对数据进行层层过滤,以达到我们想要的数据源, 3> 过滤方法中可添加自

Hadoop Mapreduce分区、分组、二次排序

1.MapReduce中数据流动   (1)最简单的过程:  map - reduce   (2)定制了partitioner以将map的结果送往指定reducer的过程: map - partition - reduce   (3)增加了在本地先进性一次reduce(优化)过程: map - combin(本地reduce) - partition -reduce2.Mapreduce中Partition的概念以及使用.(1)Partition的原理和作用        得到map给的记录后,

MapReduce排序之 二次排序

一:背景 Hadoop中虽然有自动排序和分组,由于自带的排序是按照Key进行排序的,有些时候,我们希望同时对Key和Value进行排序.自带的排序功能就无法满足我们了,还好Hadoop提供了一些组件可以让开发人员进行二次排序. 二:技术实现 我们先来看案例需求 #需求1: 首先按照第一列数字升序排列,当第一列数字相同时,第二列数字也升序排列(列之间用制表符\t隔开) [java] view plain copy 3   3 3   2 3   1 2   2 2   1 1   1 MapRed