[hadoop in Action] 第3章 Hadoop组件

  • 管理HDFS中的文件
  • 分析MapReduce框架中的组件
  • 读写输入输出数据

1、HDFS文件操作

[命令行方式]

Hadoop的文件命令采取的形式为:

hadoop fs -cmd <args>

其中,cmd是具体的文件命令,而<args>是一组数目可变的参数。

(1)添加文件和目录

HDFS有一个默认的工作目录/user/$USER,其中$USER是你的登录用户名。不过这个目录不会自动建立,让我们用mkdir命令创建它。Hadoop的mkdir命令会自动创建父目录,类似于UNIX中使用-p选项的mkdir命令。

hadoop fs -mkdir /user/chuck

如果想看到所有的子目录,则可以使用hadoop的lsr命令,类似于UNIX中打开-r选项的ls:

hadoop fs -lsr /

[输出结果显示出属性信息,比如权限、所有者、组、文件大小以及最后修改日期,所有这些都类似于UNIX的概念。显示“1”的列给出文件的复制因子。因为复制因子不适用于目录,故届时该列仅会显示一个破折号(-)]

在本地文件系统中创建一个名为examle.txt的文本文件,用hadoop的put命令将它从本地文件系统复制到HDFS中:

hadoop fs -put example.txt ./

(2)获取文件

从HDFS中复制文件到本地文件系统:

hadoop fs -get example.txt ./

显示HDFS中文件的内容:

hadoop fs -cat example.txt

[可以在hadoop的文件命令中使用UNIX的管道,将其结果发送给其他的UNIX命令做进一步处理]

查看最后一千字节:

hadoop fs -tail example.txt

(3)删除文件

删除HDFS中的文件:

hadoop fs -rm example.txt

[ rm命令还可以用于删除空目录]

删除目录(目录不为空):

hadoop fs -rmr /user/chuck

(4)查阅帮助

hadoop fs -help <cmd>

[编程方式]

hadoop命令行工具中有一个getmerge命令,用于把一组HDFS文件在复制到本地计算机以前进行合并,下面开发的是实现把本地计算机文件复制到HDFS以前进行合并:



代码清单 PutMerge程序

 1 import java.io.IOException;
 2
 3 import org.apache.hadoop.conf.Configuration;
 4 import org.apache.hadoop.fs.FSDataInputStream;
 5 import org.apache.hadoop.fs.FSDataOutputStream;
 6 import org.apache.hadoop.fs.FileStatus;
 7 import org.apache.hadoop.fs.FileSystem;
 8 import org.apache.hadoop.fs.Path;
 9
10 public class PutMerge {
11
12     public static void main(String[] args) throws IOException {
13
14         Configuration conf = new Configuration();
15         FileSystem hdfs  = FileSystem.get(conf);
16         FileSystem local = FileSystem.getLocal(conf);
17
18         Path inputDir = new Path(args[0]);   //(1)设定输入目录和输出文件
19         Path hdfsFile = new Path(args[1]);
20
21         try {
22             FileStatus[] inputFiles = local.listStatus(inputDir);    //(2)得到本地文件列表
23             FSDataOutputStream out = hdfs.create(hdfsFile);    //(3)生成HDFS输出流
24
25             for (int i=0; i<inputFiles.length; i++) {
26                 System.out.println(inputFiles[i].getPath().getName());
27                 FSDataInputStream in = local.open(inputFiles[i].getPath());    //(4)打开本地输入流
28                 byte buffer[] = new byte[256];
29                 int bytesRead = 0;
30                 while( (bytesRead = in.read(buffer)) > 0) {
31                     out.write(buffer, 0, bytesRead);
32                 }
33                 in.close();
34             }
35             out.close();
36         } catch (IOException e) {
37             e.printStackTrace();
38         }
39     }
40 }

(1)根据用户定义的参数设置本地目录和HDFS的目标文件;

(2)提取本地输入目录中每个文件的信息;

(3)创建一个输出流写入到HDFS文件;

(4)遍历本地目录中的每个文件,打开一个输入流来读取该文件


FileSystem类还有些方法用于其他标准文件操作,如delete()、exists()、mkdirs()和rename()。

2、剖析MapReduce程序

MapReduce程序通过操作键/值对来处理数据,一般形式为:

