Spark Task未序列化(Task not serializable)问题分析

问题描述及原因分析

在编写Spark程序中,由于在map等算子内部使用了外部定义的变量和函数,从而引发Task未序列化问题。然而,Spark算子在计算过程中使用外部变量在许多情形下确实在所难免,比如在filter算子根据外部指定的条件进行过滤,map根据相应的配置进行变换等。为了解决上述Task未序列化问题,这里对其进行了研究和总结。

  出现“org.apache.spark.SparkException: Task not serializable”这个错误,一般是因为在map、filter等的参数使用了外部的变量,但是这个变量不能序列化(不是说不可以引用外部变量,只是要做好序列化工作,具体后面详述)。其中最普遍的情形是:当引用了某个类(经常是当前类)的成员函数或变量时,会导致这个类的所有成员(整个类)都需要支持序列化。虽然许多情形下,当前类使用了“extends Serializable”声明支持序列化,但是由于某些字段不支持序列化,仍然会导致整个类序列化时出现问题,最终导致出现Task未序列化问题。

引用成员变量的实例分析

  如上所述,由于Spark程序中的map、filter等算子内部引用了类成员函数或变量导致需要该类所有成员都需要支持序列化,又由于该类某些成员变量不支持序列化,最终引发Task无法序列化问题。为了验证上述原因,我们编写了一个实例程序,如下所示。该类的功能是从域名列表中(rdd)过滤得到特定顶级域名(rootDomain,如.com,.cn,.org)的域名列表,而该特定顶级域名需要函数调用时指定。

class MyTest1(conf:String) extends Serializable{
  val list = List("a.com", "www.b.com", "a.cn", "a.com.cn", "a.org");
  private val sparkConf = new SparkConf().setAppName("AppName");
  private val sc = new SparkContext(sparkConf);
  val rdd = sc.parallelize(list);

  private val rootDomain = conf

  def getResult(): Array[(String)] = {
    val result = rdd.filter(item => item.contains(rootDomain))
    result.take(result.count().toInt)
  }
}

  依据上述分析的原因,由于依赖了当前类的成员变量,所以导致当前类全部需要序列化,由于当前类某些字段未做好序列化,导致出错。实际情况与分析的原因一致,运行过程中出现错误,如下所示。分析下面的错误报告得到错误是由于sc(SparkContext)引起的。

Exception in thread "main" org.apache.spark.SparkException: Task not serializable
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
    at org.apache.spark.SparkContext.clean(**SparkContext**.scala:1435)
……
Caused by: java.io.NotSerializableException: org.apache.spark.SparkContext
    - field (class "com.ntci.test.MyTest1", name: "sc", type: "class org.apache.spark.SparkContext")
    - object (class "com.ntci.test.MyTest1", com.ntci.test.MyTest1@63700353)
    - field (class "com.ntci.test.MyTest1$$anonfun$1", name: "$outer", type: "class com.ntci.test.MyTest1")

  为了验证上述结论,将不需要序列化的的成员变量使用关键字“@transent”标注,表示不序列化当前类中的这两个成员变量,再次执行函数,同样报错。

Exception in thread "main" org.apache.spark.SparkException: Task not serializable
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
……
 Caused by: java.io.NotSerializableException: org.apache.spark.SparkConf
    - field (class "com.ntci.test.MyTest1", name: "sparkConf", type: "class org.apache.spark.**SparkConf**")
    - object (class "com.ntci.test.MyTest1", com.ntci.test.MyTest1@6107799e)

  虽然错误原因相同,但是这次导致错误的字段是sparkConf(SparkConf)。使用同样的“@transent”标注方式,将sc(SparkContext)和sparkConf(SparkConf)都标注为不需序列化,再次执行时,程序正常执行。

class MyTest1(conf:String) extends Serializable{
  val list = List("a.com", "www.b.com", "a.cn", "a.com.cn", "a.org");
  @transient
  private val sparkConf = new SparkConf().setAppName("AppName");
  @transient
  private val sc = new SparkContext(sparkConf);
  val rdd = sc.parallelize(list);

