MapReduce处理二次排序(分区-排序-分组)

MapReduce二次排序原理

在map阶段,使用job.setInputFormatClass定义的InputFormat将输入的数据集分割成小数据块splites,同时InputFormat提供一个RecordReader的实现。

本例子中使用的时TextInputFormat,他提供的RecordReader会将文本的字节偏移量作为key,这一行的文本作为value。

这就是自定义Map的输入是<LongWritable,Text>的原因,然后调用自定义的Map的map方法,将一个个<LongWritable,Text>对输入的给Map的map方法。

注意输出应该符合自定义Map中定义的输出<IntPair,IntWritable>.最终是生成一个List<IntPair,IntWritable>,在map阶段的最后,会先调用job.setPartitionerClass对这个List进行分区,每个分区映射到一个reducer。每个分区内又调用job.setSortComparatorClass设置的key比较函数类排序。可以看到,这本身就是一个二次排序。如果没有通过job.setSortComparatorClass设置Key比较函数类,则使用key的实现的compareTo方法。

在reduce阶段,reducer接收到所有映射到这个reducer的map输出后,也会调用job.setSortComparatorClass设置的key比较函数类对所有数据对排序,然后开始构造一个key对应的value迭代器,这是就要用到分组,使用job.setGroupingComparatorClass设置的分组函数类。只要这个比较器比较的两个key相同,他们就属于同一个组,他们的value就放在一个value迭代器,而这个迭代器的key使用属于同一个组的所有key的第一个key。最后就是进入Reducer的reduce方法,reduce方法的输入是所有的(key和他的value迭代器)。同样注意输入与输出的类型必须与自定义的reducer中声明的一致。

核心总结

1.map最后阶段进行partition分区。一般使用job.setPartitionerClass设置的类,如果没有自定义的类,用key的hashcode()方法进行排序

2.每个分区内部调用job.setSortComparatorClass设置Key的比较函数类进行排序,如果没有则使用key的实现的compareTo方法。

3.当reduce接收到所有map传输过来的数据之后,调job.setSortComparatorClass设置的key比较函数类对所有数据对排序,如果没有则使用key的实现的compareTo方法

4.紧接着使用job.setGroupingComparatorClass设置的分组函数类,进行分组,同一个key的value放在一个迭代器里面

分区 --->  排序(二次)  --->  分组

分区默认的是key的hashcode()

排序默认的实key的compareTo()

-----------------------------------------

job.setPartitionerClass(Partitioner p); //设置分区。默认分区时hashcode()

job.setSortComparatorClass(RawComparator c);  //比较排序。shuffle阶段map输出之后,reduce之前。默认是key的compareTo()方法

job.setGroupingComparatorClass(RawComparator c); //分组。Reduce阶段

-----------------------------------------

案例

原始数据

2 12:12:34 2_hao123

3 09:10:34 3_baidu

1 15:02:41 1_google

3 22:11:34 3_sougou

1 19:23:23 1_baidu

2 13:56:60 2_soso

分别依据第一列和第二列对数据进行二次排序

1.分区类

package test.mr.seconderysort;

import org.apache.hadoop.io.Text;

/*
 * 分区类
 */
public class Partitioner extends
		org.apache.hadoop.mapreduce.Partitioner<StringPart, Text> {

	@Override
	public int getPartition(StringPart key, Text value, int numPartitions) {
		// TODO Auto-generated method stub
		return Math.abs(key.hashCode()) % numPartitions;
	}

}

2.自定义Map输出的key类,将原始数据要排序的两列作为该JavaBean的属性,实现WritableComparable接口,实现CompareTo()排序方法

Ps:WritableComparatable接口中的CompareTo()方法:在这个方法中,如果返回-1,则当前对象排前面,返回1,就排后面 ,0,就相等。

String类中的CompareTo()方法:

