UDF(user defined functions) 用于处理单行数据,并生成单个数据行。
PS:
l 一个普通UDF必须继承自“org.apache.hadoop.hive.ql.exec.UDF”。
l 一个普通UDF必须至少实现一个evaluate()方法,evaluate函数支持重载。
主要步骤如下:
步骤1 把以上程序打包成AddDoublesUDF.jar,并上传到HDFS指定目录下(如“ /user/
hive_examples_jars/” )且创建函数的用户与使用函数的用户有该文件的可读权限。示例
语句:
hdfs dfs -put ./hive_examples_jars /user/hive_examples_jars
hdfs dfs -chmod 777 /user/hive_examples_jars
步骤2 执行如下命令。
beeline -n Hive业务用户
步骤3 在Hive Server中定义该函数,以下语句用于创建永久函数:
CREATE FUNCTION addDoubles AS
‘com.huawei.bigdata.hive.example.udf.AddDoublesUDF‘ using jar ‘hdfs :/user/
hive_examples_jars/AddDoublesUDF.jar‘;
其中addDoubles是该函数的别名,用于SELECT查询中使用。
以下语句用于创建临时函数:
CREATE TEMPORARY FUNCTION addDoubles AS
‘com.huawei.bigdata.hive.example.udf.AddDoublesUDF‘ using jar ‘hdfs :/user/
hive_examples_jars/AddDoublesUDF.jar‘;
l addDoubles是该函数的别名,用于SELECT查询中使用。
l 关键字TEMPORARY说明该函数只在当前这个Hive Server的会话过程中定义使
用。
步骤4 在Hive Server中使用该函数,执行SQL语句:
SELECT addDoubles(1,2,3);
说明
若重新连接客户端再使用函数出现[Error 10011]的错误,可执行reload function;命令后再使用该
函数。
步骤5 在Hive Server中删除该函数,执行SQL语句:
DROP FUNCTION addDoubles;
----End
例子:
import org.apache.hadoop.hive.ql.exec.UDF;import java.util.ArrayList; /** * Created by wulei on 2017/8/30. * 输入一个2016-03-01 10:09:08-360122000101这样的字符串数组, * 要拆分成2016-03-01 10:09:08和360122000101,分成两个字符串数组返回出来 */public class SubstrTimeUDF extends UDF{ public static ArrayList<String> evaluate(ArrayList<String> times,boolean flag) { Object obj = new Object(); ArrayList<String> al1 = new ArrayList<String>(); ArrayList<String> al2 = new ArrayList<String>(); for (String time:times ) { String str1 = time.substring(0,19); String str2 = time.substring(20); al1.add(str1); al2.add(str2); } if(flag){ return al1; }else{ return al2; } } ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~UDTF(user defined Table functions) 用于处理单行数据,并生成多个数据行。如上,差别在于需要继承的是GeneriUDTF,然后需要覆盖重写父类的三个抽象方法,输出后有几列,在initialize中定义,主要处理逻辑在process中实现,值得注意的是,forward输出需要集合形式,比如数组或者ArrayList。
例子:
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException;import org.apache.hadoop.hive.ql.metadata.HiveException;import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;import java.text.DateFormat;import java.text.SimpleDateFormat;import java.util.ArrayList; public class SubstrTrackUdtf extends GenericUDTF { @Override public void close() throws HiveException { // TODO Auto-generated method stub } @Override public StructObjectInspector initialize(ObjectInspector[] args) throws UDFArgumentException { if (args.length != 1) { throw new UDFArgumentLengthException("ExplodeMap takes only one argument"); } if (args[0].getCategory() != ObjectInspector.Category.PRIMITIVE) { throw new UDFArgumentException("ExplodeMap takes string as a parameter"); } ArrayList<String> fieldNames = new ArrayList<String>(); ArrayList<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>(); fieldNames.add("col1"); fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector); return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs); } @Override public void process(Object[] args) throws HiveException { String input = args[0].toString(); String[] test = input.split(";"); ArrayList<String> result = new ArrayList<String>(); DateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); for (int i = 0; i < test.length; i++) { // 单独处理第一条数据 if (i == 0) { result.add(test[i]); } else { // 判断当前数据和前一条数据的时间差是否满足条件 int j = result.size(); try { if ((df.parse(test[i]).getTime() - df.parse(result.get(j - 1)).getTime()) < 30 * 60 * 1000) { result.add(test[i]); if (i + 1 == test.length) { forward(new String[]{result.toString()}); } } else { forward(new String[]{result.toString()}); result.clear(); result.add(test[i]); // 判断是否是最后一条数据 if (i + 1 == test.length) { forward(new String[]{result.toString()}); } } } catch (Exception e) { // e.printStackTrace(); continue; } } } } }
PS:如果Create function时报错,一般是你不小心,方法需要的类没有对应好。