hadoop 学习自定义分区

(网易云课程hadoop大数据实战学习笔记)

如图所示:有三个ReducerTask,因此处理完成之后的数据存储在三个文件中;

默认情况下,numReduceTasks的数量为1,前面做的实验中,输出数据都是在一个文件中。通过自定义myPatitioner类,可以把ruduce处理后的数据分类汇总,这里MyPartitioner是Partitioner的基类,如果需要定制partitioner也需要继承该类。HashPartitioner是mapreduce的默认partitioner。计算方法是which reducer=(key.hashCode() & Integer.MAX_VALUE) % numReduceTasks,得到当前的目的reducer。

实验内容,在上一个自定义排序的基础上,把正方形和长方形分别进行排序,即设置两个ReducerTask任务,通过自定义MyPartitioner实现。

package com.nwpulisz;
import java.io.IOException;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.Reducer;
public class SelfDefineSort {
	/**
	 * @param args
	 * @author nwpulisz
	 * @date 2016.4.1
	 */
	static final String INPUT_PATH="hdfs://192.168.255.132:9000/input";
	static final String OUTPUT_PATH="hdfs://192.168.255.132:9000/output";

	public static void main(String[] args) throws Exception {
		// TODO Auto-generated method stub
		Configuration conf = new Configuration();
		Path outPut_path= new Path(OUTPUT_PATH);
		Job job = new Job(conf, "SelfDefineSort");

		//如果输出路径是存在的,则提前删除输出路径
		FileSystem fileSystem = FileSystem.get(new URI(OUTPUT_PATH), conf);
		if(fileSystem.exists(outPut_path))
		{
			fileSystem.delete(outPut_path,true);
		}

		job.setJarByClass(RectangleWritable.class); //注意不能少setJarByClass,要不出现报错,源码中的解释。
													//Set the Jar by finding where a given class came from.

		FileInputFormat.setInputPaths(job, INPUT_PATH);
		FileOutputFormat.setOutputPath(job, outPut_path);

		job.setMapperClass(MyMapper.class);
		job.setReducerClass(MyReducer.class);

		job.setMapOutputKeyClass(RectangleWritable.class);
		job.setMapOutputValueClass(NullWritable.class);

		job.setPartitionerClass(MyPatitioner.class); //自定义myPatitioner类,把ruduce处理后的数据分类汇总;
		job.setNumReduceTasks(2); //设置ReduceTask数量为2;

		job.setOutputKeyClass(IntWritable.class);
		job.setOutputValueClass(IntWritable.class);
		job.waitForCompletion(true);
	}

	static class MyMapper extends Mapper<LongWritable, Text, RectangleWritable, NullWritable>{
		protected void map(LongWritable k1, Text v1,
                Context context) throws IOException, InterruptedException {
			String[] splits = v1.toString().split("\t");
			RectangleWritable k2 = new RectangleWritable(Integer.parseInt(splits[0]),
					Integer.parseInt(splits[1]));

			context.write(k2,NullWritable.get());
		}
	}

	static class MyReducer extends Reducer<RectangleWritable, NullWritable,
					IntWritable, IntWritable>{
		protected void reduce(RectangleWritable k2,
				Iterable<NullWritable> v2s,
				Context context)
				throws IOException, InterruptedException {
			// TODO Auto-generated method stub
			context.write(new IntWritable(k2.getLength()), new IntWritable(k2.getWidth()));
		}

	}

}
class MyPatitioner extends Partitioner<RectangleWritable, NullWritable>{
	@Override
	public int getPartition(RectangleWritable k2, NullWritable v2, int numPartitions) {
		// TODO Auto-generated method stub
		if (k2.getLength() == k2.getWidth()) { //根据长方形和正方形进行分类
			return 0;
		}else {
			return 1;
		}
	}

}

  

其中的RectangleWritable类与上一节中定义的相同。

此处,在eclipse中直接运行该代码,会显示错误,如下图:

可能是因为hadoop版本的原因,因此需要将源码文件打成jar包,在hadoop服务器上运行,jar中包括内容为:

在hadoop上运行 hadoop jar SelfDefinePartitioner.jar(jar包名,自定义)

运行结果如下图所示:

开始运行:

输出结果:

来自为知笔记(Wiz)

时间: 2024-10-29 02:32:28

hadoop 学习自定义分区的相关文章