/*

* compareTo()的返回值是整型,它是先比较对应字符的大小(ASCII码顺序),如果第一个字符和参数的第一个字符不等,结束比较,返回他们之间的差值,如果第一个字符和参数的第一个字符相等,则以第二个字符和参数的第二个字符做比较,以此类推,直至比较的字符或被比较的字符有一方全比较完,这时就比较字符的长度.

*

* 例:  String s1 = "abc";

*     String s2 = "abcd";

*     String s3 = "abcdfg";

*     String s4 = "1bcdfg";

*     String s5 = "cdfg";

*     System.out.println( s1.compareTo(s2) );// -1 (前面相等,s1长度小1)

*     System.out.println( s1.compareTo(s3) ); //-3 (前面相等,s1长度小3)

*     System.out.println( s1.compareTo(s4) ); //48("a"的ASCII码是97,"1"的的ASCII码是49,所以返回48)

*     System.out.println( s1.compareTo(s5) ); // -2 ("a"的ASCII码是97,"c"的ASCII码是99,所以返回-2)

*/

package test.mr.seconderysort;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;

/*
 * 自定义key
 */
/*
 *如果想对自己写的类排序,你就把自己写的这个类实现Comparable接口
 *然后写一个comparaTo方法来规定这个类的对象排序的顺序。
 *在这个方法中,如果返回-1,则当前对象排前面,返回1,就排后面 ,0,就相等
 */
public class StringPart implements WritableComparable<StringPart> {
	/*
	 * 两列排序
	 */
	private String first;
	private String second;

	public String getFirst() {
		return first;
	}

	public void setFirst(String first) {
		this.first = first;
	}

	public String getSecond() {
		return second;
	}

	public void setSecond(String second) {
		this.second = second;
	}

	public StringPart() {
		super();
		// TODO Auto-generated constructor stub
	}

	public StringPart(String first, String second) {
		super();
		this.first = first;
		this.second = second;
	}

	@Override
	public void write(DataOutput out) throws IOException {
		out.writeUTF(first);
		out.writeUTF(second);

	}

	@Override
	public void readFields(DataInput in) throws IOException {
		this.first = in.readUTF();
		this.second = in.readUTF();
	}

	/*
	 * 排序
	 */
	/*
	 * compareTo()的返回值是整型,它是先比较对应字符的大小(ASCII码顺序),如果第一个字符和参数的第一个字符不等,结束比较,返回他们之间的
	 *
	 * 差值,如果第一个字符和参数的第一个字符相等,则以第二个字符和参数的第二个字符做比较,以此类推,直至比较的字符或被比较的字符有一方
	 *
	 * 全比较完,这时就比较字符的长度.
	 *
	 * 例:  String s1 = "abc";
	 *     String s2 = "abcd";
	 *     String s3 = "abcdfg";
	 *     String s4 = "1bcdfg";
	 *     String s5 = "cdfg";
	 *     System.out.println( s1.compareTo(s2) );// -1 (前面相等,s1长度小1)
	 *     System.out.println( s1.compareTo(s3) ); //-3 (前面相等,s1长度小3)
	 *     System.out.println( s1.compareTo(s4) ); //48("a"的ASCII码是97,"1"的的ASCII码是49,所以返回48)
	 *     System.out.println( s1.compareTo(s5) ); // -2 ("a"的ASCII码是97,"c"的ASCII码是99,所以返回-2)
	 */
	@Override
	public int compareTo(StringPart o) {
		if (!this.first.equals(o.getFirst())) {
			return first.compareTo(o.getFirst()); // 字符串的compareTo()方法
		} else {
			if (!this.second.equals(o.getSecond())) {
				return second.compareTo(o.getSecond());
			} else {
				return 0;
			}
		}
	}

	@Override
	public int hashCode() {
		final int prime = 31;
		int result = 1;
		result = prime * result + ((first == null) ? 0 : first.hashCode());
		result = prime * result + ((second == null) ? 0 : second.hashCode());
		return result;
	}

