开发HIVE的UDTF自定义函数

[Author]: kwu

UDTF(User-Defined Table-Generating Functions) 用来解决 输入一行输出多行(On-to-many maping) 的需求,开发HIVE的UDTF自定义函数具体步骤如下:

1、继承org.apache.hadoop.hive.ql.udf.generic.GenericUDTF,实现initialize, process, close三个方法。

2、UDTF首先会调用initialize方法,此方法返回UDTF的返回行的信息(返回个数,类型)。

3、初始化完成后,会调用process方法,真正的处理过程在process函数中,在process中,每一次forward()调用产生一行;如果产生多列可以将多个列的值放在一个数组中,然后将该数组传入到forward()函数。

4、最后close()方法调用,对需要清理的方法进行清理。

5、代码实例,实现的功能比较简单,首先按 "\001" 切分,再处理字符串,其中涉及对JSON的处理

package com.hexun.udtf;

import java.util.ArrayList;

import net.sf.json.JSON;
import net.sf.json.JSONSerializer;

import org.apache.commons.beanutils.PropertyUtils;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;

public class UDTFDratio extends GenericUDTF {

	public void close() throws HiveException {

	}

	// 返回UDTF的处理行的信息(个数,类型)。
	public StructObjectInspector initialize(ObjectInspector[] args)
			throws UDFArgumentException {
		if (args.length != 1) {
			throw new UDFArgumentLengthException(
					"ExplodeMap takes only one argument");
		}
		if (args[0].getCategory() != ObjectInspector.Category.PRIMITIVE) {
			throw new UDFArgumentException(
					"ExplodeMap takes string as a parameter");
		}
		ArrayList<String> fieldNames = new ArrayList<String>();
		ArrayList<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>();

		fieldNames.add("col1");
		fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
		fieldNames.add("col2");
		fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
		fieldNames.add("col3");
		fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
		fieldNames.add("col4");
		fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
		fieldNames.add("col5");
		fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
		fieldNames.add("col6");
		fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
		fieldNames.add("col7");
		fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
		fieldNames.add("col8");
		fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);

		return ObjectInspectorFactory.getStandardStructObjectInspector(
				fieldNames, fieldOIs);
	}

	// 对传入的参数进行处理,可以通过forword()方法返回结果
	public void process(Object[] args) throws HiveException {
		String input = args[0].toString();
		String[] splited = input.split("\001");

		String[] result = new String[8];

		for (int i = 0; i < splited.length; i++) {
			if (i == 0) {
				String head = splited[i];
				String userId = head.substring(0, head.indexOf("_"));
				String cookieId = head.substring(head.indexOf("_") + 1);

				result[0] = userId;
				result[1] = cookieId;
			} else {
				String json = splited[i];
				JSON jo = JSONSerializer.toJSON(json);
				Object o = JSONSerializer.toJava(jo);

				try{
					String sex = PropertyUtils.getProperty(o, "sex").toString();
					result[2] = sex;

					String age = PropertyUtils.getProperty(o, "age").toString();
					result[3] = age;

					String ppt = PropertyUtils.getProperty(o, "ppt").toString();
					result[4] = ppt;

					String degree = PropertyUtils.getProperty(o, "degree").toString();
					result[5] = degree;

					String favor = PropertyUtils.getProperty(o, "favor").toString();
					result[6] = favor;

					String commercial = PropertyUtils.getProperty(o, "commercial").toString();
					result[7] = commercial;

				}catch(Exception e){
					e.printStackTrace();
				}
			}
		}

		forward(result);
	}

}

示例代码涉及的JAR包

6、hive命令行操作,引入UDTF前,需要先加入JSON的依赖包

add jar /opt/softwares/lib/commons-beanutils-1.7.0.jar;
add jar /opt/softwares/lib/commons-collections-3.2.jar;
add jar /opt/softwares/lib/commons-lang-2.4.jar;
add jar /opt/softwares/lib/commons-logging-1.1.3.jar;
add jar /opt/softwares/lib/ezmorph-1.0.3.jar;
add jar /opt/softwares/lib/json-lib-2.2.3-jdk15.jar;
add jar /opt/softwares/UDF.jar;
create temporary function explode_map3 as 'com.hexun.udtf.UDTFDratio';

insert into table stage.dratio PARTITION (day='${yesterday}') select explode_map3(datadratio) as (col1,col2,col3,col4,col5,col6,col7,col8) from stage.dratio_tmp;
时间: 2024-12-11 01:34:07

