hadoop mapreduce 自定义InputFormat

很久以前为了满足公司的需求写过一些自定义InputFormat,今天有时间拿出来记一下
    需求是这样的,如果如果使用FileInputFormat作为输入,是按照行来读取日志的,也就是按照\n来区分每一条日志的,而由于一条日志中记录着一些错误信息,比如java的异常信息,这些信息本身就带有换行符,如果还是按照\n进行区分每一条日志的话明显是错误的,由于我们日志的特殊性,将以"]@\n"作为区分日志的标识。
    接下来就来看看如何自定义InputFormat,还是不画类图了,我这个人太懒了,口述,口述
    首先我们要先定义一个类继承FileInputFormat,并重写createRecordReader方法返回RecordReader,然后定义一个类继承RecordReader,createRecordReader方法返回也就是我们定义的RecordReader的子类的对象。

代码如下

public class TrackInputFormat extends FileInputFormat<LongWritable, Text> {

	@Override
	public RecordReader<LongWritable, Text> createRecordReader(InputSplit arg0,
			TaskAttemptContext arg1) throws IOException, InterruptedException {
		// TODO Auto-generated method stub
		return new TrackRecordReader();

	}
}
package input;

import java.io.IOException;
import java.io.InputStream;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.log4j.Logger;

/**
 * Treats keys as offset in file and value as line.
 *
 * @deprecated Use
 *             {@link org.apache.hadoop.mapreduce.lib.input.LineRecordReader}
 *             instead.
 */
public class TrackRecordReader extends RecordReader<LongWritable, Text> {
	Logger logger = Logger.getLogger(TrackRecordReader.class.getName());
	private CompressionCodecFactory compressionCodecs = null;
	private long start;
	private long pos;
	private long end;
	private NewLineReader in;
	private int maxLineLength;
	private LongWritable key = null;
	private Text value = null;
	// ----------------------
	// 行分隔符,即一条记录的分隔符
	private byte[] separator = "]@\n".getBytes();

	// --------------------

	public void initialize(InputSplit genericSplit, TaskAttemptContext context)
			throws IOException {
		FileSplit split = (FileSplit) genericSplit;
		Configuration job = context.getConfiguration();
		//mapreduce.input.linerecordreader.line.maxlength
		this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength",Integer.MAX_VALUE);
		start = split.getStart();
		end = start + split.getLength();
		final Path file = split.getPath();
		//logger.info("path========================="+file.toString());
		compressionCodecs = new CompressionCodecFactory(job);
		final CompressionCodec codec = compressionCodecs.getCodec(file);

		FileSystem fs = file.getFileSystem(job);
		FSDataInputStream fileIn = fs.open(split.getPath());
		boolean skipFirstLine = false;
		//logger.info("codec========================="+codec);
		if (codec != null) {
			in = new NewLineReader(codec.createInputStream(fileIn), job);
			end = Long.MAX_VALUE;
		} else {
			if (start != 0) {
				skipFirstLine = true;
				this.start -= separator.length;//
				// --start;
				fileIn.seek(start);
			}
			in = new NewLineReader(fileIn, job);
		}
		if (skipFirstLine) { // skip first line and re-establish "start".
				start += in.readLine(new Text(), 0,
					(int) Math.min((long) Integer.MAX_VALUE, end - start));
		}
		this.pos = start;
		/*if (skipFirstLine) {
			int newSize = in.readLine(new Text(), 0, (int) Math.min( (long) Integer.MAX_VALUE, end - start));
		    if(newSize > 0){
		    	start += newSize;
		    }
		}*/

	}

	public boolean nextKeyValue() throws IOException {
		if (key == null) {
			key = new LongWritable();
		}
		key.set(pos);
		if (value == null) {
			value = new Text();
		}
		int newSize = 0;
		while (pos < end) {
			newSize = in.readLine(value, maxLineLength,
					Math.max((int) Math.min(Integer.MAX_VALUE, end - pos),
							maxLineLength));
			if (newSize == 0) {
				break;
			}
			pos += newSize;
			if (newSize < maxLineLength) {
				break;
			}
		}
		if (newSize == 0) {
			//读取下一个buffer
			key = null;
			value = null;
			return false;
		} else {
			//读同一个buffer的下一个记录
			return true;
		}
	}

	@Override
	public LongWritable getCurrentKey() {
		return key;
	}

