Pig系统分析(8)-Pig可扩展性

本文是Pig系统分析系列中的最后一篇了,主要讨论怎样扩展Pig功能。不仅介绍Pig本身提供的UDFs扩展机制,还从架构上探讨Pig扩展可能性。

补充说明:前些天同事发现twitter推动的Pig On Spark项目:Spork,准备研究下。

UDFs

通过UDFs(用户自己定义函数),能够自己定义数据处理方法,扩展Pig功能。实际上,UDFS除了使用之前须要register/define外。和内置函数没什么不同。

主要的EvalFunc

以内置的ABS函数为例:

public class ABS extends EvalFunc<Double>{
    /**
     * java level API
     * @param input expectsa single numeric value
     * @return output returns a single numeric value, absolute value of the argument
     */
    public Double exec(Tuple input) throws IOException {
        if (input == null || input.size() == 0)
            return null;

        Double d;
        try{
            d = DataType.toDouble(input.get(0));
        } catch (NumberFormatException nfe){
            System.err.println("Failed to process input; error -" + nfe.getMessage());
            return null;
        } catch (Exception e){
            throw new IOException("Caught exception processing input row", e);
        }
        return Math.abs(d);
    }
    ……
    public Schema outputSchema(Schema input) ;
    public List<FuncSpec> getArgToFuncMapping() throws FrontendException;

}
  1. 函数都继承EvalFunc接口,泛型參数Double代表返回类型。
  2. exec方法:输入參数类型为元组,代表一行记录。
  3. outputSchema方法:用于处理输入和输出Schema
  4. getArgToFuncMapping:用于支持各种数据类型重载。

聚合函数

EvalFuc方法也能实现聚合函数,这是由于group操作对每一个分组都返回一条记录,每组中包括一个Bag,所以exec方法中迭代处理Bag中记录就可以。

以Count函数为例:

public Long exec(Tuple input) throws IOException {
    try {
        DataBag bag = (DataBag)input.get(0);
        if(bag==null)
            return null;
        Iterator it = bag.iterator();
        long cnt = 0;
        while (it.hasNext()){
            Tuple t = (Tuple)it.next();
            if (t != null && t.size() > 0 && t.get(0) != null )
                cnt++;
        }
        return cnt;
    } catch (ExecException ee) {
        throw ee;
    } catch (Exception e) {
        int errCode = 2106;
        String msg = "Error while computing count in " + this.getClass().getSimpleName();
        throw new ExecException(msg, errCode, PigException.BUG, e);
    }
}

Algebraic 和Accumulator 接口

如前所述,具备algebraic性质的聚合函数在Map-Reduce过程中能被Combiner优化。直观来理解,具备algebraic性质的函数处理过程能被分为三部分:initial(初始化,处理部分输入数据)、intermediate(中间过程,处理初始化过程的结果)和final(收尾,处理中间过程的结果)。

比方COUNT函数,初始化过程为count计数操作。中间过程和收尾为sum求和操作。更进一步。假设函数在这三个阶段中都能进行同样的操作,那么函数具备distributive性质。比方SUM函数。

Pig提供了Algebraic 接口:

public interface Algebraic{
    /**
     * Get the initial function.
     * @return A function name of f_init. f_init shouldbe an eval func.
     * The return type off_init.exec() has to be Tuple
     */
    public String getInitial();

    /**
     * Get the intermediatefunction.
     * @return A function name of f_intermed. f_intermedshould be an eval func.
     * The return type off_intermed.exec() has to be Tuple
     */
    public String getIntermed();

    /**
     * Get the final function.
     * @return A function name of f_final. f_final shouldbe an eval func parametrized by
     * the same datum as the evalfunc implementing this interface.
     */
    public String getFinal();
}

当中每一个方法都返回EvalFunc实现类的名称。

继续以COUNT函数为例,COUNT实现了Algebraic接口。针对下面语句:

input= load ‘data‘ as (x, y);
grpd= group input by x;
cnt= foreach grpd generate group, COUNT(input);
storecnt into ‘result‘;