map:(k1, v1) ——> list(k2, v2)

reduce:(k2, list(v2)) ——> list(k3,v3)

  1. 输入数据;
  2. 输入数据被分布在节点上;
  3. 每个map任务处理一个数据分片;
  4. Mapper输出中间数据;
  5. 节点间的数据交换在“洗牌”阶段完成;
  6. 相同key的中间数据进入相同的reducer;
  7. 存储Reducer的输出。

虽然我们可以并且的确经常把某些键与值称为整数、字符串等,但它们实际上并不是Integer、String等那些标准的Java类。这是因为为了让键/值对可以在集群上移动,MapReduce框架提供了一种序列化键/值对的方法。因此,只有那些支持这种序列化的类能够在这个框架中充当键或者值。

更具体而言,实现Writable接口的类可以是值,而实现WritableComparable<T>接口的类既可以是键也可以是值。注意WritableComparable<T>接口是Writable和java.lang.Comparable<T>接口的组合。对于键而言,我们需要这个比较,因为它们将在Reduce阶段进行排序,而值仅会被简单地传递。

Hadoop带有一些预定义的类用于实现WritableComparable,包括面向所有基本数据类型的封装类,如下表:



描述

BooleanWritable

标准布尔变量的封装

ByteWritable

单字节数的封装

DoubleWritable

双字节数的封装

FloatWritable

浮点数的封装

IntWritable

整数的封装

LongWritable

长整数的封装

Text

使用UTF8格式的文本封装

NullWritable

无键值的占位符

键和值所采用的数据类型可以超过Hadoop自身所支持的基本类型,可以自定义数据类型,只要它实现了Writable(或WritableComparable<T>)接口。



代码清单 示例实现WritableComparable接口的类

 1 import java.io.DataInput;
 2 import java.io.DataOutput;
 3 import java.io.IOException;
 4
 5 import org.apache.hadoop.io.WritableComparable;
 6
 7 public class Edge implements WritableComparable<Edge> {
 8
 9     private String departureNode;
10     private String arrivalNode;
11
12     public String getDepartureNode() { return departureNode;}
13
14     @Override
15     public void readFields(DataInput in) throws IOException {    //(1)说明如何读入数据
16         departureNode = in.readUTF();
17         arrivalNode = in.readUTF();
18     }
19
20     @Override
21     public void write(DataOutput out) throws IOException {    //(2)说明如何写入数据
22         out.writeUTF(departureNode);
23         out.writeUTF(arrivalNode);
24     }
25
26     @Override
27     public int compareTo(Edge o) {    //(3)定义数据排序
28      return (departureNode.compareTo(o.departureNode) != 0)
29          ? departureNode.compareTo(o.departureNode)
30          : arrivalNode.compareTo(o.arrivalNode);
31     }
32 }

这个Edge类实现了Writable接口的readFields()及write()方法。它们与Java中的DataInput和DataOutput类一起用于类中内容的串行化。而Comparable接口中的实现是compareTo()方法。如果被调用的Edge小于、等于或者大于给定的Edge,这个方法会分别返回-1,0,1。


[Mapper]

一个类要作为mapper,需继承MapReducebase基类并实现Mapper接口。并不奇怪,mapper和reducer的基类均为MapReduceBase类。它包含类的构造与解构方法。

  • void configure(JobConfjob):该函数提取XML配置文件或者应用程序主类中的参数,在数据处理之前调用该函数。
  • void close():作为map任务结束前的最后一个操作,该函数完成所有的结尾工作,如关闭数据库连接、打开文件等。

Mapper接口负责数据处理阶段。它采用的形式为Mapper<k1,v1,k2,v2>Java泛型,这里键类和值类分别实现WritableComparable和Writable接口。Mapper只有一个方法——Map,用于处理一个单独的键/值对。

void map (k1 key, v1 value, OutputCollector<k2,v2> output, Reporter reporter) throws IOException

该函数处理一个给定的键/值对 (k1,v1),生成一个键/值对(k2,v2)的列表(该列表也可能为空)。OutputCollector接收这个映射过程的输出,Reporter可以提供对mapper相关附加信息的记录,形成任务进度。

Hadoop提供了一些有用的mapper实现,如下表:



