Spark(Hive) SQL中UDF的使用(Python)

相对于使用MapReduce或者Spark Application的方式进行数据分析,使用Hive SQL或Spark SQL能为我们省去不少的代码工作量,而Hive SQL或Spark SQL本身内置的各类UDF也为我们的数据处理提供了不少便利的工具,当这些内置的UDF不能满足于我们的需要时,Hive SQL或Spark SQL还为我们提供了自定义UDF的相关接口,方便我们根据自己的需求进行扩展。

在Hive的世界里使用自定义UDF的过程是比较复杂的。我们需要根据需求使用Java语言开发相应的UDF(UDAF、UDTF),然后将UDF的代码及其依赖编译打包为Jar,使用方法有两种:

(1)临时函数

在一次会话(Session)中使用如下语句创建临时函数:

ADD JAR /run/jar/udf_test.jar;

CREATE TEMPORARY FUNCTION my_add AS ‘com.hive.udf.Add‘;

这种方式有一个缺点:每一次会话过程中使用函数时都需要创建,而且仅在当前会话中有效。

(2)永久函数

这个特性需要高版本的Hive支持,它的好处是可以将UDF Jar存放至HDFS,函数仅需要创建一次即可以永久使用,如下:

CREATE FUNCTION func.ipToLocationBySina AS ‘com.sina.dip.hive.function.IPToLocationBySina‘ USING JAR ‘hdfs://dip.cdh5.dev:8020/user/hdfs/func/location.jar‘;

虽然永久函数相对于临时函数有一定优势,但Java语言的开发门槛很大程度上妨碍了UDF在实际数据分析过程中使用,毕竟我们的数据分析师多数是以Python、SQL为主要分析工具的,每一次UDF的开发都需要工程师的参与,开发效率与应用效果都是不是很好(可能需要频繁更新UDF的问题),PySpark的出现确很好地解决了这个问题:它可以非常方便地将一个普通的Python函数注册为一个UDF。

为了说明如何在Spark(Hive) SQL中的使用Python UDF,我们首先模拟一张数据表,为了简单起见,该表仅有一行一列数据:

我们模拟了一张数据表temp_table,该表仅有一列,其中列名称为col,列类型为字符串且不允许包含Null,输出结果:

我们在表temp_table的基础之上演示UDF的使用方法:

首先我们定义一个普通的Python函数:func_string,为了简单起见它没有任何参数,仅仅返回一个简单的字符串;

然后我们通过HiveContext registerFunction即可以将函数func_string注册为UDF,registerFunction接收两个参数:UDF名称、UDF关联的Python函数;

最后我们可以在Spark(Hive) SQL中使用这个UDF,输出结果:

我们需要注意的是,HiveContext registerFunction实际上有三个参数:

name:UDF名称;

f:UDF关联的Python函数;

returnType:UDF(Python函数)返回值类型,默认为StringType()。

上述示例中因为我们的UDF函数的返回值类型为字符串,因此使用Hive registerFunction注册UDF时省略了参数returnType,即returnType默认值为StringType(),如果UDF(Python函数)的返回值类型不为字符串,则需要显式为其指定returnType。

我们以类型IntegerType、ArrayType、StructType、MapType为例演示需要显式指定returnType的情况。

(1)IntegerType

(2)ArrayType

注意:ArrayType(数组)必须确保元素类型的一致性,如指定UDF返回值类型为ArrayType(IntegerType()),则函数func_array的返回值类型必须为list或tuple,其中的元素类型必须为int。

(3)StructType

注意:StructType必须确保函数的返回值类型为tuple,而且使用HiveContext registerFunction注册UDF时需要依次为其中的元素指定名称各类型,如上述示例中每一个元素的名称为first,类型为IntegerType;第二个元素的名称为second,类型为FloatType;第三个元素的名称为third,类型为StringType。

(4)MapType

注意:MapType必须确保函数的返回值类型为dict,而且所有的“key”应保持类型一致,“value”也就保持类型一致。

时间: 2024-10-10 04:19:36

Spark(Hive) SQL中UDF的使用(Python)的相关文章

Spark(Hive) SQL中UDF的使用(Python)【转】

