hadoop(五) - 分布式计算利器MapReduce加强

一. Partitioner是partitioner的基类,如果需要定制partitioner也需要继承该类。

public class DataCount {

	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();

		Job job = Job.getInstance(conf);

		job.setJarByClass(DataCount.class);

		job.setMapperClass(DCMapper.class);
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(DataInfo.class);

		job.setReducerClass(DCReducer.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(DataInfo.class);

		FileInputFormat.setInputPaths(job, new Path(args[0]));
		FileOutputFormat.setOutputPath(job, new Path(args[1]));
		job.setPartitionerClass(DCPartitioner.class);
		job.setNumReduceTasks(Integer.parseInt(args[2]));

		job.waitForCompletion(true);
	}
	//Map
	public static class DCMapper extends Mapper<LongWritable, Text, Text, DataInfo>{

		private Text k = new Text();

		@Override
		protected void map(LongWritable key, Text value,
				Mapper<LongWritable, Text, Text, DataInfo>.Context context)
				throws IOException, InterruptedException {
			String line = value.toString();
			String[] fields = line.split("\t");
			String tel = fields[1];
			long up = Long.parseLong(fields[8]);
			long down = Long.parseLong(fields[9]);
			DataInfo dataInfo = new DataInfo(tel,up,down);
			k.set(tel);
			context.write(k, dataInfo);
		}
	}
	public static class DCReducer extends Reducer<Text, DataInfo, Text, DataInfo>{

		@Override
		protected void reduce(Text key, Iterable<DataInfo> values,
				Reducer<Text, DataInfo, Text, DataInfo>.Context context)
				throws IOException, InterruptedException {
			long up_sum = 0;
			long down_sum = 0;
			for(DataInfo d : values){
				up_sum += d.getUpPayLoad();
				down_sum += d.getDownPayLoad();
			}
			DataInfo dataInfo = new DataInfo("",up_sum,down_sum);

			context.write(key, dataInfo);
		}
	}
	public static class DCPartitioner extends  Partitioner<Text, DataInfo>{

		private static Map<String,Integer> provider = new HashMap<String,Integer>();

		static{
			provider.put("138", 1);
			provider.put("139", 1);
			provider.put("152", 2);
			provider.put("153", 2);
			provider.put("182", 3);
			provider.put("183", 3);
		}
		@Override
		public int getPartition(Text key, DataInfo value, int numPartitions) {
			//向数据库或配置信息 读写
			String tel_sub = key.toString().substring(0,3);
			Integer count = provider.get(tel_sub);
			if(count == null){
				count = 0;
			}
			return count;
		}
	}
}

二. 排序和分组

map和reduce阶段进行排序时,比较的是k2。v2是不参与排序比较的。如果要想让v2也进行排序,需要把k2和v2组装成新的类,作为k2,才能参与比较。

public class InfoBean implements WritableComparable<InfoBean>{

	private String account;
	private double income;
	private double expenses;
	private double surplus;

	public void set(String account,double income,double expenses){
		this.account = account;
		this.income = income;
		this.expenses = expenses;
		this.surplus = income - expenses;
	}
	@Override
	public void write(DataOutput out) throws IOException {
		out.writeUTF(account);
		out.writeDouble(income);
		out.writeDouble(expenses);
		out.writeDouble(surplus);

	}

	@Override
	public void readFields(DataInput in) throws IOException {
		this.account = in.readUTF();
		this.income = in.readDouble();
		this.expenses = in.readDouble();
		this.surplus = in.readDouble();
	}

	@Override
	public int compareTo(InfoBean o) {
		if(this.income == o.getIncome()){
			return this.expenses > o.getExpenses() ? 1 : -1;
		}
		return this.income > o.getIncome() ? 1 : -1;
	}

	@Override
	public String toString() {
		return  income + "\t" +	expenses + "\t" + surplus;
	}
	// get set

}

三. Combiners编程

每一个map可能会产生大量的输出,combiner的作用就是在map端对输出先做一次合并,以减少传输到reducer的数据量。combiner最基本是实现本地key的归并,combiner具有类似本地的reduce功能。如果不用combiner,那么所有的结果都是reduce完成,效率会相对低下。使用combiner,先完成的map会在本地聚合,提升速度。

