Pig用户自定义函数(UDF)

我们以气温统计和词频统计为例,讲解以下三种用户自定义函数。

用户自定义函数

什么时候需要用户自定义函数呢?和其它语言一样,当你希望简化程序结构或者需要重用程序代码时,函数就是你不二选择。

Pig的用户自定义函数可以用Java编写,但是也可以用Python或Javascript编写。我们接下来以Java为例。

自定义过滤函数

我们仍然以先前的代码为例:

records = load ‘hdfs://localhost:9000/input/temperature1.txt‘as (year: chararray,temperature: int);

valid_records = filter records by temperature!=999;

第二个语句的作用就是筛选合法的数据。如果我们采用用户自定义函数,则第二个语句可以写成:

valid_records = filter records by isValid(temperature);

这种写法更容易理解,也更容易在多个地方重用。接下来的问题就是如何定义这个isValid函数。代码如下:

packagecom.oserp.pigudf;

importjava.io.IOException;

importorg.apache.pig.FilterFunc;

importorg.apache.pig.data.Tuple;

public class IsValidTemperature extends FilterFunc {

@Override

public Boolean exec(Tuple tuple) throws IOException {

Object object = tuple.get(0);

int temperature = (Integer)object;

return temperature != 999;

}

}

接下来,我们需要:

1)  编译代码并打包成jar文件,比如pigudf.jar。

2)  通过register命令将这个jar文件注册到pig环境:

register/home/user/hadoop_jar/pigudf.jar //参数为jar文件的本地路径

此时,我们就可以用以下语句调用这个函数:

valid_records = filter records bycom.oserp.pigudf.IsValidTemperature(temperature);

dump valid_records;

看起来这个函数名太长,不便输入。我们可以用定义别名的方式代替:

define isValid com.oserp.pigudf.IsValidTemperature();

valid_records = filter records by isValid(temperature);

dump valid_records;

回到代码,我们可发现:

1)  需要定义一个继承自FilterFunc的类。

2)  重写这个类的exec方法。这个方法的参数只有一个tuple,但是调用时可以传递多个参数,你可以通过索引号获得对应的参数值,比如tuple.get(1)表示取第二个参数。

3)  调用时,需要使用类的全名。(当然你可以自定义别名)

4)  更多的验证需要读者自行在函数中添加,比如判断是否为null等等。

备注:用Eclipse编写Pig自定义函数时,你可能需要引用到一些Hadoop的库文件。比较容易的方式是在新建项目时指定项目类型为MapReduce项目,这样Eclipse就会自动设置库引用的相关信息。

自定义运算函数(Eval function)

仍然以前面的数据文件为例:

1990 21

1990 18

1991 21

1992 30

1992 999

1990 23

假设我们希望通过温度值获得一个温度的分类信息,比如我们把温度大于划分为以下类型:

温度                            分类

x>=30                          hot

x>=10 and x<30        moderate

x<10                                      cool

则我们可以定义以下函数,代码如下:

packagecom.oserp.pigudf;

importjava.io.IOException;

importorg.apache.pig.EvalFunc;

importorg.apache.pig.data.Tuple;

public class GetClassification extends EvalFunc<String> {

@Override

public String exec(Tuple tuple) throws IOException {

Object object = tuple.get(0);

int temperature = (Integer)object;

if (temperature >= 30){

return "Hot";

}

else if(temperature >=10){

return "Moderate";

}

else {

return "Cool";

}

}

}

依次输入以下Pig语句:

records = load‘hdfs://localhost:9000/input/temperature1.txt‘ as (year: chararray,temperature:int);

register /home/user/hadoop_jar/pigudf.jar;

valid_records = filter records bycom.oserp.pigudf.IsValidTemperature(temperature);

result = foreach valid_records generateyear,com.oserp.pigudf.GetClassification(temperature);

dump result;

输出结果如下:

(1990,Moderate)

(1990,Moderate)

(1991,Moderate)

(1992,Hot)

(1990,Moderate)

代码比较简单,该类继承自EvalFunc类,且我们要明确定义返回值类型。

有些时候其它类库可能包含有功能相近的Java函数,我们是否可以直接将这些库函数拿过来使用呢?可以。以下语句调用了trim函数,用于去掉name字段前后的空格:

DEFINE trim InvokeForString(‘org.apache.commons.lang.StringUtils.trim‘,‘String‘);

B = FOREACH A GENERATE trim(name);

