Hadoop中SequenceFile的使用

1.对于某些应用而言,需要特殊的数据结构来存储自己的数据。对于基于MapReduce的数据处理,将每个二进制数据的大对象融入自己的文件中并不能实现很高的可扩展性,针对上述情况,Hadoop开发了一组更高层次的容器SequenceFile。

2. 考虑日志文件,其中每一条日志记录是一行文本。如果想记录二进制类型,纯文本是不合适的。这种情况下,Hadoop的SequenceFile类非常合适,因为上述提供了二进制键/值对的永久存储的数据结构。当作为日志文件的存储格式时,可以自己选择键,比如由LongWritable类型表示的时间戳,以及值可以是Writable类型,用于表示日志记录的数量。SequenceFile同样为可以作为小文件的容器。而HDFS和
MapReduce是针对大文件进行优化的,所以通过SequenceFile类型将小文件包装起来,可以获得更高效率的存储和处理。

3. SequenceFile类内部有两个比较主要的内部类分别是SequenceFile.Reader和SequenceFile.Writer

SequenceFile.Reader

通过createWriter()静态方法可以创建SequenceFile对象,并返SequenceFile.Writer实例。该静态方法有多个重载版本,但都需要指定待写入的数据流(FSDataOutputStream或FileSystem对象和Path对象),Configuration对象,以及键和值的类型。另外可选参数包括压缩类型以及相应的codec,Progressable回调函数用于通知写入的进度,以及在SequenceFile头文件中存储的Metadata实例。存储在SequenceFile中的键和值对并不一定是Writable类型。任意可以通过Serialization类实现序列化和反序列化的类型均可被使用。一旦拥有SequenceFile.Writer实例,就可以通过append()方法在文件末尾附件键/值对。

SequenceFile.Writer

创建SequenceFile.Writer可以通过调用本身的构造函数 SequenceFile.Reader(FileSystem fs, Path file, Configuration conf) 来构造实例对象,从头到尾读取顺序文件的过程是创建SequenceFile.Reader实例后反复调用next()方法迭代读取记录的过程。读取的是哪条记录与你使用的序列化框架相关。如果使用的是Writable类型,那么通过键和值作为参数的Next()方法可以将数据流中的下一条键值对读入变量中:

public boolean next(Writable key,Writable val),如果键值对成功读取,则返回true,如果已读到文件末尾,则返回false。具体示例代码如下所示:

import
java.io.IOException;

import java.net.URI;

import java.util.Random;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.FileSystem;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.SequenceFile;

import org.apache.hadoop.io.Text;

public class sequence {

/**

* @param args

*/

public static  FileSystem fs;

public static final String Output_path="/home/hadoop/test/A.txt";

public static Random random=new Random();

private static final String[] DATA={

"One,two,buckle my shoe",

"Three,four,shut the door",

"Five,six,pick up sticks",

"Seven,eight,lay them straight",

"Nine,ten,a big fat hen"

};

public static Configuration conf=new Configuration();

public static void write(String pathStr) throws IOException{

Path path=new Path(pathStr);

FileSystem fs=FileSystem.get(URI.create(pathStr), conf);

SequenceFile.Writer writer=SequenceFile.createWriter(fs, conf, path, Text.class, IntWritable.class);

Text key=new Text();

IntWritable value=new IntWritable();

for(int i=0;i<DATA.length;i++){

key.set(DATA[i]);

value.set(random.nextInt(10));

System.out.println(key);

System.out.println(value);

System.out.println(writer.getLength());

writer.append(key, value);

}

writer.close();

}

public static void read(String pathStr) throws IOException{

FileSystem fs=FileSystem.get(URI.create(pathStr), conf);

SequenceFile.Reader reader=new SequenceFile.Reader(fs, new Path(pathStr), conf);

Text key=new Text();

IntWritable value=new IntWritable();

while(reader.next(key, value)){

System.out.println(key);

System.out.println(value);

}

}

public static void main(String[] args) throws IOException {

// TODO Auto-generated method stub

write(Output_path);

read(Output_path);

}

}

如果需要在mapreduce中进行SequenceFile的读取和写入,则需要到SequcenFileInputFormat和SequenceFileOutputFormat,示例代码如下所示:

1)输出格式为SequenceFileOutputFormat