	@Override
	public boolean equals(Object obj) {
		if (this == obj)
			return true;
		if (obj == null)
			return false;
		if (getClass() != obj.getClass())
			return false;
		StringPart other = (StringPart) obj;
		if (first == null) {
			if (other.first != null)
				return false;
		} else if (!first.equals(other.first))
			return false;
		if (second == null) {
			if (other.second != null)
				return false;
		} else if (!second.equals(other.second))
			return false;
		return true;
	}

}

3.分组类(根据原始数据的第一列进行分组)

package test.mr.seconderysort;

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

/*
 * 实现分组
 */
public class Grouping extends WritableComparator {

	protected Grouping() {
		super(StringPart.class, true);
	}

	@Override
	public int compare(WritableComparable a, WritableComparable b) {
		StringPart a1 = (StringPart) a;
		StringPart b1 = (StringPart) b;
		// 只需要比较a1,b1的first字段即认为他们是否属于同组
		return a1.getFirst().compareTo(b1.getFirst());
	}

}

4.Map类

package test.mr.seconderysort;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class SeconderyMap extends Mapper<LongWritable, Text, StringPart, Text> {
	@Override
	protected void map(LongWritable key, Text value,
			Mapper<LongWritable, Text, StringPart, Text>.Context context)
			throws IOException, InterruptedException {
		String line = value.toString();
		String[] str = line.split("\t");
		if (str.length == 3) {
			StringPart temp = new StringPart(str[0], str[1]);
			context.write(temp, value);
		}
	}

}

5.Reduce类

package test.mr.seconderysort;

import java.io.IOException;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class SeconderyRedu extends
		Reducer<StringPart, Text, NullWritable, Text> {

	private static Text part = new Text("------------");

	@Override
	protected void reduce(StringPart key, Iterable<Text> values,
			Reducer<StringPart, Text, NullWritable, Text>.Context context)
			throws IOException, InterruptedException {
		context.write(NullWritable.get(), part);
		for (Text t : values) {
			context.write(NullWritable.get(), t);
		}
	}
}

6.job类

package test.mr.seconderysort;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class SeconderyMain {
	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		Job job = new Job(conf);
		job.setJarByClass(SeconderyMain.class);

		job.setGroupingComparatorClass(Grouping.class);
		job.setPartitionerClass(Partitioner.class);

		job.setMapperClass(SeconderyMap.class);
		job.setMapOutputKeyClass(StringPart.class);
		job.setMapOutputValueClass(Text.class);

		job.setReducerClass(SeconderyRedu.class);
		job.setOutputKeyClass(NullWritable.class);
		job.setOutputValueClass(Text.class);

		FileInputFormat.addInputPath(job, new Path(args[0]));
		FileOutputFormat.setOutputPath(job, new Path(args[1]));
		job.waitForCompletion(true);
	}
}
时间: 2024-10-10 08:07:12

MapReduce处理二次排序(分区-排序-分组)的相关文章

Hadoop Mapreduce分区、分组、二次排序

1.MapReduce中数据流动   (1)最简单的过程:  map - reduce   (2)定制了partitioner以将map的结果送往指定reducer的过程: map - partition - reduce   (3)增加了在本地先进性一次reduce(优化)过程: map - combin(本地reduce) - partition -reduce2.Mapreduce中Partition的概念以及使用.(1)Partition的原理和作用        得到map给的记录后,

mapreduce的二次排序实现方式

本文主要介绍下二次排序的实现方式 我们知道mapreduce是按照key来进行排序的,那么如果有有个需求就是先按照第一个字段排序,在第一个字段相等的情况下,按照第二个字段排序,这个其实就是二次排序. 下面就具体说一下二次排序的实现方式 1. 自定义一个key 为什么要自定义一个key,我们知道mapreduce中排序就是按照key来排序的,我们既然想要实现按照两个字段进行排序,默认的方式肯定不行,所以需要定义一个新的key,key里面有两个属性,也就是我们要排序的两个字段 首先,实现Writab

mapreduce 的二次排序

