自定义实现InputFormat、OutputFormat

一:自定义实现InputFormat

*数据源来自于内存
*1.InputFormat是用于处理各种数据源的,下面是实现InputFormat,数据源是来自于内存.
*1.1 在程序的job.setInputFormatClass(MyselfmemoryInputFormat.class);
*1.2 实现InputFormat,extends InputFormat< , >,实现其中的两个方法,分别是getSplits(..),createRecordReader(..).
*1.3 getSplits(..)返回的是一个java.util.List<T>,List中的每个元素是InputSplit.每个InputSplit对应一个mappper任务.
*1.4 InputSplit是对原始海量数据源的划分,因为我们处理的是海量数据,不划分不行.InputSplit数据的大小完全是我们自己来定的.本例中是在内存中产生数据,然后封装到InputSplit.
*1.5 InputSplit封装的是hadoop数据类型,实现Writable接口.
*1.6 RecordReader读取每个InputSplit中的数据.解析成一个个<k,v>,供map处理.
*1.7 RecordReader有4个核心方法,分别是initalize(..).nextKeyValue(),getCurrentKey(),getCurrentValue().
*1.8 initalize重要性在于是拿到InputSplit和定义临时变量.
*1.9 nexKeyValue(..)该方法的每次调用,可以获得key和value值.
*1.10 当nextKeyValue(..)调用后,紧接着调用getCurrentKey(),getCurrentValue().
*       mapper方法中的run方法调用.

public class MyselInputFormatApp {
        private static final String OUT_PATH = "hdfs://hadoop1:9000/out";// 输出路径,reduce作业输出的结果是一个目录
        public static void main(String[] args) {
            Configuration conf = new Configuration();// 配置对象
            try {
                FileSystem fileSystem = FileSystem.get(new URI(OUT_PATH), conf);
                fileSystem.delete(new Path(OUT_PATH), true);
                Job job = new Job(conf, WordCountApp.class.getSimpleName());// jobName:作业名称
                job.setJarByClass(WordCountApp.class);

                job.setInputFormatClass(MyselfMemoryInputFormat.class);
                job.setMapperClass(MyMapper.class);// 指定自定义map类
                job.setMapOutputKeyClass(Text.class);// 指定map输出key的类型
                job.setMapOutputValueClass(LongWritable.class);// 指定map输出value的类型
                job.setReducerClass(MyReducer.class);// 指定自定义Reduce类
                job.setOutputKeyClass(Text.class);// 设置Reduce输出key的类型
                job.setOutputValueClass(LongWritable.class);// 设置Reduce输出的value类型
                FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));// Reduce输出完之后,就会产生一个最终的输出,指定最终输出的位置
                job.waitForCompletion(true);// 提交给jobTracker并等待结束
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

        public static class MyMapper extends
                Mapper<NullWritable, Text, Text, LongWritable> {
            @Override
            protected void map(NullWritable key, Text value, Context context)
                    throws IOException, InterruptedException {
                String line = value.toString();
                String[] splited = line.split("\t");
                for (String word : splited) {
                    context.write(new Text(word), new LongWritable(1));// 把每个单词出现的次数1写出去.
                }
            }
        }

        public static class MyReducer extends
                Reducer<Text, LongWritable, Text, LongWritable> {
            @Override
            protected void reduce(Text key, Iterable<LongWritable> values,
                    Context context) throws IOException, InterruptedException {
                long count = 0L;
                for (LongWritable times : values) {
                    count += times.get();
                }
                context.write(key, new LongWritable(count));
            }
        }

        /**
         * 从内存中产生数据,然后解析成一个个的键值对
         *
         */
        public static class MyselfMemoryInputFormat extends InputFormat<NullWritable,Text>{

            @Override
            public List<InputSplit> getSplits(JobContext context)
                    throws IOException, InterruptedException {
                ArrayList<InputSplit> result = new ArrayList<InputSplit>();
                result.add(new MemoryInputSplit());
                result.add(new MemoryInputSplit());
                result.add(new MemoryInputSplit());
                return result;
            }

            @Override
            public RecordReader<NullWritable, Text> createRecordReader(
                    InputSplit split, TaskAttemptContext context)
                    throws IOException, InterruptedException {
                return new MemoryRecordReader();
            }
        }