hadoop MapReduce自定义分区Partition输出各运营商的手机号码

MapReduce和自定义Partition MobileDriver主类 package Partition; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; public class MobileDriver { public static void main(String[] args) { String[] paths = {"F:\\mobile.txt", "F

Hadoop 学习自定义数据类型

(学习网易云课堂Hadoop大数据实战笔记) 序列化在分布式环境的两大作用:进程间通信,永久存储. Writable接口, 是根据 DataInput 和 DataOutput 实现的简单.有效的序列化对象. MR的任意Value必须实现Writable接口: MR的key必须实现WritableComparable接口,WritableComparable继承自Writable和Comparable接口: (本节先讲自定义value值,下一节再讲自定义key值,根据key值进行自定义排序) 以

hadoop 学习自定义排序

(网易云课程hadoop大数据实战学习笔记) 自定义排序,是基于k2的排序,设现有以下一组数据,分别表示矩形的长和宽,先按照面积的升序进行排序. 99 66 78 11 54 现在需要重新定义数据类型,MR的key值必须继承WritableComparable接口,因此定义RectangleWritable数据类型如下: import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import

Hadoop学习之路(6)MapReduce自定义分区实现

MapReduce自带的分区器是HashPartitioner原理:先对map输出的key求hash值,再模上reduce task个数,根据结果,决定此输出kv对,被匹配的reduce任务取走.自定义分分区需要继承Partitioner,复写getpariton()方法自定义分区类:注意:map的输出是<K,V>键值对其中int partitionIndex = dict.get(text.toString()),partitionIndex是获取K的值 附:被计算的的文本 Dear Dea

hadoop学习;自定义Input/OutputFormat;类引用mapreduce.mapper;三种模式

hadoop分割与读取输入文件的方式被定义在InputFormat接口的一个实现中,TextInputFormat是默认的实现,当你想要一次获取一行内容作为输入数据时又没有确定的键,从TextInputFormat返回的键为每行的字节偏移量,但目前没看到用过 以前在mapper中曾使用LongWritable(键)和Text(值),在TextInputFormat中,因为键是字节偏移量,可以是LongWritable类型,而当使用KeyValueTextInputFormat时,第一个分隔符前后

Hadoop自定义分区Partitioner

一:背景 为了使得MapReduce计算后的结果显示更加人性化,Hadoop提供了分区的功能,可以使得MapReduce计算结果输出到不同的分区中,方便查看.Hadoop提供的Partitioner组件可以让Map对Key进行分区,从而可以根据不同key来分发到不同的reduce中去处理,我们可以自定义key的分发规则,如数据文件包含不同的省份,而输出的要求是每个省份对应一个文件. 二:技术实现 自定义分区很简单,我们只需要继承抽象类Partitioner,实现自定义的getPartitione

Hadoop学习笔记—7.计数器与自定义计数器

一.Hadoop中的计数器 计数器:计数器是用来记录job的执行进度和状态的.它的作用可以理解为日志.我们通常可以在程序的某个位置插入计数器,用来记录数据或者进度的变化情况,它比日志更便利进行分析. 例如,我们有一个文件,其中包含如下内容: hello you hello me 它被WordCount程序执行后显示如下日志: 在上图所示中,计数器有19个,分为四个组:File Output Format Counters.FileSystemCounters.File Input Format

在hadoop作业中自定义分区和归约

当遇到有特殊的业务需求时,需要对hadoop的作业进行分区处理 那么我们可以通过自定义的分区类来实现 还是通过单词计数的例子,JMapper和JReducer的代码不变,只是在JSubmit中改变了设置默认分区的代码,见代码: //1.3分区 //设置自定义分区类 job.setPartitionerClass(JPartitioner.class); //设置分区个数--这里设置成2,代表输出分为2个区,由两个reducer输出 job.setNumReduceTasks(2); 自定义的JP

hadoop编程小技巧(3)---自定义分区类Partitioner

Hadoop代码测试环境:Hadoop2.4 原理:在Hadoop的MapReduce过程中,Mapper读取处理完成数据后,会把数据发送到Partitioner,由Partitioner来决定每条记录应该送往哪个reducer节点,默认使用的是HashPartitioner,其核心代码如下: /** Use {@link Object#hashCode()} to partition. */ public int getPartition(K2 key, V2 value, int numRe