Hadoop之——数据类型

转载请注明出处:http://blog.csdn.net/l1028386804/article/details/46039055

一.  Hadoop内置的数据类型

  • BooleanWritable:标准布尔型数值
  • ByteWritable:单字节数值
  • DoubleWritable:双字节数值
  • FloatWritable:浮点数
  • IntWritable:整型数
  • LongWritable:长整型数
  • Text:使用UTF8格式存储的文本
  • NullWritable:当<key, value>中的key或value为空时使用

二、Hadoop自定义数据类型实例

把后面的URLString 封装成 URL类型。代码如下

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;

import org.apache.hadoop.io.Writable;
/**
 * @author liuyazhuang
 */
public class URLWritable implements Writable {

	protected URL url;

	public URLWritable() {

	}

	public URLWritable(URL url) {
		this.url = url;
	}

	@Override
	public void write(DataOutput out) throws IOException {
		out.writeUTF(url.toString());
	}

	@Override
	public void readFields(DataInput in) throws IOException {
		this.url = new URL(in.readUTF());
	}

	public void set(String string) {
		try {
			this.url = new URL(string);
		} catch (MalformedURLException e) {
			throw new RuntimeException("Should not have happened " + e.toString());
		}
	}
}
import java.io.IOException;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;
/**
 * @author liuyazhuang
 */
public class TimeUrlLineRecordReader extends RecordReader<Text, URLWritable> {
	public static final String Time_URL_SEPERATOR =
	    "mapreduce.input.keyvaluelinerecordreader.key.value.separator";

	private final LineRecordReader lineRecordReader;

	private byte separator = (byte) '\t';

	private Text innerValue;

	private Text key;

	private URLWritable value;

	public static int findSeparator(byte[] utf, int start, int length, byte sep) {
		for (int i = start; i < (start + length); i++) {
			if (utf[i] == sep) {
				return i;
			}
		}
		return -1;
	}

	public static void setKeyValue(Text key, URLWritable value, byte[] line,
			int lineLen, int pos) {
		if (pos == -1) {
			key.set(line, 0, lineLen);
			value.set(StringUtils.EMPTY);
		} else {
			key.set(line, 0, pos);
			String url = null;
			System.arraycopy(line, pos + 1,url , 0, lineLen - pos - 1);
			value.set(url);
		}
	}

	public TimeUrlLineRecordReader(Configuration conf) throws IOException {
		lineRecordReader = new LineRecordReader();
		String sepStr = conf.get(Time_URL_SEPERATOR, "\t");
	    this.separator = (byte) sepStr.charAt(0);
	}

	@Override
	public void initialize(InputSplit split, TaskAttemptContext context)
			throws IOException, InterruptedException {
		 lineRecordReader.initialize(split, context);
	}

	@Override
	public boolean nextKeyValue() throws IOException, InterruptedException {
		byte[] line = null;
		int lineLen = -1;
		if (lineRecordReader.nextKeyValue()) {
			innerValue = lineRecordReader.getCurrentValue();
			line = innerValue.getBytes();
			lineLen = innerValue.getLength();
		} else {
			return false;
		}
		if (line == null) {
			return false;
		}
		if (key == null) {
			key = new Text();
		}
		if (value == null) {
			value = new URLWritable();
		}
		int pos = findSeparator(line, 0, lineLen, this.separator);
		setKeyValue(key, value, line, lineLen, pos);
	    return true;
	}

	@Override
	public Text getCurrentKey() throws IOException, InterruptedException {
		return key;
	}

	@Override
	public URLWritable getCurrentValue() throws IOException,
			InterruptedException {
		return value;
	}

	@Override
	public float getProgress() throws IOException, InterruptedException {
		return lineRecordReader.getProgress();
	}

	@Override
	public void close() throws IOException {
		lineRecordReader.close();
	}
}
import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
/**
 * @author liuyazhuang
 */
public class TimeUrlTextInputFormat extends FileInputFormat<Text, URLWritable>{

	@Override
	protected boolean isSplitable(JobContext context, Path file) {
		final CompressionCodec codec = new CompressionCodecFactory(
				context.getConfiguration()).getCodec(file);
		return codec == null;
	}

	@Override
	public RecordReader<Text, URLWritable> createRecordReader(InputSplit split,
			TaskAttemptContext context) throws IOException, InterruptedException {
		context.setStatus(split.toString());
		return new TimeUrlLineRecordReader(context.getConfiguration());
	}
}
import java.io.IOException;

import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/**
 * @author liuyazhuang
 */
public class CustomTimeUrl extends Configured implements Tool {

	public static class CustomTimeUrlMapper extends Mapper<Text, URLWritable, Text, URLWritable> {

		@Override
		protected void map(Text key, URLWritable value, Context context)
				throws IOException, InterruptedException {
			context.write(key, value);
		}

	}

	public static class CustomTimeUrlReducer extends Reducer<Text, URLWritable, Text, URLWritable> {

		@Override
		protected void reduce(Text key, Iterable<URLWritable> values,Context context)throws IOException, InterruptedException {
			for (URLWritable value : values) {
				context.write(key, value);
			}
		}

	}