  private val rootDomain = conf

  def getResult(): Array[(String)] = {

    val result = rdd.filter(item => item.contains(rootDomain))
    result.take(result.count().toInt)
  }
}

  所以,通过上面的例子我们可以得到结论:由于Spark程序中的map、filter等算子内部引用了类成员函数或变量导致该类所有成员都需要支持序列化,又由于该类某些成员变量不支持序列化,最终引发Task无法序列化问题。相反地,对类中那些不支持序列化问题的成员变量标注后,使得整个类能够正常序列化,最终消除Task未序列化问题。

引用成员函数的实例分析

  成员变量与成员函数的对序列化的影响相同,即引用了某类的成员函数,会导致该类所有成员都支持序列化。为了验证这个假设,我们在map中使用了当前类的一个成员函数,作用是如果当前域名没有以“www.”开头,那么就在域名头添加“www.”前缀(注:由于rootDomain是在getResult函数内部定义的,就不存在引用类成员变量的问题,也就不存在和排除了上一个例子所讨论和引发的问题,因此这个例子主要讨论成员函数引用的影响;此外,不直接引用类成员变量也是解决这类问题的一个手段,如本例中为了消除成员变量的影响而在函数内部定义变量的这种做法,这类问题具体的规避做法此处略提,在下一节作详细阐述)。下面的代码同样会报错,同上面的例子一样,由于当前类中的sc(SparkContext)和sparkConf(SparkConf)两个成员变量没有做好序列化处理,导致当前类的序列化出现问题。

class MyTest1(conf:String)  extends Serializable{
  val list = List("a.com", "www.b.com", "a.cn", "a.com.cn", "a.org");
  private val sparkConf = new SparkConf().setAppName("AppName");
  private val sc = new SparkContext(sparkConf);
  val rdd = sc.parallelize(list);

  def getResult(): Array[(String)] = {
    val rootDomain = conf
    val result = rdd.filter(item => item.contains(rootDomain))
    .map(item => addWWW(item))
    result.take(result.count().toInt)
  }
  def addWWW(str:String):String = {
    if(str.startsWith("www."))
      str
    else
      "www."+str
  }
}

  如同前面的做法,将sc(SparkContext)和sparkConf(SparkConf)两个成员变量使用“@transent”标注后,使当前类不序列化这两个变量,则程序可以正常运行。此外,与成员变量稍有不同的是,由于该成员函数不依赖特定的成员变量,因此可以定义在scala的object中(类似于Java中的static函数),这样也取消了对特定类的依赖。如下面例子所示,将addWWW放到一个object对象(UtilTool)中去,filter操作中直接调用,这样处理以后,程序能够正常运行。

def getResult(): Array[(String)] = {
    val rootDomain = conf
    val result = rdd.filter(item => item.contains(rootDomain))
    .map(item => UtilTool.addWWW(item))
    result.take(result.count().toInt)
  }
object UtilTool {
  def addWWW(str:String):String = {
    if(str.startsWith("www."))
      str
    else
      "www."+str
  }
}

对全类序列化要求的验证

  如上所述,引用了某类的成员函数,会导致该类及所有成员都需要支持序列化。因此,对于使用了某类成员变量或函数的情形,首先该类需要序列化(extends Serializable),同时需要对某些不需要序列化的成员变量标记以避免为序列化造成影响。对于上面两个例子,由于引用了该类的成员变量或函数,导致该类以及所有成员支持序列化,为了消除某些成员变量对序列化的影响,使用“@transent”进行标注。

  为了进一步验证关于整个类需要序列化的假设,这里在上面例子使用“@transent”标注后并且能正常运行的代码基础上,将类序列化的相关代码删除(去掉extends Serializable),这样程序执行会报该类为序列化的错误,如下所示。所以通过这个实例说明了上面的假设。

