Spark之UDF

 1 package big.data.analyse.udfudaf
 2
 3 import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
 4 import org.apache.spark.sql.{Row, SparkSession}
 5
 6 /**
 7   * Created by zhen on 2018/11/25.
 8   */
 9 object SparkUdfUdaf {
10   def isAdult(age : Int) ={
11     if(age > 18){
12       true
13     }else{
14       false
15     }
16   }
17   def main(args: Array[String]) {
18     val spark = SparkSession
19       .builder()
20       .appName("UdfUdaf")
21       .master("local[2]")
22       .getOrCreate()
23     val userData = Array(
24       "2015,11,www.baidu.com",
25       "2016,14,www.google.com",
26       "2017,13,www.apache.com",
27       "2015,21,www.spark.com",
28       "2016,32,www.hadoop.com",
29       "2017,18,www.solr.com",
30       "2017,14,www.hive.com"
31     )
32     val sc = spark.sparkContext
33     val sqlContext = spark.sqlContext
34     val userDataRDD = sc.parallelize(userData) // 转化为RDD
35     val userDataType = userDataRDD.map(line => {
36         val Array(age, id, url) = line.split(",")
37         Row(
38           age, id.toInt, url
39         )
40       })
41     val structTypes = StructType(Array(
42       StructField("age", StringType, true),
43       StructField("id", IntegerType, true),
44       StructField("url", StringType, true)
45     ))
46     // RDD转化为DataFrame
47     val userDataFrame = sqlContext.createDataFrame(userDataType,structTypes)
48     // 注冊临时表
49     userDataFrame.createOrReplaceTempView("udf")
50     // 注册udf(方式一)
51     spark.udf.register("getLength", (str : String) => str.length)
52     // 注册udf(方式二)
53     spark.udf.register("isAdult", isAdult _)
54     //执行sql
55     val sql = "select * from udf where getLength(udf.url)=13 and isAdult(udf.id)"
56     val result = sqlContext.sql(sql)
57     result.foreach(println(_))
58   }
59 }

结果:

原文地址:https://www.cnblogs.com/yszd/p/10016235.html

时间: 2024-08-30 14:09:10

Spark之UDF的相关文章

Spark SQL UDF使用

Spark1.1推出了Uer Define Function功能,用户可以在Spark SQL 里自定义实际需要的UDF来处理数据. 因为目前Spark SQL本身支持的函数有限,一些常用的函数都没有,比如len, concat...etc 但是使用UDF来自己实现根据业务需要的功能是非常方便的. Spark SQL UDF其实是一个Scala函数,被catalyst封装成一个Expression结点,最后通过eval方法计根据当前Row计算UDF的结果,源码分析见:Spark SQL源码分析之

spark编写UDF和UDAF

UDF: 一.编写udf类,在其中定义udf函数 package spark._sql.UDF import org.apache.spark.sql.functions._ /** * AUTHOR Guozy * DATE 2019/7/18-9:41 **/ object udfs { def len(str: String): Int = str.length def ageThan(age: Int, small: Int): Boolean = age > small val age

Spark SQL UDF

目前 Spark SQL 不支持自定义UDF ,底层 SQL 引擎用的 catalyst . 在SqlContext 中 有一个 Analyzer给的一个EmptyFunctionRegistry ,如果 SQL 引擎函数中找不到了,会到这个FunctionRegistry 中找 EmptyFunctionRegistry 中lookup 只是抛出一个异常. 所以自定义了一个 FunctionRegistry ,SqlContext @transient protected[sql]lazyva

Spark SQL UDF示例

UDF即用户自定函数,注册之后,在sql语句中使用. 基于scala-sdk-2.10.7,Spark2.0.0. package UDF_UDAF import java.util import org.apache.spark.sql.{RowFactory, SparkSession} import org.apache.spark.SparkConf import org.apache.spark.sql.api.java.UDF1 import org.apache.spark.sql

spark自定义udf输入类型为array报错

定义udf如下 val list2string = udf { (style: Array[String], num: Array[Long]) => style.zip(num).map(t => t._1 + ":" + t._2).mkString("<br>") } 输入为两个数组,输出为string 报错如下 Caused by: java.lang.ClassCastException: scala.collection.muta

Homework 2: UDF Caching in Spark

为spark编写UDF cache: 作业介绍 https://github.com/cs186-spring15/course/tree/master/hw2 我花了点时间做了下,觉得是学习spark sql和scala的好材料.现在把我写的作业记录如下: Task #1: Implementing DiskPartition and GeneralDiskHashedRelation Task #2: Implementing object DiskHashedRelation DiskPa

王家林 大数据Spark超经典视频链接全集[转]

压缩过的大数据Spark蘑菇云行动前置课程视频百度云分享链接 链接:http://pan.baidu.com/s/1cFqjQu SCALA专辑 Scala深入浅出经典视频 链接:http://pan.baidu.com/s/1i4Gh3Xb 密码:25jc DT大数据梦工厂大数据spark蘑菇云Scala语言全集(持续更新中) http://www.tudou.com/plcover/rd3LTMjBpZA/ 1 Spark视频王家林第1课:大数据时代的“黄金”语言Scala 2 Spark视

课程路线

---恢复内容开始--- 云计算&大数据实战课程列表 first.课程说明: 本系列课程适合有一点编程基础的人员学习(比如java,python,c/c++),最好是java编程人员,特别是从事过j2ee开发的人员.学习完本套课程,可以帮助你成为大型项目架构师,特别是数据量大,并发量高的大型项目架构师,当然也能很大程度上提高你的薪资待遇. second.课程路线 third.Linux大纲 这章是基础课程,帮大家进入大数据领域打好Linux基础,以便更好地学习Hadoop,NoSQL,Oracl

Spark(Hive) SQL中UDF的使用(Python)

相对于使用MapReduce或者Spark Application的方式进行数据分析,使用Hive SQL或Spark SQL能为我们省去不少的代码工作量,而Hive SQL或Spark SQL本身内置的各类UDF也为我们的数据处理提供了不少便利的工具,当这些内置的UDF不能满足于我们的需要时,Hive SQL或Spark SQL还为我们提供了自定义UDF的相关接口,方便我们根据自己的需求进行扩展. 在Hive的世界里使用自定义UDF的过程是比较复杂的.我们需要根据需求使用Java语言开发相应的