Hadoop读书笔记(十二)MapReduce自定义排序

Hadoop读书笔记系列文章:http://blog.csdn.net/caicongyang/article/category/2166855

1.说明:

对给出的两列数据首先按照第一列升序排列,当第一列相同时,第二列升序排列

数据格式:

3	3
3	2
3	1
2	2
2	1
1	1

2.代码

SortApp.java

package sort;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.net.URI;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;

/**
 *
 * <p>
 * Title: SortApp.java
 * Package sort
 * </p>
 * <p>
 * Description: 当第一列不同时,升序;当第一列相同时,第二列升序
 * <p>
 * @author Tom.Cai
 * @created 2014-12-2 下午10:29:14
 * @version V1.0
 *
 */
public class SortApp {
	private static final String INPUT_PATH = "hdfs://192.168.80.100:9000/sort_input";
	private static final String OUT_PATH = "hdfs://192.168.80.100:9000/sort_out";

	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), conf);
		if(fileSystem.exists(new Path(OUT_PATH))){
			fileSystem.delete(new Path(OUT_PATH),true);
		}
		Job job = new Job(conf,SortApp.class.getSimpleName());
		//1.1 指定输入文件路径
		FileInputFormat.setInputPaths(job, INPUT_PATH);
		//指定哪个类用来格式化输入文件
		job.setInputFormatClass(TextInputFormat.class);

		//1.2指定自定义的Mapper类
		job.setMapperClass(MyMapper.class);
		//指定输出<k2,v2>的类型
		job.setMapOutputKeyClass(newK2.class);
		job.setMapOutputValueClass(LongWritable.class);

		//1.3 指定分区类
		job.setPartitionerClass(HashPartitioner.class);
		job.setNumReduceTasks(1);

		//1.4 TODO 排序、分区

		//1.5  TODO (可选)合并

		//2.2 指定自定义的reduce类
		job.setReducerClass(MyReducer.class);
		//指定输出<k3,v3>的类型
		job.setOutputKeyClass(LongWritable.class);
		job.setOutputValueClass(LongWritable.class);

		//2.3 指定输出到哪里
		FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));
		//设定输出文件的格式化类
		job.setOutputFormatClass(TextOutputFormat.class);

		//把代码提交给JobTracker执行
		job.waitForCompletion(true);

	}

	static class MyMapper extends Mapper<LongWritable,Text, newK2,LongWritable>{

		@Override
		protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
			String[] splied = value.toString().split("\t");
			newK2 k2 = new newK2(Long.parseLong(splied[0]),Long.parseLong(splied[1]));
			final LongWritable v2 = new LongWritable(Long.parseLong(splied[1]));
			context.write(k2, v2);
		}

	}

	static class MyReducer extends Reducer<newK2, LongWritable, LongWritable, LongWritable>{

		@Override
		protected void reduce(sort.SortApp.newK2 key, Iterable<LongWritable> value, Context context) throws IOException, InterruptedException {
			context.write(new LongWritable(key.first), new LongWritable(key.second));
			}
	}

	static class newK2 implements WritableComparable<newK2>{
		Long first;
		Long second;

		public newK2(long first, long second) {
			this.first = first;
			this.second = second;
		}

		public newK2() {
		}

		@Override
		public void readFields(DataInput input) throws IOException {
			this.first = input.readLong();
			this.second = input.readLong();
		}

		@Override
		public void write(DataOutput out) throws IOException {
			out.writeLong(first);
			out.writeLong(second);
		}
		/**
		 *
		 *
		 * 当第一列不同时,升序;当第一列相同时,第二列升序
		 */

		@Override
		public int compareTo(newK2 o) {
			long temp = this.first -o.first;
			if(temp!=0){
				return (int)temp;
			}
			return (int)(this.second -o.second);
		}

		@Override
		public int hashCode() {
			return this.first.hashCode()+this.second.hashCode();
		}

		@Override
		public boolean equals(Object obj) {
			if(!(obj instanceof newK2)){
				return false;
			}
			newK2 k2 = (newK2)obj;
			return(this.first == k2.first)&&(this.second == k2.second);
		}
	}

}

欢迎大家一起讨论学习!

有用的自己收!

记录与分享,让你我共成长!欢迎查看我的其他博客;我的博客地址:http://blog.csdn.net/caicongyang

时间: 2024-10-21 19:18:45

Hadoop读书笔记(十二)MapReduce自定义排序的相关文章

