Hadoop(四)——编程核心MapReduce(上)

上篇讲述了Hadoop的核心内容之一HDFS,是Hadoop分布式的平台基础,而这讲的MapReduce则是充分利用Hdfs分布式,提高运行效率的算法模型 ,Map(映射)和Reduce(归约)两个主要阶段都以<key,value>键值对作为输入和输出,我们需要做的就是对这些<key,value>做我们想要的处理。看似简单实则麻烦,因为这里太灵活多变。

一,好,首先来看下边两个图,看下mapreduce在Hadoop中的执行流程,以及mapreduce内部的执行流程:

以分析气象数据为例子:

二,解析:mapreduce的执行步骤:

map任务处理:

1.读取输入文件内容,解析成键值对(key/value).对输入文件的每一行,解析成

键值对(key/value).每一个键值对调用一次map函数

2.写自己的逻辑,对输入的键值对(key/value)处理,转换成新的键值对

(key/value)输出.

3.对输出的键值对(key/value)进行分区.(partition)

4.对不同分区的数据,按照key进行排序,分组.相同的key/value放到

一个集合中.(shuffle)

5.分组后的数据进行规约.(combiner,可选择的),也就是可以在mapper中处理一部

分reduce的工作,将reduce的工作进行减压

reduce任务处理:

1.对多个map任务的输出,按照不同的分区,通过网络copy到不同的reduce节点.

2.对多个map任务的输出进行合并,排序.写reduce函数自己的逻辑,对输入的

key/value处理,转换成新的key/value输出.

3.把reduce的输出保存到文件中(写入到hdfs中).

三,任务执行优化:

1,推测式执行:即如果jobtracker发现有拖后腿的任务,会再启动一个相同的备份任务,然后那个先执行完就会kill掉另一个。因此在监控网页上经常能看到正常执行完的作业有被kill的任务。

2,推测式执行缺省打开,但如果是代码问题,并不能解决问题,而且会使集群更慢,通

过在mapred-site.xml配置文件中设置mapred.map.tasks.speculative.execution和

mapred.reduce.tasks.speculative.execution可为map任务或reduce任务开启或关闭

推测式执行

3,重用JVM,可以省去启动新的JVM消耗的时间,在mapred-site.xml配置文件中设置

mapred.job.reuse.jvm.num.tasks设置单个JVM上运行的最大任务数( 1, >1或-1表

示没有限制)

4,忽略模式, 任务在读取数据失败2次后, 会把数据位置告诉jobtracker, 后者重新启动

该任务并且在遇到所记录的坏数据时直接跳过( 缺省关闭, 用SkipBadRecord方法打

开)

四,错误机制处理故障:

1,硬件故障,即jobtracker和tasktracker故障:

A,Jobtracker是单点, 若发生故障目前hadoop还无法处理, 唯有选择最牢靠的硬件作为

jobtracker

B,Jobtracker通过心跳( 周期1分钟) 信号了解tasktracker是否发生故障或负载过于严重

C,Jobtracker将从任务节点列表中移除发生故障的tasktracker

D,如果故障节点在执行map任务并且尚未完成, jobtracker会要求其它节点重新执行此

map任务

F,如果故障节点在执行reduce任务并且尚未完成,jobtracker会要求其它节点继续执行

尚未完成的reduce任务

2,任务失败:由于代码或者进程崩溃引起任务失败:

A,Jvm自动退出,向tasktracker父进程发送方错误信息,错误信息也会写入到日志

B,Tasktracker监听程序会发现进程退出,或进程很久没有更新信息送回,将任务标记为

失败

C,标记失败任务后,任务计数器减去1以便接受新任务,并通过心跳信号告诉jobtracker

任务失败的信息

D,Jobtrack获悉任务失败后,将把该任务重新放入调度队列,重新分配出去再执行

E,如果一个任务失败超过4次(可以设置),将不会再被执行,同时作业也宣布失

