mapreduce 实现数子排序

设计思路:

  使用mapreduce的默认排序,按照key值进行排序的,如果key为封装int的IntWritable类型,那么MapReduce按照数字大小对key排序,如果key为封装为String的Text类型,那么MapReduce按照字典顺序对字符串排序。

  首先map阶段将输入的数字作为key,  并记录相同key出现的次数,在reduce阶段将输入的key作为输出的value,如果相同值存在多个,循环便利输出。

源数据:file1

2
32
654
32
15
756
65223

  file2

5956
22
650
92

  file3

26
54
6
2
15

  源代码

package com.duking.hadoop;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class DataSort {

	public static class Map extends
			Mapper<Object, Text, IntWritable, IntWritable> {

		private static IntWritable data = new IntWritable();

		// 实现map函数
		public void map(Object key, Text value, Context context)

		throws IOException, InterruptedException {

			String line = value.toString(); // 将输入的每一行数据转换为String类型

			data.set(Integer.parseInt(line)); // 将String 转换为Integer

			context.write(data, new IntWritable(1)); // 将 date->key
														// 统计key出现的次数自增为value
		}
	}

	// reduce将输入中的key复制到输出数据的key上,并直接输出 这是数据区重的思想
	public static class Reduce extends
			Reducer<IntWritable, IntWritable, IntWritable, IntWritable> {

		private static IntWritable linenum = new IntWritable(1);
		private IntWritable result = new IntWritable();

		// 实现reduce函数

		public void reduce(IntWritable key, Iterable<IntWritable> values,  //Iterable转为List
				Context context)

		throws IOException, InterruptedException {

			for (IntWritable val : values) {

				context.write(linenum, key);

				linenum = new IntWritable(linenum.get() + 1);
			}
		}

	}

	public static void main(String[] args) throws Exception {

		Configuration conf = new Configuration();

		conf.set("mapred.job.tracker", "192.168.60.129:9000");

		// 指定带运行参数的目录为输入输出目录
		String[] otherArgs = new GenericOptionsParser(conf, args)
				.getRemainingArgs();

		/*
		 * 指定工程下的input2为文件输入目录 output2为文件输出目录 String[] ioArgs = new String[] {
		 * "input2", "output2" };
		 *
		 * String[] otherArgs = new GenericOptionsParser(conf, ioArgs)
		 * .getRemainingArgs();
		 */

		if (otherArgs.length != 2) { // 判断路径参数是否为2个

			System.err.println("Usage: Data Deduplication <in> <out>");

			System.exit(2);

		}

		// set maprduce job name
		Job job = new Job(conf, "Data sort");

		job.setJarByClass(DataSort.class);

		// 设置Map、Combine和Reduce处理类

		job.setMapperClass(Map.class);

		job.setCombinerClass(Reduce.class);

		job.setReducerClass(Reduce.class);

		// 设置输出类型

		job.setOutputKeyClass(IntWritable.class);

		job.setOutputValueClass(IntWritable.class);

		// 设置输入和输出目录

		FileInputFormat.addInputPath(job, new Path(otherArgs[0]));

		FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));

		System.exit(job.waitForCompletion(true) ? 0 : 1);

	}

}

  结果

1	2
2	2
3	6
4	15
5	15
6	22
7	26
8	32
9	32
10	54
11	92
12	650
13	654
14	756
15	5956
16	65223

  

  

时间: 2024-10-09 22:14:58

mapreduce 实现数子排序的相关文章

Hadoop学习笔记—11.MapReduce中的排序和分组

一.写在之前的 1.1 回顾Map阶段四大步凑 首先,我们回顾一下在MapReduce中,排序和分组在哪里被执行: 从上图中可以清楚地看出,在Step1.4也就是第四步中,需要对不同分区中的数据进行排序和分组,默认情况下,是按照key进行排序和分组. 1.2 实验场景数据文件 在一些特定的数据文件中,不一定都是类似于WordCount单次统计这种规范的数据,比如下面这类数据,它虽然只有两列,但是却有一定的实践意义. 3 3 3 2 3 1 2 2 2 1 1 1 (1)如果按照第一列升序排列,当

(转)MapReduce二次排序

