Hadoop 高级程序设计(二)---自定义输入输出格式

Hadoop提供了较为丰富的数据输入输出格式,可以满足很多的设计实现,但是在某些时候需要自定义输入输出格式。

数据的输入格式用于描述MapReduce作业的数据输入规范,MapReduce框架依靠数据输入格式完后输入规范检查(比如输入文件目录的检查),对数据文件进行输入分块(InputSpilt)以及提供从输入分快中将数据逐行的读出,并转换为Map过程的输入键值对等功能。Hadoop提供了很多的输入格式,TextInputFormat和KeyValueInputFormat,对于每个输入格式都有与之对应的RecordReader,LineRecordReader和KeyValueLineRecordReader。用户需要自定义输入格式,主要实现InputFormat中的createRecordReader()和getSplit()方法,而在RecordReader中实现getCurrentKey().....

例如:

package com.rpc.nefu;

import java.io.IOException;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.util.LineReader;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;  

//自定义的输入格式需要 继承FileInputFormat接口
public class ZInputFormat extends FileInputFormat<IntWritable,IntWritable>{  

        @Override  //实现RecordReader
        public RecordReader<IntWritable, IntWritable> createRecordReader(
                InputSplit split, TaskAttemptContext context)
                throws IOException, InterruptedException {
            return new ZRecordReader();
        }  

        //自定义的数据类型
        public static class ZRecordReader extends RecordReader<IntWritable,IntWritable>
        {
            //data
            private LineReader in;      //输入流
            private boolean more = true;//提示后续还有没有数据  

            private IntWritable key = null;
            private IntWritable value = null;  

            //这三个保存当前读取到位置(即文件中的位置)
            private long start;
            private long end;
            private long pos;  

            //private Log LOG = LogFactory.getLog(ZRecordReader.class);//日志写入系统,可加可不加  

            @Override
            public void initialize(InputSplit split, TaskAttemptContext context)
                    throws IOException, InterruptedException {
                // 初始化函数  

                FileSplit inputsplit = (FileSplit)split;
                start = inputsplit.getStart();                      //得到此分片开始位置
                end   = start + inputsplit.getLength();//结束此分片位置
                final Path file = inputsplit.getPath();  

                // 打开文件
                FileSystem fs = file.getFileSystem(context.getConfiguration());
                FSDataInputStream fileIn = fs.open(inputsplit.getPath());  

                //将文件指针移动到当前分片,因为每次默认打开文件时,其指针指向开头
                fileIn.seek(start);  

                in = new LineReader(fileIn, context.getConfiguration());  

                if (start != 0)
                {
                  System.out.println("4");
                   //如果这不是第一个分片,那么假设第一个分片是0——4,那么,第4个位置已经被读取,则需要跳过4,否则会产生读入错误,因为你回头又去读之前读过的地方
               start += in.readLine(new Text(), 0, maxBytesToConsume(start));
                }
                pos = start;
            }  

            private int maxBytesToConsume(long pos)
            {
                    return (int) Math.min(Integer.MAX_VALUE, end - pos);
             }  

            @Override
            public boolean nextKeyValue() throws IOException,
                    InterruptedException {
                //下一组值
                //tips:以后在这种函数中最好不要有输出,费时
                //LOG.info("正在读取下一个,嘿嘿");
                if(null == key)
                {
                    key = new IntWritable();
                }
                if(null == value)
                {
                    value = new IntWritable();
                }
                Text nowline = new Text();//保存当前行的内容
                int readsize = in.readLine(nowline);
                //更新当前读取到位置
                pos += readsize;  

                //如果pos的值大于等于end,说明此分片已经读取完毕
                if(pos >= end)
                {
                    more = false;
                    return false;
                }  

                if(0 == readsize)
                {
                    key = null;
                    value = null;
                    more = false;//说明此时已经读取到文件末尾,则more为false
                    return false;
                }
                String[] keyandvalue = nowline.toString().split(",");  

                //排除第一行
                if(keyandvalue[0].endsWith("\"CITING\""))
                {
                    readsize = in.readLine(nowline);
                    //更新当前读取到位置
                    pos += readsize;
                    if(0 == readsize)
                    {
                        more = false;//说明此时已经读取到文件末尾,则more为false
                        return false;
                    }
                    //重新划分
                    keyandvalue = nowline.toString().split(",");
                }  

                //得到key和value
                //LOG.info("key is :" + key +"value is" + value);
                key.set(Integer.parseInt(keyandvalue[0]));
                value.set(Integer.parseInt(keyandvalue[1]));  

                return true;
            }  