描述

IdentityMapper<k,v>

实现Mapper<k,v,k,v>将输入直接映射到输出

InverseMapper<k,v>

实现Mapper<k,v,v,k>反转键/值对

RegexMapper<k>

实现Mapper<k,text,text,LongWritable>,为每个常规表达式的匹配项生成一个(match,1)对

TokenCountMapper<k>

实现Mapper<k,text,text,LongWritable>,当输入的值为分词时,生成一个(token,1)对

[Reducer]

reducer的实现和mapper一样必须首先在MapReduce基类上扩展,允许配置和清理。此外,它还必须实现Reducer接口使其具有如下的单一方法:

void reduce(k2 key, Iterator<v2> values, OutputCollector<k3,v3> output, Reporter reporter) throws IOException

当reducer任务接收来自各个mapper的输出时,它按照键/值对中的键对输入数据进行排序,并将相同键的值归并。然后调用reduce()函数,并通过迭代处理那些与指定键相关联的值,生成一个(可能为空的)列表(k3,v3)。OutputCollector接收reduce阶段的输出,并写入输出文件。Reporter可提供对reducer相关附加信息的记录,形成任务进度。

Hadoop提供了一些基本的reducer实现,如下表:



描述

IdentityReudcer<k,v>

实现Reducer<k,v,k,v>将输入直接映射到输出

LongSumReducer<k>

实现<k,LongWritable,k,LongWritable>, 计算与给定键相对应的所有值的和

[Partitioner:重定向Mapper输出]

当使用多个reducer时,我们就需要采取一些办法来确定mapper应该把键/值对输出给谁。默认的作法是对键进行散列来确定reducer。hadoop通过HashPartitioner类强制执行这个策略。但有时HashPartitioner会让你出错。

 1 public class EdgePartitioner implements Partitioner<Edge, Writable>
 2 {
 3      @verride
 4      public int getPartition(Edge key, Writable value, int numPartitions)
 5      {
 6           return key.getDepartureNode().hashCode() % numPartitions;
 7      }
 8
 9      @verride
10      public void configure(JobConf conf) { }
11 }

一个定制的partitioner只需要实现configure()和getPartition()两个函数。前者将hadoop对作业的配置应用在patittioner上,而后者返回一个介于0和reducer任务数之间的整数,指向键/值对将要发送的reducer。

在map和reduce阶段之间,一个MapReduce应用必然从mapper任务得到输出结果,并把这些结果发布给reduce任务。该过程通常被称为洗牌。

[Combiner:本地reduce]

在许多MapReduce应用场景中,我们不妨在分发mapper结果之前做一下“本地Reduce”。

[预定义的mapper和reducer类的单词计数]



代码清单 修改的WordCount例程

 1 import org.apache.hadoop.fs.Path;
 2 import org.apache.hadoop.io.Text;
 3 import org.apache.hadoop.io.LongWritable;
 4 import org.apache.hadoop.mapred.FileInputFormat;
 5 import org.apache.hadoop.mapred.FileOutputFormat;
 6 import org.apache.hadoop.mapred.JobClient;
 7 import org.apache.hadoop.mapred.JobConf;
 8 import org.apache.hadoop.mapred.lib.TokenCountMapper;
 9 import org.apache.hadoop.mapred.lib.LongSumReducer;
10
11 public class WordCount2 {
12     public static void main(String[] args) {
13         JobClient client = new JobClient();
14         JobConf conf = new JobConf(WordCount2.class);
15
16         FileInputFormat.addInputPath(conf, new Path(args[0]));
17         FileOutputFormat.setOutputPath(conf, new Path(args[1]));
18
19         conf.setOutputKeyClass(Text.class);
20         conf.setOutputValueClass(LongWritable.class);
21         conf.setMapperClass(TokenCountMapper.class);
22         conf.setCombinerClass(LongSumReducer.class);
23         conf.setReducerClass(LongSumReducer.class);
24
25         client.setConf(conf);
26         try {
27             JobClient.runJob(conf);
28         } catch (Exception e) {
29             e.printStackTrace();
30         }
31     }
32 }

3、读和写

[InputFormat]

