Combine small files to sequence file or avro files are a good method to feed hadoop.
Small files in hadoop will take more namenode memory resource.
SequenceFileInputFormat 是一种Key value 格式的文件格式。
Key和Value的类型可以自己实现其序列化和反序列化内容。
以下的代码仅供参考作用,真实的项目中使用的时候,可以做适当的调整,以更高地节约资源和满足项目的需要。
示例代码如下:
package myexamples; import java.io.File; import java.io.IOException; import java.util.HashMap; import java.util.Map; import org.apache.commons.io.FileUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Text; public class localf2seqfile { /* * Local folder has a lot of txt file * we need handle it in map reduce * so we want to load these files to one sequence file * key: source file name * value: file content * */ static void write2Seqfile(FileSystem fs,Path hdfspath,HashMap<Text,Text> hm) { SequenceFile.Writer writer=null; try { writer=SequenceFile.createWriter(fs, fs.getConf(), hdfspath, Text.class, Text.class); for(Map.Entry<Text,Text> entry:hm.entrySet()) writer.append(entry.getKey(),entry.getValue()); } catch (IOException e) { e.printStackTrace(); }finally{ try{writer.close();}catch(IOException ioe){} } } static HashMap<Text,Text> collectFiles(String localpath) throws IOException { HashMap<Text,Text> hm = new HashMap<Text,Text>(); File f = new File(localpath); if(!f.isDirectory()) return hm; for(File file:f.listFiles()) hm.put(new Text(file.getName()), new Text(FileUtils.readFileToString(file))); return hm; } static void readSeqFile(FileSystem fs, Path hdfspath) throws IOException { SequenceFile.Reader reader = new SequenceFile.Reader(fs,hdfspath,fs.getConf()); Text key = new Text(); Text value = new Text(); while (reader.next(key, value)) { System.out.print(key +" "); System.out.println(value); } reader.close(); } public static void main(String[] args) throws IOException { args = "/home/hadoop/test/sub".split(" "); Configuration conf = new Configuration(); conf.set("fs.default.name", "hdfs://namenode:9000"); FileSystem fs = FileSystem.get(conf); System.out.println(fs.getUri()); Path file = new Path("/user/hadoop/seqfiles/seqdemo.seq"); if (fs.exists(file)) fs.delete(file,false); HashMap<Text,Text> hm = collectFiles(args[0]); write2Seqfile(fs,file,hm); readSeqFile(fs,file); fs.close(); } }
时间: 2024-10-22 13:53:45