            @Override
            public IntWritable getCurrentKey() throws IOException,
                    InterruptedException {
                //得到当前的Key
                return key;
            }  

            @Override
            public IntWritable getCurrentValue() throws IOException,
                    InterruptedException {
                //得到当前的value
                return value;
            }  

            @Override
            public float getProgress() throws IOException, InterruptedException {
                //计算对于当前片的处理进度
                if( false == more || end == start)
                {
                    return 0f;
                }
                else
                {
                    return Math.min(1.0f, (pos - start)/(end - start));
                }
            }  

            @Override
            public void close() throws IOException {
                //关闭此输入流
                if(null != in)
                {
                    in.close();
                }
            }  

        }
}
package reverseIndex;

import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;

public class FileNameLocInputFormat extends FileInputFormat<Text, Text>{

	@Override
	public org.apache.hadoop.mapreduce.RecordReader<Text, Text> createRecordReader(
			org.apache.hadoop.mapreduce.InputSplit split, TaskAttemptContext context)
			throws IOException, InterruptedException {
		// TODO Auto-generated method stub
		return new FileNameLocRecordReader();
	}
	public static class FileNameLocRecordReader extends RecordReader<Text,Text>{

		String FileName;
		LineRecordReader line = new LineRecordReader();
		/**
		 * ......
		 */ 

		@Override
		public Text getCurrentKey() throws IOException, InterruptedException {
			// TODO Auto-generated method stub
			return new Text("("+FileName+"@"+line.getCurrentKey()+")");
		}

		@Override
		public Text getCurrentValue() throws IOException, InterruptedException {
			// TODO Auto-generated method stub
			return line.getCurrentValue();
		}

		@Override
		public void initialize(InputSplit split, TaskAttemptContext arg1)
				throws IOException, InterruptedException {
			// TODO Auto-generated method stub
			line.initialize(split, arg1);
			FileSplit inputsplit = (FileSplit)split;
			FileName = (inputsplit).getPath().getName();
		}

		@Override
		public void close() throws IOException {
			// TODO Auto-generated method stub

		}

		@Override
		public float getProgress() throws IOException, InterruptedException {
			// TODO Auto-generated method stub
			return 0;
		}

		@Override
		public boolean nextKeyValue() throws IOException, InterruptedException {
			// TODO Auto-generated method stub
			return false;
		}
	}
}

Hadoop中也内置了很多的输出格式与RecordWriter.输出格式完成输出规范检查,作业结果数据输出。

自定义的输出格式:

public static class AlphaOutputFormat extends multiformat<Text, IntWritable>{

		@Override
		protected String generateFileNameForKeyValue(Text key,
				IntWritable value, Configuration conf) {
			// TODO Auto-generated method stub
			char c = key.toString().toLowerCase().charAt(0);
			if( c>='a' && c<='z'){
				return c+".txt";
			}else{
				return "other.txt";
			}
		}

	}
//设置输出格式
		job.setOutputFormatClass(AlphaOutputFormat.class);
package com.rpc.nefu;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.ReflectionUtils;  