五,最后来看一个wordCount的例子:

package job;

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

/**
 * hadoop的第一个mapreduce例子,wordCount,计算单词的个数
 *
 * @author Administrator
 *
 */
public class WordCount {

	/*
	 *  继承mapper接口,设置map的输入类型为<Object,Text>,输出类型为<Text,IntWritable>
	 */
	public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {
		//one表示单词出现了一次
		private final static IntWritable one=new IntWritable(1);
		//word用来存储切下来的单词
		private Text word=new Text();

		//map进行将内容分割,以<单词,1>的形式write出来
		public void map(Object key, Text value,Context context) throws IOException,InterruptedException{
			//进行单词的切分
			StringTokenizer itr=new StringTokenizer(value.toString());

			while (itr.hasMoreElements()) {
				word.set(itr.nextToken());//切下的单词放到word中
				context.write(word, one);
			}
		}
	}

	/**
	 * reducer函数的编写
	 * @author Administrator
	 *
	 */
	public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable>{

		//result记录单词的频数
		private IntWritable result=new IntWritable();

		public void reduce(Text key,Iterable<IntWritable> values,Context context)throws IOException,InterruptedException{
			int sum=0;
			for(IntWritable val:values){
				sum+=val.get();
			}
			result.set(sum);
			context.write(key, result);
		}
	}

