WholeFileInputFormat 和WholeFileRecordReader合并小文件

如果不希望文件被切分,例如判断文件中记录是否有序,可以让minimumSize值大于最大文件的大小,但是文件的大小不能超过blockSize,或者重写FileInputFormat方法isSplitable()返回为false。下面介绍将多个小文件合成一个大的序列文件的例子:

1)自定义完整文件输入处理类如下:

Public class WholeFileInputFormat extends FileInputFormat<NullWritable, ByteWritable>

{

@override//不得分片

protected boolean isSplitable(JobContext context,Path file){return false;}

@override

public RecordReader<NullWritable,BytesWritable> createRecordReader ( InputSplit split,TaskAttemptContext context )throws IOException,InterruptedException

{

  WholeFileRecordReader reader=new WholeFileRecordReader();

reader.initialize(split,context);

return reader;

}

}

2)自定义完整文件读取类WholeFileRecordReader

WholeFileRecordReader类通过initialize()方法传入文件信息,然后调用nextKeyValue()方法一次性读取整个文件的内容,通过布尔值processed判断是否读取执行过。其他函数都是返回值。将FileSplit转为一条记录,键为null,值为文件内容。

package org.edu.bupt.xiaoye.hadooptest;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.FSDataInputStream;

import org.apache.hadoop.fs.FileSystem;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.BytesWritable;

import org.apache.hadoop.io.IOUtils;

import org.apache.hadoop.io.NullWritable;

import org.apache.hadoop.mapreduce.InputSplit;

import org.apache.hadoop.mapreduce.RecordReader;

import org.apache.hadoop.mapreduce.TaskAttemptContext;

import org.apache.hadoop.mapreduce.lib.input.FileSplit;

/**

* 继承RecordReader

* 该类用来将分片分割成记录,从而生成key和value。例如TextInputFormat中的key和value就是RecordReader的子类产生的。

* 在这里,我们继承了这个类,将重写了key和value的生成算法。对一个分片来说,只生成一个key-value对。其中key为空,value为该分片

* 的所有内容

* @author Xiaoye

*/

public class WholeFileRecordReader extends

RecordReader<NullWritable, BytesWritable> {

// 用来盛放传递过来的分片

private FileSplit fileSplit;

private Configuration conf;

//将作为key-value中的value值返回

private BytesWritable value = new BytesWritable();

// 因为只生成一条记录,所以只需要调用一次。因此第一次调用过后将processed赋值为true,从而结束key和value的生成

private boolean processed = false;

/**

* 设置RecordReader的分片和配置对象。

*/

@Override

public void initialize(InputSplit split, TaskAttemptContext context)

throws IOException, InterruptedException {

this.fileSplit = (FileSplit) split;

this.conf = context.getConfiguration();

}

/**

* 核心算法

* 用来产生key-value值

* 一次读取整个文件内容注入value对象

*/

@Override

public boolean nextKeyValue() throws IOException, InterruptedException {

if (!processed) {

/*

* 注意这儿,fileSplit中只是存放着待处理内容的位置 大小等信息,并没有实际的内容

* 因此这里会通过fileSplit找到待处理文件,然后再读入内容到value中

*/

byte[] contents = new byte[(int) fileSplit.getLength()];

Path file = fileSplit.getPath();

FileSystem fs = file.getFileSystem(conf);

FSDataInputStream in = null;

try {

in = fs.open(file);

IOUtils.readFully(in, contents, 0, contents.length);

value.set(contents, 0, contents.length);

} finally {

IOUtils.closeStream(in);

}

processed = true;

return true;

}

return false;

}

@Override

public NullWritable getCurrentKey() throws IOException,

InterruptedException {

return NullWritable.get();

}

@Override

public BytesWritable getCurrentValue() throws IOException,

InterruptedException {

return value;

}

@Override

public float getProgress() throws IOException, InterruptedException {

return processed ? 1.0f : 0.0f;

}

@Override

public void close() throws IOException {

//do nothing

}

3)将若干个小文件打包成顺序文件的mapreduce作业

通过WholeFileRecordReader类读取所有小文件的内容,以文件名称为输出键,以内容未一条记录,然后合并成一个大的顺序文件。

public class SmallFilesToSequenceFileConverter extends configured implement Tool

{

package com.pzoom.mr.sequence;

import java.io.IOException;

import java.util.Random;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.BytesWritable;

import org.apache.hadoop.io.NullWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.InputSplit;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.input.FileSplit;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;

public class SmallFilesToSequenceFileConverter {

///定义map函数

static class SequenceFileMapper extends Mapper<NullWritable, BytesWritable, Text, BytesWritable> {

private Text filenameKey;

//定义设置文件函数

@Override

protected void setup(Context context) throws IOException,

InterruptedException {

InputSplit split = context.getInputSplit();

Path path = ((FileSplit)split).getPath();

filenameKey = new Text(path.toString());

}

//定义map函数

@Override

protected void map(NullWritable key, BytesWritable value,

Context context) throws IOException, InterruptedException {

context.write(filenameKey, value);

}

//定义run函数

@Override

public int run (String[] args)throws IOException {

Configuration conf = getConf();

if(conf==null)

{

return -1;

}

Job job=JobBuilder.parseInputAndOutput(this,conf,args);

job.setInputFormatClass(WholeFileInputFormat.class);

job.setOutputFormatClass(SequenceFileOutputFormat.class);输出序列file

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(BytesWritable.class);

job.setMapperClass(SequenceFileMapper.class);

return job.waitForCompletion(true)? 0:1;}

//args传入输入输出路径

public static void main(String[] args) throws IOException{

int exitCode=ToolRunner.run(new SmallFilesToSequenceFileConverter(),args);

System.exit(exitCode);

}

}

}

4)执行小文件合并为大文件的命令