	@Override
	public Text getCurrentValue() {
		return value;
	}

	/**
	 * Get the progress within the split
	 */
	public float getProgress() {
		if (start == end) {
			return 0.0f;
		} else {
			return Math.min(1.0f, (pos - start) / (float) (end - start));
		}
	}

	public synchronized void close() throws IOException {
		if (in != null) {
			in.close();
		}
	}

	public class NewLineReader {
		private static final int DEFAULT_BUFFER_SIZE = 256 * 1024* 1024;
		private int bufferSize = DEFAULT_BUFFER_SIZE;
		private InputStream in;
		private byte[] buffer;
		private int bufferLength = 0;
		private int bufferPosn = 0;

		public NewLineReader(InputStream in) {
			this(in, DEFAULT_BUFFER_SIZE);
		}

		public NewLineReader(InputStream in, int bufferSize) {
			this.in = in;
			this.bufferSize = bufferSize;
			this.buffer = new byte[this.bufferSize];
		}

		public NewLineReader(InputStream in, Configuration conf)
				throws IOException {
			this(in, conf.getInt("io.file.buffer.size", DEFAULT_BUFFER_SIZE));
		}

		public void close() throws IOException {
			in.close();
		}

		public int readLine(Text str, int maxLineLength, int maxBytesToConsume)
				throws IOException {
			str.clear();
			Text record = new Text();
			int txtLength = 0;
			long bytesConsumed = 0L;
			boolean newline = false;
			int sepPosn = 0;
			do {
				// 已经读到buffer的末尾了,读下一个buffer
				if (this.bufferPosn >= this.bufferLength) {
					bufferPosn = 0;
					bufferLength = in.read(buffer);
					// 读到文件末尾了,则跳出,进行下一个文件的读取
					if (bufferLength <= 0) {
						break;
					}
				}
				int startPosn = this.bufferPosn;
				for (; bufferPosn < bufferLength; bufferPosn++) {
					// 处理上一个buffer的尾巴被切成了两半的分隔符(如果分隔符中重复字符过多在这里会有问题)
					if (sepPosn > 0 && buffer[bufferPosn] != separator[sepPosn]) {
						sepPosn = 0;
					}
					// 遇到行分隔符的第一个字符
					if (buffer[bufferPosn] == separator[sepPosn]) {
						bufferPosn++;
						int i = 0;
						// 判断接下来的字符是否也是行分隔符中的字符
						for (++sepPosn; sepPosn < separator.length; i++, sepPosn++) {
							// buffer的最后刚好是分隔符,且分隔符被不幸地切成了两半
							if (bufferPosn + i >= bufferLength) {
								bufferPosn += i - 1;
								break;
							}
							// 一旦其中有一个字符不相同,就判定为不是分隔符
							if (this.buffer[this.bufferPosn + i] != separator[sepPosn]) {
								sepPosn = 0;
								break;
							}
						}
						// 的确遇到了行分隔符
						if (sepPosn == separator.length) {
							bufferPosn += i;
							newline = true;
							sepPosn = 0;
							break;
						}
					}
				}
				int readLength = this.bufferPosn - startPosn;
				bytesConsumed += readLength;
				// 行分隔符不放入块中
				if (readLength > maxLineLength - txtLength) {
					readLength = maxLineLength - txtLength;
				}
				if (readLength > 0) {
					record.append(this.buffer, startPosn, readLength);
					txtLength += readLength;
					// 去掉记录的分隔符
					if (newline) {
						str.set(record.getBytes(), 0, record.getLength() - separator.length);
					}
				}
			}
			while (!newline && (bytesConsumed < maxBytesToConsume));
			if (bytesConsumed > (long) Integer.MAX_VALUE) {
				throw new IOException("Too many bytes before newline: "
						+ bytesConsumed);
			}

			return (int) bytesConsumed;
		}

		public int readLine(Text str, int maxLineLength) throws IOException {
			return readLine(str, maxLineLength, Integer.MAX_VALUE);
		}

		public int readLine(Text str) throws IOException {
			return readLine(str, Integer.MAX_VALUE, Integer.MAX_VALUE);
		}
	}
}
private byte[] separator = "]@\n".getBytes();

上面这一行就是定义每条日志以什么字符串区分开,今天就贴代码吧,改天把这个代码讲一下

