Hadoop学习笔记(7) ——高级编程

Hadoop学习笔记(7)

——高级编程

从前面的学习中,我们了解到了MapReduce整个过程需要经过以下几个步骤:

1.输入(input):将输入数据分成一个个split,并将split进一步拆成<key, value>。

2.映射(map):根据输入的<key, value>进生处理,

3.合并(combiner):合并中间相两同的key值。

4.分区(Partition):将<key, value>分成N分,分别送到下一环节。

5.化简(Reduce):将中间结果合并,得到最终结果

6.输出(output):负责输入最终结果。

其中第3、4步又成洗牌(shuffle)过程。

从前面HelloWorld示例中,我们看到,我们只去个性化了Map和Reduce函数,那其他函数呢,是否可以个性化?答案当然是肯定的。下面我们就对每个环节的个性化进行介绍。

自定义输入格式

输入格式(InputFormat)用于描述整个MapReduce作业的数据输入规范。先对输入的文件进行格式规范检查,如输入路径,后缀等检查;然后对数据文件进行输入分块(split);再对数据块逐一读出;最后转换成Map所需要的<key, value>健值对。

系统中提供丰富的预置输入格式。最常用的以下两种:

TextInputFormat:系统默认的数据输入格式。将文件分块,并逐行读入,每一行记录行成一对<key, value>。其中,key值为当前行在整个文件中的偏移量,value值为这一行的文本内容。

KeyValueTextInputFormat:这是另一个常用的数据输入格式,读入的文本文件内容要求是以<key, value>形式。读出的结果也就直接形成<key, value>送入map函数中。

如果选择输入格式呢?那就只要在job函数中调用

  1. job.setInputFormatClass(TextInputFormat.class);

在Hello中我们没有设定,系统默认选择了TextInputFormat。

一般情况够用了,但某些情况下,还是无法满足用户的需求,所以还是需要个性化。个性化则按下面的方式进行:

如果数据我们是来源于文件,则可以继承FileInputFormat:

  1. public class MyInputFormat extends FileInputFormat<Text,Text> {
  2.    @Override
  3.    public RecordReader<Text, Text> createRecordReader(InputSplit split,
  4.          TaskAttemptContext context) throws IOException, InterruptedException {
  5.       // TODO Auto-generated method stub
  6.       return null;
  7.    }
  8. }

如果数据我们是来源于非文件,如关系数据,则继承

  1. public class MyInputFormat extends InputFormat<Text,Text> {
  2.  
  3.    @Override
  4.    public RecordReader<Text, Text> createRecordReader(InputSplit arg0,
  5.          TaskAttemptContext arg1) throws IOException, InterruptedException {
  6.       // TODO Auto-generated method stub
  7.       return null;
  8.    }
  9.  
  10.    @Override
  11.    public List<InputSplit> getSplits(JobContext arg0) throws IOException,
  12.          InterruptedException {
  13.       // TODO Auto-generated method stub
  14.       return null;
  15.    }
  16.  
  17. }

这里比较清晰了,下面个函数为拆分成split,上面个函数跟据split输出成Key,value。

自定义map处理

这个好理解,我们的HelloWorld程序中就自定义了map处理函数。然后在job中指定了我们的处理类:

  1. job.setMapperClass(TokenizerMapper.class);

能不能没有map呢? 可以的,如果没有map,也就是这与上面的这个setMapperClass,则系统自动指定一个null,这时处理是将输入的<key,value>值,不作任何修改,直接送到下一环节中。

个性化代码如下:

  1. public static class TokenizerMapper
  2.        extends Mapper<Object, Text, Text, IntWritable>{
  3.  
  4.     public void map(Object key, Text value, Context context
  5.                     ) throws IOException, InterruptedException {
  6.  
  7.         context.write(key, value);
  8.     }
  9.   }

自定义合并Combiner

自定义合并Combiner类,主要目的是减少Map阶段输出中间结果的数据量,降低数据的网络传输开销。

Combine过程,实际跟Reduce过程相似,只是执行不同,Reduce是在Reducer环节运行,而Combine是紧跟着Map之后,在同一台机器上预先将结时进行一轮合并,以减少送到Reducer的数据量。所以在HelloWorld时,可以看到,Combiner和Reducer用的是同一个类:

  1. job.setCombinerClass(IntSumReducer.class);
  2. job.setReducerClass(IntSumReducer.class);

