hadoop多文件输出

在旧的API中使用多文件输出,只需要自定义类继承MultipleTextOutputFormat类 重写它下面的generateFileNameForKeyValue 方法即可, 直接上例子。

输入文件 内容:

目的是按照 字母开头的文件输出,并统计单词计数,输出结果为:

代码如下:

package defined;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Iterator;

/**
 * User: XD
 */
public class test {
	static final String INPUT_PATH = "hdfs://localhost:9000/input";
	static final Path OUTPUT_PATH = new Path("hdfs://localhost:9000/output");

    public static class MapClass extends MapReduceBase implements Mapper<LongWritable, Text, Text,  LongWritable> {

        @Override
        public void map(LongWritable key, Text value,  OutputCollector<Text, LongWritable> output, Reporter reporter) throws IOException {
        	final String[] splited = value.toString().split(" ");
    		for(String val : splited){
    			output.collect(new Text(val), new LongWritable(1));
    			}
        }
    }
    public static class ReduceClass extends MapReduceBase implements Reducer<Text, LongWritable, Text,  LongWritable> {

    	@Override
public void reduce(Text key, Iterator<LongWritable> values,
		OutputCollector<Text, LongWritable> collect, Reporter arg3)
		throws IOException {
	// TODO Auto-generated method stub
	long sum = 0L;
	while(values.hasNext()){
		sum += values.next().get();
	}
	collect.collect(key, new LongWritable(sum));
}
}

    public static class PartitionFormat extends MultipleTextOutputFormat<Text, LongWritable> {
    	@Override
		protected String generateFileNameForKeyValue(Text key , LongWritable value,String name){
			char c = key.toString().toLowerCase().charAt(0);
			if(c>='a' && c<='z'){
				return c+".txt";
			}else{
				return "other.txt";
			}
		}
    }

    public static void main(String[] args) throws IOException, URISyntaxException {
        Configuration conf = new Configuration();
        JobConf job = new JobConf(conf, test.class);
        final FileSystem filesystem = FileSystem.get(new URI(INPUT_PATH),conf);
		final Path outPath = OUTPUT_PATH;
		if(filesystem.exists(outPath)){
			filesystem.delete(outPath, true);
		}

		//1.1 读取文件 位置
		FileInputFormat.setInputPaths(job, INPUT_PATH);
		//输出文件位置
        FileOutputFormat.setOutputPath(job, OUTPUT_PATH);

        job.setJobName("Multipleoutput");

        job.setMapperClass(MapClass.class);
        job.setReducerClass(ReduceClass.class);

        job.setInputFormat(TextInputFormat.class);
        job.setOutputFormat(PartitionFormat.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(LongWritable.class);

        job.setNumReduceTasks(1);
        JobClient.runJob(job);
    }
}

但是在新的api中,就不能像上面那样操作,需要自定义MultipleOutputormat类,在重写generateFileNameForKeyValue 方法,似乎难度较大,在此 给出一个简单的操作,使用org.apache.hadoop.mapred.lib.MultipleOutputs,也是直接上例子:

输入:

还是统计输出到不同的文件。

输出结果:

结果是dest-r-00000文件下

代码:

package wordcount;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;

public class wordcount {

	/**
	 * @param args
	 */
	static final String INPUT_PATH = "hdfs://localhost:9000/input";
	static final String OUTPUT_PATH = "hdfs://localhost:9000/output";

	public static class Map extends Mapper<LongWritable , Text , Text , LongWritable>{
		protected void map(LongWritable key,Text value,Context context) throws IOException, InterruptedException{
		final String[] splited = value.toString().split(" ");
		for(String val : splited){
			context.write(new Text(val), new LongWritable(1));
			}
		}
	}

	public static class Reduce extends Reducer<Text ,LongWritable, Text , LongWritable>{
		private MultipleOutputs<Text,LongWritable> mos;
		String dest;
		protected  void setup(Context context){
			mos = new MultipleOutputs<Text, LongWritable>(context);
		}
		protected void reduce (Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException{
			long sum = 0L;
			char c = key.toString().toLowerCase().charAt(0);
			for(LongWritable val : values){
				sum += val.get();
			}
			if(c>='a' && c<='z'){
				mos.write("dest", key, new LongWritable(sum));
			}else{
				mos.write("other", key, new LongWritable(sum));
			}
			context.write(key, new LongWritable(sum));
		}
		protected  void cleanup(Context context) throws IOException, InterruptedException{
			mos.close();
		}
	}

