MR中使用sequnceFIle输入文件

转换原始数据为块压缩的SequenceFIle

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import com.hadoop.compression.lzo.LzoCodec;

public class ToSeqFile extends Configured implements Tool {
    @Override
    public int run(String[] arg0) throws Exception {
        Job job = new Job();
        job.setJarByClass(getClass());
        Configuration conf=getConf();
        FileSystem fs = FileSystem.get(conf);

        FileInputFormat.setInputPaths(job, "/home/hadoop/tmp/tmplzo.txt");
        Path outDir=new Path("/home/hadoop/tmp/tmplzo.out");
        fs.delete(outDir,true);
        FileOutputFormat.setOutputPath(job, outDir);

        //job.setMapperClass(IndentityMapper);
        job.setNumReduceTasks(0);
        job.setOutputKeyClass(LongWritable.class);
        job.setOutputValueClass(Text.class);
        //设置OutputFormat为SequenceFileOutputFormat
        job.setOutputFormatClass(SequenceFileOutputFormat.class);
        //允许压缩
         SequenceFileOutputFormat.setCompressOutput(job, true);
         //压缩算法为gzip
         SequenceFileOutputFormat.setOutputCompressorClass(job, LzoCodec.class);
        //压缩模式为BLOCK
         SequenceFileOutputFormat.setOutputCompressionType(job, CompressionType.BLOCK);

        return job.waitForCompletion(true)?0:1;
    }

    public static void main(String[] args) throws Exception {
        int res = ToolRunner.run(new Configuration(), new ToSeqFile(), args);
        System.exit(res);
    }
}

MR处理压缩后的sequenceFile

import org.apache.hadoop.io.Text;

import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.util.Iterator;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.*;
import org.apache.hadoop.mapreduce.ContextFactory;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MapFileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
//import org.apache.hadoop.mapred.DeprecatedLzoTextInputFormat;

import com.hadoop.compression.lzo.LzoCodec;
import com.hadoop.mapreduce.LzoTextInputFormat;

public class compress extends Configured implements Tool {
	private static final Log log = LogFactory.getLog(compress.class);

	private static class ProvinceMapper extends
			Mapper<Object, Text, Text, Text> {
		@Override
		protected void map(Object key, Text value, Context context)
				throws IOException, InterruptedException {
			//System.out.println(value);

			// InputSplit inputSplit = context.getInputSplit();
			//String fileName = ((FileSplit) inputSplit).getPath().toString();

			//System.out.println(fileName);
			context.write(value, value);
		}
	}

	private static class ProvinceReducer extends
			Reducer<Text, Text, Text, Text> {
		@Override
		protected void reduce(Text key, Iterable<Text> values, Context context)
				throws IOException, InterruptedException {
			for (Text va : values) {
				// System.out.println("reduce " + key);
				context.write(key, key);
			}
		}
	}

	public static void main(String[] args) throws Exception {
		ToolRunner.run(new Configuration(), new compress(), args);
	}

	public static final String REDUCES_PER_HOST = "mapreduce.sort.reducesperhost";