一: 理解二次排序的功能, 使用自己理解的方式表达(包括自定义数据类型,分区,分组,排序) 二: 编写实现二次排序功能, 提供源码文件. 三:理解mapreduce join 的几种 方式,编码实现reduce join,提供源代码,说出思路. 一: 二次排序 使用自己理解的方式表达(包括自定义数据类型,分区,分组,排序) 1.1 二次排序的功能 1. 当客户端提交一个作业的时候,hadoop 会开启yarn 接受进行数据拷贝处理,之后交友有yarn 框架上的启动服务resourcemanage

mapreduce原理【分区,分组】

分析这个原理,的原因是: 1.更好的理解MAPREDUCE的过程. 2.在二次排序时会用到这个原理,二次排序要重写分区方法,重写分组方法:如果原理没搞明白,就无法写二次排序的代码. Key 默认分区 默认分组 自定义分区 自定义分组 Abc123 1.使用系统默认分区方式,是按KEY进行分区. 2.KEY相同,分划分到一个分区且只能划分到一个分区. 划分方式按KEY的HASHCODE进行计算. 3.假设设定为3个分区,则划分方式可能是 a) 分区1:Abc789,Cde123,Cde456 b)

排序算法分析【二】:希尔排序(附Python&amp;C++代码)

希尔排序 也称递减增量排序算法,是插入排序的一种更高效的改进版本.希尔排序是非稳定排序算法. 希尔排序是基于插入排序的以下两点性质而提出改进方法的: 1.插入排序在对几乎已经排好序的数据操作时, 效率高, 即可以达到线性排序的效率: 2.插入排序一般来说是低效的, 因为插入排序每次只能将数据移动一位: 算法原理 基础是插入排序,我们先看图,图片动态展示: 这个图太快了还是不知道他在干嘛,在给一张图(图片均来自互联网): 步长选择 希尔排序中步长的选择是重中之重![来自于维基百科] 1.最终步长必

Oracle学习(二):过滤和排序

1.知识点:可以对照下面的录屏进行阅读 SQL> --字符串大小写敏感 SQL> --查询名叫KING的员工信息 SQL> select * 2 from emp 3 where ename = 'KING'; SQL> --日期格式敏感 SQL> --查询入职日期为17-11月-81的员工 SQL> select * 2 from emp 3 where hiredate='17-11月-81'; --正确例子 SQL> ed 已写入 file afiedt.b

php对二维数据进行排序

PHP一维数组的排序可以用sort(),asort(),arsort()等函数,但是PHP二维数组的排序需要自定义. 方法一:(经验证,成功) 作用:对二维数组进行指定key排序 参数:$arr 二维数组 ,$shortKey 需要排序的列,$short 排序方式 $shortType 排序类型 function multi_array_sort($arr,$shortKey,$short=SORT_DESC,$shortType=SORT_REGULAR) { foreach ($arr as

MapReduce教程(二)MapReduce框架Partitioner分区&lt;转&gt;

1 Partitioner分区 1.1 Partitioner分区描述 在进行MapReduce计算时,有时候需要把最终的输出数据分到不同的文件中,按照手机号码段划分的话,需要把同一手机号码段的数据放到一个文件中:按照省份划分的话,需要把同一省份的数据放到一个文件中:按照性别划分的话,需要把同一性别的数据放到一个文件中.我们知道最终的输出数据是来自于Reducer任务.那么,如果要得到多个文件,意味着有同样数量的Reducer任务在运行.Reducer任务的数据来自于Mapper任务,也就说Ma

二叉查找(排序)树的分析与实现

二叉排序树(Binary Sort Tree)又称二叉查找树(Binary Search Tree),亦称二叉搜索树. 图from baike 二叉排序树或者是一棵空树,或者是具有下列性质的二叉树: (1)若左子树不空,则左子树上所有结点的值均小于它的根结点的值: (2)若右子树不空,则右子树上所有结点的值均大于或等于它的根结点的值: (3)左.右子树也分别为二叉排序树: (4)没有键值相等的节点. 步骤:若根结点的关键字值等于查找的关键字,成功.否则,若小于根结点的关键字值,递归查左子树.若大