Hadoop序列化【源码】

  • 序列化(serialization)是指将结构化的对象转化为字节流,以便在进程间通信或者写入硬盘永久存储。
  • 相对的反序列化(deserialization)是指将字节流转回到结构化对象的过程。
  • 需要注意的是,能够在网络上传输的只能是字节流。所以,Map的中间结果在不同主机间Shuffle洗牌时,结构化对象将经历序列化(map结果写入磁盘)和反序列化(reduce读取map结果)两个过程。

Writable接口

Hadoop并没有使用JAVA的序列化机制,而是引入了自己的序列化系统,package org.apache.hadoop.io 这个包中定义了大量的可序列化对象,这些对象都实现了Writable接口,Writable接口是序列化对象的一个通用接口。其中包含了write()和readFields()两个序列化相关方法。

WritableComparable接口

WriteCompareable接口是对Wirtable接口的二次封装,提供了compareTo(T o)方法,用于序列化对象的比较。因为MR中间有个基于key的排序阶段。

RawComparable接口

Hadoop为优化Shuffle阶段的排序,提供了原生的比较器接口RawComparator<T>用于在字节流层面进行比较,从而大大缩短了比较的时间开销。该接口并非被多数的衍生类所实现,多数情况下其直接子类WritableComparator作为实现Writable接口类的内置类,提供序列化字节的比较功能。

WritableComparator类

  1).对原始compare()方法的一个默认实现:先【反序列化】为对象,再通过【比较对象】,有开销的问题。所以,对于继承writeCompatable的具体子类都会要求覆写compare()方法以加快效率。

//原始compare()是将要比较的二进制流,先反序列化为对象,再调用对象的比较方法进行比较。
  public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
    try {
 //利用Buffer为桥接中介,把字节数组存储为buffer后
      buffer.reset(b1, s1, l1);

 //调用key1(WritableComparable类型)的反序列化方法
      key1.readFields(buffer);

      buffer.reset(b2, s2, l2);
      key2.readFields(buffer);
    } catch (IOException e) {
      throw new RuntimeException(e);
    }
//调用Writable对象的compare()比较方法进行比较
    return compare(key1, key2);
  }

  2).define()方法用于注册WritebaleComparaor对象到注册表中(Hadoop自动调用比较器)。

public static void define(Class c, WritableComparator comparator) {
    comparators.put(c, comparator);
   }

   3).以上两个方法在自定义的WritableComparable子类类中,都必须覆写,以实现高效排序。

Writable类的字节长度

在定制Writable类之前,应该先了解不同Writable类占用磁盘空间的大小。通过减少Writable实例的字节数,加快数据的读取和减少网络的数据传输。下表显示的是Hadoop对Java基本类型包装后相应的Writable类占用的字节长度:


Java基本类型


字节数


Writable实现


序列化后字节数 (bytes)


boolean


1/8


BooleanWritable


1


byte


1


ByteWritable


1


short


2


ShortWritable


2


int


4


IntWritable


4


VIntWritable


1–5


float


4


FloatWritable


4


long


8


LongWritable


8


VLongWritable


1–9


double


8


DoubleWritable


8

不同Writable类型序列化后的字节长度是不一样的,需要综合考虑应用中数据特征选择合适的类型。对于整数类型有两种选择,一种是定长(fixed-length)Writable类型,IntWritable和LongWritable;另一种是变长(variable-length)Writable类型,VIntWritable和VLongWritable。变长类型是根据数值的大小使用相应的字节长度表示,当数值在-112~127之间时使用1个字节表示,在-112~127范围之外的数值使用头一个字节表示该数值的正负符号以及字节长度(zero-compressed encoded integer)。

对于整数类型的Writable选择,建议:

  1. 除非对数据的均匀分布很有把握,否则使用变长Writable类型
  2. 除非数据的取值区间确定在int范围之内,否则为了程序的可扩展性,请选择VLongWritable类型

package cn.itcast.hadoop.mr;

import java.io.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.util.StringUtils;

//测试十进制序列化成不同Writable类型所占用的字节数组长度
public class WritableBytesLengthDemo {

