Hadoop之SequenceFile

Hadoop序列化文件SequenceFile可以用于解决大量小文件(所谓小文件:泛指小于black大小的文件)问题,SequenceFile是Hadoop API提供的一种二进制文件支持。这种二进制文件直接将<key,value>对序列化到文件中,一般对小文件可以使用这种文件合并,即将文件名作为key,文件内容作为value序列化到大文件中。

hadoop Archive也是一个高效地将小文件放入HDFS块中的文件存档文件格式,详情请看:hadoop Archive

但是SequenceFile文件不能追加写入,适用于一次性写入大量小文件的操作。

SequenceFile的压缩基于CompressType,请看源码:

  /**
   * The compression type used to compress key/value pairs in the
   * {@link SequenceFile}.
   * @see SequenceFile.Writer
   */
public static enum CompressionType {
    /** Do not compress records. */
    NONE, //不压缩
    /** Compress values only, each separately. */
    RECORD,  //只压缩values
    /** Compress sequences of records together in blocks. */
    BLOCK //压缩很多记录的key/value组成块
}

SequenceFile读写示例:

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.io.SequenceFile.Reader;
import org.apache.hadoop.io.SequenceFile.Writer;
import org.apache.hadoop.io.Text;

/**
 * @version 1.0
 * @author Fish
 */
public class SequenceFileWriteDemo {
	private static final String[] DATA = { "fish1", "fish2", "fish3", "fish4" };

	public static void main(String[] args) throws IOException {
		/**
		 * 写SequenceFile
		 */
		String uri = "/test/fish/seq.txt";
		Configuration conf = new Configuration();
		Path path = new Path(uri);
		IntWritable key = new IntWritable();
		Text value = new Text();
		Writer writer = null;
		try {
			/**
			 * CompressionType.NONE 不压缩<br>
			 * CompressionType.RECORD 只压缩value<br>
			 * CompressionType.BLOCK 压缩很多记录的key/value组成块
			 */
			writer = SequenceFile.createWriter(conf, Writer.file(path), Writer.keyClass(key.getClass()),
					Writer.valueClass(value.getClass()), Writer.compression(CompressionType.BLOCK));

			for (int i = 0; i < 4; i++) {
				value.set(DATA[i]);
				key.set(i);
				System.out.printf("[%s]\t%s\t%s\n", writer.getLength(), key, value);
				writer.append(key, value);

			}
		} finally {
			IOUtils.closeStream(writer);
		}

		/**
		 * 读SequenceFile
		 */
		SequenceFile.Reader reader = new SequenceFile.Reader(conf, Reader.file(path));
		IntWritable key1 = new IntWritable();
		Text value1 = new Text();
		while (reader.next(key1, value1)) {
			System.out.println(key1 + "----" + value1);
		}
		IOUtils.closeStream(reader);// 关闭read流

		/**
		 * 用于排序
		 */
//		SequenceFile.Sorter sorter = new SequenceFile.Sorter(fs, comparator, IntWritable.class, Text.class, conf);
	}
}

以上程序执行多次,并不会出现数据append的情况,每次都是重新创建一个文件,且文件中仅仅只有四条数据。究其原因,可以查看SequenceFile.Writer类的构造方法源码:

out = fs.create(p, true, bufferSize, replication, blockSize, progress);

第二个参数为true,表示每次覆盖同名文件,如果为false会抛出异常。这样设计的目的可能是和HDFS一次写入多次读取有关,不提倡追加现有文件,所以构造方法写死了true。

SequenceFile文件的数据组成形式:

一,Header

写入头部的源码:

    /** Write and flush the file header. */
    private void writeFileHeader()
      throws IOException {
      out.write(VERSION);//版本号
      Text.writeString(out, keyClass.getName());//key的Class
      Text.writeString(out, valClass.getName());//val的Class

      out.writeBoolean(this.isCompressed());//是否压缩
      out.writeBoolean(this.isBlockCompressed());//是否是CompressionType.BLOCK类型的压缩

      if (this.isCompressed()) {
        Text.writeString(out, (codec.getClass()).getName());//压缩类的名称
      }
      this.metadata.write(out);//写入metadata
      out.write(sync);                       // write the sync bytes
      out.flush();                           // flush header
    }

版本号:

  private static byte[] VERSION = new byte[] {
    (byte)'S', (byte)'E', (byte)'Q', VERSION_WITH_METADATA
  };

同步标识符的生成方式:

    byte[] sync;                          // 16 random bytes
    {
      try {
        MessageDigest digester = MessageDigest.getInstance("MD5");
        long time = Time.now();
        digester.update((new UID()+"@"+time).getBytes());
        sync = digester.digest();
      } catch (Exception e) {
        throw new RuntimeException(e);
      }
    }

二,Record

