Hadoop学习笔记之三 数据流向

http://hadoop.apache.org/docs/r1.2.1/api/index.html

最基本的:

1. 文本文件的解析

2. 序列文件的解析

toString会将Byte数组中的内存数据 按照字节间隔以字符的形式显示出来。

文本文件多事利用已有的字符处理类, 序列文件多事创建byte数组,然后将文件流中的数据复制到byte数组后进行解析。

LineRecordReader。。。 。。。

这里首先需要了整个文件数据 的流动方向。

MapReduce框架借助inputformat完成输入数据的规范检查,借助outputformat完成输出数据的规范性检查。

context的常用用法:

context.getConfiguration
context.getInputSplit
context.write

利用好context可以随心所欲的输出,从输入key value list中获得信息,输出可以是单个的key,value;也可以是key list

从输入中获得信息,用context随便向外写!

一个Mapper对应一个Split文件,而recorderreder需要多次调用用来解析键值对:

如下所示一个文本文件传入mapper,而map函数多次被触发

public class TestDataFlow {
    public static void main(String[] args)throws Exception{
        Configuration conf  = new Configuration();
        Job job = new Job(conf, "testDataFlow");

        job.setJarByClass(TestDataFlow.class);
        job.setMapperClass(myMapper.class);

        FileInputFormat.addInputPath(job, new Path("hdfs://MASTERPC:9000/home/Fea.txt"));
        FileOutputFormat.setOutputPath(job, new Path("hdfs://MASTERPC:9000/out"));

        job.waitForCompletion(true);

    }

    public static class myMapper extends Mapper<Object, Text, Text, Text>{
        private FileSplit split;
        public void map(Object key, Text value, Context context)
            throws IOException, InterruptedException{
            split = (FileSplit)context.getInputSplit();
            System.out.println("输入文件块的路径:"+split.getPath().toString());

        }
    }

}
14/11/09 17:15:44 INFO mapred.MapTask: Processing split: hdfs://MASTERPC:9000/home/Fea.txt:0+7397

输入文件块的路径:hdfs://MASTERPC:9000/home/Fea.txt
输入文件块的路径:hdfs://MASTERPC:9000/home/Fea.txt
输入文件块的路径:hdfs://MASTERPC:9000/home/Fea.txt
输入文件块的路径:hdfs://MASTERPC:9000/home/Fea.txt
输入文件块的路径:hdfs://MASTERPC:9000/home/Fea.txt
输入文件块的路径:hdfs://MASTERPC:9000/home/Fea.txt
输入文件块的路径:hdfs://MASTERPC:9000/home/Fea.txt
输入文件块的路径:hdfs://MASTERPC:9000/home/Fea.txt

下面是一段自定义InputFormat的程序,功能是将零碎的小文件合并成大的sequence文件:key文件名,value文件值