    public static void main(String[] args) throws IOException {

        //将十亿用不同Writable类型表示出来
        IntWritable int_b = new IntWritable(1000000000);
        LongWritable long_b = new LongWritable(1000000000);
        VIntWritable vint_b = new VIntWritable(1000000000);
        VLongWritable vlong_b = new VLongWritable(1000000000);

        //将不同的Writable类型序列化成字节数组
        byte[] bs_int_b = serialize(int_b);
        byte[] bs_long_b = serialize(long_b);
        byte[] bs_vint_b = serialize(vint_b);
        byte[] bs_vlong_b = serialize(vlong_b);

        //以十六进制形式打印字节数组,并打印出数组的长度
        String hex = StringUtils.byteToHexString(bs_int_b);
        formatPrint("IntWritable", "1,000,000,000",hex, bs_int_b.length);

        hex = StringUtils.byteToHexString(bs_long_b);
        formatPrint("LongWritable", "1,000,000,000",hex, bs_long_b.length);

        hex = StringUtils.byteToHexString(bs_vint_b);
        formatPrint("VIntWritable", "1,000,000,000",hex, bs_vint_b.length);

        hex = StringUtils.byteToHexString(bs_vlong_b);
        formatPrint("VLongWritable", "1,000,000,000", hex, bs_vlong_b.length);
    }
    //定义输出格式
    private static void formatPrint(String type, String param, String hex, int length) {

        String format = "%1$-50s %2$-16s with length: %3$2d%n";
        System.out.format(format, "Byte array per " + type
                + "("+ param +") is:", hex, length);
    }
    //将一个实现了Writable接口的对象序列化成字节流
    public static byte[] serialize(Writable writable) throws IOException {
        ByteArrayOutputStream out = new ByteArrayOutputStream();
        DataOutputStream dataOut = new DataOutputStream(out);
        writable.write(dataOut);
        dataOut.close();

        return out.toByteArray();
    }
    //反序列化
    public static Writable deserialize(Writable writable, byte[] bytes)    throws IOException {
        ByteArrayInputStream in = new ByteArrayInputStream(bytes);
        DataInputStream dataIn = new DataInputStream(in);
        writable.readFields(dataIn);
        dataIn.close();

        return writable;
    }
}

  Byte array per IntWritable(1,000,000,000) is:      3b9aca00          with length:   4

  Byte array per LongWritable(1,000,000,000) is:     000000003b9aca00  with length:   8

  Byte array per VIntWritable(1,000,000,000) is:     8c3b9aca00       with length:   5

  Byte array per VLongWritable(1,000,000,000) is:    8c3b9aca00        with length:   5

从上面的输出我们可以看出:

  • l对1,000,000,000的表示不同Writable占用了不同字节长度
  • 变长类型并不总比定长更加节省空间,因为变长需要一个额外的字节来存放正负信息和字节长度。

Text的字节序列

  1. 可以简单的认为Text类是java.lang.String的Writable类型,要注意的是Text类对于Unicode字符采用UTF-8编码,使用变长的1~4个字节对字符进行编码。对于ASCII字符只使用1个字节,而对于High ASCII和多字节字符使用2~4个字节表示。而不是使用Java Character类的UTF-16编码。
  2. 对于原本GBK编码的数据使用Text读入后直接使用String line=value.toString();方法就会变成乱码。正确的方法是将输入的Text类型的value转换为字节数组,使用String的构造器String(byte[] bytes, int offset, int length, Charset charset),通过使用指定的charset解码指定的byte子数组,构造一个新的String。即 String line=new String(value.getBytes(),0,value.getLength(),”GBK”);
  3. Text类的字节序列表示为【一个VIntWritable + UTF-8字节流】。其中,VIntWritable表示Text类型的字符长度,UTF-8字节数组为真正的Text字节流。

下面以Text类中字节比较的代码进行说明:

/** A WritableComparator optimized for Text keys. */
  public static class Comparator extends WritableComparator {
    public Comparator() {
      super(Text.class);
    }
    @Override
    //b1代表字节数组;s1代表一个text类型的起始字节;l1代表一个text类型的字节长度
    public int compare(byte[] b1, int s1, int l1,
                       byte[] b2, int s2, int l2) {
 //返回Text的字符长度
      int n1 = WritableUtils.decodeVIntSize(b1[s1]);
      int n2 = WritableUtils.decodeVIntSize(b2[s2]);

 //比较器跳过 代表Text字符长度 的字节,直接比对UTF编码的真正的字符串部分的字节
 //compareBytes()方法是对字节进行逐个比较。一旦找到一个不同的,然后就返回结果,后面的不管
      return compareBytes(b1, s1+n1, l1-n1, b2, s2+n2, l2-n2);
    }
  }

时间: 2024-08-29 15:09:18

Hadoop序列化【源码】的相关文章

spark streaming task 序列化源码

spark streaming task 序列化源码 1.入口 val kafkaStreams = (1 to recerverNum).map { i => KafkaUtils.createDirectStream[String, String]( ssc, PreferConsistent, Subscribe[String, String](topics, kafkaParams) )} val unifiedStream = ssc.union(kafkaStreams) val u

大数据技术之_03_Hadoop学习_02_入门_Hadoop运行模式+【本地运行模式+伪分布式运行模式+完全分布式运行模式(开发重点)】+Hadoop编译源码(面试重点)+常见错误及解决方案

