Accumulator

Accumulator简介

Accumulator是spark提供的累加器,顾名思义,该变量只能够增加。

只有driver能获取到Accumulator的值(使用value方法),Task只能对其做增加操作(使用 +=)。你也可以在为Accumulator命名(不支持Python),这样就会在spark web ui中显示,可以帮助你了解程序运行的情况。

Accumulator使用

使用示例

举个最简单的accumulator的使用例子:

//在driver中定义

val accum = sc.accumulator(0, "Example Accumulator")

//在task中进行累加

sc.parallelize(1 to 10).foreach(x=> accum += 1)

//在driver中输出

accum.value

//结果将返回10

res: 10

累加器的错误用法

val accum= sc.accumulator(0, "Error Accumulator")

val data = sc.parallelize(1 to 10)

//用accumulator统计偶数出现的次数,同时偶数返回0,奇数返回1

val newData = data.map{x => {

if(x%2 == 0){

accum += 1

0

}else 1

}}

//使用action操作触发执行

newData.count

//此时accum的值为5,是我们要的结果

accum.value

//继续操作,查看刚才变动的数据,foreach也是action操作

newData.foreach(println)

//上个步骤没有进行累计器操作,可是累加器此时的结果已经是10了

//这并不是我们想要的结果

accum.value

原因分析

官方对这个问题的解释如下描述:

For accumulator updates performed inside actions only, Spark guarantees that each task’s update to the accumulator will only be applied once, i.e. restarted tasks will not update the value. In transformations, users should be aware of that each task’s update may be applied more than once if tasks or job stages are re-executed.

我们都知道,spark中的一系列transform操作会构成一串长的任务链,此时需要通过一个action操作来触发,accumulator也是一样。因此在一个action操作之前,你调用value方法查看其数值,肯定是没有任何变化的。

所以在第一次count(action操作)之后,我们发现累加器的数值变成了5,是我们要的答案。

之后又对新产生的的newData进行了一次foreach(action操作),其实这个时候又执行了一次map(transform)操作,所以累加器又增加了5。最终获得的结果变成了10。

这里写图片描述

解决办法

看了上面的分析,大家都有这种印象了,那就是使用累加器的过程中只能使用一次action的操作才能保证结果的准确性。

事实上,还是有解决方案的,只要将任务之间的依赖关系切断就可以了。什么方法有这种功能呢?你们肯定都想到了,cache,persist。调用这个方法的时候会将之前的依赖切除,后续的累加器就不会再被之前的transfrom操作影响到了。

这里写图片描述

val accum= sc.accumulator(0, "Error Accumulator")

val data = sc.parallelize(1 to 10)

//代码和上方相同

val newData = data.map{x => {...}}

//使用cache缓存数据,切断依赖。

newData.cache.count

//此时accum的值为5

accum.value

newData.foreach(println)

//此时的accum依旧是5

accum.value

总结

使用Accumulator时,为了保证准确性,只使用一次action操作。如果需要使用多次则使用cache或persist操作切断依赖。

https://stackoverflow.com/questions/29494452/when-are-accumulators-truly-reliable

链接:http://www.jianshu.com/p/1b7c9a63bc7c

时间: 2025-01-08 13:10:28

Accumulator的相关文章

pyspark中使用累加器Accumulator统计指标

评价分类模型的性能时需要用到以下四个指标 最开始使用以下代码计算,发现代码需要跑近一个小时,而且这一个小时都花在这四行代码上 # evaluate model TP = labelAndPreds.filter(lambda (v, p): (v == 1 and p == 1)).count() FP = labelAndPreds.filter(lambda (v, p): (v == 0 and p == 1)).count() TN = labelAndPreds.filter(lamb

TI C66x DSP 系统events及其应用 - 5.1(QM accumulator的配置)

以下解说在详细应用中,event与中断ISR的设置.以对QM的queue监控产生中断(不是EXCEP)为例,主要包含配置QM accumulator(用于监控QM queue)与配置ISR(ISR与event配置). 首先介绍QM accumulator的配置,QM模块中QMSS(包括QMSS Tx queue 800:831,Tx/Rx channel 0:31,RxChan,TxChan,Tx queue是一一相应的,如Tx queue是806,那么相应的TxChan与RxChan编号都是6

flink - accumulator

  读accumlator JobManager 在job finish的时候会汇总accumulator的值, newJobStatus match { case JobStatus.FINISHED => try { val accumulatorResults = executionGraph.getAccumulatorsSerialized() val result = new SerializedJobExecutionResult( jobID, jobInfo.duration,

08、共享变量(Broadcast Variable和Accumulator)

共享变量工作原理 Spark一个非常重要的特性就是共享变量. 默认情况下,如果在一个算子的函数中使用到了某个外部的变量,那么这个变量的值会被拷贝到每个task中.此时每个task只能操作自己的那份变量副本.如果多个task想要共享某个变量,那么这种方式是做不到的. Spark为此提供了两种共享变量,一种是Broadcast Variable(广播变量),另一种是Accumulator(累加变量).Broadcast Variable会将使用到的变量,仅仅为每个节点拷贝一份,更大的用处是优化性能,

spark中的Broadcast variables和Accumulator

举个例子: val factor = 3 rdd.map( num => num*factor) 以上两行代码显示了rdd的一个map操作,其中factor是一个外部变量.默认情况下,算子的函数内,如果使用到了外部变量,那么会将这个变量拷贝到执行这个函数的每一个task中.如果该变量非常大的话,那么网络传输耗费的资源会特别大,而且在每个节点上占用的内存空间也特别大. Spark提供的Broadcast Variable,是只读的.并且在每个节点上只会有一份副本,而不会为每个task都拷贝一份副本

Spark(Accumulator)陷阱及解决办法

Accumulator简介 Accumulator是spark提供的累加器,顾名思义,该变量只能够增加. 只有driver能获取到Accumulator的值(使用value方法),Task只能对其做增加操作(使用 +=).你也可以在为Accumulator命名(不支持Python),这样就会在spark web ui中显示,可以帮助你了解程序运行的情况. Accumulator使用 使用示例 举个最简单的accumulator的使用例子: //在driver中定义 val accum = sc.

spark变量使用broadcast、accumulator

broadcast 官方文档描述: Broadcast a read-only variable to the cluster, returning a [[org.apache.spark.broadcast.Broadcast]] object for reading it in distributed functions. The variable will be sent to each cluster only once. 源码分析: 这里使用告警方式代替异常,为了是避免用户进程中断:

spark Accumulator累加器使用示例

官网 http://spark.apache.org/docs/2.3.1/rdd-programming-guide.html#accumulators http://spark.apache.org/docs/2.3.1/api/scala/index.html#org.apache.spark.util.AccumulatorV2 Accumulator是spark提供的累加器,累加器的一个常用用途是在调试时对作业执行过程中的事件进行计数,但是只要driver能获取Accumulator的

underscore学习总结,献给晦涩的函数式编程之美

underscore.js 越看越美,如果在项目中不断尝试underscore的方法,将会事半功倍 underscore 体现出 functionial javascript的思想,采用函数式编程的思路来解决日常生活中的一些 util的小问题 javascript 属于弱语言,对象类型用得最多的就是 array和object,underscore是基于js 封装一些对象和数组方法的库,使用起来非常便捷 这里推荐一本函数式编程的书,Functionial Javascript ,这本书对于函数式编