public abstract class multiformat<K extends WritableComparable<?>, V extends Writable>
        extends FileOutputFormat<K, V> {
    private MultiRecordWriter writer = null;
    public RecordWriter<K, V> getRecordWriter(TaskAttemptContext job) throws IOException,
            InterruptedException {
        if (writer == null) {
            writer = new MultiRecordWriter(job, getTaskOutputPath(job));
        }
        return writer;
    }
    private Path getTaskOutputPath(TaskAttemptContext conf) throws IOException {
        Path workPath = null;
        OutputCommitter committer = super.getOutputCommitter(conf);
        if (committer instanceof FileOutputCommitter) {
            workPath = ((FileOutputCommitter) committer).getWorkPath();
        } else {
            Path outputPath = super.getOutputPath(conf);
            if (outputPath == null) {
                throw new IOException("Undefined job output-path");
            }
            workPath = outputPath;
        }
        return workPath;
    }
    /**通过key, value, conf来确定输出文件名(含扩展名)*/
    protected abstract String generateFileNameForKeyValue(K key, V value, Configuration conf);
    public class MultiRecordWriter extends RecordWriter<K, V> {
        /**RecordWriter的缓存*/
        private HashMap<String, RecordWriter<K, V>> recordWriters = null;
        private TaskAttemptContext job = null;
        /**输出目录*/
        private Path workPath = null;
        public MultiRecordWriter(TaskAttemptContext job, Path workPath) {
            super();
            this.job = job;
            this.workPath = workPath;
            recordWriters = new HashMap<String, RecordWriter<K, V>>();
        }
        @Override
        public void close(TaskAttemptContext context) throws IOException, InterruptedException {
            Iterator<RecordWriter<K, V>> values = this.recordWriters.values().iterator();
            while (values.hasNext()) {
                values.next().close(context);
            }
            this.recordWriters.clear();
        }
        @Override
        public void write(K key, V value) throws IOException, InterruptedException {
            //得到输出文件名
            String baseName = generateFileNameForKeyValue(key, value, job.getConfiguration());
            RecordWriter<K, V> rw = this.recordWriters.get(baseName);
            if (rw == null) {
                rw = getBaseRecordWriter(job, baseName);
                this.recordWriters.put(baseName, rw);
            }
            rw.write(key, value);
        }
        // ${mapred.out.dir}/_temporary/_${taskid}/${nameWithExtension}
        private RecordWriter<K, V> getBaseRecordWriter(TaskAttemptContext job, String baseName)
                throws IOException, InterruptedException {
            Configuration conf = job.getConfiguration();
            boolean isCompressed = getCompressOutput(job);
            String keyValueSeparator = ",";
            RecordWriter<K, V> recordWriter = null;
            if (isCompressed) {
                Class<? extends CompressionCodec> codecClass = getOutputCompressorClass(job,
                        GzipCodec.class);
                CompressionCodec codec = ReflectionUtils.newInstance(codecClass, conf);
                Path file = new Path(workPath, baseName + codec.getDefaultExtension());
                FSDataOutputStream fileOut = file.getFileSystem(conf).create(file, false);
                recordWriter = new lineRecordWrite<K, V>(new DataOutputStream(codec
                        .createOutputStream(fileOut)), keyValueSeparator);
            } else {
                Path file = new Path(workPath, baseName);
                FSDataOutputStream fileOut = file.getFileSystem(conf).create(file, false);
                recordWriter = new lineRecordWrite<K, V>(fileOut, keyValueSeparator);
            }
            return recordWriter;
        }
    }
}  
时间: 2024-08-13 10:41:01

Hadoop 高级程序设计(二)---自定义输入输出格式的相关文章

Hadoop 高级程序设计(一)---复合键 自定义输入类型

简介: 在大数据处理的基本方法上,对于相互间计算的依赖性不大的数据,mapreduce采用分治的策略进行处理,将大的问题划分成小的问题进行求解,使得问题变得简单可行,同时在处理问题上面,MapReduce框架隐藏了很多的处理细节,将数据切分,任务调度,数据通信,容错,负载均衡.....交给了系统负责,对于很多问题,只需要采取框架的缺省值完成即可,用户只需完成设计map函数很reduce函数即可. 复合键 在一般的情况下只需要使用简单的<key,value>对即可,但是在一些复杂的情况下可以完成

干货--Hadoop自定义数据类型和自定义输入输出格式整合项目案例

正文开始前 ,先介绍几个概念 序列化 所谓序列化,是指将结构化对象转化为字节流,以便在网络上传输或写到磁盘进行永久存储. 反序列化 是指将字节流转回到结构化对象的逆过程 序列化在分布式数据处理的两个大领域经常出现:进程间通信和永久存储 在Hadoop中,系统中多个节点上进程间的通信是通过"远程过程调用"(remote procedure call,RPC)实现的 .RPC协议将消息序列化成二进制流后发送到远程节点,远程节点接着将二进制流反序列化为原始消息 Hadoop使用了自己写的序列