	public static void main(String[] args) throws Exception{
		Configuration configuration=new Configuration();
		String[] otherArgs=new GenericOptionsParser(configuration,args).getRemainingArgs();
		if(otherArgs.length!=2){
			System.err.println("Usage:wordcount <in> <out>");
			System.exit(2);
		}

		//配置作业名
		Job job=new Job(configuration, "word count");
		job.setJarByClass(WordCount.class);
		job.setMapperClass(TokenizerMapper.class);
		job.setCombinerClass(IntSumReducer.class);
		job.setReducerClass(IntSumReducer.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(IntWritable.class);
		FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
		FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
		System.exit(job.waitForCompletion(true)?0:1);
	}
}

MapReduce,多理解流程执行,属性对应的API,然后就是锻炼自己的建模的思维,算法的相关锻炼等……

版权声明:本文为博主原创文章,未经博主允许不得转载。

时间: 2024-11-10 00:13:30

Hadoop(四)——编程核心MapReduce(上)的相关文章

hadoop(四) - 分布式计算利器MapReduce

一. MapReduce执行过程 MapReduce运行的时候, 会通过Mapper运行的任务读取HDFS中的数据文件, 然后调用自己的方法处理数据, 最后输出. Reduce任务会接受Mapper任务输出的数据, 作为自己输入的数据, 然后调用自己的方法, 最后输出到HDFS的文件中. 二. Mapper任务执行过程 每个Mapper任务都是一个java进程, 它会读取HDFS中的文件, 解析成很多键值对, 经过我们覆盖的map方法处理后, 转换成很多的键值对再输出, 整个Mapper任务的处

Hadoop Streaming 编程

1.概述 Hadoop Streaming是Hadoop提供的一个编程工具,它允许用户使用任何可执行文件或者脚本文件作为Mapper和Reducer,例如: 采用shell脚本语言中的一些命令作为mapper和reducer(cat作为mapper,wc作为reducer) $HADOOP_HOME/bin/hadoop  jar $HADOOP_HOME/contrib/streaming/hadoop-*-streaming.jar \ -input myInputDirs \ -outpu

hadoop 学习笔记:mapreduce框架详解

hadoop 学习笔记:mapreduce框架详解 开始聊mapreduce,mapreduce是hadoop的计算框架,我 学hadoop是从hive开始入手,再到hdfs,当我学习hdfs时候,就感觉到hdfs和mapreduce关系的紧密.这个可能是我做技术研究的 思路有关,我开始学习某一套技术总是想着这套技术到底能干什么,只有当我真正理解了这套技术解决了什么问题时候,我后续的学习就能逐步的加快,而学习 hdfs时候我就发现,要理解hadoop框架的意义,hdfs和mapreduce是密不

Hadoop详解 - HDFS - MapReduce - YARN - HA

为什么要有Hadoop? 从计算机诞生到现今,积累了海量的数据,这些海量的数据有结构化.半结构化.非 结构的数据,并且这些海量的数据存储和检索就成为了一大问题. 我们都知道大数据技术难题在于一个数据复杂性.数据量.大规模的数据计算. Hadoop就是为了解决这些问题而出现的. Hadoop的诞生 Doug Cutting是Lucene的作者,当时Lucene面临和谷歌同样的问题,就是海量的数据存储和检索,于是就诞生了Nutch. 在这之后,谷歌的大牛就为解决这个问题发了三篇论文(GFS.Map-

Hadoop初学指南(6)--MapReduce的简单实例及分析

本文在上一节的基础上通过一个简单的MR示例对MapReduce的运行流程进行分析. 假设有两行数据,分别是hello you,hello me,我们要统计其中出现的单词以及每个单词出现的次数. 所得的结果为 hello   2 you     1 me      1 (1)大致运行流畅 1.解析成2个<k,v>,分别是<0, hello you><10, hello me>.调用2次map函数. 2.执行map任务 3.map输出后的数据是:<hello,1>

hadoop安装和hadoop pipes编程说明

本篇文章主要是对hadoop pipes编程的一些问题的备注,对于网上常见的问题,并未完全写入. 安装 基础环境:3台基于centos7的虚拟机(1个master,2个slave:slave1.slave2).hadoop-2.6.0 1. hadoop安装主要参考的网址是:hadoop参考安装 2. linux配置ssh免密码登录,具体参考的是:centos ssh免密码登录 tips: 1. 三个虚拟机的一定要用一样的帐号,即用户名.如果不一样的话,通过ssh进行免密码登录时会出现问题.比如

大数据时代之hadoop(五):hadoop 分布式计算框架(MapReduce)

hadoop的核心分为两块,一是分布式存储系统-hdfs,这个我已经在上一章节大致讲了一下,另一个就是hadoop的计算框架-mapreduce. mapreduce其实就是一个移动式的基于key-value形式的分布式计算框架. 其计算分为两个阶段,map阶段和reduce阶段,都是对数据的处理,由于其入门非常简单,但是若想理解其中各个环节及实现细节还是有一定程度的困难,因此我计划在本文中只是挑几个mapreduce的核心来进行分析讲解. 1.MapReduce驱动程序默认值 编写mapred

Hadoop高级编程—构建与实现大数据解决方案pdf

下载地址:网盘下载 内容简介  · · · · · · 如果你已经准备好要充分实施大规模可扩展性数据分析工作,那么需要知道如何利用Hadoop技术.这本<Hadoop高级编程--构建与实现大数据解决方案>可以帮助你做到这一点!本书关注用于构建先进的.基于Hadoop的企业级应用的架构和方案,并为实现现实的解决方案提供深入的.代码级的讲解.本书还会带你领略数据设计以及数据设计如何影响实现.本书解释了MapReduce的工作原理,并展示了如何在MapReduce中重新定制特定的业务问题.在整本书中

《Hadoop高级编程》之为Hadoop实现构建企业级安全解决方案

本章内容提要 ●    理解企业级应用的安全顾虑 ●    理解Hadoop尚未为企业级应用提供的安全机制 ●    考察用于构建企业级安全解决方案的方法 第10章讨论了Hadoop安全性以及Hadoop中用于提供安全控制的机制.当构建企业级安全解决方案(它可能会围绕着与Hadoop数据集交互的许多应用程序和企业级服务)时,保证Hadoop自身的安全仅仅是安全解决方案的一个方面.各种组织努力对数据采用一致的安全机制,而数据是从采用了不同安全策略的异构数据源中提取的.当这些组织从多个源获取数据,接