Caused by: java.io.NotSerializableException: com.ntci.test.MyTest1
    - field (class "com.ntci.test.MyTest1$$anonfun$1", name: "$outer", type: "class com.ntci.test.MyTest1")

  所以通过以上例子可以说明:map等算子内部可以引用外部变量和某类的成员变量,但是要做好该类的序列化处理。首先是该类需要继承Serializable类,此外,对于类中某些序列化会出错的成员变量做好处理,这也是Task未序列化问题的主要原因。对于出现这类问题,首先查看未能序列化的成员变量是哪个,对于可以不需要序列化的成员变量可使用“@transent”标注。

此外,也不是map操作所在的类必须序列化不可(继承Serializable类),对于不需要引用某类成员变量或函数的情形,就不会要求相应的类必须实现序列化,如下面的例子所示,filter操作内部没有引用任何类的成员变量或函数,因此当前类不用序列化,程序可正常执行。

class MyTest1(conf:String) {
  val list = List("a.com", "www.b.com", "a.cn", "a.com.cn", "a.org");
  private val sparkConf = new SparkConf().setAppName("AppName");
  private val sc = new SparkContext(sparkConf);
  val rdd = sc.parallelize(list);

  def getResult(): Array[(String)] = {
    val rootDomain = conf
    val result = rdd.filter(item => item.contains(rootDomain))
    result.take(result.count().toInt)
  }
}

解决办法与编程建议

  承上所述,这个问题主要是引用了某类的成员变量或函数,并且相应的类没有做好序列化处理导致的。因此解决这个问题无非以下两种方法:

  1. 不在(或不直接在)map等闭包内部直接引用某类(通常是当前类)的成员函数或成员变量
  2. 如果引用了某类的成员函数或变量,则需对相应的类做好序列化处理

(一)不在(或不直接在)map等闭包内部直接引用某类成员函数或成员变量

(1)对于依赖某类成员变量的情形

  • 如果程序依赖的值相对固定,可取固定的值,或定义在map、filter等操作内部,或定义在scala

    object对象中(类似于Java中的static变量)

  • 如果依赖值需要程序调用时动态指定(以函数参数形式),则在map、filter等操作时,可不直接引用该成员变量,而是在类似上面例子的getResult函数中根据成员变量的值重新定义一个局部变量,这样map等算子就无需引用类的成员变量。

(2)对于依赖某类成员函数的情形

  • 如果函数功能独立,可定义在scala object对象中(类似于Java中的static方法),这样就无需一来特定的类。

(二)如果引用了某类的成员函数或变量,则需对相应的类做好序列化处理

  对于这种情况,则需对该类做好序列化处理,首先该类继承序列化类,然后对于不能序列化的成员变量使用“@transent”标注,告诉编译器不需要序列化。

此外如果可以,可将依赖的变量独立放到一个小的class中,让这个class支持序列化,这样做可以减少网络传输量,提高效率。

时间: 2024-08-15 21:10:14

Spark Task未序列化(Task not serializable)问题分析的相关文章

MapReduce作业的map task和reduce task调度参数

MapReduce作业可以细分为map task和reduce task,而MRAppMaster又将map task和reduce task分为四种状态: 1.pending:刚启动但尚未向resourcemanager发送资源请求: 2.scheduled:已经向resourceManager发送资源请求,但尚未分配到资源: 3.assigned:已经分配到了资源且正在运行: 4.completed:已经运行完成. map task的生命周期为:scheduled -> assigned -

.Net4.0如何实现.NET4.5中的Task.Run及Task.Delay方法

前言 .NET4.0下是没有Task.Run及Task.Delay方法的,而.NET4.5已经实现,对于还在使用.NET4.0的同学来说,如何在.NET4.0下实现这两个方法呢? 在.NET4.0下,有一个泛型类,叫TaskCompletionSource<TReuslt>,它能控制Task的行为,如给Task设置结果.设置异常.设置取消等. MSDN是这样描述的(网址): 表示未绑定到委托的 Task<TResult> 的制造者方,并通过Task属性提供对使用者方的访问. 它有以