public class SequenceFileOutputFormatDemo extends Configured implements Tool {

public static class SequenceFileOutputFormatDemoMapper extends

Mapper<LongWritable, Text, LongWritable, Text> {

public void map(LongWritable key, Text value, Context context)

throws IOException, InterruptedException {

context.write(key, value);

}

}

public static void main(String[] args) throws Exception {

int nRet = ToolRunner.run(new Configuration(),

new SequenceFileOutputFormatDemo(), args);

System.out.println(nRet);

}

@Override

public int run(String[] args) throws Exception {

// TODO Auto-generated method stub

Configuration conf = getConf();

Job job = new Job(conf, "sequence file output demo ");

job.setJarByClass(SequenceFileOutputFormatDemo.class);

FileInputFormat.addInputPaths(job, args[0]);

HdfsUtil.deleteDir(args[1]);

job.setMapperClass(SequenceFileOutputFormatDemoMapper.class);

// 因为没有reducer,所以map的输出为job的最后输出,所以需要把outputkeyclass

// outputvalueclass设置为与map的输出一致

job.setOutputKeyClass(LongWritable.class);

job.setOutputValueClass(Text.class);

// 如果不希望有reducer,设置为0

job.setNumReduceTasks(0);

// 设置输出类

job.setOutputFormatClass(SequenceFileOutputFormat.class);

// 设置sequecnfile的格式,对于sequencefile的输出格式,有多种组合方式,

//从下面的模式中选择一种,并将其余的注释掉

// 组合方式1:不压缩模式

SequenceFileOutputFormat.setOutputCompressionType(job,

CompressionType.NONE);

// 组合方式2:record压缩模式,并指定采用的压缩方式 :默认、gzip压缩等

//        SequenceFileOutputFormat.setOutputCompressionType(job,

//                CompressionType.RECORD);

//        SequenceFileOutputFormat.setOutputCompressorClass(job,

//                DefaultCodec.class);

// 组合方式3:block压缩模式,并指定采用的压缩方式 :默认、gzip压缩等

//        SequenceFileOutputFormat.setOutputCompressionType(job,

//                CompressionType.BLOCK);

//        SequenceFileOutputFormat.setOutputCompressorClass(job,

//                DefaultCodec.class);

SequenceFileOutputFormat.setOutputPath(job, new Path(args[1]));

int result = job.waitForCompletion(true) ? 0 : 1;

return result;

}

}

2)输入格式为SequcenFileInputFormat 

public class SequenceFileInputFormatDemo extends Configured implements Tool {

public static class SequenceFileInputFormatDemoMapper extends

Mapper<LongWritable, Text, Text, NullWritable> {

public void map(LongWritable key, Text value, Context context)

throws IOException, InterruptedException {

System.out.println("key:   " + key.toString() + "  ;  value: "

+ value.toString());

}

}

public static void main(String[] args) throws Exception {

int nRet = ToolRunner.run(new Configuration(),

new SequenceFileInputFormatDemo(), args);

System.out.println(nRet);

}

@Override

public int run(String[] args) throws Exception {

Configuration conf = getConf();

Job job = new Job(conf, "sequence file input demo");

job.setJarByClass(SequenceFileInputFormatDemo.class);

FileInputFormat.addInputPaths(job, args[0]);

HdfsUtil.deleteDir(args[1]);

FileOutputFormat.setOutputPath(job, new Path(args[1]));

job.setMapperClass(SequenceFileInputFormatDemoMapper.class);

job.setNumReduceTasks(1);

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(NullWritable.class);

job.setMapOutputKeyClass(Text.class);

job.setMapOutputValueClass(Text.class);

job.setInputFormatClass(SequenceFileInputFormat.class);

int result = job.waitForCompletion(true) ? 0 : 1;

return result;

}

}

或者读取的时候也可以如下面的方式进行读取,但是此时输出格式就为普通FileOutputFormat了,输入格式也为普通FileInputFormat了。示例代码如下面所示 :

public class MapReduceReadFile {

private static SequenceFile.Reader reader = null;

private static Configuration conf = new Configuration();

public static class ReadFileMapper extends

Mapper<LongWritable, Text, LongWritable, Text> {

/* (non-Javadoc)

* @see org.apache.hadoop.mapreduce.Mapper#map(KEYIN, VALUEIN, org.apache.hadoop.mapreduce.Mapper.Context)

*/

@Override

public void map(LongWritable key, Text value, Context context) {

key = (LongWritable) ReflectionUtils.newInstance(

reader.getKeyClass(), conf);

value = (Text) ReflectionUtils.newInstance(

reader.getValueClass(), conf);

try {

while (reader.next(key, value)) {

System.out.printf("%s\t%s\n", key, value);

context.write(key, value);

}

} catch (IOException e1) {

e1.printStackTrace();

} catch (InterruptedException e) {

e.printStackTrace();

}

}

}

/**

* @param args

* @throws IOException

* @throws InterruptedException

* @throws ClassNotFoundException

*/

public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {

Job job = new Job(conf,"read seq file");

job.setJarByClass(MapReduceReadFile.class);

job.setMapperClass(ReadFileMapper.class);

job.setMapOutputValueClass(Text.class);

Path path = new Path("logfile2");

FileSystem fs = FileSystem.get(conf);

reader = new SequenceFile.Reader(fs, path, conf);

FileInputFormat.addInputPath(job, path);

FileOutputFormat.setOutputPath(job, new Path("result"));

System.exit(job.waitForCompletion(true)?0:1);

}

}

时间: 2025-01-10 04:50:13

Hadoop中SequenceFile的使用的相关文章