hadoop分割与读取输入文件的方式被定义在InputFormat接口的一个实现中。TextInputFormat是InputFormat的默认实现,当你想要一次获取一行内容而输入数据又没有确定的键值时,这种数据格式通常会非常有用。

常用的InputFormat类,如下表:


InputFormat

描述

TextInputFormat

在文本文件中每一行均为一个记录。键(key)为一行的字节偏移,而值(value)为一行的内容

key: LongWritable

value: Text


KeyValueTextInputFormat

在文本文件中的每一行均为一个记录。以每行的第一个分隔符为界,分隔符之前的是键(key),之后的是值(value)。分离器在属性key.value.separator.in.input.line中设定,默认为制表符(\t)。

key: Text

Value: Text


SequenceFileInputFormat<k,v>

用于读取序列文件的InputFormat。键和值由用户定义。序列文件为hadoop专用的压缩二进制文件格式。它专用于一个MapReduce作业和其他MapReduce作业之间传送数据。

key: K(用户定义)

value: V(用户定义)


NLineInputFormat

与TextInputFormat相同,但每个分片一定有N行。N在属性mapred.line.input.format.linespermap中设定,默认为1.

key: LongWritable

value: Text

可以设置JobConf对象使用KeyValueTextInputFormat类读取这个文件:

conf.setInputFormat(KeyValueTextInputFormat.class);

回想一下,我们之前在mapper中曾使用LongWritable和Text分别作为键(key)和值(value)的类型。在TextInputFormat中,因为值为用数字表示的偏移量,所以LongWritable是一个合理的键类型。而当使用KeyvalueTextInputFormat时,无论是键和值都为Text类型,你必须改变mapper的实现以及map()方法来适应这个新的键(key)类型。

生成一个定制的InputFormat:略

[OutputFormat]

当MapReduce输出数据到文件时,使用的是OutputForamt类,它与inputForamt类相似。因为每个reducer仅需将它的输出写入自己的文件中,输出无需分片。输出文件放在一个公用目录中,通常命名为part-nnnnn,这里nnnnn是reducer的分区ID。RecordWriter对象将输出结果进行格式化,而RecordReader对输入格式进行解析。

常用的OutputFormat类,如下表:


OutputFormat

描述

TextOutputFormat<k,v>
将每个记录写为一行文本。键和值以字符串的形式写入,并以制表符(\t)分隔。这个分隔符可以在属性mapred.textoutputformat.separator中修改

SequenceFileOutputFormat<k,v>
以hadoop专有序列文件格式写入键/值对。与SequenceFileInputForamt配合使用

NullOutputFormat<k,v>
无输出
时间: 2024-08-05 23:30:00

[hadoop in Action] 第3章 Hadoop组件的相关文章

[Hadoop in Action] 第1章 Hadoop简介

编写可扩展.分布式的数据密集型程序和基础知识 理解Hadoop和MapReduce 编写和运行一个基本的MapReduce程序 1.什么是Hadoop Hadoop是一个开源的框架,可编写和运行分布式应用处理大规模数据. Hadoop与众不同之处在于以下几点: 方便--Hadoop运行在由一般商用机器构成的大型集群上,或者云计算服务之上: 健壮--Hadoop致力于在一般商用硬件上运行,其架构假设硬件会频繁地出现失效: 可扩展--Hadoop通过增加集群节点,可以线性地扩展以处理更大的数据集:

[Hadoop in Action] 第6章 编程实践

Hadoop程序开发的独门绝技 在本地,伪分布和全分布模式下调试程序 程序输出的完整性检查和回归测试 日志和监控 性能调优 1.开发MapReduce程序 [本地模式] 本地模式下的hadoop将所有的运行都放在一个单独的Java虚拟机中完成,并且使用的是本地文件系统(非HDFS).在本地模式中运行的程序将所有的日志和错误信息都输出到控制台,最后它会给出所处理数据的总量. 对程序进行正确性检查: 完整性检查 回归测试 考虑使用long而非int [伪分布模式] 本地模式不具备生产型hadoop集

[hadoop读书笔记] 第四章 Hadoop I/O操作

