[Author]: kwu
UDTF(User-Defined Table-Generating Functions) 用来解决 输入一行输出多行(On-to-many maping) 的需求,开发HIVE的UDTF自定义函数具体步骤如下:
1、继承org.apache.hadoop.hive.ql.udf.generic.GenericUDTF,实现initialize, process, close三个方法。
2、UDTF首先会调用initialize方法,此方法返回UDTF的返回行的信息(返回个数,类型)。
3、初始化完成后,会调用process方法,真正的处理过程在process函数中,在process中,每一次forward()调用产生一行;如果产生多列可以将多个列的值放在一个数组中,然后将该数组传入到forward()函数。
4、最后close()方法调用,对需要清理的方法进行清理。
5、代码实例,实现的功能比较简单,首先按 "\001" 切分,再处理字符串,其中涉及对JSON的处理
package com.hexun.udtf; import java.util.ArrayList; import net.sf.json.JSON; import net.sf.json.JSONSerializer; import org.apache.commons.beanutils.PropertyUtils; import org.apache.hadoop.hive.ql.exec.UDFArgumentException; import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; public class UDTFDratio extends GenericUDTF { public void close() throws HiveException { } // 返回UDTF的处理行的信息(个数,类型)。 public StructObjectInspector initialize(ObjectInspector[] args) throws UDFArgumentException { if (args.length != 1) { throw new UDFArgumentLengthException( "ExplodeMap takes only one argument"); } if (args[0].getCategory() != ObjectInspector.Category.PRIMITIVE) { throw new UDFArgumentException( "ExplodeMap takes string as a parameter"); } ArrayList<String> fieldNames = new ArrayList<String>(); ArrayList<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>(); fieldNames.add("col1"); fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector); fieldNames.add("col2"); fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector); fieldNames.add("col3"); fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector); fieldNames.add("col4"); fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector); fieldNames.add("col5"); fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector); fieldNames.add("col6"); fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector); fieldNames.add("col7"); fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector); fieldNames.add("col8"); fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector); return ObjectInspectorFactory.getStandardStructObjectInspector( fieldNames, fieldOIs); } // 对传入的参数进行处理,可以通过forword()方法返回结果 public void process(Object[] args) throws HiveException { String input = args[0].toString(); String[] splited = input.split("\001"); String[] result = new String[8]; for (int i = 0; i < splited.length; i++) { if (i == 0) { String head = splited[i]; String userId = head.substring(0, head.indexOf("_")); String cookieId = head.substring(head.indexOf("_") + 1); result[0] = userId; result[1] = cookieId; } else { String json = splited[i]; JSON jo = JSONSerializer.toJSON(json); Object o = JSONSerializer.toJava(jo); try{ String sex = PropertyUtils.getProperty(o, "sex").toString(); result[2] = sex; String age = PropertyUtils.getProperty(o, "age").toString(); result[3] = age; String ppt = PropertyUtils.getProperty(o, "ppt").toString(); result[4] = ppt; String degree = PropertyUtils.getProperty(o, "degree").toString(); result[5] = degree; String favor = PropertyUtils.getProperty(o, "favor").toString(); result[6] = favor; String commercial = PropertyUtils.getProperty(o, "commercial").toString(); result[7] = commercial; }catch(Exception e){ e.printStackTrace(); } } } forward(result); } }
示例代码涉及的JAR包
6、hive命令行操作,引入UDTF前,需要先加入JSON的依赖包
add jar /opt/softwares/lib/commons-beanutils-1.7.0.jar; add jar /opt/softwares/lib/commons-collections-3.2.jar; add jar /opt/softwares/lib/commons-lang-2.4.jar; add jar /opt/softwares/lib/commons-logging-1.1.3.jar; add jar /opt/softwares/lib/ezmorph-1.0.3.jar; add jar /opt/softwares/lib/json-lib-2.2.3-jdk15.jar; add jar /opt/softwares/UDF.jar; create temporary function explode_map3 as 'com.hexun.udtf.UDTFDratio'; insert into table stage.dratio PARTITION (day='${yesterday}') select explode_map3(datadratio) as (col1,col2,col3,col4,col5,col6,col7,col8) from stage.dratio_tmp;
时间: 2024-10-11 05:06:51