[Hadoop in Action] 第6章 编程实践

  • Hadoop程序开发的独门绝技
  • 在本地,伪分布和全分布模式下调试程序
  • 程序输出的完整性检查和回归测试
  • 日志和监控
  • 性能调优

1、开发MapReduce程序

[本地模式]

本地模式下的hadoop将所有的运行都放在一个单独的Java虚拟机中完成,并且使用的是本地文件系统(非HDFS)。在本地模式中运行的程序将所有的日志和错误信息都输出到控制台,最后它会给出所处理数据的总量。

对程序进行正确性检查:

  • 完整性检查
  • 回归测试
  • 考虑使用long而非int

[伪分布模式]

本地模式不具备生产型hadoop集群的分布式特征。一些bug在运行本地模式时是不会出现的。现在是通过日志文件和web界面远程监视它,这些工具和以后在监控生产集群时用的工具是相同的。

2、生产集群上的监视和调试

[计数器]



代码清单 使用计数器统计缺失值个数的MapClass

  1 import java.io.IOException;
  2 import java.util.regex.PatternSyntaxException;
  3 import java.util.Iterator;
  4
  5 import org.apache.hadoop.conf.Configuration;
  6 import org.apache.hadoop.conf.Configured;
  7 import org.apache.hadoop.fs.Path;
  8 import org.apache.hadoop.io.IntWritable;
  9 import org.apache.hadoop.io.LongWritable;
 10 import org.apache.hadoop.io.DoubleWritable;
 11 import org.apache.hadoop.io.Text;
 12 import org.apache.hadoop.mapred.*;
 13 import org.apache.hadoop.util.Tool;
 14 import org.apache.hadoop.util.ToolRunner;
 15
 16
 17 public class AveragingWithCombiner extends Configured implements Tool {
 18
 19     public static class MapClass extends MapReduceBase
 20         implements Mapper<LongWritable, Text, Text, Text> {
 21
 22         static enum ClaimsCounters { MISSING, QUOTED };
 23
 24         public void map(LongWritable key, Text value,
 25                         OutputCollector<Text, Text> output,
 26                         Reporter reporter) throws IOException {
 27
 28             String fields[] = value.toString().split(",", -20);
 29             String country = fields[4];
 30             String numClaims = fields[8];
 31             if (numClaims.length() == 0) {
 32                 reporter.incrCounter(ClaimsCounters.MISSING, 1);
 33             } else if (numClaims.startsWith("\"")) {
 34                 reporter.incrCounter(ClaimsCounters.QUOTED, 1);
 35             } else {
 36                 output.collect(new Text(country), new Text(numClaims + ",1"));
 37             }
 38
 39         }
 40     }
 41
 42     public static class Combine extends MapReduceBase
 43         implements Reducer<Text, Text, Text, Text> {
 44
 45         public void reduce(Text key, Iterator<Text> values,
 46                            OutputCollector<Text, Text> output,
 47                            Reporter reporter) throws IOException {
 48
 49             double sum = 0;
 50             int count = 0;
 51             while (values.hasNext()) {
 52                 String fields[] = values.next().toString().split(",");
 53                 sum += Double.parseDouble(fields[0]);
 54                 count += Integer.parseInt(fields[1]);
 55             }
 56             output.collect(key, new Text(sum + "," + count));
 57         }
 58     }
 59
 60     public static class Reduce extends MapReduceBase
 61         implements Reducer<Text, Text, Text, DoubleWritable> {
 62
 63         public void reduce(Text key, Iterator<Text> values,
 64                            OutputCollector<Text, DoubleWritable> output,
 65                            Reporter reporter) throws IOException {
 66
 67             double sum = 0;
 68             int count = 0;
 69             while (values.hasNext()) {
 70                 String fields[] = values.next().toString().split(",");
 71                 sum += Double.parseDouble(fields[0]);
 72                 count += Integer.parseInt(fields[1]);
 73             }
 74             output.collect(key, new DoubleWritable(sum/count));
 75         }
 76     }
 77
 78     public int run(String[] args) throws Exception {
 79         // Configuration processed by ToolRunner
 80         Configuration conf = getConf();
 81
 82         // Create a JobConf using the processed conf
 83         JobConf job = new JobConf(conf, AveragingWithCombiner.class);
 84
 85         // Process custom command-line options
 86         Path in = new Path(args[0]);
 87         Path out = new Path(args[1]);
 88         FileInputFormat.setInputPaths(job, in);
 89         FileOutputFormat.setOutputPath(job, out);
 90
 91         // Specify various job-specific parameters
 92         job.setJobName("AveragingWithCombiner");
 93         job.setMapperClass(MapClass.class);
 94         job.setCombinerClass(Combine.class);
 95         job.setReducerClass(Reduce.class);
 96
 97         job.setInputFormat(TextInputFormat.class);
 98         job.setOutputFormat(TextOutputFormat.class);
 99         job.setOutputKeyClass(Text.class);
100         job.setOutputValueClass(Text.class);
101
102         // Submit the job, then poll for progress until the job is complete
103         JobClient.runJob(job);
104
105         return 0;
106     }
107
108     public static void main(String[] args) throws Exception {
109         // Let ToolRunner handle generic command-line options
110         int res = ToolRunner.run(new Configuration(), new AveragingWithCombiner(), args);
111
112         System.exit(res);
113     }
114 }

