注册UDF
do.pig的内容如下:
register /xx/yy.jar data = load ‘data‘; result = foreach data generate aa.bb.Upper($0); dump result;
register的路径可以是本地路径,也可以是hdfs路径
register hdfs://pig/xx/yy.jar
如果是pig -Dudf.import.list=aa.bb,在引用UDF时可以不用包路径:
register /xx/yy.jar data = load ‘data‘; result = foreach data generate Upper($0); dump result;
如果是pig -Dpig.additional.jars=/xx/yy.jar,可以不用register:
data = load ‘data‘; result = foreach data generate aa.bb.Upper($0); dump result;
可以使用define为UDF起别名:
register /xx/yy.jar define UPPER aa.bb. Upper(); data = load ‘data‘; result = foreach data generate UPPER($0); dump result;
如果构造UDF需要参数,可以在define里传入,也可以define多个重载的构造函数
register /xx/yy.jar define UPPER1 aa.bb. Upper(); define UPPER2 aa.bb. Upper(‘abc‘); data = load ‘data‘; result = foreach data generate UPPER1($0), UPPER2($1); dump result;
调用静态java函数
可调用的函数必须符合条件:
1)静态函数
2)参数是基本数据类型、stirng、array
3)返回值是基本数据类型、string
多个参数用空格分隔
InvokeForInt、InvokeForLong、InvokeForFloat、InvokeForDouble、InvokeForString
define hex InvokeForString(‘java.lang.Integer.toHexString‘, ‘int‘); data = load ‘data‘; result = foreach data generate hex((int)$0);
define stdev InvokeForDouble(‘com.acme.stats.stdev‘, ‘double[]‘); a = load ‘data‘ as (id:int, dp:double); b = group a by id; c = foreach b generate stdev(a.dp);
自定义UDF
Eval Function (运算函数)
package com.test.pig.udf; import java.io.IOException; import org.apache.pig.EvalFunc; import org.apache.pig.data.Tuple; public class CustomReplaceFunc extends EvalFunc<String> { @Override public String exec(Tuple tuple) throws IOException { if(tuple == null || tuple.size() == 0 || tuple.isNull(0) ) { return null; } String original = (String)tuple.get(0); String oldChar = (String)tuple.get(1); String newChar = (String)tuple.get(2); return original.replaceAll(oldChar, newChar); } }
register /home/pig/myfunc.jar define MyReplace com.test.pig.udf.CustomReplaceFunc(); users = load ‘/users.data‘ as (name:chararray, age, address); result = foreach users generate MyReplace(name, ‘l‘, ‘L‘), age, address; dump result;
Aggregate Function (聚合函数)
Filter Function(过滤函数)
Load Function(加载函数)
Store Function(存储函数)
时间: 2024-10-13 09:28:40