spark2.1注册内部函数spark.udf.register("xx", xxx _),运行时抛出异常:Task not serializable

函数代码:

class MySparkJob{
    def entry(spark:SparkSession):Unit={
          def getInnerRsrp(outer_rsrp: Double, wear_loss: Double, path_loss: Double): Double = {
          val innerRsrp: Double = outer_rsrp - wear_loss - (XX) * path_loss

          innerRsrp
        }
        spark.udf.register("getXXX", getXXX _)

        import spark.sql
        sql(s"""|select getInnerRsrp(t10.outer_rsrp,t10.wear_loss,t10.path_loss) as rsrp, xx from yy""".stripMargin)
    }
}

使用spark-submit提交函数时,抛出异常:

User class threw exception: org.apache.spark.SparkException: Task not serializable 

org.apache.spark.SparkException: Task not serializable
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
    at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
    at org.apache.spark.SparkContext.clean(SparkContext.scala:2125)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:842)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:841)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
    at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:841)
    at com.dx.fpd_withscenetype.MySparkJob.entry(MySparkJob.scala:207)
    at com.dx.App$.main(App.scala:58)
    at com.dx.App.main(App.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:650)
Caused by: java.io.NotSerializableException: com.dx.fpd_withscenetype.MySparkJob
Serialization stack:
    - object not serializable (class: com.dx.fpd_withscenetype.MySparkJob, value: [email protected])
    - field (class: com.dx.fpd_withscenetype.MySparkJob$$anonfun$entry$1, name: $outer, type: class com.dx.fpd_withscenetype.MySparkJob)
    - object (class com.dx.fpd_withscenetype.MySparkJob$$anonfun$entry$1, <function2>)
    - field (class: org.apache.spark.sql.catalyst.expressions.ScalaUDF$$anonfun$3, name: func$3, type: interface scala.Function2)
    - object (class org.apache.spark.sql.catalyst.expressions.ScalaUDF$$anonfun$3, <function1>)
    - field (class: org.apache.spark.sql.catalyst.expressions.ScalaUDF, name: f, type: interface scala.Function1)
    - object (class org.apache.spark.sql.catalyst.expressions.ScalaUDF, UDF:getInnerRsrp(cast(input[1, double, true] as int), cast(input[12, double, true] as int), cast(input[13, double, true] as int)))
    - element of array (index: 1)
    - array (class [Ljava.lang.Object;, size 3)
    - field (class: org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10, name: references$1, type: class [Ljava.lang.Object;)
    - object (class org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10, <function2>)
    at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295)
    ... 147 more

解决方案:

把当前MySparkJob集成Serializable

class MySparkJob extends Serializable {
    xxx
}

原文地址:https://www.cnblogs.com/yy3b2007com/p/8570796.html

时间: 2024-08-04 20:55:11

spark2.1注册内部函数spark.udf.register("xx", xxx _),运行时抛出异常:Task not serializable的相关文章

Spark和Kafka在IDEA整合运行时提示&#39;&#39;com.fasterxml.jackson.databind.JsonMappingException: Incompatible Jackson version: 2.9.1&#39;&#39;

使用kafka和sparkstreaming整合时,IDEA运行程序报错"com.fasterxml.jackson.databind.JsonMappingException: Incompatible Jackson version: 2.9.1",初步看是版本不兼容,但是版本后面确认没有问题,参考博文,修改了依赖包的依赖顺序就解决了问题. 依赖包顺序 修改了依赖包顺序,前三个如图所示的顺序是ok的. ①spark安装包中的jar包 ②kafka安装包中的jar包 ③sparkst

Spark UDF Java 示例

Spark UDF Java 示例 在这篇文章中提到了用Spark做用户昵称文本聚类分析,聚类需要选定K个中心点,然后迭代计算其他样本点到中心点的距离.由于中文文字分词之后(n-gram)再加上昵称允许各个特殊字符(数字.字母.各种符号--),如果直接在原来的文本数据上进行聚类,由于文本的"多样性",聚类效果并不一定好.因此准确对昵称先进行一个预分类的过程,这里的分类不是机器学习里面的分类算法(逻辑回归.线性回归),而是根据昵称文本的特征进行分类:给定一个文本昵称字符串,分类方法逐个地

spark udf 初识初用

直接上代码,详见注释 import org.apache.spark.sql.hive.HiveContext import org.apache.spark.{SparkContext, SparkConf} /** * Created by zxh on 2016/6/10. */ object UDF_test { def main(args: Array[String]): Unit = { val conf = new SparkConf() implicit val sc = new

(版本定制)第5课:基于案例分析Spark Streaming流计算框架的运行源码

本期内容: 1.在线动态计算分类最热门商品案例回顾与演示 2.基于案例分析Spark Streaming的运行源码 第一部分案例: package com.dt.spark.sparkstreaming import com.robinspark.utils.ConnectionPoolimport org.apache.spark.SparkConfimport org.apache.spark.sql.Rowimport org.apache.spark.sql.hive.HiveConte

spark版本定制五:基于案例一节课贯通Spark Streaming流计算框架的运行源码

本期内容: 1.在线动态计算分类最热门商品案例回顾与演示 2.基于案例贯通Spark Streaming的运行源码 一.在线动态计算分类最热门商品案例回顾与演示 案例回顾: package com.dt.spark.sparkstreaming import com.robinspark.utils.ConnectionPool import org.apache.spark.SparkConf import org.apache.spark.sql.Row import org.apache.

Spark定制班第5课:基于案例一节课贯通Spark Streaming流计算框架的运行源码

本期内容: 1 在线动态计算分类最热门商品案例回顾与演示 2 基于案例贯通Spark Streaming的运行源码 1 在线动态计算分类最热门商品案例回顾与演示 我们用Spark Streaming+Spark SQL来实现分类最热门商品的在线动态计算.代码如下: package com.dt.spark.streaming import org.apache.spark.SparkConf import org.apache.spark.sql.Row import org.apache.sp

基于案例一节课贯通Spark Streaming流计算框架的运行源码

 在线动态计算分类最热门商品案例回顾与演示 基于案例贯通Spark Streaming的运行源码 使用Spark Streaming + Spark SQL来在线动态计算电商中不同类别中最热门的商品排名,例如手机这个类别下面最热门的三款手机. 是用mysql数据库作为元数据库,使用Hive作为存储引擎,使用Spark SQL作为查询引擎. 其中链接数据库代码如下: package com.dt.spark.com.dt.spark.streaming; import java.sql.Con

[Spark內核] 第42课:Spark Broadcast内幕解密:Broadcast运行机制彻底解密、Broadcast源码解析、Broadcast最佳实践

本课主题 Broadcast 运行原理图 Broadcast 源码解析 Broadcast 运行原理图 Broadcast 就是将数据从一个节点发送到其他的节点上; 例如 Driver 上有一张表,而 Executor 中的每个并行执行的Task (100万个Task) 都要查询这张表的话,那我们通过 Broadcast 的方式就只需要往每个Executor 把这张表发送一次就行了,Executor 中的每个运行的 Task 查询这张唯一的表,而不是每次执行的时候都从 Driver 中获得这张表

TaskScheduler内幕天机:Spark shell案例,TaskScheduler和SchedulerBackend、FIFO与FAIR、Task运行时本地性算法详解

TaskSchedulerBackend与SchedulerBackend FIFO与FAIR两种调度模式 Task数据本地性资源的分配 一.TaskScheduler运行过程(Spark-shell角度) 1.启动Spark-shell 当我们spark-shell本身的时候命令终端返回来的主要是ClientEndpoint和SparkDeploySchedulerBakcend.这是因为此时还没有任何应用程序Job的触发,这是启动Application本身而已,所以主要就是实例化SparkC