生产常用Spark累加器剖析之二

Driver端

  1. Driver端初始化构建Accumulator并初始化,同时完成了Accumulator注册,Accumulators.register(this)时Accumulator会在序列化后发送到Executor端
  2. Driver接收到ResultTask完成的状态更新后,会去更新Value的值 然后在Action操作执行后就可以获取到Accumulator的值了

Executor端

  1. Executor端接收到Task之后会进行反序列化操作,反序列化得到RDD和function。同时在反序列化的同时也去反序列化Accumulator(在readObject方法中完成),同时也会向TaskContext完成注册
  2. 完成任务计算之后,随着Task结果一起返回给Driver

结合源码分析

Driver端初始化

??Driver端主要经过以下步骤,完成初始化操作:

val accum = sparkContext.accumulator(0, “AccumulatorTest”)
val acc = new Accumulator(initialValue, param, Some(name))
Accumulators.register(this)

Executor端反序列化得到Accumulator

??反序列化是在调用ResultTask的runTask方式时候做的操作:

// 会反序列化出来RDD和自己定义的function
val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)](
   ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)

??在反序列化的过程中,会调用Accumulable中的readObject方法:

private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException {
    in.defaultReadObject()
    // value的初始值为zero;该值是会被序列化的
    value_ = zero
    deserialized = true
    // Automatically register the accumulator when it is deserialized with the task closure.
    //
    // Note internal accumulators sent with task are deserialized before the TaskContext is created
    // and are registered in the TaskContext constructor. Other internal accumulators, such SQL
    // metrics, still need to register here.
    val taskContext = TaskContext.get()
    if (taskContext != null) {
      // 当前反序列化所得到的对象会被注册到TaskContext中
      // 这样TaskContext就可以获取到累加器
      // 任务运行结束之后,就可以通过context.collectAccumulators()返回给executor
      taskContext.registerAccumulator(this)
    }
  }

注意

Accumulable.scala中的value_,是不会被序列化的,@transient关键词修饰了

@volatile @transient private var value_ : R = initialValue // Current value on master

累加器在各个节点的累加操作

针对传入function中不同的操作,对应有不同的调用方法,以下列举几种(在Accumulator.scala中):

def += (term: T) { value_ = param.addAccumulator(value_, term) }
def add(term: T) { value_ = param.addAccumulator(value_, term) }
def ++= (term: R) { value_ = param.addInPlace(value_, term)}

根据不同的累加器参数,有不同实现的AccumulableParam(在Accumulator.scala中):

