MR中简单实现自定义的输入输出格式

import java.io.DataOutput;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Reducer.Context;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader;
import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileRecordReader;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class TestCombine extends Configured implements Tool {
	private static class ProvinceMapper extends
			Mapper<Object, Text, Text, Text> {
		@Override
		protected void map(Object key, Text value, Context context)
				throws IOException, InterruptedException {
			System.out.println("value : " + value + " Context " + context);
			context.write(value, value);
		}
	}

	private static class ProvinceReducer extends
			Reducer<Text, Text, Text, Text> {
		@Override
		protected void reduce(Text key, Iterable<Text> values, Context context)
				throws IOException, InterruptedException {
			for (Text va : values) {
			    System.out.println("reduce " + key);
				context.write(key, key);
			}
		}
	}

	 // 输入格式
     static class CombineSequenceFileInputFormat<K, V> extends CombineFileInputFormat<K, V> {
	    @SuppressWarnings({ "unchecked", "rawtypes" })
	    @Override
	    public RecordReader<K, V> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException {
	        return new CombineFileRecordReader((CombineFileSplit)split, context, CombineLineRecordReader.class);
	    }
	}  

	 static class CombineLineRecordReader<K, V> extends RecordReader<K, V> {
	    private CombineFileSplit split;
	    private TaskAttemptContext context;
	    private int index;
	    private RecordReader<K, V> rr;  

	    @SuppressWarnings("unchecked")
	    public CombineLineRecordReader(CombineFileSplit split, TaskAttemptContext context, Integer index) throws IOException, InterruptedException {
	        this.index = index;
	        this.split = (CombineFileSplit) split;
	        this.context = context;  

	        this.rr = (RecordReader<K, V>) ReflectionUtils.newInstance(LineRecordReader.class, context.getConfiguration());
	    }  

	    @SuppressWarnings("unchecked")
	    @Override
	    public void initialize(InputSplit curSplit, TaskAttemptContext curContext) throws IOException, InterruptedException {
	        this.split = (CombineFileSplit) curSplit;
	        this.context = curContext;  

	        if (null == rr) {
	            rr = ReflectionUtils.newInstance(SequenceFileRecordReader.class, context.getConfiguration());
	        }  

	        FileSplit fileSplit = new FileSplit(this.split.getPath(index),
	                this.split.getOffset(index), this.split.getLength(index),
	                this.split.getLocations());  

	        this.rr.initialize(fileSplit, this.context);
	    }  

	    @Override
	    public float getProgress() throws IOException, InterruptedException {
	        return rr.getProgress();
	    }  

	    @Override
	    public void close() throws IOException {
	        if (null != rr) {
	            rr.close();
	            rr = null;
	        }
	    }  

	    @Override
	    public K getCurrentKey()
	    throws IOException, InterruptedException {
	        return rr.getCurrentKey();
	    }  

	    @Override
	    public V getCurrentValue()
	    throws IOException, InterruptedException {
	        return rr.getCurrentValue();
	    }  

	    @Override
	    public boolean nextKeyValue() throws IOException, InterruptedException {
	        return rr.nextKeyValue();
	    }
	}  

	// 输出格式
	 static class MyOutputFormat extends FileOutputFormat<Text, Text>{
		@Override
		public RecordWriter<Text, Text> getRecordWriter(
				TaskAttemptContext job) throws IOException, InterruptedException {
			return new MyRecordWriter(job);
		}
	}

	  public static class  MyRecordWriter extends RecordWriter<Text, Text> {
		private Map<String, FSDataOutputStream> outputMap = null;
		private static final String LINESEPARATOR = "\n";
		private FileSystem fs;
		private JobContext job;

		public MyRecordWriter(JobContext job) throws IOException {
			this.outputMap = new HashMap<String, FSDataOutputStream>();
			this.job = job;
			this.fs = FileSystem.get(job.getConfiguration());
		}

		// 参考 MultipleOutputs
		public void write(Text key, Text value) throws IOException {
			String k = key.toString();
			if(k.isEmpty())
				return;
			FSDataOutputStream out = outputMap.get(k);
			if(out==null) {
				if(k.isEmpty())
					System.out.println(value.toString());
				Path outputPath = new Path(FileOutputFormat.getOutputPath(job), k);
				if(!fs.exists(outputPath))
					out = fs.create(outputPath);
				else
					return;
				outputMap.put(k, out);
			}
			out.write(value.getBytes());
			out.write(LINESEPARATOR.getBytes());
		}

		@Override
		public void close(TaskAttemptContext context) throws IOException,
				InterruptedException {
			for(FSDataOutputStream out : outputMap.values()) {
				out.close();
			}
		}
	}

	public int run(String[] args) throws Exception {
		Configuration conf = new Configuration();

		Job job = new Job(conf);
		job.setJobName("TestCombine");
		job.setJarByClass(TestCombine.class);

		job.setMapperClass(ProvinceMapper.class);
		job.setReducerClass(ProvinceReducer.class);

		//job.setInputFormatClass(CombineSequenceFileInputFormat.class);
		job.setOutputFormatClass(MyOutputFormat.class);

		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(Text.class);

		String inpath = "/home/hadoop/tmp/combine";
		String outpath = "/home/hadoop/tmp/combineout";
		Path p = new Path(outpath);

		FileSystem fs = FileSystem.get(conf);
		if (fs.exists(p)){
			fs.delete(p);
		}
		FileInputFormat.addInputPaths(job, inpath);
		FileOutputFormat.setOutputPath(job, p);

		return job.waitForCompletion(true) ? 0 : 1;
	} 

	public static void main(String[] args) throws Exception {
		int ret = ToolRunner.run(new TestCombine(), args);
		System.exit(ret);
	}
}
时间: 2024-10-10 22:01:14

