Hive中的ObjectInspector设计

ObjectInspector是Hive中一个咋一看比较令人困惑的概念,当初读Hive源代码时,花了很长时间才理解。 当读懂之后,发现ObjectInspector作用相当大,它解耦了数据使用和数据格式,从而提高了代码的复用程度。 简单的说,ObjectInspector接口使得Hive可以不拘泥于一种特定数据格式,使得数据流 1)在输入端和输出端切换不同的输入/输出格式 2)在不同的Operator上使用不同的数据格式。

这是ObjectInspector interface 
public interface ObjectInspector extends Cloneable { 
  public static enum Category { 
    PRIMITIVE, LIST, MAP, STRUCT, UNION 
  };

String getTypeName();

Category getCategory(); 
}

这个interface提供了最一般的方法 getTypeName 和 getCategory。 我们再来看它的子抽象类和interface: 
StructObjectInspector 
MapObjectInspector 
ListObjectInspector 
PrimitiveObjectInspector 
UnionObjectInspector

其中,PrimitiveObjectInspector用来完成对基本数据类型的解析,而StructObjectInspector用了完成对一行数据的解析,它本身有一组ObjectInspector组成。 由于Hive支持Nested Data Structure,所以,在StructObjectInspector中又可以(一层或多层的)嵌套任意的ObjectInspector。 Struct, Map, List, Union是Hive支持的4种集合数据类型,比如某一列的数据可以被声明为Struct类型,这样解析这一列的StructObjectInspector中就会嵌套了另一个StructObjectInspector。

现在我们可以从一个小例子看看ObjectInspector是如何工作的,这是一个Hive SerDe的测试用例代码:

/** 
   * Test the LazySimpleSerDe class. 
   */ 
  public void testLazySimpleSerDe() throws Throwable { 
    try { 
      // Create the SerDe 
      LazySimpleSerDe serDe = new LazySimpleSerDe(); 
      Configuration conf = new Configuration(); 
      Properties tbl = createProperties(); 
      //用Properties初始化serDe 
      serDe.initialize(conf, tbl);

// Data 
      Text t = new Text("123\t456\t789\t1000\t5.3\thive and hadoop\t1.\tNULL"); 
      String s = "123\t456\t789\t1000\t5.3\thive and hadoop\tNULL\tNULL"; 
      Object[] expectedFieldsData = {new ByteWritable((byte) 123), 
          new ShortWritable((short) 456), new IntWritable(789), 
          new LongWritable(1000), new DoubleWritable(5.3), 
          new Text("hive and hadoop"), null, null};

// Test 
      deserializeAndSerialize(serDe, t, s, expectedFieldsData); 
    } catch (Throwable e) { 
      e.printStackTrace(); 
      throw e; 
    } 
  }

private void deserializeAndSerialize(LazySimpleSerDe serDe, Text t, String s, 
      Object[] expectedFieldsData) throws SerDeException { 
    // Get the row ObjectInspector 
    StructObjectInspector oi = (StructObjectInspector) serDe 
        .getObjectInspector(); 
    // 获取列信息 
    List<? extends StructField> fieldRefs = oi.getAllStructFieldRefs(); 
    assertEquals(8, fieldRefs.size());

// Deserialize 
    Object row = serDe.deserialize(t); 
    for (int i = 0; i < fieldRefs.size(); i++) { 
      Object fieldData = oi.getStructFieldData(row, fieldRefs.get(i)); 
      if (fieldData != null) { 
        fieldData = ((LazyPrimitive) fieldData).getWritableObject(); 
      } 
      assertEquals("Field " + i, expectedFieldsData[i], fieldData); 
    } 
    // Serialize 
    assertEquals(Text.class, serDe.getSerializedClass()); 
    Text serializedText = (Text) serDe.serialize(row, oi); 
    assertEquals("Serialized data", s, serializedText.toString()); 
  }

//创建schema,保存在Properties中 
  private Properties createProperties() { 
    Properties tbl = new Properties();

// Set the configuration parameters 
    tbl.setProperty(Constants.SERIALIZATION_FORMAT, "9"); 
    tbl.setProperty("columns", 
        "abyte,ashort,aint,along,adouble,astring,anullint,anullstring"); 
    tbl.setProperty("columns.types", 
        "tinyint:smallint:int:bigint:double:string:int:string"); 
    tbl.setProperty(Constants.SERIALIZATION_NULL_FORMAT, "NULL"); 
    return tbl; 
  }

从这个例子中,不难出,Hive将对行中列的读取和行的存储方式解耦和了,只有ObjectInspector清楚行和行中的列是怎样存取的,但使用者并不知道存储的细节。 对于数据的使用者来说,只需要行的Object和相应的ObjectInspector,就能读取出每一列的对象。