Hadoop 高级程序设计(三)---自定义Partition和Combiner

Hadoop提供了缺省的Partition来完成map的输出向reduce分发处理.有时也需要自定义partition来将相同key值的数据分发到同一个reduce处理,为了减少map过程输出的中间结果键值对的数量,降低网络数据通信开销,用户也可以自定制combiner过程. 自定制Partition过程: 在mapreduce中,partition用于决定Map节点输出将被分到哪个Reduce节点,MapReduce提供的缺省Partition是HashPartition,他根据每条数据的主键

JavaScript高级程序设计(二):在HTML中使用JavaScript

一.使用<script>元素 1.<script>元素定义了6个属性: async:可选.表示应该立即下载脚本,但不应该妨碍页面中的其他操作,比如下载其他资源或等待加载其他脚本.只对外部脚本文件有效. charset:可选.表示通过src属性指定的代码的字符集.很少人用. defer:可选.表示脚本可以延迟到文档完全被解析和显示之后再执行.只对外部文件有效. language:已废弃. src:可选.表示包含要执行代码的外部文件. type:可选.表示编写代码使用的脚本语言的内容类

《JavaScript 高级程序设计》读书笔记二 使用JavaScript

一   <script>元素 a.四个属性: async:立即异步加载外部脚本: defer:延迟到文档完全被解析再加载外部脚本: src:外部脚本路径: type:脚本语言的内容类型: 二   XHTML中用法 a. //<![CDATA[ javascript代码 //]]> 三   <noscript>元素 <JavaScript 高级程序设计>读书笔记二 使用JavaScript

hadoop编程小技巧(5)---自定义输入文件格式类InputFormat

Hadoop代码测试环境:Hadoop2.4 应用:在对数据需要进行一定条件的过滤和简单处理的时候可以使用自定义输入文件格式类. Hadoop内置的输入文件格式类有: 1)FileInputFormat<K,V>这个是基本的父类,我们自定义就直接使用它作为父类: 2)TextInputFormat<LongWritable,Text>这个是默认的数据格式类,我们一般编程,如果没有特别指定的话,一般都使用的是这个:key代表当前行数据距离文件开始的距离,value代码当前行字符串:

MySQL数据库高级(二)——自定义函数

MySQL数据库高级(二)--自定义函数 一.自定义函数简介 自定义函数 (user-defined function UDF)是一种对MySQL扩展的途径,其用法和内置函数相同.自定义函数的两个必要条件:A.参数B.返回值(必须有).函数可以返回任意类型的值. 二.自定义函数的使用 1.自定义函数语法 CREATE?FUNCTION?function_name(parameter_nametype,[parameter_name type,...]) RETURNS?{STRING|INTEG

《Javascript高级程序设计》阅读记录(二):第四章

这个系列之前文字地址:http://www.cnblogs.com/qixinbo/p/6984374.html 这个系列,我会把阅读<Javascript高级程序设计>之后,感觉讲的比较深入,而且实际使用价值较大的内容记录下来,并且注释上我的一些想法.做这个一方面是提升了我的阅读效果以及方便我以后阅读 另一个目的是,Javascript高级程序设计这本书内容很多也很厚,希望其他没有时间的人可以通过看这系列摘录,就可以大体学到书里面的核心内容. 绿色背景的内容是我认为比较值得注意的原著内容.

hadoop自定义输入格式

一个任务的开始阶段是由InputFormat来决定的! 1.在MapReduce框架中,InputFormat扮演的角色:– 将输入数据切分成逻辑的分片(Split),一个分片将被分配给一个单独的Mapper– 提供RecordReader的对象,该对象会从分片中读出<Key-Value>对供Mapper处理 1.1InputFormat对Mapper的影响:– 决定了Mapper的数量– 决定了Mapper的map函数接收的Key和Value 1.2InputFormat: InputFor