注意:Combiner的输出是Reducer的输入,如果Combiner是可插拔的,添加Combiner绝不能改变最终的计算结果。所以Combiner只应该用于那种Reduce的输入key/value与输出key/value类型完全一致,且不影响最终结果的场景。比如累加,最大值等。

public class InverseIndex {

	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();

		Job job = Job.getInstance(conf);
		//设置jar
		job.setJarByClass(InverseIndex.class);

		//设置Mapper相关的属性
		job.setMapperClass(IndexMapper.class);
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(Text.class);
		FileInputFormat.setInputPaths(job, new Path(args[0]));//words.txt

		//设置Reducer相关属性
		job.setReducerClass(IndexReducer.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(Text.class);
		FileOutputFormat.setOutputPath(job, new Path(args[1]));

		job.setCombinerClass(IndexCombiner.class);

		//提交任务
		job.waitForCompletion(true);
	}
	public static class IndexMapper extends Mapper<LongWritable, Text, Text, Text>{
		private Text k = new Text();
		private Text v = new Text();
		@Override
		protected void map(LongWritable key, Text value,
				Mapper<LongWritable, Text, Text, Text>.Context context)
				throws IOException, InterruptedException {
			String line = value.toString();
			String[] fields = line.split(" ");
			FileSplit inputSplit = (FileSplit) context.getInputSplit();
			Path path = inputSplit.getPath();
			String name = path.getName();
			for(String f : fields){
				k.set(f + "->" + name);
				v.set("1");
				context.write(k, v);
			}
		}
	}
	public static class IndexCombiner extends Reducer<Text, Text, Text, Text>{
		private Text k = new Text();
		private Text v = new Text();
		@Override
		protected void reduce(Text key, Iterable<Text> values,
				Reducer<Text, Text, Text, Text>.Context context)
				throws IOException, InterruptedException {
			String[] fields = key.toString().split("->");
			long sum = 0;
			for(Text t : values){
				sum += Long.parseLong(t.toString());
			}
			k.set(fields[0]);
			v.set(fields[1] + "->" + sum);
			context.write(k, v);
		}
	}
	public static class IndexReducer extends Reducer<Text, Text, Text, Text>{
		private Text v = new Text();
		@Override
		protected void reduce(Text key, Iterable<Text> values,
				Reducer<Text, Text, Text, Text>.Context context)
				throws IOException, InterruptedException {
			String value = "";
			for(Text t : values){
				value += t.toString() + " ";
			}
			v.set(value);
			context.write(key, v);
		}
	}
}

四. shuffle

每个map有一个环形内存缓冲区,用于存储任务的输出。默认大小100MB(io.sort.mb属性),一旦达到阀值0.8(io.sort.spill.percent), 一个后台线程把内容写到(spill)磁盘的指定目录(mapred.local.dir)下的新建的一个溢出写文件。

写磁盘前,要partition, sort。如果有combiner,combine排序后数据。等最后记录写完,合并全部溢出写文件为一个分区且排序的文件。

Reducer通过Http方式得到输出文件的分区。TaskTracker为分区文件运行Reduce任务。复制阶段把Map输出复制到Reducer的内存或磁盘。一个Map任务完成,Reduce就开始复制输出。排序阶段合并map输出。然后走Reduce阶段。

时间: 2024-12-19 00:55:50

hadoop(五) - 分布式计算利器MapReduce加强的相关文章

hadoop(四) - 分布式计算利器MapReduce

一. MapReduce执行过程 MapReduce运行的时候, 会通过Mapper运行的任务读取HDFS中的数据文件, 然后调用自己的方法处理数据, 最后输出. Reduce任务会接受Mapper任务输出的数据, 作为自己输入的数据, 然后调用自己的方法, 最后输出到HDFS的文件中. 二. Mapper任务执行过程 每个Mapper任务都是一个java进程, 它会读取HDFS中的文件, 解析成很多键值对, 经过我们覆盖的map方法处理后, 转换成很多的键值对再输出, 整个Mapper任务的处

大数据时代之hadoop(五):hadoop 分布式计算框架(MapReduce)

