玩转大数据系列之Apache Pig如何通过自定义UDF查询数据库(五)

GMV(一定时间内的成交总额)是一个衡量电商网站营业收入的一项重要指标,例如淘宝,京东都有这样的衡量标准,感兴趣的朋友可以自己科普下这方面的概念知识。

当然散仙今天,并不是来解释概念的,而是记录下最近工作的一些东西,原来我们平台的GMV只有一个总的成交金额,并没有细分到各个系统的GMV的比重,比如搜索端,推荐端,移动端等等。

通过细粒度的分析各个系统所占的比重,对于指导各个系统完善和发展有一定的重要意义,这里不就深说了,下面先来看下散仙分析的搜索gmv的数据布局方式。

(1)Hadoop集群上,存储了一些非核心的数据,比如访问数据,点击数据,购物车数据,下单数据(这个是从数据库里每天同步到HDFS上的,算是备份吧)
(2)Oracle数据库中,存储了订单信息,交易信息,商品信息,支付信息等一些电商的核心数据

其实关于gmv的计算方式,在我们oracle库里,以及有一个存储过程封装了复杂的细节的处理,包括运费,折扣,不同国家,不同地域,信用用户,等等,在使用时候,只需要传入一个订单编号即可,计算出本单的gmv成交金额。

这样以来的,按照目前的数据情况,订单编号是从Hadoop集群上,一直是从搜索,点击,添加购物车,下单计算出来的,然后获取的对应的订单编号,注意这个过程中,是需要全程去爬虫数据的,因为还要算最终的GMV成交额,所以需要找到一定时期内的订单号,然后通过调用在oracle库的封装好的函数,计算出gmv,这样以来,就能够比较细跟踪各个阶段运行轨迹和成交额。

ok,业务上的分析大致如此,下面就看下,技术上如何实现,其实就是需要Pig的一个自定义UDF函数,在遍历每一行的recoder时,去查询oracle只读库,获取gmv的值,并将最终结果存储起来,以图形化方式展示。

Pig里面对UDF函数非常丰富,比较常用的是转化函数和加载存储函数,这一点在Hive里,也是如此,之前的文章中,散仙介绍过,通过自定义UDF将pig分析的结果直接存储到数据库或索引中,便于检索和发挥不同框架之间的组合优势。

核心代码如下:

Java代码  

package com.pig.dhgate.getgvmbyrfxno;

import java.io.IOException;

import org.apache.pig.EvalFunc;
import org.apache.pig.data.Tuple;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
 * 自定义Pig UDF实现查询db计算gmv
 * **/
public class GetGmvByRfxno extends EvalFunc<Double> {
	/**日志对象*/
	static Logger log =LoggerFactory.getLogger(GetGmvByRfxno.class);
	/**数据库工具类*/
	DBTools dbtools=new DBTools();

	@Override
	public Double exec(Tuple input) throws IOException {

		if(input!=null&&input.size()!=0){
			//获取传入的订单号
			String rfxno =(String)input.get(0);
			//通过db类,查询对应的gmv并返回
			double gmv=dbtools.getGmvByRfxno(rfxno);
			return gmv;
		}else{
			//对null,空值,一律按0处理
			return 0.00;
		}
	}
}

数据库封装类:

Java代码  

/***
 * 数据库工具类
 * */
public class DBTools {

	/**日志对象*/
	static Logger log =LoggerFactory.getLogger(DBTools.class);

	private  static  Connection conn;
	private  static PreparedStatement ps;
	private   ResultSet rs;
	//从虚拟表查询函数
	private static  String  sql="select datasql.GETGMV(?) as gmv  from dual ";
	static{
		try{
		Class.forName("oracle.jdbc.driver.OracleDriver");
		conn = DriverManager.getConnection("jdbc:oracle:thin:@ip地址:1521:数据库名", "用户名", "密码");
		System.out.println("数据库连接:"+conn);
		ps=conn.prepareStatement(sql);
		}catch(Exception e){
			log.error("初始化oracle驱动异常!", e);
		}
	}