Pig会重写MR运行计划:

Map
load,foreach(group,COUNT.Initial)
Combine
foreach(group,COUNT.Intermediate)
Reduce
foreach(group,COUNT.Final),store

Algebraic 接口通过Combiner优化降低传输数据量,而Accumulator接口则关注的是内存使用量。UDF实现Accumulator接口后,Pig保证全部key相同的数据(通过Shuffle)以增量的形式传递给UDF(默认pig.accumulative.batchsize=20000)。相同。COUNT也实现了Accumulator接口。

/* Accumulator interface implementation */
    private long intermediateCount = 0L;
    @Override
    public void accumulate(Tuple b) throws IOException {
       try {
           DataBag bag = (DataBag)b.get(0);
           Iterator it = bag.iterator();
           while (it.hasNext()){
                Tuple t = (Tuple)it.next();
                if (t != null && t.size() > 0 && t.get(0) != null) {
                    intermediateCount += 1;
                }
            }
       } catch (ExecException ee) {
           throw ee;
       } catch (Exception e) {
           int errCode = 2106;
           String msg = "Error while computing min in " + this.getClass().getSimpleName();
           throw new ExecException(msg, errCode, PigException.BUG, e);
       }
    }

    @Override
    public void cleanup() {
       intermediateCount = 0L;
    }
    @Override
    /*
    *当前key都被处理完之后被调用
    */
    public Long getValue() {
       return intermediateCount;
    }

前后端数据传递

通过UDFs构造函数传递数据是最简单的方法。然后通过define语句定义UDF实例时指定构造方法參数。但有些情况下。比方数据在执行期才产生,或者数据不能用String格式表达,这时候就得使用UDFContext了。

UDF通过getUDFContext方法获取保存在ThreadLoacl中的UDFContext实例。

UDFContext包括下面信息:

  1. jconf:Hadoop Configuration。
  2. clientSysProps:系统属性。
  3. HashMap<UDFContextKey,Properties> udfConfs:用户自己保存的属性,当中UDFContextKey由UDF类名生成。

UDFs运行流程

Pig架构可扩展性

Pig哲学之三——Pigs Live Anywhere。

理论上。Pig并不被限定执行在Hadoop框架上,有几个能够參考的实现和提议。

  1. Pigen。Pig on Tez。https://github.com/achalsoni81/pigeon,架构图例如以下:
  2. Pig的后端抽象层:https://wiki.apache.org/pig/PigAbstractionLayer

    眼下已经实现了PigLatin执行在Galago上。

    http://www.galagosearch.org/

參考资料

Pig官网:http://pig.apache.org/

Pig paper at SIGMOD 2008:Building a High Level DataflowSystem on top of MapReduce:The Pig Experience

Programming.Pig:Dataflow.Scripting.with.Hadoop(2011.9).Alan.Gates

时间: 2024-12-08 11:33:27

Pig系统分析(8)-Pig可扩展性的相关文章

Pig系统分析(7)-Pig实用工具类