Writer有三个实现类,分别对应CompressType的NONE,RECOR,BLOCK。下面逐一介绍一下(结合上面的图看):

1,NONE SequenceFile

Record直接存Record 的长度,KEY的长度,key值,Value的值

2, BlockCompressWriter

/** Append a key/value pair. */
    @Override
    @SuppressWarnings("unchecked")
    public synchronized void append(Object key, Object val)
      throws IOException {
      if (key.getClass() != keyClass)
        throw new IOException("wrong key class: "+key+" is not "+keyClass);
      if (val.getClass() != valClass)
        throw new IOException("wrong value class: "+val+" is not "+valClass);

      // Save key/value into respective buffers
      int oldKeyLength = keyBuffer.getLength();
      keySerializer.serialize(key);
      int keyLength = keyBuffer.getLength() - oldKeyLength;
      if (keyLength < 0)
        throw new IOException("negative length keys not allowed: " + key);
      WritableUtils.writeVInt(keyLenBuffer, keyLength);//每调一次,都会累加keyLength

      int oldValLength = valBuffer.getLength();
      uncompressedValSerializer.serialize(val);
      int valLength = valBuffer.getLength() - oldValLength;
      WritableUtils.writeVInt(valLenBuffer, valLength);//每调一次,都会累加valLength
      // Added another key/value pair
      ++noBufferedRecords;

      // Compress and flush?
      int currentBlockSize = keyBuffer.getLength() + valBuffer.getLength();
      if (currentBlockSize >= compressionBlockSize) {
      //compressionBlockSize =  conf.getInt("io.seqfile.compress.blocksize", 1000000);
      //超过1000000就会写一个Sync
        sync();
      }
    

超过compressionBlockSize的大小,就会调用sync()方法,下面看看sync的源码(和上面的图对照):

会写入和图中所画的各个数据项。

/** Compress and flush contents to dfs */
    @Override
    public synchronized void sync() throws IOException {
      if (noBufferedRecords > 0) {
        super.sync();

        // No. of records
        WritableUtils.writeVInt(out, noBufferedRecords);

        // Write 'keys' and lengths
        writeBuffer(keyLenBuffer);
        writeBuffer(keyBuffer);

        // Write 'values' and lengths
        writeBuffer(valLenBuffer);
        writeBuffer(valBuffer);

        // Flush the file-stream
        out.flush();

        // Reset internal states
        keyLenBuffer.reset();
        keyBuffer.reset();
        valLenBuffer.reset();
        valBuffer.reset();
        noBufferedRecords = 0;
      }

    }

2,RecordCompressWriter

/** Append a key/value pair. */
    @Override
    @SuppressWarnings("unchecked")
    public synchronized void append(Object key, Object val)
      throws IOException {
      if (key.getClass() != keyClass)
        throw new IOException("wrong key class: "+key.getClass().getName()
                              +" is not "+keyClass);
      if (val.getClass() != valClass)
        throw new IOException("wrong value class: "+val.getClass().getName()
                              +" is not "+valClass);

      buffer.reset();

      // Append the 'key'
      keySerializer.serialize(key);
      int keyLength = buffer.getLength();
      if (keyLength < 0)
        throw new IOException("negative length keys not allowed: " + key);

      // Compress 'value' and append it
      deflateFilter.resetState();
      compressedValSerializer.serialize(val);
      deflateOut.flush();
      deflateFilter.finish();

      // Write the record out
      checkAndWriteSync();                                // sync
      out.writeInt(buffer.getLength());                   // total record length record的长度
      out.writeInt(keyLength);                            // key portion length key的长度
      out.write(buffer.getData(), 0, buffer.getLength()); // data 数据
    }

写入Sync:

synchronized void checkAndWriteSync() throws IOException {
      if (sync != null &&
          out.getPos() >= lastSyncPos+SYNC_INTERVAL) { // time to emit sync
        sync();
      }
    }

SYNC_INTERVAL的定义:

  private static final int SYNC_ESCAPE = -1;      // "length" of sync entries
  private static final int SYNC_HASH_SIZE = 16;   // number of bytes in hash
  private static final int SYNC_SIZE = 4+SYNC_HASH_SIZE; // escape + hash

  /** The number of bytes between sync points.*/
  public static final int SYNC_INTERVAL = 100*SYNC_SIZE; 

每2000个byte,就会写一个Sync。

总结:

Record:存储SequenceFile通用的KV数据格式,Key和Value都是二进制变长的数据。Record表示Key和Value的byte的总和。

Sync:主要是用来扫描和恢复数据的,以至于读取数据的Reader不会迷失。

Header:存储了如下信息:文件标识符SEQ,key和value的格式说明,以及压缩的相关信息,metadata等信息。

metadata:包含文件头所需要的数据:文件标识、Sync标识、数据格式说明(含压缩)、文件元数据(时间、owner、权限等)、检验信息等

版权声明:本文为博主原创文章,未经博主允许不得转载。

时间: 2024-08-04 13:03:39

Hadoop之SequenceFile的相关文章

Hadoop 中SequenceFile的简介

概念 SequenceFile是一个由二进制序列化过的key/value的字节流组成的文本存储文件,它可以在map/reduce过程中的input/output 的format时被使用.在map/reduce过程中,map处理文件的临时输出就是使用SequenceFile处理过的. 所以一般的SequenceFile均是在FileSystem中生成,供map调用的原始文件. 特点 SequenceFile是 Hadoop 的一个重要数据文件类型,它提供key-value的存储,但与传统key-v

Hadoop中SequenceFile的使用

1.对于某些应用而言,需要特殊的数据结构来存储自己的数据.对于基于MapReduce的数据处理,将每个二进制数据的大对象融入自己的文件中并不能实现很高的可扩展性,针对上述情况,Hadoop开发了一组更高层次的容器SequenceFile. 2. 考虑日志文件,其中每一条日志记录是一行文本.如果想记录二进制类型,纯文本是不合适的.这种情况下,Hadoop的SequenceFile类非常合适,因为上述提供了二进制键/值对的永久存储的数据结构.当作为日志文件的存储格式时,可以自己选择键,比如由Long

Hadoop SequenceFile数据结构介绍及读写

在一些应用中,我们需要一种特殊的数据结构来存储数据,并进行读取,这里就分析下为什么用SequenceFile格式文件. Hadoop SequenceFile Hadoop提供的SequenceFile文件格式提供一对key,value形式的不可变的数据结构.同时,HDFS和MapReduce job使用SequenceFile文件可以使文件的读取更加效率. SequenceFile的格式 SequenceFile的格式是由一个header 跟随一个或多个记录组成.前三个字节是一个Bytes S

Hadoop Sequencefile using Hadoop 2 Apis

public class SequenceFilesTest { @Test public void testSeqFileReadWrite() throws IOException { Configuration conf = new Configuration(); FileSystem fs = FileSystem.getLocal(conf); Path seqFilePath = new Path("file.seq"); SequenceFile.Writer writ

hadoop SequenceFile介绍

SequenceFile是一个由二进制序列化过的key/value的字节流组成的文本存储文件. 基于压缩类型CompressType,共有三种SequenceFile Writer: public static enum CompressionType {     /** 不压缩 */     NONE,      /** 只压缩value */     RECORD,     /** 压缩很多记录的key/value成一块 */     BLOCK   } There are three Se

hadoop编程小技巧(4)---全局key排序类TotalOrderPartitioner

Hadoop代码测试版本:Hadoop2.4 原理:在进行MR程序之前对输入数据进行随机提取样本,把样本排序,然后在MR的中间过程Partition的时候使用这个样本排序的值进行分组数据,这样就可以达到全局排序的目的了. 难点:如果使用Hadoop提供的方法来实现全局排序,那么要求Mapper的输入.输出的key不变才可以,因为在源码InputSampler中提供的随机抽取的数据是输入数据最原始的key,如下代码(line:225): for (int i = 0; i < splitsToSa

Hadoop基于文件的数据结构及实例

基于文件的数据结构 两种文件格式: 1.SequenceFile 2.MapFile SequenceFile 1.SequenceFile文件是Hadoop用来存储二进制形式的<key,value>对而设计的一种平面文件(Flat File). 2.能够把SequenceFile当做一个容器,把全部文件打包到SequenceFile类中能够高效的对小文件进行存储和处理. 3.SequenceFile文件并不依照其存储的key进行排序存储.SequenceFile的内部类Writer**提供了

Hadoop学习笔记之二 文件操作

HDFS分布式文件系统:优点:支持超大文件存储.流式访问.一次写入多次读取.缺点:不适应大量小文件.不适应低时延的数据访问.不适应多用户访问任意修改文件. 1.hadoop用于大数据处理,在数据量较小时,并不适用于实时性强的任务,并不是所有的job放到hadoop上,性能都会提升. 2.大量小文件的情况下会极大的降低系统的性能,所以处理前需要先将少文件聚合成大文件,map的输出也应该首先combine在传输给reduce. 3.数据传输时的IO开销,存储在内存中还是硬盘中,节点之间共享数据的分发

[译]SequenceFile、MapFile、SetFile、ArrayFile、BloomMapFile

Apache Hadoop的SequenceFile提供了一种把数据以二进制key-value对的形式保存到hdfs上的方式.跟其他key-value 数据结构相比(比如B-Tree),SequenceFile不能修改.删除数据,也不能在数据中间插入数据.SequenceFile只能往尾 部追加数据. SequenceFile提供了三种存储形式: 1.Uncompressed Format :无压缩格式 2.Record Compressed :记录压缩格式 3.Block Compressed