MR中简单实现自定义的输入输出格式的相关文章

干货--Hadoop自定义数据类型和自定义输入输出格式整合项目案例

正文开始前 ,先介绍几个概念 序列化 所谓序列化,是指将结构化对象转化为字节流,以便在网络上传输或写到磁盘进行永久存储. 反序列化 是指将字节流转回到结构化对象的逆过程 序列化在分布式数据处理的两个大领域经常出现:进程间通信和永久存储 在Hadoop中,系统中多个节点上进程间的通信是通过"远程过程调用"(remote procedure call,RPC)实现的 .RPC协议将消息序列化成二进制流后发送到远程节点,远程节点接着将二进制流反序列化为原始消息 Hadoop使用了自己写的序列

Hadoop 高级程序设计(二)---自定义输入输出格式

Hadoop提供了较为丰富的数据输入输出格式,可以满足很多的设计实现,但是在某些时候需要自定义输入输出格式. 数据的输入格式用于描述MapReduce作业的数据输入规范,MapReduce框架依靠数据输入格式完后输入规范检查(比如输入文件目录的检查),对数据文件进行输入分块(InputSpilt)以及提供从输入分快中将数据逐行的读出,并转换为Map过程的输入键值对等功能.Hadoop提供了很多的输入格式,TextInputFormat和KeyValueInputFormat,对于每个输入格式都有

在PowerPoint中自定义可输入文本的占位符

日常生活中,当我们设计多媒体课件时,默认的版式其实已经够用了.但是,很多时候,我们需要更加个性一点,所以,我们需要自定义很多东西.本文介绍在PowerPoint中自定义可输入文本的占位符. 一.占位符的概念 占位符就是先占住一个固定的位置,等着你再往里面添加内容的符号. 二.PowerPoint(简称:PPT)简介 ppt,是演示文稿软件.演示文稿中的每一页就叫幻灯片,每张幻灯片都是演示文稿中既相互独立又相互联系的内容.我们这里要做的就是,给每一张幻灯片都加上占位符. 三.在幻灯片母版中加入占位

C语言中输入输出格式控制

1.C语言中,非零值为真,真用1表示:零值为假,假用0表示. 2.转义字符参考: \a 蜂鸣,响铃 \b 回退:向后退一格 \f 换页 \n 换行 \r 回车,光标到本行行首 \t 水平制表 \v 垂直制表 \\ 反斜杠 \' 单引号 \" 双引号 \? 问号 \ddd 三位八进制 \0 空字符(NULL),什么都不做 \xhh 二位十六进制 说明: 1)\v垂直制表和\f换页符对屏幕没有任何影响,但会影响打印机执行响应操作. 2),\n其实应该叫回车换行.换行只是换一行,不改变光标的横坐标:回

数据输入输出格式

数据输入格式 数据输入格式(InputFormat)用于描述MR作业的输入规范,主要功能:输入规范检查(比如输入文件目录的检查).对数据文件进行输入切分和从输入分块中将数据记录逐一读取出来.并转化为Map的输入键值对. Hadoop中最常用的数据输入格式包括:TextInputFormat 和 KeyValueInputFormat. 1)TextInputFormat 是系统默认的数据输入格式,可以将文件的每一行解析成一个键值对.其中,Key是当前行在整个文件中的字节偏移量,而Value就是该

CString中Format函数与格式输入与输出

CString中Format函数与格式输入与输出 Format是一个很常用,却又似乎很烦的方法,以下是它的完整概貌,以供大家查询之用: 格式化字符串forma("%d",12)意思是将一个整形的格式化的字符(我认为是保持其形状不变) 1).格式说明总是以%字符开始,以下是不同类型数据的格式方式%号后的说明: d输出带符号十进制数 o输出无符号八进制数 x输出无符号十六进制数 u输出无符号数 c输出单个字符 s输出一串字符 f输出实数(6位小数) e以指数形式输出实数 g选用f与e格式中

MR中的combiner和partitioner

1.combiner combiner是MR编程模型中的一个组件: 有些任务中map可能会产生大量的本地输出,combiner的作用就是在map端对输出先做一次合并,以减少map和reduce节点之间的数据传输量,提高网络IO性能,是MR的优化手段之一: 两大基本功能: 1.1map的输出的key的聚合,对map输出的key排序.value进行迭代: 1.2reduce功能. 并不是设置了combiner就一定会执行(在当前集群非常繁忙的时候设置了也不会执行): combiner的执行时机:co

简单的文本框输入自动提示

简单的文本框输入自动提示--输入的时候可以直接异步加载数据库中匹配的项,然后显示出来. 这里没有使用到数据库,直接在PHP用数组模拟数据存储.  demo演示 原理主要是: 监听输入框的状态,当有改变的时候即刻通过ajax发送数据并取得返回值. 主要使用了jQuery封装很方便,但貌似我这个兼容性不咋地...主要提供个思路吧~ js部分: <script type="text/javascript" src="./js/jquery.min.js">&l

使用Visual Studio创建简单的自定义Web Part 部件属性

使用Visual Studio创建简单的自定义Web Part 部件属性 自定义属性使用额外的选项和设置拓展你的Web part部件.本文主要讲解如何使用Visual Studio创建简单的自定义Web Part 部件属性. 1. 打开Visual Studio,点击文件--新建项目--空白SharePoint项目CustomWPProperties.部署为场解决方案. 2. 右击项目添加新项Web Part部件WPPropertyExample,点击添加. 3. 右击WPPropertyExa