如何个性化呢,这个跟Reducer差不多了:

  1. public static class MyCombiner
  2.       extends Reducer<Text,IntWritable,Text,IntWritable> {
  3.  
  4.    public void reduce(Text key, Iterable<IntWritable> values,
  5.                       Context context
  6.                       ) throws IOException, InterruptedException {
  7.  
  8.      context.write(key, new IntWritable(1));
  9.    }
  10.  }

自定义分区Partitioner

在MapReduce程序中,Partitioner决定着Map节点的输出将被分区到哪个Reduce节点。而默认的Partitioner是HashPartitioner,它根据每条数据记录的主健值进行Hash操作,获得一个非负整数的Hash码,然后用当前作业的Reduce节点数取模运算,有N个结点的话,就会平均分配置到N个节点上,一个隔一个依次。大多情况下这个平均分配是够用了,但也会有一些特殊情况,比如某个文件的,不能被拆开到两个结点中,这样就需要个性化了。

个性化方式如下:

  1. public static class MyPartitioner
  2.       extends HashPartitioner<K,V> {
  3.  
  4.    public void getPartition(K key, V value,int numReduceTasks) {
  5.  
  6.      super.getPartition(key,value,numReduceTasks);
  7.    }
  8.  }

方式其实就是在执行之前可以改变一下key,来欺骗这个hash表。

自定义化简(Reducer)

这一块是将Map送来的结果进行化简处理,并形成最终的输出值。与前面map一样,在HelloWorld中我们就见到过了。通过下面代码可以设置其值:

  1. job.setReducerClass(IntSumReducer.class);

同样,也可以这样类可以不设置,如果不设置的话,就是把前面送来的值,直接送向输出格式器中。

如果要个性化,则如下:

  1.   public static class IntSumReducer
  2.      extends Reducer<Text,IntWritable,Text,IntWritable> {
  3.  
  4.   public void reduce(Text key, Iterable<IntWritable> values,
  5.                      Context context
  6.                      ) throws IOException, InterruptedException {
  7.     context.write(key, result);
  8.   }
  9. }

自定义输出格式

数据输出格式(OutPutFormat)用于描述MapReduce作业的数据输出规范。Hadoop提供了丰富的内置数据输出格式。最常的数据输出格式是TextOutputFormat,也是系统默认的数据输出格式,将结果以"key+\t+value"的形式逐行输出到文本文件中。还有其它的,如:DBOutputFormat,FileOutputFormat,FilterOutputFormat,IndexUpdataOutputFormat,LazyOutputFormat,MapFileOutputFormat,等等。

如果要个性化,则按下面方式进行:

  1. public class MyOutputFormat extends OutputFormat<Text,Text> {
  2.  
  3.    @Override
  4.    public void checkOutputSpecs(JobContext arg0) throws IOException,
  5.          InterruptedException {
  6.       // TODO Auto-generated method stub
  7.  
  8.    }
  9.  
  10.    @Override
  11.    public OutputCommitter getOutputCommitter(TaskAttemptContext arg0)
  12.          throws IOException, InterruptedException {
  13.       // TODO Auto-generated method stub
  14.       return null;
  15.    }
  16.  
  17.    @Override
  18.    public RecordWriter<Text, Text> getRecordWriter(TaskAttemptContext arg0)
  19.          throws IOException, InterruptedException {
  20.       // TODO Auto-generated method stub
  21.       return null;
  22.    }
  23.  
  24. }

复合健——用户自定义类型。

从前面的整个过程中可以看到,都是采用key-value的方式进行传入传出,而这些类型大多是单一的字符串,和整型。如果我的key中需要包含多个信息怎么办?用字符串直接拼接么? 太不方便了,最好能够自己定义一个类,作为这个key,这样就方便了。