	/**根据一个rfxno获取对应的产品的gmv
	 * **/
	public double getGmvByRfxno(String rfxno){
		try{
		ps.setString(1, rfxno);
		rs = ps.executeQuery();
		if(rs.next()){
			double gmv=rs.getDouble("gmv");
//			System.out.println("gmv是:  "+gmv);
			return gmv;
		}
		rs.close();
		}catch(Exception e){
			log.error("根据rfxno获取gmv出错!",e);
		}
		return 0.0;
	}
	}

其实,代码还是比较简单的,在这里,你可以从任何数据源获取需要的数据,而不仅仅是数据库,你也可以从redis,memcache,文件,xml,等等里获取需要组合用的数据。

遇到一个异常:在sql语句后面,不用加分号,类似下面的这样的语句,通过jdbc编译然后调用oracle是不通过的:

Sql代码  

  1. select datasql.GETGMV(?) as gmv  from dual;
select datasql.GETGMV(?) as gmv  from dual;

这一点需要注意下。

最后来看下如下在pig脚本里,使用自定义的函数:
(1)使用ant打包自定义的udf函数的jar
(2)在pig脚本里,注册相关的jar包,注意如果有依赖关系,依赖的jar包,也需要注册,例如本例中的oracle的jdbc的驱动包
(3)在对应的地方,通过类的全路径名,引用此函数,完成对应的查询转换,并将新得到的一个字段,作为原始一行记录的字段扩充。

脚本如下:

Java代码  

--注册依赖的jar包
register /home/search/dongliang/nsconvent/checklist/ojdbc.jar
register /home/search/dongliang/nsconvent/checklist/tools.jar

--加载原有数据
m = load ‘/tmp/mdm/VW_TD_RFX‘ using PigStorage(‘\\x07‘);
--加载原有数据
n = load ‘/tmp/mdm/TD_RFX_PRODUCT‘ using PigStorage(‘\\x07‘);

--过滤出符合时间的数据

m= filter m by ToMilliSeconds(ToDate($3,‘yyyy-MM-dd HH:mm:ss‘)) >= ToMilliSeconds(ToDate(‘$day 00:00:00‘,‘yyyy-MM-dd HH:mm:ss‘)) and ToMilliSeconds(ToDate($3
,‘yyyy-MM-dd HH:mm:ss‘)) <= ToMilliSeconds(ToDate(‘$day 23:59:59‘,‘yyyy-MM-dd HH:mm:ss‘))  ;

--提取相关字段,并完成计算
m = foreach m generate $0 as arfid, $1 as rfxno , com.pig.dhgate.getgvmbyrfxno.GetGmvByRfxno((chararray)$1) as gmv  , $4 as bid ;
--获取topN数据
m = limit m 10 ;
--打印输出
dump m;
时间: 2024-08-02 11:02:46

玩转大数据系列之Apache Pig如何通过自定义UDF查询数据库(五)的相关文章

玩转大数据系列之Apache Pig高级技能之函数编程(六)

原创不易,转载请务必注明,原创地址,谢谢配合! http://qindongliang.iteye.com/ Pig系列的学习文档,希望对大家有用,感谢关注散仙! Apache Pig的前世今生 Apache Pig如何自定义UDF函数? Apache Pig5行代码怎么实现Hadoop的WordCount? Apache Pig入门学习文档(一) Apache Pig学习笔记(二) Apache Pig学习笔记之内置函数(三) 玩转大数据系列之Apache Pig如何与Apache Lucen

玩转大数据系列之Apache Pig如何与Apache Solr集成(二)

散仙,在上篇文章中介绍了,如何使用Apache Pig与Lucene集成,还不知道的道友们,可以先看下上篇,熟悉下具体的流程. 在与Lucene集成过程中,我们发现最终还要把生成的Lucene索引,拷贝至本地磁盘,才能提供检索服务,这样以来,比较繁琐,而且有以下几个缺点: (一)在生成索引以及最终能提供正常的服务之前,索引经过多次落地操作,这无疑会给磁盘和网络IO,带来巨大影响 (二)Lucene的Field的配置与其UDF函数的代码耦合性过强,而且提供的配置也比较简单,不太容易满足,灵活多变的

玩转大数据系列之Apache Pig如何与MySQL集成(三)

