spark Task not serializable

Job aborted due to stage failure: Task not serializable:

If you see this error:

org.apache.spark.SparkException: Job aborted due to stage failure: Task not serializable: java.io.NotSerializableException: ...

The above error can be triggered when you intialize a variable on the driver (master), but then try to use it on one of the workers. In that case, Spark Streaming will try to serialize the object to send it over to the worker, and fail if the object is not serializable. Consider the following code snippet:

NotSerializable notSerializable = new NotSerializable();
JavaRDD<String> rdd = sc.textFile("/tmp/myfile");

rdd.map(s -> notSerializable.doSomething(s)).collect();

This will trigger that error. Here are some ideas to fix this error:

  • Serializable the class
  • Declare the instance only within the lambda function passed in map.
  • Make the NotSerializable object as a static and create it once per machine.
  • Call rdd.forEachPartition and create the NotSerializable object in there like this:
rdd.forEachPartition(iter -> {
  NotSerializable notSerializable = new NotSerializable();

  // ...Now process iter
});

参考:https://databricks.gitbooks.io/databricks-spark-knowledge-base/content/troubleshooting/javaionotserializableexception.html
时间: 2025-01-11 15:50:41

spark Task not serializable的相关文章

spark出现task不能序列化错误的解决方法 org.apache.spark.SparkException: Task not serializable

import org.elasticsearch.cluster.routing.Murmur3HashFunction; import org.elasticsearch.common.math.MathUtils; // 自定义Partitioner class ESShardPartitioner(settings: String) extends org.apache.spark.Partitioner { protected var _numPartitions = -1; prote

Spark Task未序列化(Task not serializable)问题分析

问题描述及原因分析 在编写Spark程序中,由于在map等算子内部使用了外部定义的变量和函数,从而引发Task未序列化问题.然而,Spark算子在计算过程中使用外部变量在许多情形下确实在所难免,比如在filter算子根据外部指定的条件进行过滤,map根据相应的配置进行变换等.为了解决上述Task未序列化问题,这里对其进行了研究和总结. 出现"org.apache.spark.SparkException: Task not serializable"这个错误,一般是因为在map.fil

spark 写入 redis 和 org.apache.spark.SparkException: Task not serializable

spark将数据写入redis时调用以下代码会报  org.apache.spark.SparkException: Task not serializable import com.redis.RedisClient val r = new RedisClient("192.168.1.101", 6379) val perhit = rdd.map(x => { val arr = x.split(" ") val k = arr(0).toInt val

spark2.1注册内部函数spark.udf.register(&quot;xx&quot;, xxx _),运行时抛出异常:Task not serializable

函数代码: class MySparkJob{ def entry(spark:SparkSession):Unit={ def getInnerRsrp(outer_rsrp: Double, wear_loss: Double, path_loss: Double): Double = { val innerRsrp: Double = outer_rsrp - wear_loss - (XX) * path_loss innerRsrp } spark.udf.register("getX

【原创】问题定位分享(19)spark task在executors上分布不均

最近提交一个spark应用之后发现执行非常慢,点开spark web ui之后发现卡在一个job的一个stage上,这个stage有100000个task,但是绝大部分task都分配到两个executor上,其他executor非常空闲,what happened? 查看spark task分配逻辑发现,有一个data locality即数据本地性的特性,详见 https://www.cnblogs.com/barneywill/p/10152497.html即会按照locality级别的优先级

spark 插入数据到mysql时遇到的问题 org.apache.spark.SparkException: Task not serializable

报错问题:Exception in thread "main" org.apache.spark.SparkException: Task not serializableCaused by: java.io.NotSerializableException: org.apache.commons.dbcp2.PoolingDataSource$PoolGuardConnectionWrapper 出错的代码: def saveMonthToMysql(everymonth_avg:R

大数据:Spark Core(二)Driver上的Task的生成、分配、调度

1. 什么是Task? 在前面的章节里描述过几个角色,Driver(Client),Master,Worker(Executor),Driver会提交Application到Master进行Worker上的Executor上的调度,显然这些都不是Task. Spark上的几个关系可以这样理解: Application: Application是Driver在构建SparkContent的上下文的时候创建的,就像申报员,现在要构建一个能完成任务的集群,需要申报的是这次需要多少个Executor(可

Spark源码分析之六:Task调度(二)

话说在<Spark源码分析之五:Task调度(一)>一文中,我们对Task调度分析到了DriverEndpoint的makeOffers()方法.这个方法针对接收到的ReviveOffers事件进行处理.代码如下: [java] view plain copy // Make fake resource offers on all executors // 在所有的executors上提供假的资源(抽象的资源,也就是资源的对象信息,我是这么理解的) private def makeOffers

Spark源代码分析之六:Task调度(二)

话说在<Spark源代码分析之五:Task调度(一)>一文中,我们对Task调度分析到了DriverEndpoint的makeOffers()方法.这种方法针对接收到的ReviveOffers事件进行处理.代码例如以下: // Make fake resource offers on all executors     // 在全部的executors上提供假的资源(抽象的资源.也就是资源的对象信息,我是这么理解的)     private def makeOffers() {       /