时间: 2024-11-05 02:31:10

hadoop mapreduce 自定义InputFormat的相关文章

Hadoop mapreduce自定义分组RawComparator

本文发表于本人博客. 今天接着上次[Hadoop mapreduce自定义排序WritableComparable]文章写,按照顺序那么这次应该是讲解自定义分组如何实现,关于操作顺序在这里不多说了,需要了解的可以看看我在博客园的评论,现在开始. 首先我们查看下Job这个类,发现有setGroupingComparatorClass()这个方法,具体源码如下: /** * Define the comparator that controls which keys are grouped toge

commoncrawl 源码库是用于 Hadoop 的自定义 InputFormat 配送实现

commoncrawl 源码库是用于 Hadoop 的自定义 InputFormat 配送实现. Common Crawl 提供一个示例程序 BasicArcFileReaderSample.java (位于 org.commoncrawl.samples) 用来配置 InputFormat. commoncrawl / commoncrawl Watch414 Fork86 CommonCrawl Project Repository — More... http://www.commoncr

[Hadoop] - Mapreduce自定义Counter

在Hadoop的MR程序开发中,经常需要统计一些map/reduce的运行状态信息,这个时候我们可以通过自定义Counter来实现,这个实现的方式是不是通过配置信息完成的,而是通过代码运行时检查完成的. 1.创建一个自己的Counter枚举类. enum PROCESS_COUNTER { BAD_RECORDS, BAD_GROUPS; } 2.在需要统计的地方,比如map或者reduce阶段进行下列操作. context.getCounter(PROCESS_COUNTER.BAD_RECO

hadoop MapReduce自定义分区Partition输出各运营商的手机号码

MapReduce和自定义Partition MobileDriver主类 package Partition; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; public class MobileDriver { public static void main(String[] args) { String[] paths = {"F:\\mobile.txt", "F

Hadoop mapreduce自定义排序WritableComparable

本文发表于本人博客. 今天继续写练习题,上次对分区稍微理解了一下,那根据那个步骤分区.排序.分组.规约来的话,今天应该是要写个排序有关的例子了,那好现在就开始! 说到排序我们可以查看下hadoop源码里面的WordCount例子中对LongWritable类型定义,它实现抽象接口WritableComparable,代码如下: public interface WritableComparable<T> extends Writable, Comparable<T> { } pub

Hadoop读书笔记(十二)MapReduce自定义排序

Hadoop读书笔记系列文章:http://blog.csdn.net/caicongyang/article/category/2166855 1.说明: 对给出的两列数据首先按照第一列升序排列,当第一列相同时,第二列升序排列 数据格式: 3 3 3 2 3 1 2 2 2 1 1 1 2.代码 SortApp.java package sort; import java.io.DataInput; import java.io.DataOutput; import java.io.IOExc

Hadoop初学指南(7)--MapReduce自定义计数器

本文主要介绍了MapReduce中的自定义计数器的相关内容. 在上次的单词统计例子中,我们可以看到MapReduce在执行过程中会有很多的控制台输出信息,其中有一个很关键的内容:计数器.如下图: 可以看到最上方的关键字:Counters,这就表示计数器. 在这里,只有一个制表符缩进的表示计数器组,有两个制表符缩进的表示计数器组下的计数器.如File Output Format Counters就表示文件输出的计数器组,里面的Bytes Written表示输出的字符数,在输出的文本中,hello,

Hadoop学习之路(6)MapReduce自定义分区实现

MapReduce自带的分区器是HashPartitioner原理:先对map输出的key求hash值,再模上reduce task个数,根据结果,决定此输出kv对,被匹配的reduce任务取走.自定义分分区需要继承Partitioner,复写getpariton()方法自定义分区类:注意:map的输出是<K,V>键值对其中int partitionIndex = dict.get(text.toString()),partitionIndex是获取K的值 附:被计算的的文本 Dear Dea

Hadoop学习之路(7)MapReduce自定义排序

本文测试文本: tom 20 8000 nancy 22 8000 ketty 22 9000 stone 19 10000 green 19 11000 white 39 29000 socrates 30 40000    MapReduce中,根据key进行分区.排序.分组MapReduce会按照基本类型对应的key进行排序,如int类型的IntWritable,long类型的LongWritable,Text类型,默认升序排序   为什么要自定义排序规则?现有需求,需要自定义key类型,