一.概述 MapReduce框架对处理结果的输出会根据key值进行默认的排序,这个默认排序可以满足一部分需求,但是也是十分有限的.在我们实际的需求当中,往往有要对reduce输出结果进行二次排序的需求.对于二次排序的实现,网络上已经有很多人分享过了,但是对二次排序的实现的原理以及整个MapReduce框架的处理流程的分析还是有非常大的出入,而且部分分析是没有经过验证的.本文将通过一个实际的MapReduce二次排序例子,讲述二次排序的实现和其MapReduce的整个处理流程,并且通过结果和map

MapReduce中的排序

hadoop的计算模型就是map/reduce,每一个计算任务会被分割成很多互不依赖的map/reduce计算单元,将所有的计算单元执行完毕后整个计算任务就完成了.因为计算单元之间互不依赖所以计算单元可以分配到不同的计算机上执行,这样就可以将计算压力平摊到多个机器上面.当然性能线性提高是有条件的,前提是计算任务所采用的算法必须能够适应map/reduce模式.例如对于海量数据排序任务来说,绝大多数的排序算法都是不适应map/reduce模式的,如堆排序,插入排序,冒泡排序都是不适用于map/re

MapReduce二次排序

本文主要介绍下二次排序的实现方式 我们知道MapReduce是按照key来进行排序的,那么如果有个需求就是先按照第一个字段排序,在第一个字段相等的情况下,按照第二个字段排序,这就是传说中的二次排序. 下面就具体说一下二次排序的实现方式 主要就是4点 1.自定义一个Key 为什么要自定义一个Key,我们知道MapReduce中排序就是按照Key来排序的,我们既然想要实现按照两个字段进行排序,默认的方式肯定是不行的,所以自定义一个新的Key,Key里面有两个属性,也就是我们要排序的两个字段. 首先,

Mapreduce shuffle和排序

Mapreduce为了确保每个reducer的输入都按键排序.系统执行排序的过程-----将map的输出作为输入传给reducer 称为shuffle.学习shuffle是如何工作的有助于我们理解mapreduce工作机制.shuffle属于hadoop不断被优化和改进的代码库的一部分.从许多方面看,shuffle是mapreduce的“心脏”,是奇迹出现的地方. 下面这张图介绍了mapreduce里shuffle的工作原理: 从图可以看出shuffle发生在map端和reduce端之间,将ma

Spark 颠覆 MapReduce 保持的排序记录

在过去几年,Apache Spark的采用以惊人的速度增加着,通常被作为MapReduce后继,可以支撑数千节点规模的集群部署.在内存中数 据处理上,Apache Spark比MapReduce更加高效已经得到广泛认识:但是当数据量远超内存容量时,我们也听到了一些机构在Spark使用 上的困扰.因此,我们与Spark社区一起,投入了大量的精力做Spark稳定性.扩展性.性能等方面的提升.既然Spark在GB或TB级别数据上运行 良好,那么它在PB级数据上也应当同样如此. 为了评估这些工作,最近我

hadoop2.2.0 MapReduce求和并排序

javabean必须实现WritableComparable接口,并实现该接口的序列化,反序列话和比较方法 package com.my.hadoop.mapreduce.sort; import java.io.DataInput;import java.io.DataOutput;import java.io.IOException; import org.apache.hadoop.io.WritableComparable; public class InfoBean implement

MapReduce编程(二) 排序

一.问题描述 文件中存储了商品id和商品价格的信息,文件中每行2列,第一列文本类型代表商品id,第二列为double类型代表商品价格.数据格式如下: pid0 334589.41 pid1 663306.49 pid2 499226.8 pid3 130618.22 pid4 513708.8 pid5 723470.7 pid6 998579.14 pid7 831682.84 pid8 87723.96 要求使用MapReduce,按商品的价格从低到高排序,输出格式仍为原来的格式:第一列为商

MapReduce 二次排序详解

1 首先说一下工作原理: 在map阶段,使用job.setInputFormatClass定义的InputFormat将输入的数据集分割成小数据块splites,同时InputFormat提供一个RecordReder的实现.本例子中使用的是TextInputFormat,他提供的RecordReder会将文本的一行的行号作为key,这一行的文本作为value.这就是自定义Map的输入是<LongWritable, Text>的原因.然后调用自定义Map的map方法,将一个个<LongW