Spark闭包与序列化

本文原文出处: http://blog.csdn.net/bluishglc/article/details/50945032 严禁任何形式的转载,否则将委托CSDN官方维护权益!

在Spark的官方文档再三强调那些将要作用到RDD上的操作,都会被分发到各个worker节点上去执行,我们都知道,这些操作实际上就是一些函数和涉及的变量组成的闭包,这里显然涉及到一个容易被忽视的问题:闭包的“序列化”。显然,闭包是有状态的,这主要是指它牵涉到的那些自由变量以及自由变量的依赖到的其他变量,所以,在将一个简单的函数或者一段简短的操作(就是闭包)传递给类似RDD.map等函数时,Spark需要检索闭包内所有的涉及到的变量(包括传递依赖到的变量),正确地把这些变量序列化之后才能传递到worker节点并反序列化去执行。如果在涉及到的所有的变量中有任何不支持序列化或没有指明如何序列化自己时,你就会遇到这样的错误:

org.apache.spark.SparkException: Task not serializable

在下面的例子中,我们从kafka中持续地接收json消息,并在spark-streaming中将字符串解析成对应的实体:

object App {
    private val config = ConfigFactory.load("my-streaming.conf")
    case class Person (firstName: String,lastName: String)
    def main(args: Array[String]) {
        val zkQuorum = config.getString("kafka.zkQuorum")
        val myTopic = config.getString("kafka.myTopic")
        val myGroup = config.getString("kafka.myGroup")
        val conf = new SparkConf().setAppName("my-streaming")
        val ssc = new StreamingContext(conf, Seconds(1))
        val lines = KafkaUtils.createStream(ssc, zkQuorum, myGroup, Map(myTopic -> 1))
        //this val is a part of closure, and it‘s not serializable!
        implicit val formats = DefaultFormats
        def parser(json: String) = parse(json).extract[Person].firstName
        lines.map(_._2).map(parser).print
        ....
        ssc.start()
        ssc.awaitTerminationOrTimeout(2)
        ssc.stop()
    }

}

这段代码在执行时就会报如下错误:

org.apache.spark.SparkException: Task not serializable
Caused by: java.io.NotSerializableException: org.json4s.DefaultFormats$

问题的症结就是闭包没有办法序列化引起的。在这个例子里,闭包的范围是:函数parser以及它所依赖的一个隐式参数: formats , 而问题就出在这个隐式参数上, 它的类型是DefaultFormats,这个类没有提供序列化和反序列自身的说明,所以Spark无法序列化formats,进而无法将task推送到远端执行。

隐式参数formats是为extract准备的,它的参数列表如下:

org.json4s.ExtractableJsonAstNode#extract[A](implicit formats: Formats, mf: scala.reflect.Manifest[A]): A = ...

找到问题的根源之后就好解决了。实际上我们根本不需要序列化formats, 对我们来说,它是无状态的。所以,我们只需要把它声明为一个全局静态的变量就可以绕过序列化。所以改动的方法就是简单地把implicit val formats = DefaultFormats的声明从方法内部迁移到App Object的字段位置上即可。

object App {
    private val config = ConfigFactory.load("my-streaming.conf")
    case class Person (firstName: String,lastName: String)
    //As Object field, global, static, no need to serialize
    implicit val formats = DefaultFormats

    def main(args: Array[String]) {
        val zkQuorum = config.getString("kafka.zkQuorum")
        val myTopic = config.getString("kafka.myTopic")
        val myGroup = config.getString("kafka.myGroup")
        val conf = new SparkConf().setAppName("my-streaming")
        val ssc = new StreamingContext(conf, Seconds(1))
        val lines = KafkaUtils.createStream(ssc, zkQuorum, myGroup, Map(myTopic -> 1))
        def parser(json: String) = parse(json).extract[Person].firstName
        lines..map(_._2).map(parser).print
        ....
        ssc.start()
        ssc.awaitTerminationOrTimeout(2)
        ssc.stop()
    }

}

最后我们来总结一下应该如何正确的处理Spark Task闭包的序列化问题。首先你需要对Task涉及的闭包的边界要有一个清晰的认识,要尽量地控制闭包的范围,和牵涉到的自由变量,一个非常值得警惕的地方是:尽量不要在闭包中直接引用一个类的成员变量和函数,这样会导致整个类实例被序列化。这样例子在Spark文档中也有提及,如下:

class MyClass {
  def func1(s: String): String = { ... }
  def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(func1) }
}

然后,一个好的组织代码的方式是:除了那些很短小的函数,尽量把复杂的操作封装到全局单一的函数体:全局静态方法或者函数对象

如果确实需要某个类的实例参与到计算过程中,则要作好相关的序列化工作。

时间: 2025-01-08 22:32:02

Spark闭包与序列化的相关文章

spark streaming task 序列化源码