//主要包括两个部分:文件划分 + 创建RecordReader; 下面的代码new了一个自己的reader返回public class myFileInputFormat extends FileInputFormat<NullWritable, BytesWritable> {
    protected boolean isSplitable(JobContext context, Path file){
        return false;
    }
    public RecordReader<NullWritable, BytesWritable> createRecordReader(InputSplit split, TaskAttemptContext context)
            throws IOException,InterruptedException{
        myRecordReader recorder = new myRecordReader();
        recorder.initialize(split, context);
        return recorder;
    }
}
//使用split获得输入文件块的大小、路径信息;使用context获得fs真正的从dfs上读入文件内容到value成员变量中。//成员变量value用来传递给Mapper的map函数使用public class myRecordReader extends RecordReader<NullWritable, BytesWritable> {
    private FileSplit split;
    private Configuration conf;
    private BytesWritable value = new BytesWritable();
    private boolean processed = false;

    public void initialize(InputSplit split, TaskAttemptContext context)
            throws IOException, InterruptedException {
            this.split = (FileSplit)split;
            this.conf = context.getConfiguration();
        }
    public void close(){
        ;
    }

    public NullWritable getCurrentKey()throws IOException{
        return null;
    }
    public BytesWritable getCurrentValue()throws IOException{
        return value;
    }
    public float getProgress()throws IOException, InterruptedException{
        return processed?1.0f:0.0f;
    }
    public boolean nextKeyValue()throws IOException, InterruptedException{
        if (!processed){
            byte[] buf = new byte[(int)split.getLength()];
            FileSystem fs =  FileSystem.get(conf);
            Path path = split.getPath();
            InputStream in = null;
            try{
                in = fs.open(path);
                IOUtils.readFully(in,  buf, 0, buf.length);
                value.set(buf,0, buf.length);
            }finally{
                IOUtils.closeStream(in);
            }
            processed = true;
            return true;
        }else{
            return false;
        }

    }

}

  //RecordReader处理好的key和value自动的传递给map函数
    public static class myMapper extends Mapper<Object, BytesWritable, Text, BytesWritable>{
        private FileSplit split;
        private Text outkey = new Text();
        public void setup(Context context){
            split = (FileSplit)context.getInputSplit();
            outkey.set(split.getPath().toString());
            System.out.println("输入文件块的路径:"+split.getPath().toString());
        }
        public void map(Object key, BytesWritable value, Context context)
            throws IOException, InterruptedException{
            System.out.println("key: "+ outkey.toString()+" value: "+value.toString());            

        }

Mapper类对应的是输入的文件块split,map对应的是文件块解析出来的一个个的<key, value>

RecordReader对应的是输入的文件块split,可能需要多次对split进行解析。

InputFormat中分为两部分,getSplit是将inputPath路径下的HDFS文件划分成块split,另一部分是CreateRecordReader

创建用来解析每个split。

其中划分成块是在终端上传数据文件时进行的,然后划分之后的文件信息提供给jobTracker进行分配,分配到任务的节点(Mapper)到相应的 位置下载自己的split文件

然后调用RecordReader不断对这个split文件进行解析,将生成的<key,value>送给map函数进行处理生成新的<key,value>,经过排序合并分组之后传递给相应的reduce。

自定义InputFormat

下面是一个将小文件聚合成大的序列文件的mapred作业,核心是利用自定义的FileInputFormt将系统分配的小文件,以文件名(text)key,

文件内容(bytes)value的键值对形式传递给最后的SeqenceFileOutputFormat,将这些键值对写入HDFS上的序列文件中。

由于文件较小所以不会再分块,一个文件作为整体输入到节点上,FileInputFormat中设置文件不可分。

Mapper类中调用FileInputFormat中的创建初始化RecordReader函数:createRecordReader(),

RecordReader将输入文件解析后将<key,value>送给map函数, 在之前的TextInputFormat的LineRecordReader是不断的对split进行处理

将每行解析成一个<key, value>送给map,但是这里的应用时RecordReader仅仅处理split文件一次,就将整个文件的内容作为<null, value>

传递给map, 最后的reduce在将所有的键值对聚合成一个。所以RecordReader类中做了处理次数的判断处理。

这样一来对于每个节点的map也只会执行一次,多以在Mapper一开始的时候setup函数中将输入的Split文件的文件名获得作为key。

文件:myRecordReader.java

[说明]RecordReader的作用就是为了解析split文件,所以createRecordReader时传入的参数是split、context,在RecordReader类中,首先需要获得

HDFS上的split的输入文件流,然后对此进行解析,并记录下处理的位置,方便下次map调用的时候,从上次的位置接着对split文件进行解析。

文件:myFileInputFormat.java

[说明]该类中设置文件不可分,创建RecordReader对象。

inputFormat和 recordReader中的键值对类型一定要和  map中的键值对类型一致。

作业上下文context可以获得作业参数conf、输入文件InputSplit、

在Mapper类的初始化函数中传入的参数就是context

public void setup(Context context)

OutputFormat:

主要的函数是:

getRcordWriter, 返回的是RecordWriter, 它负责将键值对写入到存储部件中。

checkOutputSpecs,负责检查输出目录参数是否合理,如果输出目录已经存在就报错。

getOutputCommiter,OutputCommiter负责临时文件的初始化,作业完成之后清理临时目录等等。

系统默认的是TextOutputFormat,它使用LineRecordWriter将最终获的键值对以key + \t + value的方式逐行输出到文本文件中。

在 编程时使用较多的是重载 writer、RecordWriter、getRecordWriter,

tips:在eclipse中代码的空白处右键->Source->Override/Implements Methods 然后在弹出的窗口中选择需要重载的成员方法。^.^

(生成的重载函数上面会自动生@Override灰色字体)

下面是hadoop源码SequenceFileOutputFormat中的一段:

  public RecordWriter<K, V>          getRecordWriter(TaskAttemptContext context                         ) throws IOException, InterruptedException {   。。。 。。。   final SequenceFile.Writer out =
        SequenceFile.createWriter(fs, conf, file,
                                context.getOutputKeyClass(),
                                context.getOutputValueClass(),
                                compressionType,
                                codec,
                                context);

    return new RecordWriter<K, V>() {

        public void write(K key, V value)
          throws IOException {

          out.append(key, value);
        }

        public void close(TaskAttemptContext context) throws IOException {
          out.close();
        }
      };

可见write中传入的是OutputFormat<K,V>中的键值对,而创建的SequenceFile.Writer是根据setOutputKeyClass、setOutputValueClass设置的类型进行写文件。

public class TestFormat {public  static void main(String[] args)throws Exception{
        Configuration conf = new Configuration();
        Job job = new Job(conf, "testFormat!");
        job.setInputFormatClass(SequenceFileInputFormat.class);
        job.setOutputFormatClass(SequenceFileOutputFormat.class);
        FileInputFormat.addInputPath(job , new Path("hdfs://master:9000/hadoop"));
        FileOutputFormat.setOutputPath(job, new Path("hdfs://master:9000/out"));

        job.waitForCompletion(true);

    }
}

java.lang.Exception: java.io.IOException:

Type mismatch in key from map: expected org.apache.hadoop.io.LongWritable, recieved org.apache.hadoop.io.Text

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(BytesWritable.class);

RecordReader是根据输入文件中的内容自动获得key和value的class,所以map之后的k,v类型是原始文件的kv类型,但是write函数中out(writer)的kv类型时job中设置的。

//草稿:

 public static class myOutputFormat<Text, BytesWritable> extends  SequenceFileOutputFormat<Text, BytesWritable>{
        public RecordWriter<Text, BytesWritable> getRecordWriter(
                TaskAttemptContext arg0) throws IOException,
                InterruptedException {
            // TODO Auto-generated method stub
            return super.getRecordWriter(arg0);
        }

    }

定义自己的数据:

public class Point3D implements WritableComparable<Point3D>{
        private float x, y, z;
        public float getX(){return x;}
        public float getY(){return y;}
        public float getZ(){return z;}
        public void readFields(DataInput in) throws IOException{
            x = in.readFloat();
            y = in.readFloat();
            z = in.readFloat();
        }
        public void write(DataOutput out)throws IOException{
            out.writeFloat(x);
            out.writeFloat(y);
            out.writeFloat(z);
        }
        public int compareTo(Point3D p){
            float ret = (x*x+y*y +z*z) - (p.x*(p.x)+p.y*(p.y)+p.z*(p.z));
            if (ret > 0)
                return 1;
            else if (ret == 0)
                return 0;
            else
                return -1;

        }
    }

一些常用的流:

FSDataInputStream
DataInputStream
FileInputStream
InputStream

BufferedInputStream

链式处理:

在hadoop计算的过程中比较耗费时间的是IO操作,一些Job 在Map需要前期预处理,reduce后需要后处理,这样可以使用Job的链式处理;

通过ChainMapper.addMapp实现,但是该类不支持hadoop1.2.1版本。

---------------------------------------------------------------

参考文献:

《实战Hadoop》开启通向云计算的捷径,刘鹏主编,电子工业出版社

最短路径系列之一《从零开始学Hadoop》

http://blog.csdn.net/chaoping315/article/details/6221440

时间: 2024-10-22 13:27:19

Hadoop学习笔记之三 数据流向的相关文章

Hadoop学习笔记0003——从Hadoop URL读取数据

Hadoop学习笔记0003--从Hadoop URL读取数据 从HadoopURL读取数据   要从Hadoop文件系统中读取文件,一个最简单的方法是使用java.net.URL对象来打开一个数据流,从而从中读取数据.一般的格式如下: InputStream in = null; try { in = new URL("hdfs://host/path").openStream(); // process in } finally { IOUtils.closeStream(in);

Hadoop学习笔记(6) ——重新认识Hadoop

Hadoop学习笔记(6) ——重新认识Hadoop 之前,我们把hadoop从下载包部署到编写了helloworld,看到了结果.现是得开始稍微更深入地了解hadoop了. Hadoop包含了两大功能DFS和MapReduce, DFS可以理解为一个分布式文件系统,存储而已,所以这里暂时就不深入研究了,等后面读了其源码后,再来深入分析. 所以这里主要来研究一下MapReduce. 这样,我们先来看一下MapReduce的思想来源: alert("I'd like some Spaghetti!

Hadoop学习笔记(7) ——高级编程

Hadoop学习笔记(7) ——高级编程 从前面的学习中,我们了解到了MapReduce整个过程需要经过以下几个步骤: 1.输入(input):将输入数据分成一个个split,并将split进一步拆成<key, value>. 2.映射(map):根据输入的<key, value>进生处理, 3.合并(combiner):合并中间相两同的key值. 4.分区(Partition):将<key, value>分成N分,分别送到下一环节. 5.化简(Reduce):将中间结

Hadoop学习笔记_2_Hadoop源起与体系概述[续]

Hadoop源起与体系概述 Hadoop的源起--Lucene Lucene是Doug Cutting开创的开源软件,用java书写代码,实现与Google类似的全文搜索功能,它提供了全文检索引擎的架构,包括完整的查询引擎和索引引擎 早期发布在个人网站和SourceForge,2001年年底成为apache软件基金会jakarta的一个子项目 Lucene的目的是为软件开发人员提供一个简单易用的工具包,以方便的在目标系统中实现全文检索的功能,或者是以此为基础建立起完整的全文检索引擎 对于大数据的

hadoop 学习笔记:mapreduce框架详解

hadoop 学习笔记:mapreduce框架详解 开始聊mapreduce,mapreduce是hadoop的计算框架,我 学hadoop是从hive开始入手,再到hdfs,当我学习hdfs时候,就感觉到hdfs和mapreduce关系的紧密.这个可能是我做技术研究的 思路有关,我开始学习某一套技术总是想着这套技术到底能干什么,只有当我真正理解了这套技术解决了什么问题时候,我后续的学习就能逐步的加快,而学习 hdfs时候我就发现,要理解hadoop框架的意义,hdfs和mapreduce是密不

Hadoop学习笔记_7_分布式文件系统HDFS --DataNode体系结构

分布式文件系统HDFS --DataNode体系结构 1.概述 DataNode作用:提供真实文件数据的存储服务. 文件块(block):最基本的存储单位[沿用的Linux操作系统地概念].对于文件内容而言,一个文件的长度大小是size,那么从文件的0偏移开始,按照固定的大小,顺序对文件进行划分并编号,划分好的每一个块称一个Block. 与Linux操作系统不同的是,一旦上传了一个小于Block大小的文件,则该文件会占用实际文件大小的空间. 2.进入hdfs-default.xml <prope

Hadoop学习笔记(2) ——解读Hello World

Hadoop学习笔记(2) ——解读Hello World 上一章中,我们把hadoop下载.安装.运行起来,最后还执行了一个Hello world程序,看到了结果.现在我们就来解读一下这个Hello Word. OK,我们先来看一下当时在命令行里输入的内容: $mkdir input $cd input $echo "hello world">test1.txt $echo "hello hadoop">test2.txt $cd .. $bin/ha

hadoop学习笔记——基础知识及安装

1.核心 HDFS  分布式文件系统    主从结构,一个namenoe和多个datanode, 分别对应独立的物理机器 1) NameNode是主服务器,管理文件系统的命名空间和客户端对文件的访问操作.NameNode执行文件系统的命名空间操作,比如打开关闭重命名文件或者目录等,它也负责数据块到具体DataNode的映射 2)集群中的DataNode管理存储的数据.负责处理文件系统客户端的文件读写请求,并在NameNode的统一调度下进行数据块的创建删除和复制工作. 3)NameNode是所有

Hadoop学习笔记(一)——Hadoop体系结构

HDFS和MapReduce是Hadoop的两大核心.整个Hadoop体系结构主要是通过HDFS来实现分布式存储的底层支持的,并且通过MapReduce来实现分布式并行任务处理的程序支持. 一.HDFS体系结构 HDFS采用了主从(Master/Slave)结构模型.一个HDFS集群是由一个NameNode和若干个DataNode组成的.其中,NameNode作为主服务器,管理文件系统的命名空间和客户端对文件的访问操作:集群中的DataNode管理存储的数据.HDFS典型的部署是在一个专门的机器