Hadoop 中SequenceFile的简介

概念 SequenceFile是一个由二进制序列化过的key/value的字节流组成的文本存储文件,它可以在map/reduce过程中的input/output 的format时被使用.在map/reduce过程中,map处理文件的临时输出就是使用SequenceFile处理过的. 所以一般的SequenceFile均是在FileSystem中生成,供map调用的原始文件. 特点 SequenceFile是 Hadoop 的一个重要数据文件类型,它提供key-value的存储,但与传统key-v

Hadoop之SequenceFile

Hadoop序列化文件SequenceFile可以用于解决大量小文件(所谓小文件:泛指小于black大小的文件)问题,SequenceFile是Hadoop API提供的一种二进制文件支持.这种二进制文件直接将<key,value>对序列化到文件中,一般对小文件可以使用这种文件合并,即将文件名作为key,文件内容作为value序列化到大文件中. hadoop Archive也是一个高效地将小文件放入HDFS块中的文件存档文件格式,详情请看:hadoop Archive 但是SequenceFi

[转] - hadoop中使用lzo的压缩

在hadoop中使用lzo的压缩算法可以减小数据的大小和数据的磁盘读写时间,不仅如此,lzo是基于block分块的,这样他就允许数据被分解成chunk,并行的被hadoop处理.这样的特点,就可以让lzo在hadoop上成为一种非常好用的压缩格式. lzo本身不是splitable的,所以当数据为text格式时,用lzo压缩出来的数据当做job的输入是一个文件作为一个map.但是sequencefile本身是分块的,所以sequencefile格式的文件,再配上lzo的压缩格式,就可实现lzo文

hadoop中Configuration类剖析

Configuration是hadoop中五大组件的公用类,所以放在了core下,org.apache.hadoop.conf.Configruration.这个类是作业的配置信息类,任何作用的配置信息必须通过Configuration传递,因为通过Configuration可以实现在多个mapper和多个reducer任务之间共享信息. 类图 说明:Configuration实现了Iterable和Writable两个接口,其中实现Iterable是为了迭代,迭代出Configuration对

Hadoop中作业(job)、任务(task)和task attempt

hadoop中,MapReduce作业(job)ID的格式为job_201412081211_0002.这表示该作业是第二个作业(作业号从0001开始),作业开始于2014年12月8号12:11. 任务(task)属于作业,通过使用"task"替换作业ID的"job"前缀,然后在后面加上一个后缀表示哪个作业中间的任务.例如:task_201412081211_0002_m_000003,表示该任务属于job_201412081211_0002作业的第三个map任务(

hadoop中Text类 与 java中String类的区别

hadoop 中 的Text类与java中的String类感觉上用法是相似的,但两者在编码格式和访问方式上还是有些差别的,要说明这个问题,首先得了解几个概念: 字符集: 是一个系统支持的所有抽象字符的集合.字符是各种文字和符号的总称,包括各国家文字.标点符号.图形符号.数字等.例如 unicode就是一个字符集,它的目标是涵盖世界上所有国家的文字和符号: 字符编码:是一套法则,使用该法则能够对自然语言的字符的一个集合(如字母表或音节表),与其他东西的一个集合(如号码或电脉冲)进行配对.即在符号集

再次整理关于hadoop中yarn的原理及运行

关于hadoop中yarn的运行原理整理 一:对yarn的理解 1.关于yarn的组成 大约分成主要的四个. Resourcemanager,Nodemanager,Applicationmaster,container 2.Resourcemanager(RM)的理解 RM是全局资源管理器,负责整个系统的资源管理和分配. 主要由两个组件组成:调度器和应用程序管理器(ASM) 调度器:根据容量,队列等限制条件,将系统中的资源分配给各个正在运行的应用程序,不负责具体应用程序的相关工作,比如监控或跟

Hadoop 中利用 mapreduce 读写 mysql 数据

Hadoop 中利用 mapreduce 读写 mysql 数据 有时候我们在项目中会遇到输入结果集很大,但是输出结果很小,比如一些 pv.uv 数据,然后为了实时查询的需求,或者一些 OLAP 的需求,我们需要 mapreduce 与 mysql 进行数据的交互,而这些特性正是 hbase 或者 hive 目前亟待改进的地方. 好了言归正传,简单的说说背景.原理以及需要注意的地方: 1.为了方便 MapReduce 直接访问关系型数据库(Mysql,Oracle),Hadoop提供了DBInp

解决hadoop中 bin/hadoop fs -ls ls: `.&#39;: No such file or directory问题

出现这样的问题确实很苦恼...使用的是2.7版本..一般论坛上的都是1.x的教程,搞死人 在现在的2.x版本上的使用bin/hadoop fs -ls  /就有用 应该使用绝对路径就不会有问题....mkdir也是一样的..具体原因不知,我使用相对路径会出现错误.... 解决hadoop中 bin/hadoop fs -ls ls: `.': No such file or directory问题