这段代码再清晰不过了,ObjectInspector oi控制了对列的Access 
for (int i = 0; i < fieldRefs.size(); i++) { 
      Object fieldData = oi.getStructFieldData(row, fieldRefs.get(i)); 
      if (fieldData != null) { 
        fieldData = ((LazyPrimitive) fieldData).getWritableObject(); 
      } 
      assertEquals("Field " + i, expectedFieldsData[i], fieldData); 
  }

这段代码的作用是把一行deserialize,然后再serialize 
    Object row = serDe.deserialize(t); 
    Text serializedText = (Text) serDe.serialize(row, oi); 
由此不难看出,只要有了不同的SerDe对象,可以很容易的将一条数据deserialize,然后再serialize成不同的格式,从而非常方便的实现数据格式的切换。

理解了上面的例子,就不难理解为什么所有的Hive ExprNodeEvaluator 和 UDF,UDAF, UDTF 都需要 (Object, ObjectInspector) pair了。 数据存储细节和使用的分离,使得Hive不需要针对不同的数据格式对同一个UDF, UDAF 或UDTF实现不同的版本,这些函数看到的只是WritableObject!

下面是表达式evaluator的interface: 
/** 
* ExprNodeEvaluator. 

*/ 
public abstract class ExprNodeEvaluator {

/** 
   * Initialize should be called once and only once. Return the ObjectInspector 
   * for the return value, given the rowInspector. 
   */ 
  public abstract ObjectInspector initialize(ObjectInspector rowInspector) throws HiveException;

/** 
   * Evaluate the expression given the row. This method should use the 
   * rowInspector passed in from initialize to inspect the row object. The 
   * return value will be inspected by the return value of initialize. 
   */ 
  public abstract Object evaluate(Object row) throws HiveException;

}

initialize中需要初始化ObjectInspector,返回输出数据的ObjectInspector(它负责解析evaluate method返回的对象);而每次evaluate call传进来一条Object数据,它的解析由ObjectInspector负责。

接下来是GenericUDF抽象类: 
public abstract class GenericUDF {

/** 
   * A Defered Object allows us to do lazy-evaluation and short-circuiting. 
   * GenericUDF use DeferedObject to pass arguments. 
   */ 
  public static interface DeferredObject { 
    Object get() throws HiveException; 
  };

/** 
   * The constructor. 
   */ 
  public GenericUDF() { 
  }

/** 
   * Initialize this GenericUDF. This will be called once and only once per 
   * GenericUDF instance. 
   * 
   * @param arguments 
   *          The ObjectInspector for the arguments 
   * @throws UDFArgumentException 
   *           Thrown when arguments have wrong types, wrong length, etc. 
   * @return The ObjectInspector for the return value 
   */ 
  public abstract ObjectInspector initialize(ObjectInspector[] arguments) 
      throws UDFArgumentException;

/** 
   * Evaluate the GenericUDF with the arguments. 
   * 
   * @param arguments 
   *          The arguments as DeferedObject, use DeferedObject.get() to get the 
   *          actual argument Object. The Objects can be inspected by the 
   *          ObjectInspectors passed in the initialize call. 
   * @return The 
   */ 
  public abstract Object evaluate(DeferredObject[] arguments) 
      throws HiveException;

/** 
   * Get the String to be displayed in explain. 
   */ 
  public abstract String getDisplayString(String[] children);

}

它的机制与evaluator非常类似,初始化中敲定ObjectInspector数组,它们负责解析输入,返回output数据(即evaluator method返回的Object)的ObjectInspector;每次evaluate call传进一个Object数组,返回一条数据。

Hive支持LazySimple, LazyBinary,Thrift等不同的数据格式,同一个查询计划中,可以在operator上切换数据流的格式。比较常见的是在Mapper端使用LazySimpleSerDe,Mapper输出的数据使用LazyBinarySerDe,因为binary格式比较节省空间,从而减少repartition时的网络传输。 如果你想看查询计划的每一步到底使用了哪一种SerDe格式,只要用"Explain Extended"就可以查清楚了。

时间: 2024-07-30 04:00:47

Hive中的ObjectInspector设计的相关文章

hive中rcfile格式(收藏文)

首先声明,此文是属于纯粹收藏文,感觉讲的很不错. 本文介绍了Facebook公司数据分析系统中的RCFile存储结构,该结构集行存储和列存储的优点于一身,在MapReduce环境下的大规模数据分析中扮演重要角色. Facebook曾在2010 ICDE(IEEE International Conference on Data Engineering)会议上介绍了数据仓库Hive.Hive存储海量数据在Hadoop系统中,提供了一套类数据库的数据存储和处理机制.它采用类SQL语言对数据进行自动化