trait AccumulableParam[R, T] extends Serializable {
  /**
  def addAccumulator(r: R, t: T): R
  def addInPlace(r1: R, r2: R): R
  def zero(initialValue: R): R
}

不同的实现如下图所示:

以IntAccumulatorParam为例:

implicit object IntAccumulatorParam extends AccumulatorParam[Int] {
  def addInPlace(t1: Int, t2: Int): Int = t1 + t2
  def zero(initialValue: Int): Int = 0
}

我们发现IntAccumulatorParam实现的是trait AccumulatorParam[T]:

trait AccumulatorParam[T] extends AccumulableParam[T, T] {
  def addAccumulator(t1: T, t2: T): T = {
    addInPlace(t1, t2)
  }
}

在各个节点上的累加操作完成之后,就会紧跟着返回更新之后的Accumulators的value_值

聚合操作

在Task.scala中的run方法,会执行如下:

// 返回累加器,并运行task
// 调用TaskContextImpl的collectAccumulators,返回值的类型为一个Map
(runTask(context), context.collectAccumulators())

在Executor端已经完成了一系列操作,需要将它们的值返回到Driver端进行聚合汇总,整个顺序如图累加器执行流程:

根据执行流程,我们可以发现,在执行完collectAccumulators方法之后,最终会在DAGScheduler中调用updateAccumulators(event),而在该方法中会调用Accumulators的add方法,从而完成聚合操作:

def add(values: Map[Long, Any]): Unit = synchronized {
  // 遍历传进来的值
  for ((id, value) <- values) {
    if (originals.contains(id)) {
      // Since we are now storing weak references, we must check whether the underlying data
      // is valid.
      // 根据id从注册的Map中取出对应的累加器
      originals(id).get match {
        // 将值给累加起来,最终将结果加到value里面
       // ++=是被重载了
        case Some(accum) => accum.asInstanceOf[Accumulable[Any, Any]] ++= value
        case None =>
          throw new IllegalAccessError("Attempted to access garbage collected Accumulator.")
      }
    } else {
      logWarning(s"Ignoring accumulator update for unknown accumulator id $id")
    }
  }
}

获取累加器的值

通过accum.value方法可以获取到累加器的值

至此,累加器执行完毕。

原文地址:https://blog.51cto.com/14309075/2413995

时间: 2024-11-10 15:27:39

生产常用Spark累加器剖析之二的相关文章

生产常用Spark累加器剖析之四

生产常用Spark累加器剖析之四 现象描述 val acc = sc.accumulator(0, "Error Accumulator") val data = sc.parallelize(1 to 10) val newData = data.map(x => { if (x % 2 == 0) { accum += 1 } }) newData.count acc.value newData.foreach(println) acc.value 上述现象,会造成acc.v

Spark 累加器

由于spark是分布式的计算,所以使得每个task间不存在共享的变量,而为了实现共享变量spark实现了两种类型 - 累加器与广播变量, 对于其概念与理解可以参考:共享变量(广播变量和累加器).可能需要注意:Spark累加器(Accumulator)陷阱及解决办法 因此,我们便可以利用累加器与广播变量来构造一些比较常用的关系,以Map的形式广播出去,提高效率. 如下通过累加器构造了一个DF数据间的映射关系, defgetMap(spark:SparkSession,data:DataFrame)

WCF技术剖析之二:再谈IIS与ASP.NET管道

原文:WCF技术剖析之二:再谈IIS与ASP.NET管道 在2007年9月份,我曾经写了三篇详细介绍IIS架构和ASP.NET运行时管道的文章,深入介绍了IIS 5.x与IIS 6.0HTTP请求的监听与分发机制,以及ASP.NET运行时管道对HTTP请求的处理流程: [原创]ASP.NET Process Model之一:IIS 和 ASP.NET ISAPI [原创]ASP.NET Process Model之二:ASP.NET Http Runtime Pipeline - Part I

Spark机器学习实战 (十二) - 推荐系统实战

0 相关源码 将结合前述知识进行综合实战,以达到所学即所用.在推荐系统项目中,讲解了推荐系统基本原理以及实现推荐系统的架构思路,有其他相关研发经验基础的同学可以结合以往的经验,实现自己的推荐系统. 1 推荐系统简介 1.1 什么是推荐系统 1.2 推荐系统的作用 1.2.1 帮助顾客快速定位需求,节省时间 1.2.2 大幅度提高销售量 1.3 推荐系统的技术思想 1.3.1 推荐系统是一种机器学习的工程应用 1.3.2 推荐系统基于知识发现原理 1.4 推荐系统的工业化实现 Apache Spa

Spark用Java实现二次排序的自定义key

本人在研究Spak,最近看了很多网上的对于SPARK用Java实现二次排序的方法,对于自定义key的做法 基本上都是实现Ordered<>接口,重写$greater.$greater$eq.$less.$less$eq.compare.compareTo方法,定义hashCode.equals····· 感觉好麻烦,其实我们自定义key只是用了里面的compareTo方法,其他的$greater.$greater$eq.$less.$less$eq.compare 不用做任何改动,hashCo

logback logback.xml常用配置详解(二)&lt;appender&gt;

logback 常用配置详解(二) <appender> <appender>: <appender>是<configuration>的子节点,是负责写日志的组件. <appender>有两个必要属性name和class.name指定appender名称,class指定appender的全限定名. 1.ConsoleAppender: 把日志添加到控制台,有以下子节点: <encoder>:对日志进行格式化.(具体参数稍后讲解 ) &

Tomcat剖析(二)

Tomcat剖析(二) 目录: Tomcat剖析(一):一个简单的Web服务器 Tomcat剖析(二):一个简单的Servlet服务器 这一节基于 <深度剖析Tomcat>第二章: 一个简单的Servlet服务器 总结而成. 上一节,我们了解了一个简单的Web服务器的总体处理流程是怎样的:这一节,我们开始搭建一个简单的Servlet容器,也就是增加实现了servlet的简单加载执行,而不仅仅是将文件内容输出到浏览器上.当然,这一节的servlet的实现是最简单的,用来了解整个Servlet的大

logback 常用配置详解(二) &lt;appender&gt;

logback 常用配置详解(二) <appender> <appender>: <appender>是<configuration>的子节点,是负责写日志的组件. <appender>有两个必要属性name和class.name指定appender名称,class指定appender的全限定名. 1.ConsoleAppender: 把日志添加到控制台,有以下子节点: <encoder>:对日志进行格式化.(具体参数稍后讲解 ) &

java中常用的工具类(二)

下面继续分享java中常用的一些工具类,希望给大家带来帮助! 1.FtpUtil Java 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71