spark udf 初识初用

直接上代码,详见注释

import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.{SparkContext, SparkConf}

/**
 * Created by zxh on 2016/6/10.
 */
object UDF_test {
  def main(args: Array[String]): Unit = {

    val conf = new SparkConf()
    implicit val sc = new SparkContext(conf)
    implicit val sqlContext = new HiveContext(sc)

    import sqlContext.implicits._

    val data = sc.parallelize(Seq(("a", 1), ("bb", 5), ("cccc", 10), ("dddddd", 15))).toDF("a", "b")
    data.registerTempTable("data")

    {
      //函数体采用原生类型(非Column类型),使用udf包装函数体,将函数体注册到sqlContext.udf
      import org.apache.spark.sql.functions._

      //函数体
      val filter_length_f = (str: String, _length: Int) => {
        str.length > _length;
      }

      //注册函数体到当前sqlContext,注意,注册到sqlContext的函数体,参数不能为Column
      //注册后,可以在以下地方使用:1、df.selectExpr 2、df.filter ,3、将该df注册为temptable,之后在sql中使用
      sqlContext.udf.register("filter_length", filter_length_f)

      val filter_length = udf(filter_length_f) //为方便使用Column,我们对函数体进行包装,包装后的输入参数为Column

      data.select($"*", filter_length($"a", lit(2))).show //使用udf包装过的,必须传入Column,注意 lit(2)
      data.selectExpr("*", " filter_length(a,2) as ax").show //select 若写表达式调用函数,则需要使用selectExpr

      data.filter(filter_length($"a", lit(2))).show //同select
      data.filter("filter_length(a,2)").show //filter调用表达式,可以直接使用df.filter函数,

      sqlContext.sql("select *,filter_length(a,2) from data").show
      sqlContext.sql("select *,filter_length(a,2) from data where filter_length(a,2)").show
    }
    {
      //函数体使用Column类型,无法注册到sqlContext.udf
      //使用udf包装后,每列都必须输入column,能否我们自己定义呢,比如一个参数是Column,一个是其他类型
      import org.apache.spark.sql.functions._
      import org.apache.spark.sql.Column

      val filter_length_f2 = (str: Column, _length: Int) => {
        length(str) > _length
      }
      sqlContext.udf.register("filter_length", filter_length_f2) //todo:不好意思,这里注册不了,注册到sqlContext.udf的函数,入参不支持Column类型

      data.select($"*", filter_length_f2($"a", 2)).show //不用udf包装,我们就可以完全自定义,这时 length 就可以传入整型了
      data.selectExpr("*", " filter_length_f2(a,2) as ax").show //todo:不好意思,这里用不了了,

      data.filter(filter_length_f2($"a", 2)).show //同select
      data.filter("filter_length(a,2)").show //todo:不好意思,这里用不了了

    }
    //最后,我们写一个相对通用的吧
    {
      //定义两个函数体,入参一个使用column类型,一个使用原生类型,将原生类型函数注册到sqlContext.udf

      import org.apache.spark.sql.functions._
      import org.apache.spark.sql.Column

      //函数体
      val filter_length_f = (str: String, _length: Int) => {
        str.length > _length;
      }
      //主函数,下面df.select df.filter 等中使用
      val filter_length = (str: Column, _length: Int) => {
        length(str) > _length
      }
      //注册函数体到当前sqlContext,注意,注册到sqlContext的函数体,参数不能为Column
      //注册后,可以在以下地方使用:1、df.selectExpr 2、df.filter ,3、将该df注册为temptable,之后在sql中使用
      sqlContext.udf.register("filter_length", filter_length_f)

      //这里我们不使用udf了,直接使用自己定义的支持Column的函数
      //val filter_length = udf(filter_length_f) //为方便使用Column,我们对函数体进行包装,包装后的输入参数为Column

      data.select($"*", filter_length($"a", 2)).show //使用udf包装过的,必须传入Column,注意 lit(2)
      data.selectExpr("*", " filter_length(a,2) as ax").show //select 若写表达式调用函数,则需要使用selectExpr

      data.filter(filter_length($"a", 2)).show //同select
      data.filter("filter_length(a,2)").show //filter调用表达式,可以直接使用df.filter函数,

      sqlContext.sql("select *,filter_length(a,2) from data").show
      sqlContext.sql("select *,filter_length(a,2) from data where filter_length(a,2)").show
    }

  }

}
时间: 2024-08-07 12:27:18

spark udf 初识初用的相关文章