如果定义一个类作为key 或value的类型? 有什么要求?就是这个类型必须要继承WritableComparable<T>这个类,所以如果要自定义一个类型则可以这么实现:

  1. public class MyType implements WritableComparable<MyType> {
  2.  
  3.    private float x,y;
  4.    public float GetX(){return x;}
  5.    public float GetY(){return y;}
  6.  
  7.       @Override
  8.       public void readFields(DataInput in) throws IOException {
  9.          x = in.readFloat();
  10.          y = in.readFloat();
  11.       }
  12.  
  13.       @Override
  14.       public void write(DataOutput out) throws IOException {
  15.          out.writeFloat(x);
  16.          out.writeFloat(y);
  17.       }
  18.  
  19.       @Override
  20.       public int compareTo(MyType arg0) {
  21.          //输入:-1(小于) 0(等于) 1(大于)
  22.          return 0;
  23.       }
  24.    }

这个示例中,我们添加了两个float变量:x,y 。 这个信息能过int 和out按次序进行输入输出。最后,再实现一个比较函数即可。

Job任务的创建

  1. Job job = new Job(conf, "word count");
  2.    job.setJarByClass(WordCount.class);
  3.    job.setInputFormatClass(MyInputFormat.class);
  4.    job.setMapperClass(TokenizerMapper.class);
  5.    job.setCombinerClass(IntSumReducer.class);
  6.    job.setPartitionerClass(MyPartitioner.class);
  7.    job.setReducerClass(IntSumReducer.class);
  8.    job.setOutputFormatClass(TextOutputFormat.class);
  9.    job.setOutputKeyClass(Text.class);
  10.    job.setOutputValueClass(IntWritable.class);
  11.    FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
  12.    FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));

任务创建比较容易,其实就是new一个实例,然后把上面描述的过程类设置好,然后加上第2行中,jar包的主类,第10、11行的输入输出路径。这样就完事了。

Job任务的执行

单个任务的执行,没有什么问题,可以用这个:

  1. job.waitForCompletion(true);

但多个任务呢? 多个任务的话,就会形成其组织方式,有串行,有并行,有无关,有组合的,如下图:

图中,Job2和Job3将会等Job1执行完了再执行,且可以同时开始,而Job4必须等Job2和Job3同时结束后才结束。

这个组合,就可以采用这样的代码来实现:

  1. Configuration conf = new Configuration();
  2.       Job job1 = new Job(conf, "job1");
  3.       //.. config Job1
  4.       Job job2 = new Job(conf, "job2");
  5.       //.. config Job2
  6.       Job job3 = new Job(conf, "job3");
  7.       //.. config Job3
  8.       Job job4 = new Job(conf, "job4");
  9.       //.. config Job4
  10.  
  11.       //添加依赖关系
  12.       job2.addDependingJob(job1);
  13.       job3.addDependingJob(job1);
  14.       job4.addDependingJob(job2);
  15.       job4.addDependingJob(job3);
  16.  
  17.       JobControl jc = new JobControl("jbo name");
  18.       jc.addJob(job1);
  19.       jc.addJob(job2);
  20.       jc.addJob(job3);
  21.       jc.addJob(job4);
  22.       jc.run();

总述

现在回头看看,其实整个hadoop编程,也就是这几块内容了,要实现某个功能,我们就往上面这些步骤上套,然后联起来执行,达到我们的目的。

时间: 2024-08-04 22:21:31

Hadoop学习笔记(7) ——高级编程的相关文章

c++学习笔记の面向对象高级编程@Boolan

一.函数的传参&返回值(引用传值效率高于值传递) 1)参数尽量考虑使用对象引用,对于要求函数不改变对象内容的情况使用const. 2)  返回值尽量使用对象引用, 对于返回值是局部对象的情况只能返回对象不能是引用 原因是局部的对象在函数执行完毕之后会被析构,外部的引用将对应一个无效的对象. 举个栗子理解下函数传参,假设自己有份文件需要别人帮你处理,你把文件传递给别人的方式简单来说可以有两种: 1.将文件拷贝一份发送给对方,之后让他处理,这样的好处是他的修改不会影响到你的原始文件 缺点是浪费空间,

Hadoop学习笔记(6) ——重新认识Hadoop