hadoop的核心分为两块,一是分布式存储系统-hdfs,这个我已经在上一章节大致讲了一下,另一个就是hadoop的计算框架-mapreduce. mapreduce其实就是一个移动式的基于key-value形式的分布式计算框架. 其计算分为两个阶段,map阶段和reduce阶段,都是对数据的处理,由于其入门非常简单,但是若想理解其中各个环节及实现细节还是有一定程度的困难,因此我计划在本文中只是挑几个mapreduce的核心来进行分析讲解. 1.MapReduce驱动程序默认值 编写mapred

hadoop 学习笔记:mapreduce框架详解

hadoop 学习笔记:mapreduce框架详解 开始聊mapreduce,mapreduce是hadoop的计算框架,我 学hadoop是从hive开始入手,再到hdfs,当我学习hdfs时候,就感觉到hdfs和mapreduce关系的紧密.这个可能是我做技术研究的 思路有关,我开始学习某一套技术总是想着这套技术到底能干什么,只有当我真正理解了这套技术解决了什么问题时候,我后续的学习就能逐步的加快,而学习 hdfs时候我就发现,要理解hadoop框架的意义,hdfs和mapreduce是密不

Hadoop初学指南(5)--MapReduce入门

本文将介绍Hadoop中的重点MapReduce的入门知识. (1)MapReduce概述 MapReduce是一种分布式计算模型,由Google提出,主要用于搜索领域,解决海量数据的计算问题. MR由两个阶段组成:Map和Reduce,在Hadoop中用户只需要实现map()和reduce()两个函数,即可实现分布式计算,非常简单.这两个函数的形参是key.value对,表示函数的输入信息. (2)MR执行流程 客户端的代码会提交给JobTracker,也就是JobTracker接受由用户提交

hadoop分布式存储&&分布式计算

Hadoop是Lucene创始人Doug Cutting,根据Google的相关内容山寨出来的分布式文件系统和对海量数据进行分析计算的基础框架系统,其中包含MapReduce程序,hdfs系统等. 网方网站:http://hadoop.apache.org/ Hadoop是一个由Apache基金会所开发的分布式系统基础架构. 下载: http://hadoop.apache.org/releases.html hadoop基于java开发的. Hadoop包括两大核心,分布式存储系统和分布式计算

Hadoop初学指南(6)--MapReduce的简单实例及分析

本文在上一节的基础上通过一个简单的MR示例对MapReduce的运行流程进行分析. 假设有两行数据,分别是hello you,hello me,我们要统计其中出现的单词以及每个单词出现的次数. 所得的结果为 hello   2 you     1 me      1 (1)大致运行流畅 1.解析成2个<k,v>,分别是<0, hello you><10, hello me>.调用2次map函数. 2.执行map任务 3.map输出后的数据是:<hello,1>

Apache Hadoop YARN: Moving beyond MapReduce and Batch Processing with Apache Hadoop 2

Apache Hadoop YARN: Moving beyond MapReduce and Batch Processing with Apache Hadoop 2 .mobi: http://www.t00y.com/file/79497801 Apache Hadoop YARN: Moving beyond MapReduce and Batch Processing with Apache Hadoop 2.pdf: http://www.t00y.com/file/8034244

Hadoop基本原理之一:MapReduce

1.为什么需要Hadoop 目前,一块硬盘容量约为1TB,读取速度约为100M/S,因此完成一块硬盘的读取需时约2.5小时(写入时间更长).若把数据放在同一硬盘上,且全部数据均需要同一个程序进行处理,此程序的处理时间将主要浪费在I/O时间上. 在过去几十年,硬盘的读取速度并未明显增长,而网络传输速度此飞速上升. 因此,若把数据分散到多个硬盘上进行存储(如分成100份存储在100个硬盘上),则读取数据所需时间大大减少,并将各节点处理好的结果通过网络进行传输. 但这将导致2个问题 (1)数据被分散到

hadoop分布式系统下的mapreduce java小程序计算网站uv

一.准备工作 1.1 搭建hadoop分布式系统,博主是用3台虚拟机搭建的一个简易hadoop分布式系统. linux 5.5 64位     hadoop2.6.0 192.168.19.201   h1   (master) 192.168.19.202   h2   (slaver1) 192.168.19.203   h3   (slaver2) 1.2 准备网站访问IP文件 由于是实验,一个简单的txt文件即可 如:vim a.txt 10.0.0.1 10.0.0.2 10.0.0.