spark streaming task 序列化源码 1.入口 val kafkaStreams = (1 to recerverNum).map { i => KafkaUtils.createDirectStream[String, String]( ssc, PreferConsistent, Subscribe[String, String](topics, kafkaParams) )} val unifiedStream = ssc.union(kafkaStreams) val u

Spark Task未序列化(Task not serializable)问题分析

问题描述及原因分析 在编写Spark程序中,由于在map等算子内部使用了外部定义的变量和函数,从而引发Task未序列化问题.然而,Spark算子在计算过程中使用外部变量在许多情形下确实在所难免,比如在filter算子根据外部指定的条件进行过滤,map根据相应的配置进行变换等.为了解决上述Task未序列化问题,这里对其进行了研究和总结. 出现"org.apache.spark.SparkException: Task not serializable"这个错误,一般是因为在map.fil

Spark设置Kryo序列化缓冲区大小

背景 今天在开发SparkRDD的过程中出现Buffer Overflow错误,查看具体Yarn日志后发现是因为Kryo序列化缓冲区溢出了,日志建议调大spark.kryoserializer.buffer.max的value,搜索了一下设置keyo序列化缓冲区的方法,特此整理记录下来. 20/01/08 17:12:55 WARN scheduler.TaskSetManager: Lost task 1.0 in stage 1.0 (TID 4, s015.test.com, execut

spark性能优化之使用高性能序列化类库

如果使用序列化技术,在执行序列化操作的时候很慢或者是序列化之后的数据量还是很大,那么会让分布式应用程序性能下降很多,spark自身就会在一些地方对数据进行序列化,比如shuffle写磁盘,还有就是如果我们算子函数使用了外部数据,(比如Java内置类型,或者自定义类型)那么也需要让其可序列化. 默认,spark使用了Java自身提供的序列化机制,基于objectoutputStream和objectinputstream,因为这种方式是Java原生提供的,很方便使用.但是Java序列化机制性能并不

Spark 性能相关参数配置详解-压缩与序列化篇

随着Spark的逐渐成熟完善, 越来越多的可配置参数被添加到Spark中来, 本文试图通过阐述这其中部分参数的工作原理和配置思路, 和大家一起探讨一下如何根据实际场合对Spark进行配置优化. 由于篇幅较长,所以在这里分篇组织,如果要看最新完整的网页版内容,可以戳这里:http://spark-config.readthedocs.org/,主要是便于更新内容 压缩和序列化相关 spark.serializer 默认为org.apache.spark.serializer.JavaSeriali

spark总结——转载

转载自:http://smallx.me/2016/06/07/spark%E4%BD%BF%E7%94%A8%E6%80%BB%E7%BB%93/ 第一个Spark程序 /** * 功能:用spark实现的单词计数程序 * 环境:spark 1.6.1, scala 2.10.4 */ // 导入相关类库import org.apache.spark._ object WordCount { def main(args: Array[String]) { // 建立spark运行上下文 val

(6)Spark编程进阶

6.1 简介 累加器:用来对信息进行聚合: 广播变量:用来高效分发较大的对象 6.2 累加器 通常在向Spark传递函数时,可以使用驱动器程序中定义的变量,但是集群中运行的每个人物都会得到这些变量的一份新的副本,更新这些副本的值也不会影响驱动器中的对应变量.Spark的两个共享变量,累加器和广播变量,分别为结果聚合与广播这两种常见的通信模式突破了这一限制. 累加器,提供了将工作节点中的值聚合到驱动器程序中的简单语法.累加器的一个常见的用途是在调试时对作业执行过程中的事件进行计数. 在Python

Spark编程进阶

1.累加器      通常在向Spark传递函数时,比如使用map()函数或者用filter()传条件是,可以使用驱动器程序中定义的变量,但是集群中运行的每个人物都会得到这些变量的一份新的副本,更新这些副本的值也不会影响驱动器中的对应变量.Spark的两个共享变量,累加器与广播变量,分别为结果聚合与广播这两种常见的通信模式突破了这一限制. 第一种共享变量,累加器,提供了将工作节点中的值聚合到驱动器程序中的简单语法.累加器的一个常见用途是在调试时对作业执行过程中的事件进行计数. 下面的例子是计算输

Spark使用总结与分享【转】

背景 使用spark开发已有几个月.相比于python/hive,scala/spark学习门槛较高.尤其记得刚开时,举步维艰,进展十分缓慢.不过谢天谢地,这段苦涩(bi)的日子过去了.忆苦思甜,为了避免项目组的其他同学走弯路,决定总结和梳理spark的使用经验. Spark基础 基石RDD spark的核心是RDD(弹性分布式数据集),一种通用的数据抽象,封装了基础的数据操作,如map,filter,reduce等.RDD提供数据共享的抽象,相比其他大数据处理框架,如MapReduce,Peg