简单实现CombineFileInputFormat

import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Reducer.Context;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader;
import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileRecordReader;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class TestCombine extends Configured implements Tool {
	private static class ProvinceMapper extends
			Mapper<Object, Text, Text, Text> {
		@Override
		protected void map(Object key, Text value, Context context)
				throws IOException, InterruptedException {
			System.out.println("value : " + value + " Context " + context);
			context.write(value, value);
		}
	}

	private static class ProvinceReducer extends
			Reducer<Text, Text, Text, Text> {
		@Override
		protected void reduce(Text key, Iterable<Text> values, Context context)
				throws IOException, InterruptedException {
			for (Text va : values) {
			    System.out.println("reduce " + key);
				context.write(key, key);
			}
		}
	}

	public static class CombineSequenceFileInputFormat<K, V> extends CombineFileInputFormat<K, V> {
	    @SuppressWarnings({ "unchecked", "rawtypes" })
	    @Override
	    public RecordReader<K, V> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException {
	        return new CombineFileRecordReader((CombineFileSplit)split, context, CombineLineRecordReader.class);
	    }
	}  

	public static class CombineLineRecordReader<K, V> extends RecordReader<K, V> {
	    private CombineFileSplit split;
	    private TaskAttemptContext context;
	    private int index;
	    private RecordReader<K, V> rr;  

	    @SuppressWarnings("unchecked")
	    public CombineLineRecordReader(CombineFileSplit split, TaskAttemptContext context, Integer index) throws IOException, InterruptedException {
	        this.index = index;
	        this.split = (CombineFileSplit) split;
	        this.context = context;  

	        this.rr = (RecordReader<K, V>) ReflectionUtils.newInstance(LineRecordReader.class, context.getConfiguration());
	    }  

	    @SuppressWarnings("unchecked")
	    @Override
	    public void initialize(InputSplit curSplit, TaskAttemptContext curContext) throws IOException, InterruptedException {
	        this.split = (CombineFileSplit) curSplit;
	        this.context = curContext;  

	        if (null == rr) {
	            rr = ReflectionUtils.newInstance(SequenceFileRecordReader.class, context.getConfiguration());
	        }  

	        FileSplit fileSplit = new FileSplit(this.split.getPath(index),
	                this.split.getOffset(index), this.split.getLength(index),
	                this.split.getLocations());  

	        this.rr.initialize(fileSplit, this.context);
	    }  

	    @Override
	    public float getProgress() throws IOException, InterruptedException {
	        return rr.getProgress();
	    }  

	    @Override
	    public void close() throws IOException {
	        if (null != rr) {
	            rr.close();
	            rr = null;
	        }
	    }  

	    @Override
	    public K getCurrentKey()
	    throws IOException, InterruptedException {
	        return rr.getCurrentKey();
	    }  

	    @Override
	    public V getCurrentValue()
	    throws IOException, InterruptedException {
	        return rr.getCurrentValue();
	    }  

	    @Override
	    public boolean nextKeyValue() throws IOException, InterruptedException {
	        return rr.nextKeyValue();
	    }
	}  

	public int run(String[] args) throws Exception {
		Configuration conf = new Configuration();

		Job job = new Job(conf);
		job.setJobName("TestCombine");
		job.setJarByClass(TestCombine.class);

		job.setMapperClass(ProvinceMapper.class);
		job.setReducerClass(ProvinceReducer.class);

		job.setInputFormatClass(CombineSequenceFileInputFormat.class);

		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(Text.class);

		String inpath = "/home/hadoop/tmp/combine";
		String outpath = "/home/hadoop/tmp/combineout";
		Path p = new Path(outpath);

		FileSystem fs = FileSystem.get(conf);
		if (fs.exists(p)){
			fs.delete(p);
		}
		FileInputFormat.addInputPaths(job, inpath);
		FileOutputFormat.setOutputPath(job, p);

		return job.waitForCompletion(true) ? 0 : 1;
	} 

	public static void main(String[] args) throws Exception {
		int ret = ToolRunner.run(new TestCombine(), args);
		System.exit(ret);
	}
}
时间: 2024-08-04 01:51:39

简单实现CombineFileInputFormat的相关文章

hadoop old API CombineFileInputFormat