各参数含义:采用本地配置文件,两个reduces任务,输入文件夹,输出文件夹

%hadoop jar job.jar SmallFilesToSequenceFileConverter –conf conf/Hadoop-localhost.xml –D mapreduece.job.reduces-2 input/smallfiles output

5)通过命令来查看输出结果

%hadoop fs –conf conf/Hadoop-localhost.xml –text output/part-r-00000

输出结果是以小文件路径为键,以内容为值的合并序列文件

 

自己开发了一个股票智能分析软件,功能很强大,需要的点击下面的链接获取:

https://www.cnblogs.com/bclshuai/p/11380657.html

原文地址:https://www.cnblogs.com/bclshuai/p/12255279.html

时间: 2024-08-19 09:53:29

WholeFileInputFormat 和WholeFileRecordReader合并小文件的相关文章

Hadoop HDFS编程 API入门系列之合并小文件到HDFS(三)

不多说,直接上代码.  代码 package zhouls.bigdata.myWholeHadoop.HDFS.hdfs7; import java.io.IOException;import java.net.URI;import java.net.URISyntaxException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FSDataInputStream;import org.apa

[转载]mapreduce合并小文件成sequencefile

mapreduce合并小文件成sequencefile http://blog.csdn.net/xiao_jun_0820/article/details/42747537

HDFS 实际应用场景合并小文件

合并小文件,存放到HDFS上, 采取在向HDFS复制上传的过程中将小文件进行合并,效果会更好 package org.xueruan.hadoop.hdfs; import java.nio.file.Path; import sun.management.FileSystem; /* * function: merge file while copying and uploading files into HDFS */ public class PutMerge { public stati

Hadoop合并小文件的几种方法

1.Hadoop HAR 将众多小文件打包成一个大文件进行存储,并且打包后原来的文件仍然可以通过Map-Reduce进行操作,打包后的文件由索引和存储两大部分组成: 缺点: 一旦创建就不能修改,也不支持追加操作,还不支持文档压缩,当有新文件进来以后,需要重新打包. 2.SequeuesFile 适用于非文体格式,可作小文件容器,并可压缩: 3.CombineFileInputFormat 将多个文件合并成一个split作为输入,减少map输入与HDFS块的耦合: 4.Java代码实现,使用HDF

Hive merge(小文件合并)

当Hive的输入由很多个小文件组成时,如果不涉及文件合并的话,那么每个小文件都会启动一个map task. 如果文件过小,以至于map任务启动和初始化的时间大于逻辑处理的时间,会造成资源浪费,甚至发生OutOfMemoryError错误. 因此,当我们启动一个任务时,如果发现输入数据量小但任务数量多时,需要注意在Map前端进行输入小文件合并操作. 同理,向一个表写数据时,注意观察reduce数量,注意输出文件大小. 1. Map输入小文件合并 #每个Map处理的最大输入文件大小(256MB) s

HDFS小文件合并问题的优化:copyMerge的改进

1.问题分析 用fsck命令统计 查看HDFS上在某一天日志的大小,分块情况以及平均的块大小,即 [[email protected] jar]$ hadoop fsck /wcc/da/kafka/report/2015-01-11 DEPRECATED: Use of this script to execute hdfs command is deprecated. Instead use the hdfs command for it. 15/01/13 18:57:23 WARN ut

hive压缩之小文件合并

Hive压缩之二 小文件合并 调研背景 当Hive输入由很多个小文件组成,由于每个小文件都会启动一个map任务,如果文件过小,以至于map任务启动和初始化的时间大于逻辑处理的时间,会造成资源浪费,甚至OOM.为此,当我们启动一个任务,发现输入数据量小但任务数量多时,需要注意在Map前端进行输入合并.当然,在我们向一个表写数据时,也需要注意输出文件大小. 输入合并 合并输入小文件,减少map数? 主要的决定因素有: input的文件总个数,input的文件大小,集群设置的文件块大小. 举例: a)

HDFS操作及小文件合并

小文件合并是针对文件上传到HDFS之前 这些文件夹里面都是小文件 参考代码 package com.gong.hadoop2; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import or

hive小文件合并设置参数

Hive的后端存储是HDFS,它对大文件的处理是非常高效的,如果合理配置文件系统的块大小,NameNode可以支持很大的数据量.但是在数据仓库中,越是上层的表其汇总程度就越高,数据量也就越小.而且这些表通常会按日期进行分区,随着时间的推移,HDFS的文件数目就会逐渐增加. 小文件带来的问题 关于这个问题的阐述可以读一读Cloudera的这篇文章.简单来说,HDFS的文件元信息,包括位置.大小.分块信息等,都是保存在NameNode的内存中的.每个对象大约占用150个字节,因此一千万个文件及分块就