[跳过坏记录]

(1)在Java中配置记录跳读

hadoop从0.19版本起就已经支持skipping特征了,但默认状态是关闭的。在Java中,这个特征由类SkipBadRecords来控制,全部由静态方法组成。作业的driver需要调用如下的一个或全部方法:

public static void setMapperMaxSkipRecords(Configuration conf, long maxSkipRecs)

public static void setReducerMaxSkipGroups(Configuration conf, long maxSkipRecs)

来分别为map任务和reduce任务打开记录跳读的设置。如果最大的跳读区域大小被设置为0(默认),那么记录跳读就处于关闭状态。可以使用JobConf.setMaxMapAttempts()和JobConf.setMaxReduceAttempts()方法,或者设置等效的属性mapred.map.max.attempts和mapred.reduce.max.attempts来做到这点。

如果skipping被启用,hadoop在任务失效两次后就进入skipping模式。你可以在SkipBadRecords的setAttemptsToStartSkipping()方法中设置触发skipping模式的任务失效次数:

public static void setAttemptsToStartSkipping(Configuration conf, int attemptsToStartSkipping)

hadoop会把被跳过的记录写入HDFS以供以后分析,它们以序列文件的形式写入_log/skip目录,可以用hadoop fs -text <filepath>解压并读取。你可以使用方法SkipBadRecords.setSkipOutputPath(JobConf conf, Path path)修改当前用于存放被跳过记录的目录_log/skip,如果path被设为空,或者一个值为“none”的字符串path,hadoop就会放弃记录被跳过的记录。

(2)在Java之外配置记录跳读


SkipBadRecords方法

JobConf属性
setAttemptsToStartSkipping() mapred.skip.attempts.to.start.skipping
setMapperMaxSkipRecords() mapred.skip.map.max.skip.records
setReducerMaxSkipGroups() mapred.skip.reduce.max.skip.groups
setSkipOutputPath() mapred.skip.out.dir
setAutoIncrMapperProcCount() mapred.skip.map.auto.incr.proc.count
setAutoIncrReducerProcCount() mapred.skip.reduce.auto.incr.proc.count

3、性能调优

(1)通过combiner来减少网络流量

Combiner可以减少在map和reduce阶段之间洗牌的数据量,较低的网络流量缩短了执行时间。

(2)减少输入数据量

(3)使用压缩

     hadoop内置支持压缩与解压。启用对map输出的压缩涉及对两个属性的配置:


属性

描述
mapred.compress.map.output Boolean属性,表示mapper的输出是否被压缩
mapred.map.output.compression.codec Class属性,表示哪种CompressionCodec被用于压缩mapper的输出

conf.setBoolean(“mapred.compress.map.output”, true);

conf.setClass(“mapred.map.output.compression.codec”, GzipCodec.calss, CompressionCodec.class);

也可以直接使用JobConf中的便捷方法setCompressionMapOutput()和setMapOutputCompressorClass()。

(4)重用JVM

hadoop从版本0.19.0开始,允许相同作业的多个任务之间重用JVM。因此,启动开销被平摊到多个任务中。一个新属性(mapred.job.reuse.jvm.num.tasks)指定了一个JVM可以运行的最大任务数。它默认值为1,此时JVM不能被重用。你可以增大该属性值来启用JVM重用。如果将其设置为-1,则意味着在可重复使用JVM的任务数量上没有限制。在JobConf对象中有一个便捷方法,setNumTasksToExecutePerJvm(int),可以用它很方便地设置作业的属性。

(5)根据猜测执行来运行

启动和禁止猜测执行的配置属性:


属性

描述
mapred.map.tasks.speculative.execution 布尔属性,表示是否运行map任务猜测执行
mapred.reduce.tasks.speculative.execution 布尔属性,表示是否运行reduce任务猜测执行

(6)代码重构与算法重写

Streaming程序重写为hadoop的Java程序

[转载请注明] http://www.cnblogs.com/zhengrunjian/

时间: 2024-11-05 10:27:21

[Hadoop in Action] 第6章 编程实践的相关文章

[Hadoop in Action] 第1章 Hadoop简介

编写可扩展.分布式的数据密集型程序和基础知识 理解Hadoop和MapReduce 编写和运行一个基本的MapReduce程序 1.什么是Hadoop Hadoop是一个开源的框架,可编写和运行分布式应用处理大规模数据. Hadoop与众不同之处在于以下几点: 方便--Hadoop运行在由一般商用机器构成的大型集群上,或者云计算服务之上: 健壮--Hadoop致力于在一般商用硬件上运行,其架构假设硬件会频繁地出现失效: 可扩展--Hadoop通过增加集群节点,可以线性地扩展以处理更大的数据集:

[hadoop in Action] 第3章 Hadoop组件

管理HDFS中的文件 分析MapReduce框架中的组件 读写输入输出数据 1.HDFS文件操作 [命令行方式] Hadoop的文件命令采取的形式为: hadoop fs -cmd <args> 其中,cmd是具体的文件命令,而<args>是一组数目可变的参数. (1)添加文件和目录 HDFS有一个默认的工作目录/user/$USER,其中$USER是你的登录用户名.不过这个目录不会自动建立,让我们用mkdir命令创建它.Hadoop的mkdir命令会自动创建父目录,类似于UNIX

[Hadoop in Action] 第2章 初识Hadoop

Hadoop的结构组成 安装Hadoop及其3种工作模式:单机.伪分布和全分布 用于监控Hadoop安装的Web工具 1.Hadoop的构造模块 (1)NameNode(名字节点) Hadoop在分布式计算和分布式存储中都采用了主/从结构.NameNode位于HDFS的主端,它指导从端的DataNode执行底层的I/O任务.NameNode是HDFS的书记员,它跟踪文件如何被分割成文件块,而这些块又被哪些节点存储,以及分布式文件系统的整体运行状态是否正常. 运行NameNode消耗大量的内存和I

终端控制和和信号——《Unix/Linux编程实践教程》读书笔记(第6章)

1.有些程序处理从特定设备来的数据.这些与特定设备相关的程序必须控制与设备的连接.Unix系统中最常见的设备是终端. 2.终端驱动程序有很多设置.各个设置的特定值决定了终端驱动程序的模式.为用户编写的程序通常需要设置终端驱动程序为特定的模式. 3.键盘输入分为3类,终端驱动程序对这些输入做不同的处理.大多数建代表常规数据,它们从驱动程序传输到程序.有些键调用驱动程序中的编辑函数.如果按下删除键,驱动程序将前一个字符从它的行缓冲中删除,并将命令发送到终端屏幕,使之从显示器中删除字符.最后,有些键调

进程和程序:编写shell——《Unix/Linux编程实践教程》读书笔记(第8章)

1.Unix shell的功能 shell是一个管理进程和运行程序的程序.所有常用的shell都有3个主要功能: (1)运行程序: (2)管理输入和输出 (3)可编程 shell同时也是带有变量和流程控制的编程语言. 2.Unix的进程模型 一个程序是存储在文件中的机器指令序列,一般它是由编译器将源代码编译成二进制格式的代码.运行一个程序意味着将这些机器指令序列载入内存然后让处理器(CPU)逐条执行.在Unix术语中,一个可执行程序是一些机器指令机器数据的序列.一个进程是程序运行时的内存空间和设

事件驱动编程——《Unix/Linux编程实践教程》读书笔记(第7章)

1.curses库 /* 基本curses函数 */ initscr(); // 初始化curses库和tty endwin(); // 关闭curses并重置tty refresh(); // 使屏幕按照你的意图显示 move(r, c); // 移动光标到屏幕的(r, c)位置 addstr(s); // 在当前位置画字符串s addch(c); // 在当前位置画字符c clear(); // 清屏 standout(); // 启动standout模式(一般使屏幕反色) standend

I/O重定向和管道——《Unix/Linux编程实践教程》读书笔记(第10章)

1.I/O重定向的概念与原因 及 标准输入.输出的标准错误的定义 所以的Unix I/O重定向都基于标准数据流的原理.三个数据了分别如下: 1)标准输入--需要处理的数据流 2)标准输出--结果数据流 3)标准错误输出--错误消息流 概念:所以的Unix工具都使用文件描述符0.1和2.标准输入文件的描述符是0,标准输出的文件描述符是1,而标准错误输出的文件描述符则是2.Unix假设文件描述符0.1.2已经被打开,可以分别进行读写操作. 通常通过shell命令行运行Unix系统工具时,stdin.

郑捷《机器学习算法原理与编程实践》学习笔记(第六章 神经网络初步)6.3 自组织特征映射神经网路(SMO)

具体原理网址:http://wenku.baidu.com/link?url=zSDn1fRKXlfafc_tbofxw1mTaY0LgtH4GWHqs5rl8w2l5I4GF35PmiO43Cnz3YeFrrkGsXgnFmqoKGGaCrylnBgx4cZC3vymiRYvC4d3DF3 自组织特征映射神经网络(Self-Organizing Feature Map.也称Kohonen映射),简称为SMO网络,主要用于解决模式识别类的问题.SMO网络属于无监督学习算法,与之前的Kmeans算

[Java 并发] Java并发编程实践 思维导图 - 第一章 简介

阅读<Java并发编程实践>一书后整理的思维导图.