P92 压缩 P102 序列化 序列化:将结构化对象转为字节流便于在网上传输或写到磁盘进行永久性存储的过程 用于进程之间的通信或者数据的永久存储 反序列化:将字节流转为结构化对象的逆过程 Hadoop中的序列化:在Hadoop中,系统中多个节点上进程间的通信是通过远程过程传输RPC来实现的. RPC协议将消息序列化成二进制流后发送到远程节点,远程节点接着将二进制流反序列化成原始信息. Avro:一个独立于编程语言,并基于 IDL的序列化框架,非常适合用于Hadoop的大规模数据处理

[Hadoop in Action] 第2章 初识Hadoop

Hadoop的结构组成 安装Hadoop及其3种工作模式:单机.伪分布和全分布 用于监控Hadoop安装的Web工具 1.Hadoop的构造模块 (1)NameNode(名字节点) Hadoop在分布式计算和分布式存储中都采用了主/从结构.NameNode位于HDFS的主端,它指导从端的DataNode执行底层的I/O任务.NameNode是HDFS的书记员,它跟踪文件如何被分割成文件块,而这些块又被哪些节点存储,以及分布式文件系统的整体运行状态是否正常. 运行NameNode消耗大量的内存和I

大数据hadoop领域技术总体介绍(各个组件的作用)

2019/2/16 星期六 大数据领域技术总体介绍(各个组件的作用)1.大数据技术介绍大数据技术生态体系:Hadoop 元老级分布式海量数据存储.处理技术系统,擅长离线数据分析Hbase 基于hadoop 的分布式海量数据库,离线分析和在线业务通吃Hive sql 基于hadoop 的数据仓库工具,使用方便,功能丰富,使用方法类似SQLZookeeper 集群协调服务Sqoop 数据导入导出工具Flume 数据采集框架 //经常会结合kafka+flume数据流 或者用于大量的日志收集到hdfs

Solr In Action 中文版 第一章 (二)

Solr到底是什么? 在本节中,我们通过从头设计一个搜索应用来介绍Solr的关键组件.这个过程将有助于你理解Solr的功能,以及设计这些功能的初衷.不过在我们开始介绍Solr的功能特性之前,还是要先澄清一下Solr并不具有的一些性质: 1)  Solr并不是一个像Google或是Bing那样的web搜索引擎 2)  Solr和网站优化中经常提到的搜索引擎SEO优化没有任何关系 好了,现在假设我们准备为潜在的购房客户设计一个不动产搜索的网络应用.该应用的核心用例场景是通过网页浏览器来搜索全美国范围

《Hadoop高级编程》之为Hadoop实现构建企业级安全解决方案

本章内容提要 ●    理解企业级应用的安全顾虑 ●    理解Hadoop尚未为企业级应用提供的安全机制 ●    考察用于构建企业级安全解决方案的方法 第10章讨论了Hadoop安全性以及Hadoop中用于提供安全控制的机制.当构建企业级安全解决方案(它可能会围绕着与Hadoop数据集交互的许多应用程序和企业级服务)时,保证Hadoop自身的安全仅仅是安全解决方案的一个方面.各种组织努力对数据采用一致的安全机制,而数据是从采用了不同安全策略的异构数据源中提取的.当这些组织从多个源获取数据,接

Solr In Action 中文版 第一章(四、五)

1.1             功能概览1. 4 最后,让我们再按照下面的分类,快速的过一下Solr的主要功能: ·用户体验 ·数据建模 ·Solr 4的新功能 在本书中,为你的用户提供良好的搜索体验会一直贯穿全书的主题.所以我们就从用户体验开始,看看Solr是如何让你的用户感觉到爽的. 1.4.1             用户体验类功能 Solr提供了一系列的重要功能来帮助你搭建一个易用的,符合用户直觉的,功能强大的搜索引擎.不过你需要注意的是Solr仅仅是提供了类REST风格的HTTP AP

hadoop搭建杂记:Linux下hadoop的安装配置

VirtualBox搭建伪分布式模式:hadoop的下载与配置 VirtualBox搭建伪分布式模式:hadoop的下载与配置 由于个人机子略渣,无法部署XWindow环境,直接用的Shell来操作,想要用鼠标点击操作的出门转左不送- 1.hadoop的下载与解压 wget http://mirror.bit.edu.cn/apache/hadoop/common/stable2/hadoop-2.7.1.tar.gzmkdir /usr/hadooptar -xzvf hadoop-2.7.1