Explain Explain是Pig提供的调试工具,使用explain可以输出Pig Lation的执行计划.值得一提的是,explain支持-dot选项,将执行计划以DOT格式输出, (DOT是一种图形描述语言,请参考http://zh.wikipedia.org/zh/DOT%E8%AF%AD%E8%A8%80) 代码实现详见org.apache.pig.impl.plan.DotPlanDumper,这部分实现为我们设计执行计划可视化提供了参考. 下图部分截取了使用Graphviz打开物

Pig系统分析(7)-Pig有用工具类

Explain Explain是Pig提供的调试工具,使用explain能够输出Pig Lation的运行计划. 值得一提的是,explain支持-dot选项,将运行计划以DOT格式输出, (DOT是一种图形描写叙述语言,请參考http://zh.wikipedia.org/zh/DOT%E8%AF%AD%E8%A8%80) 代码实现详见org.apache.pig.impl.plan.DotPlanDumper,这部分实现为我们设计运行计划可视化提供了參考. 下图部分截取了使用Graphviz

Pig系统分析(6)-从Physical Plan到MR Plan再到Hadoop Job

从Physical Plan到Map-Reduce Plan 注:因为我们重点关注的是Pig On Spark针对RDD的执行计划,所以Pig物理执行计划之后的后端参考意义不大,这些部分主要分析流程,忽略实现细节. 入口类MRCompiler,MRCompilier按照拓扑顺序遍历物理执行计划中的节点,将其转换为MROperator,每个MROperator都代表一个map-reduce job,整个完整的计划存储在MROperPlan类中.其中针对Load和Store操作会做以下特殊处理: S

Pig系统分析(5)-从Logical Plan到Physical Plan

Physical Plan生成过程 优化后的逻辑运行计划被LogToPhyTranslationVisitor处理,生成物理运行计划. 这是一个经典的Vistor设计模式应用场景. 当中,LogToPhyTranslationVisitor的visit()为入口方法,通过DependencyOrderWalker遍历处理逻辑运行计划中的每个LogicalRelationalOperator.DependencyOrderWalker依照依赖顺序遍历DAG中节点,保证当且仅当节点的全部前驱都被訪问

Hive集成HBase;安装pig

Hive集成HBase 配置 将hive的lib/中的HBase.jar包用实际安装的Hbase的jar包替换掉 cd /opt/hive/lib/ ls hbase-0.94.2* rm -rf hbase-0.92* cp /opt/hbase/hbase-0.94.2* 将Hive的lib/中的zookeeper.jar包用HBase中lib/中的替换掉 步骤同上 在hive-site.xml中添加: <property> <name>hive.aux.jars.path&l

Pig和Hive的对比

Pig Pig是一种编程语言,它简化了Hadoop常见的工作任务.Pig可加载数据.表达转换数据以及存储最终结果.Pig内置的操作使得半结构化数据变得有意义(如日志文件).同时Pig可扩展使用Java中添加的自定义数据类型并支持数据转换. Hive Hive在Hadoop中扮演数据仓库的角色.Hive添加数据的结构在HDFS(hive superimposes structure on data in HDFS),并允许使用类似于SQL语法进行数据查询.与Pig一样,Hive的核心功能是可扩展的

Pig安装及简单使用(pig版本0.13.0,Hadoop版本2.5.0)

原文地址:http://www.linuxidc.com/Linux/2014-03/99055.htm 我们用MapReduce进行数据分析.当业务比较复杂的时候,使用MapReduce将会是一个很复杂的事情,比如你需要对数据进行很多预处理或转换,以便能够适应MapReduce的处理模式,另一方面,编写MapReduce程序,发布及运行作业都将是一个比较耗时的事情. Pig的出现很好的弥补了这一不足.Pig能够让你专心于数据及业务本身,而不是纠结于数据的格式转换以及MapReduce程序的编写

大数据之pig 命令

1.pig与hive的区别 pig和hive比较类似的,都是类sql的语言,底层都是依赖于hadoop    走的mapreduce任务.    pig和hive的区别就是,想要实现一个业务逻辑的话,使用pig需要一步一步操作    而使用hive的话一条SQL就可以搞定.    如果想在很短时间内获取一个比较复杂的业务逻辑处理结果的话,建议使用pig.    如果需要定时执行的一些任务,建议使用hive. 2:pig和mapreduce对比 pig优点:针对一些基本的处理逻辑,已经做好了封装,

Pig安装与配置

---------------------------------- 一.前言 二.环境 三.配置 1.本地模式 2.MapReduce模式 四.测试 ---------------------------------- 一.前言 Pig是一个用来处理大规模数据集的平台,和Google的Sawzall类似,由Yahoo!贡献给Apache.MapReduce的查询矿街虽然主要是Map和Reduce两个函数,但是用户从编程写程序到在集群中部署.运行,仍然要花费不少时间.使用Pig可以简化MapRe