其中的InvokeForString是一个Invoker(不知道该如何翻译啊),其通过反射机制调用,返回值是String类型。其它类似的还有InvokeForInt,InvokeForLong, InvokeForDouble,InvokeForFloat等等。

自定义加载函数

我们以词频统计为例,讲解如何自定义加载函数。(统计各个单词出现的频率,由高到低排序)

一般情况下,load语句加载数据时,一行会被生成一个tuple。而统计词频时,我们希望每个单词生成一个tuple。我们的测试数据文件只有两行数据,如下:

Thisis a map a reduce program

mapreduce partition combiner

我们希望load后能得到如下形式的数据,每个单词一个tuple:

(This)

(is)

(a)

(map)

(a)

(reduce)

 

先看代码:

package com.oserp.pigudf;

import java.io.IOException;

import java.util.ArrayList;

import java.util.List;

import org.apache.hadoop.io.Text;

importorg.apache.hadoop.mapreduce.InputFormat;

import org.apache.hadoop.mapreduce.Job;

importorg.apache.hadoop.mapreduce.RecordReader;

importorg.apache.hadoop.mapreduce.lib.input.FileInputFormat;

importorg.apache.hadoop.mapreduce.lib.input.TextInputFormat;

import org.apache.pig.LoadFunc;

importorg.apache.pig.backend.executionengine.ExecException;

importorg.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;

import org.apache.pig.data.BagFactory;

import org.apache.pig.data.DataBag;

import org.apache.pig.data.Tuple;

import org.apache.pig.data.TupleFactory;

public class WordCountLoadFunc extends LoadFunc {

private RecordReader reader;

TupleFactorytupleFactory = TupleFactory.getInstance();

BagFactorybagFactory = BagFactory.getInstance();

@Override

public InputFormatgetInputFormat() throws IOException {

return new TextInputFormat();

}

@Override

public Tuple getNext() throws IOException {

try {

// 当读取到分区数据块的末尾时,返回null表示数据已读取完

if (!reader.nextKeyValue()){

return null;

}

Textvalue = (Text)reader.getCurrentValue();

Stringline = value.toString();

String[]words =  line.split("\\s+"); // 断词

// 因为getNext函数只能返回一个tuple,

// 而我们希望每个单词一个单独的tuple,

// 所以我们将多个tuple放到一个bag里面,

// 然后返回一个包含一个bag的tuple。

// 注:这只是一个用于演示用法的示例,实际中这样使用不一定合理。

List<Tuple>tuples = new ArrayList<Tuple>();

Tupletuple = null;

for (String word : words) {

tuple= tupleFactory.newTuple();

tuple.append(word);

tuples.add(tuple);

}

DataBagbag = bagFactory.newDefaultBag(tuples);

Tupleresult = tupleFactory.newTuple(bag);

return result;

}

catch (InterruptedException e) {

throw new ExecException(e);

}

}

@Override

public void prepareToRead(RecordReader reader,PigSplit arg1)

throws IOException {

this.reader = reader;

}

@Override

public void setLocation(String location, Job job) throws IOException {

FileInputFormat.setInputPaths(job,location);

}

}

依次执行以下命令:

1)       records= load ‘hdfs://localhost:9000/input/sample_small.txt‘ usingcom.oserp.pigudf.WordCountLoadFunc() as (words:bag{word:(w:chararray)});

2)       flatten_records= foreach records generate flatten($0);

3)       grouped_records= group flatten_records by words::w;

4)       result= foreach grouped_records generate group,COUNT(flatten_records);

5)       final_result= order result by $1 desc,$0;

6)       dumpfinal_result;

显示结果如下:

(a,2)

(map,2)

(reduce,2)

(This,1)

(combiner,1)

(is,1)

(partition,1)

(program,1)

注意schema的定义格式:(words:bag{word:(w:chararray)})

时间: 2024-10-06 12:18:27

Pig用户自定义函数(UDF)的相关文章

Pig用户自定义函数(UDF)转

原文地址:http://blog.csdn.net/zythy/article/details/18326693 我们以气温统计和词频统计为例,讲解以下三种用户自定义函数. 用户自定义函数 什么时候需要用户自定义函数呢?和其它语言一样,当你希望简化程序结构或者需要重用程序代码时,函数就是你不二选择. Pig的用户自定义函数可以用Java编写,但是也可以用Python或Javascript编写.我们接下来以Java为例. 自定义过滤函数 我们仍然以先前的代码为例: records = load '

详解Spark sql用户自定义函数:UDF与UDAF