Task.Run vs Task.Factory.StartNew

Task.Run 和 Task.Factory.StartNew 都可以把一段要执行的代码放到ThreadPool thread中去执行.Task.Factory.StartNew是.Net 4.0中引入的,而Task.Run则是在.Net 4.5中引入,首要目的是为了简化Task.Factory.StartNew的使用.简言之, Task.Run(someAction) 与 Task.Factory.StartNew(someAction, CancellationToken.None, Ta

C# Task的使用---Task的启动

.NET 4.0包含的新名称空间System.Threading.Tasks,它包含的类抽象出了线程功能.任务表示应完成的某个单元的工作.这个单元的工作可以在单独的线程中运行,也可以以同步的方式启动一个任务,这需要等待主调线程.使用任务不仅可以获得一个抽象层,还可以对底层线程进行许多控制. 启动任务 1).使用TaskFactory类的实例,在其中把TaskMethod()方法传递给StartNew方法,就会立即启动任务. 1: using System; 2: using System.Col

Task.Run 和 Task.Factory.StartNew

在.Net 4中,Task.Factory.StartNew是启动一个新Task的首选方法.它有很多重载方法,使它在具体使用当中可以非常灵活,通过设置可选参数,可以传递任意状态,取消任务继续执行,甚至控制任务的调度行为.所有这些能力也带来了复杂性的提升,你必须知道何时应该使用何种重载方法,提供哪种调度方式等等.并且Task.Factory.StartNew这种写法也不够简洁明快,至少对它使用的主要场景不够快,一般它使用的主要场景只是将一个工作任务丢给一个后台线程执行而已. 于是,在.NET Fr

Task.Run 和Task.Factory.StartNew 区别

.Net Framework 4.0开始支持Task.Factory.StartNew,.Net Framework 4.5后开始支持Task.Run. Task.Factory.StartNew经过简化成了Task.Run,注意的是Factory.StartNew的方法参数种类更丰富,可以完成多样的需求. 在选择上,如果创建的线程需要长时间运行的话那就选择Task.Factory.StartNew. 一:使用 Task.Run(() =>{......}); Task.Factory.Star

Task.Run 和 Task.Factory.StartNew 区别

Task.Run 是在 dotnet framework 4.5 之后才可以使用, Task.Factory.StartNew 可以使用比 Task.Run 更多的参数,可以做到更多的定制. 可以认为 Task.Run 是简化的 Task.Factory.StartNew 的使用,除了需要指定一个线程是长时间占用的,否则就使用 Task.Run 创建新线程 下面来告诉大家使用两个函数创建新的线程 Task.Run(() => { var foo = 2; }); 这时 foo 的创建就在另一个线

Apache Spark源码走读之6 -- 存储子系统分析

欢迎转载,转载请注明出处,徽沪一郎. 楔子 Spark计算速度远胜于Hadoop的原因之一就在于中间结果是缓存在内存而不是直接写入到disk,本文尝试分析Spark中存储子系统的构成,并以数据写入和数据读取为例,讲述清楚存储子系统中各部件的交互关系. 存储子系统概览 上图是Spark存储子系统中几个主要模块的关系示意图,现简要说明如下 CacheManager  RDD在进行计算的时候,通过CacheManager来获取数据,并通过CacheManager来存储计算结果 BlockManager

spark出现task不能序列化错误的解决方法 org.apache.spark.SparkException: Task not serializable

import org.elasticsearch.cluster.routing.Murmur3HashFunction; import org.elasticsearch.common.math.MathUtils; // 自定义Partitioner class ESShardPartitioner(settings: String) extends org.apache.spark.Partitioner { protected var _numPartitions = -1; prote