Hive中的一种假NULL

Hive中有种假NULL,它看起来和NULL一摸一样,但是实际却不是NULL. 例如如下这个查询: hive> desc ljn004; OK a       string Time taken: 0.237 seconds hive> select a from ljn004; OK NULL Time taken: 46.232 seconds 看上去好像ljn004的a字段保存了一个 NULL, 但是换一个查询会发现它和NULL并不一样: hive> select a from l

【甘道夫】Sqoop1.4.4 实现将 Oracle10g 中的增量数据导入 Hive0.13.1 ,并更新Hive中的主表

需求 将Oracle中的业务基础表增量数据导入Hive中,与当前的全量表合并为最新的全量表. ***欢迎转载,请注明来源***    http://blog.csdn.net/u010967382/article/details/38735381 设计 涉及的三张表: 全量表:保存了截止上一次同步时间的全量基础数据表 增量表:增量临时表 更新后的全量表:更新后的全量数据表 步骤: 通过Sqoop将Oracle中的表导入Hive,模拟全量表和增量表 通过Hive将"全量表+增量表"合并为

hive中udf读写hbase

在大数据开发过程中经常会遇到,将hive中处理后的结果写入hbase中,每次都要写java程序会非常浪费时间,我们就想了一个办法 ,用hive的udf来实现. 只需要调用同一个udf,将表名字段名以及每一个字段的值作为udf的参数,就可以实现写hbase了. 这样大大的节省了开发时间,提升了开发效率. 大家可以按照这种思路写自己需要的功能.这里只简单的列举几个供大家参考,具体操作如下: 一.依赖的jar包 commons-codec-1.7.jar commons-collections-3.2

SQOOP增量抽取时,在HIVE中实现类似Oracle的merge操作

数据仓库建设中的数据抽取环节,常常需要增量抽取业务库数据.但业务库数据不是一层不变的,会根据时间发生状态变更,那么就需要同步更新变化数据到HIVE中.过去在Oracle上做数据仓库时,可以使用merge的方法合并新老数据.但hive中没有该功能,本文旨在通过sqoop抽取后,自动实现数据合并. 表设计 将抽取表分为三张, 一张_arc表,保存每日合并后的快照,根据pt字段分区 一张_inc表,用于保存当日抽取的增量数据,根据pt字段分区 一张不带后缀的表,指向最终表给后续ETL任务使用. 步骤

hive中order by,sort by, distribute by, cluster by作用以及用法

1. order by Hive中的order by跟传统的sql语言中的order by作用是一样的,会对查询的结果做一次全局排序,所以说,只有hive的sql中制定了order by所有的数据都会到同一个reducer进行处理(不管有多少map,也不管文件有多少的block只会启动一个reducer).但是对于大量数据这将会消耗很长的时间去执行. 这里跟传统的sql还有一点区别:如果指定了hive.mapred.mode=strict(默认值是nonstrict),这时就必须指定limit来

kettle连接Hive中数据导入导出(6)

1.hive往外写数据 http://wiki.pentaho.com/display/BAD/Extracting+Data+from+Hive+to+Load+an+RDBMS 连接hive 表输入 1)往excel中写数据 2)往文本文件中写数据 注:这里需要填上hive的表名.字段名,不然会报如下错误: 2016/07/27 15:43:01 - 表输入.0 - ERROR (version 3.2.0, build 1 from 2016-07-07 10.46.10 by xnren

hive中partition如何使用

1.背景 1.在Hive Select查询中一般会扫描整个表内容,会消耗很多时间做没必要的工作.有时候只需要扫描表中关心的一部分数据,因此建表时引入了partition概念. 2.分区表指的是在创建表时指定的partition的分区空间. 3.如果需要创建有分区的表,需要在create表的时候调用可选参数partitioned by,详见表创建的语法结构. 2.细节 1.一个表可以拥有一个或者多个分区,每个分区以文件夹的形式单独存在表文件夹的目录下. show partitions stage_

看懂此文,不再困惑于 JS 中的事件设计

看懂此文,不再困惑于 JS 中的事件设计 今天刚在关注的微信公众号看到的文章,关于JS事件的,写的很详细也很容易理解,相关的知识点都有总结到,看完就有种很舒畅的感觉,该串起来的知识点都串起来了.反正一字节:爽. 作者:aitangyong 链接:blog.csdn.net/aitangyong/article/details/43231111 抽空学习了下javascript和jquery的事件设计,收获颇大,总结此贴,和大家分享. (一)事件绑定的几种方式 javascript给DOM绑定事件