开发HIVE的UDTF自定义函数的相关文章

hive添加永久自定义函数

永久自定义hive函数 1:做这件事的原因: 有一些函数是比较基础的,公用的,每次都要create temporary function麻烦了,这样的基础函数需要直接集成到hive中去,避免每次都要创建. 2:步骤 本人拥有一个账户zb_test 自定义的函数已经准备好 登陆linux账户,修改该账户的home目录下的.bashrc文件: 把CLASSPATH改成如下: export CLASSPATH=$JAVA_HOME/lib:$JAVA_HOME/jre/lib:$SQOOP_HOME/

hadoop(十) - hive安装与自定义函数

一. Hive安装 Hive只在一个节点上安装即可 1. 上传tar包 2. 解压 tar -zxvf hive-0.9.0.tar.gz -C /cloud/ 3. 配置mysql metastore(切换到root用户) 3.1 配置HIVE_HOME环境变量 3.2 安装mysql 查询以前安装的mysql相关包: rpm -qa | grep mysql 暴力删除这个包: rpm -e mysql-libs-5.1.66-2.el6_3.i686 --nodeps 安装mysql: rp

Hive自定义函数的学习笔记(1)

前言: hive本身提供了丰富的函数集, 有普通函数(求平方sqrt), 聚合函数(求和sum), 以及表生成函数(explode, json_tuple)等等. 但不是所有的业务需求都能涉及和覆盖到, 因此hive提供了自定义函数的接口, 方便用户扩展. 自己好像很久没接触hadoop了, 也很久没博客了, 今天趁这个短期的项目, 对hive中涉及的自定义函数做个笔记. 准备: 编写hive自定义函数前, 需要了解下当前线上hive的版本. hive --vesion 比如作者使用到的hive

Spark(十八)SparkSQL的自定义函数UDF

在Spark中,也支持Hive中的自定义函数.自定义函数大致可以分为三种: UDF(User-Defined-Function),即最基本的自定义函数,类似to_char,to_date等 UDAF(User- Defined Aggregation Funcation),用户自定义聚合函数,类似在group by之后使用的sum,avg等 UDTF(User-Defined Table-Generating Functions),用户自定义生成函数,有点像stream里面的flatMap 自定

Spark学习之路 (十九)SparkSQL的自定义函数UDF

讨论QQ:1586558083 在Spark中,也支持Hive中的自定义函数.自定义函数大致可以分为三种: UDF(User-Defined-Function),即最基本的自定义函数,类似to_char,to_date等 UDAF(User- Defined Aggregation Funcation),用户自定义聚合函数,类似在group by之后使用的sum,avg等 UDTF(User-Defined Table-Generating Functions),用户自定义生成函数,有点像str

Spark学习之路 (十九)SparkSQL的自定义函数UDF[转]

在Spark中,也支持Hive中的自定义函数.自定义函数大致可以分为三种: UDF(User-Defined-Function),即最基本的自定义函数,类似to_char,to_date等 UDAF(User- Defined Aggregation Funcation),用户自定义聚合函数,类似在group by之后使用的sum,avg等 UDTF(User-Defined Table-Generating Functions),用户自定义生成函数,有点像stream里面的flatMap 自定

hive自定义函数UDF UDTF UDAF

Hive 自定义函数 UDF UDTF UDAF 1.UDF:用户定义(普通)函数,只对单行数值产生作用: UDF只能实现一进一出的操作. 定义udf 计算两个数最小值 public class Min extends UDF { public Double evaluate(Double a, Double b) { if (a == null) a = 0.0; if (b == null) b = 0.0; if (a >= b) { return b; } else { return a

Hive自定义函数UDAF开发

Hive支持自定义函数,UDAF是接受多行,输出一行. 通常是group by时用到这种函数. 其实最好的学习资料就是官方自带的examples了. 我这里用的是0.10版本hive,所以对于的examples在 https://github.com/apache/hive/tree/branch-0.10/contrib/src/java/org/apache/hadoop/hive/contrib/udaf/example 我这里的功能需求是: actionCount(act_code,ac

Hive(9)-自定义函数

一. 自定义函数分类 当Hive提供的内置函数无法满足你的业务处理需要时,此时就可以考虑使用用户自定义函数. 根据用户自定义函数类别分为以下三种: 1. UDF(User-Defined-Function) 一进一出 2. UDAF(User-Defined Aggregation Function) 聚集函数,多进一出, 类似于:count/max/min 3. UDTF(User-Defined Table-Generating Functions) 一进多出 如lateral view e