第4章 Hadoop运行模式4.1 本地运行模式4.1.1 官方Grep案例4.1.2 官方WordCount案例4.2 伪分布式运行模式4.2.1 启动HDFS并运行MapReduce程序4.2.2 启动YARN并运行MapReduce程序4.2.3 配置历史服务器4.2.4 配置日志的聚集4.2.5 配置文件说明4.3 完全分布式运行模式(开发重点)4.3.1 虚拟机准备4.3.2 编写集群分发脚本xsync4.3.3 集群配置4.3.4 集群单点启动4.3.5 SSH无密登录配置4.3.6

Hadoop HDFS源码分析 关于数据块的类

Hadoop HDFS源码分析 关于数据块的类 1.BlocksMap 官方代码中的注释为: /** * This class maintains the map from a block to its metadata. * block's metadata currently includes blockCollection it belongs to and * the datanodes that store the block. */ BlocksMap数据块映射,管理名字节点上的数据

Hadoop InputFormat源码分析

平时我们写MapReduce程序的时候,在设置输入格式的时候,总会调用形如job.setInputFormatClass(KeyValueTextInputFormat.class)来保证输入文件按照我们想要的格式被读取.所有的输入格式都继承于InputFormat,这是一个抽象类,其子类有专门用于读取普通文件的FileInputFormat,用来读取数据库的DBInputFormat等等. 不同的InputFormat都会按自己的实现来读取输入数据并产生输入分片,一个输入分片会被单独的MapT

Hadoop TextInputFormat源码分析

InputFormat主要用于描述输入数据的格式(我们只分析新API,即org.apache.hadoop.mapreduce.lib.input.InputFormat),提供以下两个功能: (1).数据切分:按照某个策略将输入数据切分成若干个split,以便确定MapTask个数以及对应的split. (2).为Mapper提供输入数据:读取给定的split的数据,解析成一个个的key/value对,共Mapper使用. InputFormat抽象类中只有两个方法,分别对应上面两个功能,源码

大数据-Hadoop生态(12)-Hadoop序列化和源码追踪

1.什么是序列化 2.为什么要序列化 3.为什么不用Java的序列化 4.自定义bean对象实现序列化接口(Writable) 在企业开发中往往常用的基本序列化类型不能满足所有需求,比如在Hadoop框架内部传递一个bean对象,那么该对象就需要实现序列化接口. 具体实现bean对象序列化步骤如下7步: 1) 必须实现Writable接口 2) 反序列话时,需要反射调用无参构造方法,所以必须要有无参构造方法 3) 重写序列化方法write() 4) 重写反序列化方法readFields() 5)

Hadoop 修改源码以及将修改后的源码应用到部署好的Hadoop中

我的Hadoop版本是hadoop-2.7.3, 我们可以去hadoop官网下载源码hadoop-2.7.3-src,以及编译好的工程文件hadoop-2.7.3, 后者可以直接部署. 前者hadoop-2.7.3-src必须mvn之后才能部署. 我们修改代码必须是在hadoop-2.7.3-src源码中进行, 而源码mvn之后才能部署或使用. 所以我们要先了解Maven.     mvn hadoop-2.7.3-src的时候会出现各种问题. 其中hadoop-2.7.3-src源码文件中有个

Hadoop Yarn源码 - day1

Hadoop 2.6.0下面的关于Yarn工程,如下所示,主要有以下七个module: hadoop-yarn-api:和外部平台交互的接口 hadoop-yarn-applications hadoop-yarn-client hadoop-yarn-common:yarn client和server可以用到的一些实用工具 hadoop-yarn-registry hadoop-yarn-server:hadoop-yarn-api的具体实现 hadoop-yarn-server-applic

如何把hadoop相关源码关联到eclipse工程中

在eclipse中阅读源码非常方便,利于我们平时的学习,如下就供述如何把hadoop源码导入到eclpse的java工程中的 一:解压源码 首先,我们在windows下使用winrar把hadoop-1.1.2.tar.gz解压,如图1所示 我们关注文件夹src,浏览该文件夹,如图2所示 我们需要这三个文件夹,一会我们会把这三个文件夹复制到eclipse中. 二:创建java工程 在eclipse中通过菜单栏创建一个java工程,如图3, 图3 这里创建工程叫做"HadoopShanghai&q

[Hadoop] - TaskTracker源码分析(状态发送)

TaskTracker节点向JobTracker汇报当前节点的运行时信息时候,是将运行状态信息同心跳报告一起发送给JobTracker的,主要包括TaskTracker的基本信息.节点资源使用信息.各任务状态等.所以信息被序列化为TaskTrackerStatus实例对象.每次发送心跳报告的时候,会重新构造一个Status对象,并重置这些信息,而且需要主要的是每次发送的status对象的大小是不一定的,因为很多信息的发送是有时间间隔的.这些操作主要位于方法transmitHeartBeat的上半