	public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException {
		// TODO Auto-generated method stub
		Configuration conf = new Configuration();

		final FileSystem filesystem = FileSystem.get(new URI(INPUT_PATH),conf);
		final Path outPath = new Path(OUTPUT_PATH);
		if(filesystem.exists(outPath)){
			filesystem.delete(outPath, true);
		}
		Job job = new Job(conf,wordcount.class.getSimpleName());

		//1.1 读取文件 位置
		FileInputFormat.setInputPaths(job, INPUT_PATH);

		//1.2指定的map类//1.3 map输出的key value 类型 要是和最终的输出类型是一样的 可以省略
		job.setMapperClass(Map.class);
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(LongWritable.class);
		job.setJarByClass(wordcount.class);

		//1.3 分区
		job.setPartitionerClass(HashPartitioner.class);

		//1.4分组

		//1.5 归约

		//2.1 copy 经由网络

		//2.2 指定自定义的reduce类
		job.setReducerClass(Reduce.class);
		//指定 reduce的输出类型
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(LongWritable.class);

		//2.3指定写出到什么位置
		FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH));

		MultipleOutputs.addNamedOutput(job, "dest", TextOutputFormat.class, Text.class, LongWritable.class);
		MultipleOutputs.addNamedOutput(job, "other", TextOutputFormat.class, Text.class, LongWritable.class);
		//提交到jobtracker执行。  此函数还将会打印出作业执行的详细信息
		job.waitForCompletion(true);

	}

}

由于懒惰,没有书写更好的例子,只是简单的介绍,并且并没有将路径写成通用的,读者可自行书写,如有更好的解决,还请大家不吝赐教,小弟拜谢!!

时间: 2024-10-19 14:17:08

hadoop多文件输出的相关文章

Hadoop基于文件的数据结构及实例

基于文件的数据结构 两种文件格式: 1.SequenceFile 2.MapFile SequenceFile 1.SequenceFile文件是Hadoop用来存储二进制形式的<key,value>对而设计的一种平面文件(Flat File). 2.能够把SequenceFile当做一个容器,把全部文件打包到SequenceFile类中能够高效的对小文件进行存储和处理. 3.SequenceFile文件并不依照其存储的key进行排序存储.SequenceFile的内部类Writer**提供了

hadoop streaming 多路输出 [转载]

转载 http://www.cnblogs.com/shapherd/archive/2012/12/21/2827860.html hadoop 支持reduce多路输出的功能,一个reduce可以输出到多个part-xxxxx-X文件中,其中X是A-Z的字母之一,程序在输出<key,value>对的时候,在value的后面追加"#X"后缀,比如#A,输出的文件就是part-00000-A,不同的后缀可以把key,value输出到不同的文件中,方便做输出类型分类, #X仅

大数据-Hadoop小文件问题解决方案

HDFS中小文件是指文件size小于HDFS上block(dfs block size)大小的文件.大量的小文件会给Hadoop的扩展性和性能带来严重的影响.HDFS中小文件是指文件size小于HDFS上block大小的文件.大量的小文件会给Hadoop的扩展性和性能带来严重的影响. 大数据学习群:716581014 小文件是如何产生的? 动态分区插入数据,产生大量的小文件,从而导致map数量剧增 reduce数量越多,小文件也越多,reduce的个数和输出文件个数一致 数据源本身就是大量的小文

更改gradle的java的class文件输出目录的结构

gradle虽然也是一个convention over configuration的工具. gradle是一个新的构建工作可以更改它的convention. //指定gradle的class文件输出目录,同时去掉java的文件夹 output.classesDir = 'WebContent/WEB-INF/classes'

Hadoop的文件读写操作流程

以下主要讲解了Hadoop的文件读写操作流程: 读文件 读文件时内部工作机制参看下图: 客户端通过调用FileSystem对象(对应于HDFS文件系统,调用DistributedFileSystem对象)的open()方法来打开文件(也即图中的第一步),DistributedFileSystem通过RPC(Remote Procedure Call)调用询问NameNode来得到此文件最开始几个block的文件位置(第二步).对每一个block来说,namenode返回拥有此block备份的所有

向文件输出数据的输出字节流

1 package com.outputstream; 2 3 import java.io.File; 4 import java.io.FileInputStream; 5 import java.io.FileNotFoundException; 6 import java.io.FileOutputStream; 7 import java.io.IOException; 8 import java.io.InputStream; 9 import java.util.Arrays; 1

MpLab设置编译文件输出路径

MpLab设置编译文件输出路径

将jar文件输出字节流

使用fat jar  将一个工程打包后,使用以下代码 将jar输出为字节流 public final static byte[] findJarBytes(String path){ File file = new File(path); try{ FileInputStream fis = new FileInputStream(file); JarInputStream jis = new JarInputStream(fis); Manifest manifest = jis.getMan

Hadoop HDFS文件常用操作及注意事项

1.Copy a file from the local file system to HDFS The srcFile variable needs to contain the full name (path + file name) of the file in the local file system. The dstFile variable needs to contain the desired full name of the file in the Hadoop file s