相对于使用MapReduce或者Spark Application的方式进行数据分析,使用Hive SQL或Spark SQL能为我们省去不少的代码工作量,而Hive SQL或Spark SQL本身内置的各类UDF也为我们的数据处理提供了不少便利的工具,当这些内置的UDF不能满足于我们的需要时,Hive SQL或Spark SQL还为我们提供了自定义UDF的相关接口,方便我们根据自己的需求进行扩展. 在Hive的世界里使用自定义UDF的过程是比较复杂的.我们需要根据需求使用Java语言开发相应的

Spark(Hive) SQL数据类型使用详解(Python)

Spark SQL使用时需要有若干“表”的存在,这些“表”可以来自于Hive,也可以来自“临时表”.如果“表”来自于Hive,它的模式(列名.列类型等)在创建时已经确定,一般情况下我们直接通过Spark SQL分析表中的数据即可:如果“表”来自“临时表”,我们就需要考虑两个问题: (1)“临时表”的数据是哪来的? (2)“临时表”的模式是什么? 通过Spark的官方文档可以了解到,生成一张“临时表”需要两个要素: (1)关联着数据的RDD: (2)数据模式: 也就是说,我们需要将数据模式应用于关

Hive SQL的编译过程

Hive是基于Hadoop的一个数据仓库系统,在各大公司都有广泛的应用.美团数据仓库也是基于Hive搭建,每天执行近万次的Hive ETL计算流程,负责每天数百GB的数据存储和分析.Hive的稳定性和性能对我们的数据分析非常关键. 在几次升级Hive的过程中,我们遇到了一些大大小小的问题.通过向社区的咨询和自己的努力,在解决这些问题的同时我们对Hive将SQL编译为MapReduce的过程有了比较深入的理解.对这一过程的理解不仅帮助我们解决了一些Hive的bug,也有利于我们优化Hive SQL

Hive SQL 编译过程

转自:http://www.open-open.com/lib/view/open1400644430159.html Hive跟Impala貌似都是公司或者研究所常用的系统,前者更稳定点,实现方式是MapReduce,因为用Hue的时候,在groupby中文的时候,出现了点问题,并且看到写很长的SQL语句,经常会看到起很多个Job,因此想了解下Hive怎么将SQL转化成MapReduce的Job.以后写SQL的时候,大概就了解怎么去做优化了.下面是看到的一片优秀的文章(美团的技术博客),我粘过

(转)Hive SQL的编译过程

本文来着美团 :http://tech.meituan.com/hive-sql-to-mapreduce.html Hive是基于Hadoop的一个数据仓库系统,在各大公司都有广泛的应用.美团数据仓库也是基于Hive搭建,每天执行近万次的Hive ETL计算流程,负责每天数百GB的数据存储和分析.Hive的稳定性和性能对我们的数据分析非常关键. 在几次升级Hive的过程中,我们遇到了一些大大小小的问题.通过向社区的咨询和自己的努力,在解决这些问题的同时我们对Hive将SQL编译为MapRedu

shell 脚本运行 hive sql

#!/b START=$(date +%s); datebegin=`date -d "$1" "+%Y%m%d"` dateend=`date -d "$2" "+%Y%m%d"` sdate=`date -d "$datebegin -1 days" "+%Y%m%d"` while [ "$datebegin" -le "$dateend"

使用spark对hive表中的多列数据判重

本文处理的场景如下,hive表中的数据,对其中的多列进行判重deduplicate. 1.先解决依赖,spark相关的所有包,pom.xml spark-hive是我们进行hive表spark处理的关键. <dependencies> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.10</artifactId> <version

Spark SQL中的broadcast join分析

在Spark-1.6.2中,执行相同join查询语句,broadcast join模式下,DAG和执行时间如下图所示: 1.broadcast join (1)DAG (2)执行时间 122 rows selected (22.709 seconds) 2.非broadcast join (1)DAG (2)执行时间 122 rows selected (55.512 seconds) 对于broadcast join模式,会将小于spark.sql.autoBroadcastJoinThres

0011-如何在Hive &amp; Impala中使用UDF

温馨提示:要看高清无码套图,请使用手机打开并单击图片放大查看. 1.文档编写目的 本文档讲述如何开发Hive自定义函数(UDF),以及如何在Impala中使用Hive的自定义函数,通过本文档,您将学习到以下知识: 1.如何使用Java开发Hive的自定义函数 2.如何在Hive中创建自定义函数及使用 3.如何在Impala中使用Hive的自定义函数 这篇文档将重点介绍UDF在Hive和Impala的使用,并基于以下假设: 1.集群环境正常运行 2.集群安装Hive和Impala服务 以下是本次测