Hadoop学习笔记(6) ——重新认识Hadoop 之前,我们把hadoop从下载包部署到编写了helloworld,看到了结果.现是得开始稍微更深入地了解hadoop了. Hadoop包含了两大功能DFS和MapReduce, DFS可以理解为一个分布式文件系统,存储而已,所以这里暂时就不深入研究了,等后面读了其源码后,再来深入分析. 所以这里主要来研究一下MapReduce. 这样,我们先来看一下MapReduce的思想来源: alert("I'd like some Spaghetti!

Hadoop学习笔记(5) ——编写HelloWorld(2)

Hadoop学习笔记(5) ——编写HelloWorld(2) 前面我们写了一个Hadoop程序,并让它跑起来了.但想想不对啊,Hadoop不是有两块功能么,DFS和MapReduce.没错,上一节我们写了一个MapReduce的HelloWorld程序,那这一节,我们就也学一学DFS程序的编写. DFS是什么,之前已经了解过,它是一个分布式文件存储系统.不管是远程或本地的文件系统,其实从接口上讲,应该是一至的,不然很难处理.同时在第2节的最后,我们列出了很多一些DFS的操作命令,仔细看一下,这

hadoop 学习笔记:mapreduce框架详解

hadoop 学习笔记:mapreduce框架详解 开始聊mapreduce,mapreduce是hadoop的计算框架,我 学hadoop是从hive开始入手,再到hdfs,当我学习hdfs时候,就感觉到hdfs和mapreduce关系的紧密.这个可能是我做技术研究的 思路有关,我开始学习某一套技术总是想着这套技术到底能干什么,只有当我真正理解了这套技术解决了什么问题时候,我后续的学习就能逐步的加快,而学习 hdfs时候我就发现,要理解hadoop框架的意义,hdfs和mapreduce是密不

Hadoop学习笔记(一)——Hadoop体系结构

HDFS和MapReduce是Hadoop的两大核心.整个Hadoop体系结构主要是通过HDFS来实现分布式存储的底层支持的,并且通过MapReduce来实现分布式并行任务处理的程序支持. 一.HDFS体系结构 HDFS采用了主从(Master/Slave)结构模型.一个HDFS集群是由一个NameNode和若干个DataNode组成的.其中,NameNode作为主服务器,管理文件系统的命名空间和客户端对文件的访问操作:集群中的DataNode管理存储的数据.HDFS典型的部署是在一个专门的机器

Hadoop学习笔记(9) ——源码初窥

Hadoop学习笔记(9) ——源码初窥 之前我们把Hadoop算是入了门,下载的源码,写了HelloWorld,简要分析了其编程要点,然后也编了个较复杂的示例.接下来其实就有两条路可走了,一条是继续深入研究其编程及部署等,让其功能使用的淋漓尽致.二是停下来,先看看其源码,研究下如何实现的.在这里我就选择第二条路. 研究源码,那我们就来先看一下整个目录里有点啥: 这个是刚下完代码后,目录列表中的内容. 目录/文件 说明 bin 下面存放着可执行的sh命名,所有操作都在这里 conf 配置文件所在

C++ Primer 学习笔记_73_面向对象编程 --再谈文本查询示例

面向对象编程 --再谈文本查询示例 引言: 扩展第10.6节的文本查询应用程序,使我们的系统可以支持更复杂的查询. 为了说明问题,将用下面的简单小说来运行查询: Alice Emma has long flowing red hair. Her Daddy says when the wind blows through her hair, it looks almost alive, like a fiery bird in flight. A beautiful fiery bird, he

C++ Primer 学习笔记_74_面向对象编程 --再谈文本查询示例[续/习题]

面向对象编程 --再谈文本查询示例[续/习题] //P522 习题15.41 //1 in TextQuery.h #ifndef TEXTQUERY_H_INCLUDED #define TEXTQUERY_H_INCLUDED #include <iostream> #include <fstream> #include <sstream> #include <vector> #include <set> #include <map&g

Hadoop学习笔记(8) ——实战 做个倒排索引

Hadoop学习笔记(8) ——实战 做个倒排索引 倒排索引是文档检索系统中最常用数据结构.根据单词反过来查在文档中出现的频率,而不是根据文档来,所以称倒排索引(Inverted Index).结构如下: 这张索引表中, 每个单词都对应着一系列的出现该单词的文档,权表示该单词在该文档中出现的次数.现在我们假定输入的是以下的文件清单: T1 : hello world hello china T2 : hello hadoop T3 : bye world bye hadoop bye bye 输