上篇介绍了如何把Pig的结果存储到Solr中,那么可能就会有朋友问了,为什么不存到数据库呢? 不支持还是? 其实只要我们愿意,我们可以存储它的结果集到任何地方,只需要重写我们自己的StoreFunc类即可. 关于如何将Pig分析完的结果存储到数据库,在pig的piggy贡献组织里,已经有了对应的UDF了,piggybank是非apache官方提供的工具函数,里面的大部分的UDF都是,其他公司或着个人在后来使用时贡献的,这些工具类,虽然没有正式划入pig的源码包里,但是pig每次发行的时候,都会以

大数据系列(2)——Hadoop集群坏境CentOS安装

前言 前面我们主要分析了搭建Hadoop集群所需要准备的内容和一些提前规划好的项,本篇我们主要来分析如何安装CentOS操作系统,以及一些基础的设置,闲言少叙,我们进入本篇的正题. 技术准备 VMware虚拟机.CentOS 6.8 64 bit 安装流程 因为我的笔记本是Window7操作系统,然后内存配置,只有8G,内存配置太低了,当然为了演示,我会将Hadoop集群中的主节点分配2GB内存,然后剩余的三个节点都是1GB配置. 所有的节点存储我都设置为50GB. 在安装操作系统之前,我们需要

大数据系列之分布式数据库HBase-1.2.4+Zookeeper 安装及增删改查实践

之前介绍过关于HBase 0.9.8版本的部署及使用,本篇介绍下最新版本HBase1.2.4的部署及使用,有部分区别,详见如下: 1. 环境准备: 1.需要在Hadoop[hadoop-2.7.3] 启动正常情况下安装,hadoop安装可参考LZ的文章 大数据系列之Hadoop分布式集群部署 2. 资料包  zookeeper-3.4.9.tar.gz,hbase-1.2.4-bin.tar.gz 2. 安装步骤: 1.安装zookeeper 1.解压zookeeper-3.4.9.tar.gz

大数据系列之数据仓库Hive原理

Hive系列博文,持续更新~~~ 大数据系列之数据仓库Hive原理 大数据系列之数据仓库Hive安装 大数据系列之数据仓库Hive中分区Partition如何使用 大数据系列之数据仓库Hive命令使用及JDBC连接 Hive的工作原理简单来说就是一个查询引擎 先来一张Hive的架构图: Hive的工作原理如下: 接收到一个sql,后面做的事情包括:1.词法分析/语法分析 使用antlr将SQL语句解析成抽象语法树-AST2.语义分析 从Megastore获取模式信息,验证SQL语句中队表名,列名

大数据系列之分布式计算批处理引擎MapReduce实践

关于MR的工作原理不做过多叙述,本文将对MapReduce的实例WordCount(单词计数程序)做实践,从而理解MapReduce的工作机制. WordCount: 1.应用场景,在大量文件中存储了单词,单词之间用空格分隔 2.类似场景:搜索引擎中,统计最流行的N个搜索词,统计搜索词频率,帮助优化搜索词提示. 3.采用MapReduce执行过程如图 3.1MapReduce将作业的整个运行过程分为两个阶段 3.1.1Map阶段和Reduce阶段 Map阶段由一定数量的Map Task组成 输入

大数据系列之分布式计算批处理引擎MapReduce实践-排序

清明刚过,该来学习点新的知识点了. 上次说到关于MapReduce对于文本中词频的统计使用WordCount.如果还有同学不熟悉的可以参考博文大数据系列之分布式计算批处理引擎MapReduce实践. 博文发表后很多同学私下反映对于MapReduce的处理原理没有了解到.在这篇博文中楼主与大家交流下MapReduce的数据处理原理及MR中各角色的职责. 文末还有示例代码讲解.. 1.MapReduce中的数据流动 最简单的过程: map - reduce 定制了partitioner以将map的结

大数据系列之三:大数据体系架构的重要里程碑

欧凯惯例:引子 世界上唯一不变的就是变化,大数据的架构也不例外. 这次变化的推动者,多是一些大的商业公司! 首发地址 --- Teradata 美国天睿 Teradata这家公司其实挺陌生的,但这并不能让我们忽视其在大数据方面做出的贡献.简单一句描述这家公司的贡献就是: 2008年之前,这家公司以关系型为基础,硬刚大数据,之后意识到数据实在太大大复杂了,终究实现了对非关系型数据的支持. 具体它拿关系型作为对大数据的解决方案硬刚到什么程度呢?拿一个数据说来说明白了,直到2017年,它可以基于其关系