	    @Override
	     public int run(String[] args) throws Exception {
		Job job = new Job(getConf());
		job.setJarByClass(getClass());
		job.setJobName("CustomTimeUrl");

		job.setInputFormatClass(TimeUrlTextInputFormat.class);
		job.setOutputFormatClass(TextOutputFormat.class);

		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(URLWritable.class);

		job.setMapperClass(CustomTimeUrlMapper.class);
		job.setReducerClass(CustomTimeUrlReducer.class);

		FileInputFormat.setInputPaths(job, new Path("/timeurl/input/"));
		FileOutputFormat.setOutputPath(job, new Path("/timeurl/output"));

		boolean success = job.waitForCompletion(true);
		return success ? 0 : 1;
	}

	public static void main(String[] args) throws Exception {
		int result = ToolRunner.run(new TimeUrl(), args);
		System.exit(result);
	}
}
时间: 2024-10-19 18:55:22

Hadoop之——数据类型的相关文章

干货--Hadoop自定义数据类型和自定义输入输出格式整合项目案例

正文开始前 ,先介绍几个概念 序列化 所谓序列化,是指将结构化对象转化为字节流,以便在网络上传输或写到磁盘进行永久存储. 反序列化 是指将字节流转回到结构化对象的逆过程 序列化在分布式数据处理的两个大领域经常出现:进程间通信和永久存储 在Hadoop中,系统中多个节点上进程间的通信是通过"远程过程调用"(remote procedure call,RPC)实现的 .RPC协议将消息序列化成二进制流后发送到远程节点,远程节点接着将二进制流反序列化为原始消息 Hadoop使用了自己写的序列

Hadoop 自定义数据类型

Hadoop的自定制数据类型有两种,一种较为简单的是针对值,另外一种更为完整针对于键和值都适合 一.针对值,实现 Writable 接口 package org.apache.hadoop.io; import java.io.DataOutput; import java.io.DataInput; import java.io.IOException; public interface Writable { void write(DataOutput out) throws IOExcept

Hadoop自定义类型处理手机上网日志

job提交源码分析 在eclipse中的写的代码如何提交作业到JobTracker中的哪?(1)在eclipse中调用的job.waitForCompletion(true)实际上执行如下方法 connect(); info = jobClient.submitJobInternal(conf); (2)在connect()方法中,实际上创建了一个JobClient对象. 在调用该对象的构造方法时,获得了JobTracker的客户端代理对象JobSubmissionProtocol. JobSu

hadoop实例

一篇讲得很好的hadoop实例,非常适合初学者学习hadoop. 本文转载自:http://www.cnblogs.com/xia520pi/archive/2012/06/04/2534533.html,感谢作者虾皮的分享. 1.数据去重  "数据去重"主要是为了掌握和利用并行化思想来对数据进行有意义的筛选.统计大数据集上的数据种类个数.从网站日志中计算访问地等这些看似庞杂的任务都会涉及数据去重.下面就进入这个实例的MapReduce程序设计. 1.1 实例描述 对数据文件中的数据进

Hadoop集群(第9期)_MapReduce初级案例

1.数据去重  "数据去重"主要是为了掌握和利用并行化思想来对数据进行有意义的筛选.统计大数据集上的数据种类个数.从网站日志中计算访问地等这些看似庞杂的任务都会涉及数据去重.下面就进入这个实例的MapReduce程序设计. 1.1 实例描述 对数据文件中的数据进行去重.数据文件中的每行都是一个数据. 样例输入如下所示: 1)file1: 2012-3-1 a 2012-3-2 b 2012-3-3 c 2012-3-4 d 2012-3-5 a 2012-3-6 b 2012-3-7

Hadoop MapReduce执行过程详解(带hadoop例子)

https://my.oschina.net/itblog/blog/275294 摘要: 本文通过一个例子,详细介绍Hadoop 的 MapReduce过程. 分析MapReduce执行过程 MapReduce运行的时候,会通过Mapper运行的任务读取HDFS中的数据文件,然后调用自己的方法,处理数据,最后输出.Reducer任务会接收Mapper任务输出的数据,作为自己的输入数据,调用自己的方法,最后输出到HDFS的文件中.整个流程如图: Mapper任务的执行过程详解 每个Mapper任

Hadoop Day2

1.分布式文件系统与HDFS(****了解***) ?  思考:windows的文件存储目录结构? ?  什么是分布式文件系统?(***了解***) 当数据量越来越多,在一个操作系统管辖的范围存不下了,那么就分配到更多的操作系统管理的磁盘中,但是不方便管理和维护,因此迫切需要一种系统来管理多台机器上的文件,这就是分布式文件管理系统 (Distributed File System). l  分布式文件系统的定义: 是一种允许文件通过网络在多台主机上分享的文件的系统,可让多机器上的多用户分享文件和

Hadoop MapReduce编程学习

一直在搞spark,也没时间弄hadoop,不过Hadoop基本的编程我觉得我还是要会吧,看到一篇不错的文章,不过应该应用于hadoop2.0以前,因为代码中有  conf.set("mapred.job.tracker", "192.168.1.2:9001");新框架中已改为 Yarn-site.xml 中的 resouceManager 及 nodeManager 具体配置项,新框架中历史 job 的查询已从 Job tracker 剥离,归入单独的mapre

Hadoop之——有趣问答(一)

转载请注明出处:http://blog.csdn.net/l1028386804/article/details/46039301 问:在eclipse中的写的代码如何提交作业到JobTracker中的哪? 答:(1)在eclipse中调用的job.waitForCompletion(true)实际上执行如下方法 connect(); info = jobClient.submitJobInternal(conf); (2)在connect()方法中,实际上创建了一个JobClient对象. 在