Hadoop读书笔记(二)HDFS的shell操作

Hadoop读书笔记(一)Hadoop介绍:http://blog.csdn.net/caicongyang/article/details/39898629 1.shell操作 1.1所有的HDFS shell操作命名可以通过hadoop fs获取: [[email protected] ~]# hadoop fs Usage: java FsShell [-ls <path>] [-lsr <path>] [-du <path>] [-dus <path>

Hadoop学习之路(7)MapReduce自定义排序

本文测试文本: tom 20 8000 nancy 22 8000 ketty 22 9000 stone 19 10000 green 19 11000 white 39 29000 socrates 30 40000    MapReduce中,根据key进行分区.排序.分组MapReduce会按照基本类型对应的key进行排序,如int类型的IntWritable,long类型的LongWritable,Text类型,默认升序排序   为什么要自定义排序规则?现有需求,需要自定义key类型,

Machine Learning for hackers读书笔记(十二)模型比较

library('ggplot2')df <- read.csv('G:\\dataguru\\ML_for_Hackers\\ML_for_Hackers-master\\12-Model_Comparison\\data\\df.csv') #用glm logit.fit <- glm(Label ~ X + Y,family = binomial(link = 'logit'),data = df) logit.predictions <- ifelse(predict(logit

Java读书笔记十二(Java中的代码块)

前言 我们知道在java中通过构造器来为对象进行初始化操作,但是在java中与构造器类似的还有初始化块的操作,下面小编来为大家分享一下. 初始化块使用 在Java中初始化块是java中出现的第4中成员,前三种分别是成员变量.方法和构造器.一个类中可以有多个初始化块,并且相同类型的初始化块是按照顺序依次执行的.初始化块的修饰符只能是static,初始化块可以包含任何可执行语句,包括定义局部变量.调用其他对象方法,以及使用分支.循环语句等 /** * @FileName: test.java * @

How tomcat works 读书笔记十二 StandardContext 上

在tomcat4中,StandardContext.java是最大的一个类,有117k.废话不说,开始分析吧. 其实要分析StandardContext,也就主要分析两个方法,一个start,一个invoke. 两个变量 这里首先咱们得说两个boolean型的变量available,configured. 先说available,它表示了StandardContext是否可用,初始值为false.若StandardContext启动成功,其值就变为true;另外各种原因都会导致StandardC

How tomcat works 读书笔记十二 StandardContext 下

对重载的支持 tomcat里容器对重载功能的支持是依靠Load的(在目前就是WebLoader).当在绑定载入器的容器时 public void setContainer(Container container) { ... // Register with the new Container (if any) if ((this.container != null) && (this.container instanceof Context)) { setReloadable( ((Co

Hadoop读书笔记(六)MapReduce自定义数据类型demo

Hadoop读书笔记(一)Hadoop介绍:http://blog.csdn.net/caicongyang/article/details/39898629 Hadoop读书笔记(二)HDFS的shell操作:http://blog.csdn.net/caicongyang/article/details/41253927 Hadoop读书笔记(三)Java API操作HDFS:http://blog.csdn.net/caicongyang/article/details/41290955

Hadoop读书笔记(五)MapReduce统计单词demo

Hadoop读书笔记(一)Hadoop介绍:http://blog.csdn.net/caicongyang/article/details/39898629 Hadoop读书笔记(二)HDFS的shell操作:http://blog.csdn.net/caicongyang/article/details/41253927 Hadoop读书笔记(三)Java API操作HDFS:http://blog.csdn.net/caicongyang/article/details/41290955

Hadoop读书笔记(八)MapReduce 打成jar包demo

Hadoop读书笔记(一)Hadoop介绍:http://blog.csdn.net/caicongyang/article/details/39898629 Hadoop读书笔记(二)HDFS的shell操作:http://blog.csdn.net/caicongyang/article/details/41253927 Hadoop读书笔记(三)Java API操作HDFS:http://blog.csdn.net/caicongyang/article/details/41290955

Hadoop读书笔记(七)MapReduce 0.x版本API使用demo

Hadoop读书笔记(一)Hadoop介绍:http://blog.csdn.net/caicongyang/article/details/39898629 Hadoop读书笔记(二)HDFS的shell操作:http://blog.csdn.net/caicongyang/article/details/41253927 Hadoop读书笔记(三)Java API操作HDFS:http://blog.csdn.net/caicongyang/article/details/41290955