UDAF = USER DEFINED AGGREGATION FUNCTION Spark sql提供了丰富的内置函数供猿友们使用,辣为何还要用户自定义函数呢?实际的业务场景可能很复杂,内置函数hold不住,所以Spark sql提供了可扩展的内置函数接口:哥们,你的业务太变态了,我满足不了你,自己按照我的规范去定义一个sql函数,该怎么折腾就怎么折腾! 例如,MySQL数据库中有一张task表,共两个字段taskid (任务ID)与taskParam(JSON格式的任务请求参数).简单起见,

SQL Server UDF用户自定义函数

UDF的定义 和存储过程很相似,用户自定义函数也是一组有序的T-SQL语句,UDF被预先优化和编译并且尅作为一个单元爱进行调用.UDF和存储过程的主要区别在于返回结果的方式. 使用UDF时可传入参数,但不可传出参数.输出参数的概念被更为健壮的返回值取代了.和系统函数一样,可以返回标量值,这个值的好处是它并不像在存储过程中那样只限于整形数据类型,而是可以返回大多数SQL Server数据类型. UDF有以下两种类型: 返回标量值的UDF. 返回表的UDF. 创建语法: CREATE FUNCTIO

Hive的UDF(用户自定义函数)开发

当 Hive 提供的内置函数无法满足你的业务处理需要时,此时就可以考虑使用用户自定义函数(UDF:user-defined function). 测试各种内置函数的快捷方法: 创建一个 dual 表 create table dual(id string); load 一个文件(只有一行内容:内容为一个空格)到 dual 表 新建 JAVA maven 项目 添加依赖 <dependencies> <dependency> <groupId>org.apache.hiv

SQL Server用户自定义函数(UDF)

一.UDF的定义 和存储过程很相似,用户自定义函数也是一组有序的T-SQL语句,UDF被预先优化和编译并且可以作为一个单元来进行调用.UDF和存储过程的主要区别在于返回结果的方式. 使用UDF时可传入参数,但不可传出参数.输出参数的概念被更为健壮的返回值取代了. 和系统函数一样,可以返回标量值,这个值的好处是它并不像在存储过程中那样只限于整形数据类型,而是可以返回大多数SQL Server数据类型. UDF有以下两种类型: 返回标量值的UDF. 返回表的UDF. 创建语法: CREATE FUN

[Hive]Hive自定义函数UDF

当Hive提供的内置函数无法满足你的业务处理需要时,此时就可以考虑使用用户自定义函数 用户自定义函数(user defined function),针对单条记录. 编写一个UDF,需要继承UDF类,并实现evaluate()函数.在查询执行过程中,查询中对应的每个应用到这个函数的地方都会对这个类进行实例化.对于每行输入都会调用到evaluate()函数.而evaluate()函数处理的值会返回给Hive.同时用户是可以重载evaluate方法的.Hive会像Java的方法重载一样,自动选择匹配的

Hive自定义函数(UDF、UDAF)

当Hive提供的内置函数无法满足你的业务处理需要时,此时就可以考虑使用用户自定义函数. UDF 用户自定义函数(user defined function)–针对单条记录. 创建函数流程 1.自定义一个Java类 2.继承UDF类 3.重写evaluate方法 4.打成jar包 6.在hive执行add jar方法 7.在hive执行创建模板函数 8.hql中使用 Demo01: 自定义一个Java类 package UDFDemo; import org.apache.hadoop.hive.

hive自定义函数UDF UDTF UDAF

Hive 自定义函数 UDF UDTF UDAF 1.UDF:用户定义(普通)函数,只对单行数值产生作用: UDF只能实现一进一出的操作. 定义udf 计算两个数最小值 public class Min extends UDF { public Double evaluate(Double a, Double b) { if (a == null) a = 0.0; if (b == null) b = 0.0; if (a >= b) { return b; } else { return a

SQL之用户自定义函数

关于SQL Server用户自定义的函数,有标量函数.表值函数(内联表值函数.多语句表值函数)两种. 题外话,可能有部分朋友不知道SQL Serve用户自定义的函数应该是写在哪里,这里简单提示一下,在Microsoft SQL Server Managerment Studio里面,展开具体需要创建SQL Server用户自定义函数的数据库(即每个用户自定义函数只针对具体的一个数据库有用),然后找到可编程性选项,再展开找到函数选项,在具体的函数选项里面可参照下图的方式鼠标右键选择来添加. 标量函