        public static class MemoryInputSplit extends InputSplit implements Writable{
            int SIZE = 10;
            //java中的数组在hadoop中不被支持,所以这里使用hadoop的数组
            //在hadoop中使用的是这种数据结构,不能使用java中的数组表示.
            ArrayWritable arrayWritable = new ArrayWritable(Text.class);
            /**
             * 先创建一个java数组类型,然后转化为hadoop的数据类型.
             * @throws FileNotFoundException
             */
            public MemoryInputSplit() throws FileNotFoundException {
                //一个inputSplit供一个map使用,map函数如果要被调用多次的话,意味着InputSplit必须解析出多个键值对
                Text[] array = new Text[SIZE];
                Random random = new Random();
                for(int i=0;i<SIZE;i++){
                    int nextInt = random.nextInt(999999);
                    Text text = new Text("Text"+nextInt);
                    array[i] = text ;
                }

//                FileInputStream fs = new FileInputStream(new File("\\etc\\profile"));//从文件中读取
//                将流中的数据解析出来放到数据结构中.
                arrayWritable.set(array);
            }
            @Override
            public long getLength() throws IOException, InterruptedException {
                return SIZE;
            }
            @Override
            public String[] getLocations() throws IOException,
                    InterruptedException {
                return new String[]{};
            }
            public ArrayWritable getValues() {
                return arrayWritable;
            }
            @Override
            public void write(DataOutput out) throws IOException {
                arrayWritable.write(out);
            }
            @Override
            public void readFields(DataInput in) throws IOException {
                arrayWritable.readFields(in);
            }
        }

        public static class MemoryRecordReader extends RecordReader<NullWritable, Text>{
            private Writable[] values = null ;
            private Text value = null ;
            private int i = 0;
            @Override
            public void initialize(InputSplit split, TaskAttemptContext context)
                    throws IOException, InterruptedException {
                MemoryInputSplit inputSplit = (MemoryInputSplit)split;
                ArrayWritable writables = inputSplit.getValues();
                this.values = writables.get();
                this.i = 0 ;
            }

            @Override
            public boolean nextKeyValue() throws IOException,
                    InterruptedException {
                if(i >= values.length){
                    return false ;
                }
                if(null == this.value){
                    value = new Text();
                }
                value.set((Text)values[i]);
                i++ ;
                return true;
            }

            @Override
            public NullWritable getCurrentKey() throws IOException,
                    InterruptedException {
                return NullWritable.get();
            }

            @Override
            public Text getCurrentValue() throws IOException,
                    InterruptedException {
                return value;
            }

            @Override
            public float getProgress() throws IOException, InterruptedException {
                // TODO Auto-generated method stub
                return 0;
            }

            /**
             * 程序结束的时候,关闭
             */
            @Override
            public void close() throws IOException {
            }

        }

    }

自定义实现InputFormat

二:自定义实现OutputFormat

常见的输出类型:TextInputFormat:默认输出格式,key和value中间用tab隔开.
        DBOutputFormat:写出到数据库的.
        SequenceFileFormat:将key,value以Sequence格式输出的.
        SequenceFileAsOutputFormat:SequenceFile以原始二进制的格式输出.
        MapFileOutputFormat:将key和value写入MapFile中.由于MapFile中key是有序的,所以写入的时候必须保证记录是按key值顺序入的.
        MultipleOutputFormat:多文件的一个输出.默认情况下一个reducer产生一个输出,但是有些时候我们想一个reducer产生多个输出,MultipleOutputFormat和MultipleOutputs就可以实现这个功能.
          MultipleOutputFormat:可以自定义输出文件的名称.
          继承MultipleOutputFormat 需要实现
            getBaseRecordWriter():
            generateFileNameForKeyvalue():根据键值确定文件名.

/**
 *自定义输出OutputFormat:用于处理各种输出目的地的.
 *1.OutputFormat需要写出的键值对是来自于Reducer类.是通过RecordWriter获得的.
 *2.RecordWriter(..)中write只有key和value,写到那里去哪?这要通过单独传入输出流来处理.write方法就是把k,v写入到outputStream中的.
 *3.RecordWriter类是位于OutputFormat中的.因此,我们自定义OutputFormat必须继承OutputFormat类.那么流对象就必须在getRecordWriter(..)中获得.
 */