spark2.1注册内部函数spark.udf.register("xx", xxx _),运行时抛出异常:Task not serializable

函数代码: class MySparkJob{ def entry(spark:SparkSession):Unit={ def getInnerRsrp(outer_rsrp: Double, wear_loss: Double, path_loss: Double): Double = { val innerRsrp: Double = outer_rsrp - wear_loss - (XX) * path_loss innerRsrp } spark.udf.register("getX

Spark UDF Java 示例

Spark UDF Java 示例 在这篇文章中提到了用Spark做用户昵称文本聚类分析,聚类需要选定K个中心点,然后迭代计算其他样本点到中心点的距离.由于中文文字分词之后(n-gram)再加上昵称允许各个特殊字符(数字.字母.各种符号--),如果直接在原来的文本数据上进行聚类,由于文本的"多样性",聚类效果并不一定好.因此准确对昵称先进行一个预分类的过程,这里的分类不是机器学习里面的分类算法(逻辑回归.线性回归),而是根据昵称文本的特征进行分类:给定一个文本昵称字符串,分类方法逐个地

在spark udf中读取hdfs上的文件

某些场景下,我们在写UDF实现业务逻辑时候,可能需要去读取某个配置文件. 大多时候我们都会将此文件上传个hdfs某个路径下,然后通过hdfs api读取该文件,但是需要注意: UDF中读取文件部分最好放在静态代码块中(只会在类加载时候读取一次),尤其在处理的数据量比较大的时候,否则会反反复复的读取,造成不必要的开销,甚至任务失败,示例代码如下: package cn.com.dtmobile.udf; import java.util.HashMap; import org.apache.spa

MySQL数据库初识——初窥MySQL

初步了解MySQL基本数据库语言 1.创建一个Mysql数据库 create database  database_name: 2.显示所有的Mysql数据库 show databases: 3.使用一个MySQL数据库 use database_name: 4.删除一个MySQL数据库 drop database database_name: 使用DOS命令登录mysql软件,首先要配置好环境变量 PATH : C:\Program Files\MySQL\MySQL Server 8.0\b

第八篇:Spark SQL Catalyst源码分析之UDF

/** Spark SQL源码分析系列文章*/ 在SQL的世界里,除了官方提供的常用的处理函数之外,一般都会提供可扩展的对外自定义函数接口,这已经成为一种事实的标准. 在前面Spark SQL源码分析之核心流程一文中,已经介绍了Spark SQL Catalyst Analyzer的作用,其中包含了ResolveFunctions这个解析函数的功能.但是随着Spark1.1版本的发布,Spark SQL的代码有很多新完善和新功能了,和我先前基于1.0的源码分析多少有些不同,比如支持UDF: sp

Spark SQL Catalyst源码分析之UDF

/** Spark SQL源码分析系列文章*/ 在SQL的世界里,除了官方提供的常用的处理函数之外,一般都会提供可扩展的对外自定义函数接口,这已经成为一种事实的标准. 在前面Spark SQL源码分析之核心流程一文中,已经介绍了Spark SQL Catalyst Analyzer的作用,其中包含了ResolveFunctions这个解析函数的功能.但是随着Spark1.1版本的发布,Spark SQL的代码有很多新完善和新功能了,和我先前基于1.0的源码分析多少有些不同,比如支持UDF: sp

Spark之UDF

1 package big.data.analyse.udfudaf 2 3 import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} 4 import org.apache.spark.sql.{Row, SparkSession} 5 6 /** 7 * Created by zhen on 2018/11/25. 8 */ 9 object SparkUdfUdaf { 10 d

在Apache Spark中使用UDF

用户自定义函数(UDF)是大多数SQL环境的一个关键特性,其主要用于扩展系统的内置功能.UDF允许开发人员通过抽象其低级语言实现在更高级语言(如SQL)中应用的新函数.Apache Spark也不例外,其为UDF与Spark SQL工作流集成提供了各种选项. 在本篇博文中,我们将回顾Python.Java和Scala上的Apache Spark UDF和UDAF(用户自定义的聚合函数)实现的简单示例.我们还将讨论重要的UDF API功能和集成点,包括各发行版本之间的当前可用性.总而言之,我们将介

Spark 自定义函数(udf,udaf)

Spark 版本 2.3 文中测试数据(json) {"name":"lillcol", "age":24,"ip":"192.168.0.8"} {"name":"adson", "age":100,"ip":"192.168.255.1"} {"name":"wuli&quo