来自:http://f.dataguru.cn/thread-271645-1-1.html 简介 本文主要介绍下面4个方面 1.为什么要使用CombineFileInputFormat 2.CombineFileInputFormat实现原理 3.怎样使用CombineFileInputFormat 4.现存的问题 使用CombineFileInputFormat的目的 在开发MR的程序时,mapper的主要作用是对数据的收集.一般情况下,为了能让mapper更快的运行,我们会对文件进行spl

C# Ping 简单使用

编程过程中,有时候需要判断主机是否在线,最简单的方法就是使用Windows的Ping命令看看能否ping通.看到网上很多文章,说用C#去调用windows的ping.exe,然后解析返回的字符串.我觉得这种方式太麻烦了,就做一下简单判断,不想弄那么麻烦. 查了一下,C#专门提供了一个Ping类,与Windows下的ping命令类似: 命令空间: System.Net.NetworkInformation; 使用方法: bool online = false; //是否在线 Ping ping =

自动生成简单四则运算的C语言程序

该程序是在博客园里面找的,具体是谁的找了半天没找到,无法提供它原本的链接.由于自己写的过于简单,且有一些功能暂时无法实现,所以就找了一个来应付作业,望原谅.在这个程序的源码中我改了一个错误的地方,源码中有这样一个随机数发生器的初始化函数的语句:"srand((unsigned)time(NULL))".srand函数是随机数发生器的初始化函数.但是正确的写法应该是:srand(unsigned( time(NULL))):为了防止随机数每次重复,常常使用系统时间来初始化,即使用time

Mysql的锁机制与PHP文件锁处理高并发简单思路

以购买商品举例: ① 从数据库获取库存的数量. ② 检查一下库存的数量是否充足. ③ 库存的数量减去买家购买的数量(以每个用户购买一个为例). ④ 最后完成购买. 仅仅这几行逻辑代码在并发的情况下会出现问题,自己可以想象一下. 这里暂时就不测试了,下面会针对并发的处理给出测试结果. 创建表: CREATE TABLE `warehouse` ( `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'id', `stock` int(11) NOT NULL

Winfrom 简单的安卓手机屏幕获取和安卓简单操作

为啥我要做这个东西了,是因为经常要用投影演示app ,现在有很多这样的软件可以把手机界面投到电脑上 ,但都要安装,比如说360的手机助手,我又讨厌安装,于是就自己捣鼓了下 做了这个东西, 实现了以下简单功能   1.屏幕获取(因为是截图方式获取的,所以有点卡顿) 2.实现点击功能,并在点击的时候出现一个手势图标,方便用户观看 3.实现简单的滑动功能 4.实现在界面上画图功能 5.实现拖拽安装apk功能 操作说明:鼠标左边 模拟手机点击,中键停止/开始刷新界面(画图的时候不能刷新),右键去掉画图内

iOS instruments之ui automation的简单使用(高手绕道)

最近使用了几次instruments中的automation工具,现记录下automation的简单使用方法,希望对没接触过自动化测试又有需求的人有所帮助.  UI 自动测试是iOS 中重要的附加功能,它由名为"Automation"的新的工具对象支持.Automation工具的脚本是用JavaScript语言编写,主要用于分析应用的性能和用户行为,模仿/击发被请求的事件,利用它可以完成对被测应用的简单的UI测试及相关功能测试. 一. 简单的录制脚本 打开xcode,这里用我为我家亲爱

Android ExpandableListView 带有Checkbox的简单应用

expandablelistview2_groups.xml <?xml version="1.0" encoding="utf-8"?> <RelativeLayout xmlns:android="http://schemas.android.com/apk/res/android" android:layout_width="match_parent" android:layout_height=&qu

Android ExpandableListView的简单应用

Expandablelistview1Activity.java package com.wangzhu.demoexpandablelistview; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import android.app.Activity; import android.os.Bundle; import android.widg

一个简单的主机管理模拟程序

最近写的一个小练习,主要是把前面学的东西整合一下.写了一个简单的主机管理界面,主要是练习以下知识点: Session和Cookie进行登录验证(装饰器) 数据库的基本操作 (单表,1对多,多对多) Form的简单使用实现验证 Bootstrap模板写个简单界面 自定义分页 信号,中间件,CSRF,模板语言,JavaScript,AJAX等等 界面比较low,毕竟不是专业的. 附件里面是Django的源代码,3个文件放在一起winrar解压就可以打开