public class MySelfOutputFormatApp {
    private static final String INPUT_PATH = "hdfs://hadoop1:9000/abd/hello";// 输入路径
    private static final String OUT_PATH = "hdfs://hadoop1:9000/out";// 输出路径,reduce作业输出的结果是一个目录
    private static final String OUT_FIE_NAME = "/abc";
    public static void main(String[] args) {
        Configuration conf = new Configuration();
        try {
            FileSystem fileSystem = FileSystem.get(new URI(OUT_PATH), conf);
            fileSystem.delete(new Path(OUT_PATH), true);
            Job job = new Job(conf, WordCountApp.class.getSimpleName());
            job.setJarByClass(WordCountApp.class);
            FileInputFormat.setInputPaths(job, INPUT_PATH);
            job.setMapperClass(MyMapper.class);
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(LongWritable.class);
            job.setReducerClass(MyReducer.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(LongWritable.class);
            job.setOutputFormatClass(MySelfTextOutputFormat.class);
            job.waitForCompletion(true);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public static class MyMapper extends
            Mapper<LongWritable, Text, Text, LongWritable> {
        @Override
        protected void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {
            String line = value.toString();
            String[] splited = line.split("\t");
            for (String word : splited) {
                context.write(new Text(word), new LongWritable(1));// 把每个单词出现的次数1写出去.
            }
        }
    }

    public static class MyReducer extends
            Reducer<Text, LongWritable, Text, LongWritable> {
        @Override
        protected void reduce(Text key, Iterable<LongWritable> values,
                Context context) throws IOException, InterruptedException {
            long count = 0L;
            for (LongWritable times : values) {
                count += times.get();
            }
            context.write(key, new LongWritable(count));
        }
    }
    /**
     *自定义输出类型
     */
    public static class MySelfTextOutputFormat  extends OutputFormat<Text,LongWritable>{
        FSDataOutputStream outputStream = null ;
        @Override
        public RecordWriter<Text, LongWritable> getRecordWriter(
                TaskAttemptContext context) throws IOException,
                InterruptedException {
            try {
                FileSystem fileSystem = FileSystem.get(new URI(MySelfOutputFormatApp.OUT_PATH), context.getConfiguration());
                //指定的是输出文件的路径
                String opath = MySelfOutputFormatApp.OUT_PATH+OUT_FIE_NAME;
                outputStream = fileSystem.create(new Path(opath));
            } catch (URISyntaxException e) {
                e.printStackTrace();
            }
            return new MySelfRecordWriter(outputStream);
        }

        @Override
        public void checkOutputSpecs(JobContext context) throws IOException,
                InterruptedException {
        }

        /**
         * OutputCommitter:在作业初始化的时候创建一些临时的输出目录,作业的输出目录,管理作业和任务的临时文件的.
         * 作业运行过程中,会产生很多的Task,Task在处理的时候也会产生很多的输出.也会创建这个输出目录.
         * 当我们的Task或者是作业都运行完成之后,输出目录由OutputCommitter给删了.所以程序在运行结束之后,我们根本看不见任何额外的输出.
         * 在程序运行中会产生很多的临时文件,临时文件全交给OutputCommitter处理,真正的输出是RecordWriter(..),我们只需要关注最后的输出就可以了.中间的临时文件就是程序运行时产生的.
         */
        @Override
        public OutputCommitter getOutputCommitter(TaskAttemptContext context)
                throws IOException, InterruptedException {
            //提交任务的输出,包括初始化路径,包括在作业完成的时候清理作业,删除临时目录,包括作业和任务的临时目录.
            //作业的输出路径应该是一个路径
            return new FileOutputCommitter(new Path(MySelfOutputFormatApp.OUT_PATH), context);
        }
    }
    public static class MySelfRecordWriter extends RecordWriter<Text, LongWritable>{
        FSDataOutputStream outputStream = null ;
        public MySelfRecordWriter(FSDataOutputStream outputStream) {
            this.outputStream = outputStream ;
        }
        @Override
        public void write(Text key, LongWritable value) throws IOException,
                InterruptedException {
            this.outputStream.writeBytes(key.toString());
            this.outputStream.writeBytes("\t");
            this.outputStream.writeLong(value.get());
        }
        @Override
        public void close(TaskAttemptContext context) throws IOException,
                InterruptedException {
            this.outputStream.close();
        }
    }
}

自定义输出OutputFormat

时间: 2024-11-02 23:34:43

自定义实现InputFormat、OutputFormat的相关文章

[Hadoop] - 自定义Mapreduce InputFormat&amp;OutputFormat

在MR程序的开发过程中,经常会遇到输入数据不是HDFS或者数据输出目的地不是HDFS的,MapReduce的设计已经考虑到这种情况,它为我们提供了两个组建,只需要我们自定义适合的InputFormat和OutputFormat,就可以完成这个需求,这里简单的介绍一个从MongoDB中读数据,并写出数据到MongoDB中的一种情况,只是一个Demo,所以数据随便找的一个. 一.自定义InputFormat MapReduce中Map阶段的数据输入是由InputFormat决定的,我们查看org.a

InputFormat,OutputFormat,InputSplit,RecordRead(一些常见面试题),使用yum安装64位Mysql

列举出hadoop常用的一些InputFormat InputFormat是用来对我们的输入数据进行格式化的.TextInputFormat是默认的. InputFormat有哪些类型? DBInputFormat,DelegatingInputFormat,FileInputFormat,常用的就是DBInputFormat,FileInputFormat . DBInputFormat:接我们的关系型数据库的,比如mysql和oracle, FileInputFormat是和文件相关的,又有

InputFormat&amp;OutputFormat

本文的主要目的是从源码级别讲解Hadoop中InputFormat和OutputFormat部分,首先简介InputFormat和OutputFormat,然后介绍两个重要的组件,RecordWriter和RecordReader,再以FileInputFormat和FileOutputFormat为例,介绍一组InputFormat和OutputFormat的实现细节,最后以SqoopInputFormat和SqoopOutputFormat为例,体会一下InputFormat和OutputF

如何使用Hive集成Solr?

(一)Hive+Solr简介 Hive作为Hadoop生态系统里面离线的数据仓库,可以非常方便的使用SQL的方式来离线分析海量的历史数据,并根据分析的结果,来干一些其他的事情,如报表统计查询等. Solr作为高性能的搜索服务器,能够提供快速,强大的全文检索功能. (二)为什么需要hive集成solr? 有时候,我们需要将hive的分析完的结果,存储到solr里面进行全文检索服务,比如以前我们有个业务,对我们电商网站的搜索日志使用hive分析完后 存储到solr里面做报表查询,因为里面涉及到搜索关

自定义inputformat和outputformat

1. 自定义inputFormat 1.1 需求 无论hdfs还是mapreduce,对于小文件都有损效率,实践中,又难免面临处理大量小文件的场景,此时,就需要有相应解决方案 1.2 分析 小文件的优化无非以下几种方式: 1. 在数据采集的时候,就将小文件或小批数据合成大文件再上传HDFS 2. 在业务处理之前,在HDFS上使用mapreduce程序对小文件进行合并 3. 在mapreduce处理时,可采用combineInputFormat提高效率 实现 本节实现的是上述第二种方式 程序的核心

Hadoop_28_MapReduce_自定义 inputFormat

1. 自定义inputFormat 1.1.需求: 无论hdfs还是mapreduce,对于小文件都有损效率,实践中,又难免面临处理大量小文件,此时就需要有相应解决方案; 1.2.分析: 小文件的优化无非以下几种方式: 1.在数据采集的时候,就将小文件或小批数据合成大文件再上传HDFS: 2.在业务处理之前,在HDFS上使用mapreduce程序对小文件进行合并: 3.在mapreduce处理时,可采用combineInputFormat提高效率: 1.3.实现: 本节实现的是上述第二种方式,程

MapReduce自定义RecordReader

一:背景 RecordReader表示以怎样的方式从分片中读取一条记录,每读取一条记录都会调用RecordReader类,系统默认的RecordReader是LineRecordReader,它是TextInputFormat对应的RecordReader:而SequenceFileInputFormat对应的RecordReader是SequenceFileRecordReader.LineRecordReader是每行的偏移量作为读入map的key,每行的内容作为读入map的value.很多

整个文件作为一条记录处理自定义FileInputFormat类

众所周知,Hadoop对处理单个大文件比处理多个小文件更有效率,另外单个文件也非常占用HDFS的存储空间.所以往往要将其合并起来. 1,getmerge hadoop有一个命令行工具getmerge,用于将一组HDFS上的文件复制到本地计算机以前进行合并 参考:http://hadoop.apache.org/common/docs/r0.19.2/cn/hdfs_shell.html 使用方法:hadoop fs -getmerge <src> <localdst> [addnl

Hadoop InputFormat

Hadoop可以处理不同数据格式(数据源)的数据,从文本文件到(非)关系型数据库,这很大程度上得益于Hadoop InputFormat的可扩展性设计,InputFormat层次结构图如下: InputFormat(org.apache.hadoop.mapreduce.InputFormat)被设计为一个抽象类,代码如下: public abstract class InputFormat<K, V> { public abstract List<InputSplit> getS