	@Override
	public int run(String[] args) throws Exception {
		log.info("我的服务查询开始.....................................");

		long beg = System.currentTimeMillis();
		int result = 0;
		Configuration conf = new Configuration();

		conf.set(
				"io.compression.codecs",
				"org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.GzipCodec,com.hadoop.compression.lzo.LzopCodec");
		conf.set("io.compression.codec.lzo.class",
				"com.hadoop.compression.lzo.LzoCodec");

		conf.setBoolean("mapreduce.map.output.compress", true);
	    conf.setClass("mapreduce.map.output.compression.codec", SnappyCodec.class, CompressionCodec.class);
	   // conf.setBoolean("mapreduce.output.fileoutputformat.compress", true); // 是否压缩输出
	    conf.setClass("mapreduce.output.fileoutputformat.compress.codec", SnappyCodec.class, CompressionCodec.class);

		String[] argArray = new GenericOptionsParser(conf, args)
				.getRemainingArgs();

		if (argArray.length != 2) {
			System.err.println("Usage: compress <in> <out>");
			System.exit(1);
		}

		// Hadoop总共有5个Job.java
		// /hadoop-2.0.0-cdh4.5.0/src/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java
		Job job = new Job(conf, "compress");
		job.setJarByClass(compress.class);
		job.setMapperClass(ProvinceMapper.class);
		job.setReducerClass(ProvinceReducer.class);
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(Text.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(Text.class);

		 //job.setInputFormatClass(LzoTextInputFormat.class); // TextInputFormat
		// MyFileinput

		// 使用lzo索引文件作为输入文件
		// job.setInputFormatClass(LzoTextInputFormat.class);
		job.setInputFormatClass(SequenceFileInputFormat.class);

		// SequenceFileOutputFormat.set(job, LzoCodec.class);

		// 测试块大小
		// FileInputFormat.setMinInputSplitSize(job, 150*1024*1024);
		// FileInputFormat.setMinInputSplitSize(job, 301349250);
		// FileInputFormat.setMaxInputSplitSize(job, 10000);

		// 推测执行的开关 另外还有针对map和reduce的对应开关
		// job.setSpeculativeExecution(false);
		FileInputFormat.addInputPath(job, new Path(argArray[0]));
		FileOutputFormat.setOutputPath(job, new Path(argArray[1]));

		String uri = argArray[1];
		Path path = new Path(uri);
		FileSystem fs = FileSystem.get(URI.create(uri), conf);
		if (fs.exists(path)) {
			fs.delete(path);
		}

		result = job.waitForCompletion(true) ? 0 : 1;

//		try {
//			result = job.waitForCompletion(true) ? 0 : 1;
//		} catch (ClassNotFoundException | InterruptedException e) {
//			e.printStackTrace();
//		}
		long end = (System.currentTimeMillis() -beg) ;
        System.out.println("耗时:" + end);
		return result;
	}
}

测试结果

文件大小 544M(未使用任何压缩)
耗时:73805

使用 seqencefile(block使用lzo压缩, 中间结果使用snappy压缩)

44207s

时间: 2024-08-12 16:11:17

MR中使用sequnceFIle输入文件的相关文章

MR中的combiner和partitioner

1.combiner combiner是MR编程模型中的一个组件: 有些任务中map可能会产生大量的本地输出,combiner的作用就是在map端对输出先做一次合并,以减少map和reduce节点之间的数据传输量,提高网络IO性能,是MR的优化手段之一: 两大基本功能: 1.1map的输出的key的聚合,对map输出的key排序.value进行迭代: 1.2reduce功能. 并不是设置了combiner就一定会执行(在当前集群非常繁忙的时候设置了也不会执行): combiner的执行时机:co

总结的MR中连接操作

1 reduce side join在map端加上标记, 在reduce容器保存,然后作笛卡尔积缺点: 有可能oom 2 map side join  2.1 利用内存和分布式缓存,也有oom风险 2.2 自己的想法,参考hive桶的思路,  第一次MR,将两个文件相同的方法分文件输出并打上标记,排序输出,尽可能分的比较均匀  第二次MR 改写输入方法,将相同标记的多个分片打包传给map, 改写读的方法,用数组或者列表保存文件流,利用多个文件多路归并思想,使整个大分片按照有序的方法传给map,M

MR中简单实现自定义的输入输出格式

import java.io.DataOutput; import java.io.IOException; import java.util.HashMap; import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FSDataOutputStream; import org.a

MR操作

MR操作————Map.Partitioner.Shuffle.Combiners.Reduce 1.Map步骤 1.1 读取输入文件,解析成k-v对,其中每个k-v对调用一次map函数 1.2 写自己的逻辑,对输入的k-v进行处理,转换成新的k-v 1.3 对输出的k-v进行分区(Partitioner) 1.4 对不同分区的数据进行排序/分组,将相同的key的value放在一个集合中(Shuffle处理) 1.5 分组后进行归约(可选)(Combiners 可理解为单个节点的reduce 不

详述执行map reduce 程序的步骤(本地执行MR、服务器上执行MR)

MR程序的执行环境有两种:本地测试环境.服务器环境. 1.本地环境执行MR程序的步骤: (1)在windows下配置hadoop的环境变量 (2)拷贝debug工具(winutils)到HADOOP_HOME/bin (3)从源码中拷贝org.apache.hadoop.io.nativeio.NativeIO.java到我们的mr的src目录下,修改NativeIO.java.(大家可去http://download.csdn.net/detail/u013226462/9516657下载.)

MR基本的运作流程

MapReduce的核心是:分而治之,并行处理:以及其调度和处理数据的自动化. MR中主要是Map和Reduce两个阶段,其中基本流程是: 1.mr的数据处理单位是一个split,一个split对应一个map任务,处理时会有多个map任务同时运行:当map从HDFS上读取一个split时,这里会有"移动计算,不移动数据"的机制来减少网络的数据传输,使得效率能最大化: 2.获取到split时,默认会以TextInputFormat的格式读入,文件中的字符位置的偏移量作为 key,以及每一

mr实现pagerank

PageRank计算什么是pagerankPageRank是Google专有的算法,用于衡量特定网页相对于搜索引擎索引中的其他网页而言的重要程度.是Google创始人拉里·佩奇和谢尔盖·布林于1997年创造的PageRank实现了将链接价值概念作为排名因素. PageRank计算算法原理(1)入链 ====投票PageRank让链接来"投票",到一个页面的超链接相当于对该页投一票.入链数量如果一个页面节点接收到的其他网页指向的入链数量越多,那么这个页面越重要.入链质量指向页面A的入链质

深入浅出数据仓库中SQL性能优化之Hive篇

转自:http://www.csdn.net/article/2015-01-13/2823530 一个Hive查询生成多个Map Reduce Job,一个Map Reduce Job又有Map,Reduce,Spill,Shuffle,Sort等多个阶段,所以针对Hive查询的优化可以大致分为针对MR中单个步骤的优化(其中又会有细分),针对MR全局的优化,和针对整个查询(多MR Job)的优化,下文会分别阐述. 在开始之前,先把MR的流程图帖出来(摘自Hadoop权威指南),方便后面对照.另

数据仓库中的 SQL 性能优化(Hive篇)

一个Hive查询生成多个map reduce job,一个map reduce job又有map,reduce,spill,shuffle,sort等多个阶段,所以针对hive查询的优化可以大致分为针对MR中单个步骤的优化(其中又会有细分),针对MR全局的优化,和针对整个查询(多MR job)的优化,下文会分别阐述. 在开始之前,先把MR的流程图帖出来(摘自Hadoop权威指南),方便后面对照.另外要说明的是,这个优化只是针对Hive 0.9版本,而不是后来Hortonwork发起Stinger