生产常用Spark累加器剖析之四

生产常用Spark累加器剖析之四

现象描述

val acc = sc.accumulator(0, “Error Accumulator”)
val data = sc.parallelize(1 to 10)
val newData = data.map(x => {
  if (x % 2 == 0) {
 accum += 1
}
})
newData.count
acc.value
newData.foreach(println)
acc.value

上述现象,会造成acc.value的最终值变为10

原因分析

Spark中的一系列transform操作都会构造成一长串的任务链,此时就需要通过一个action操作来触发(lazy的特性),accumulator也是如此。

  • 因此在一个action操作之后,调用value方法查看,是没有任何变化
  • 第一次action操作之后,调用value方法查看,变成了5
  • 第二次action操作之后,调用value方法查看,变成了10

原因就在于第二次action操作的时候,又执行了一次累加器的操作,同个累加器,在原有的基础上又加了5,从而变成了10

解决方案

通过上述的现象描述,我们可以很快知道解决的方法:只进行一次action操作。基于此,我们只要切断任务之间的依赖关系就可以了,即使用cache、persist。这样操作之后,那么后续的累加器操作就不会受前面的transform操作影响了

相关案例

  • 需求

    使用Accumulators统计emp表中NULL出现的次数以及正常数据的条数 & 打印正常数据的信息

  • 数据
    7369  SMITH   CLERK   7902    1980-12-17  800.00      20
    7499  ALLEN   SALESMAN    7698    1981-2-20   1600.00 300.00  30
    7521  WARD    SALESMAN    7698    1981-2-22   1250.00 500.00  30
    7566  JONES   MANAGER 7839    1981-4-2    2975.00     20
    7654  MARTIN  SALESMAN    7698    1981-9-28   1250.00 1400.00 30
    7698  BLAKE   MANAGER 7839    1981-5-1    2850.00     30
    7782  CLARK   MANAGER 7839    1981-6-9    2450.00     10
    7788  SCOTT   ANALYST 7566    1987-4-19   3000.00     20
    7839  KING    PRESIDENT       1981-11-17  5000.00     10
    7844  TURNER  SALESMAN    7698    1981-9-8    1500.00 0.00    30
    7876  ADAMS   CLERK   7788    1987-5-23   1100.00     20
    7900  JAMES   CLERK   7698    1981-12-3   950.00      30
    7902  FORD    ANALYST 7566    1981-12-3   3000.00     20
    7934  MILLER  CLERK   7782    1982-1-23   1300.00     10
  • 遇到的坑 & 解决方法

    现象描述 & 原因分析:

    我们都知道,spark中的一系列transform操作会构成一串长的任务链,此时就需要通过一个action操作来触发; accumulator也是一样的,只有当action操作执行时,才会触发accumulator的执行; 因此在一个action操作之前,我们调用accumulator的value方法是无法查看其数值的,肯定是没有任何变化的; 所以在对normalData进行foreach操作之后,即action操作之后,我们会发现累加器的数值就变成了11; 之后,我们对normalData再进行一次count操作之后,即又一次的action操作之后,其实这时候,又去执行了一次前面的transform操作; 因此累加器的值又增加了11,变成了22

    解决办法:

    经过上面的分析,我们可以知道,使用累加器的时候,我们只有使用一次action操作才能够保证结果的准确性 因此,我们面对这种情况,是有办法的,做法就是切断它们相互之间的依赖关系即可 因此对normalData使用cache方法,当RDD第一次被计算出来时,就会被直接缓存起来 再调用时,相同的计算操作就不会再重新计算一遍

    import org.apache.spark.{SparkConf, SparkContext}
    /**
    * 使用Spark Accumulators完成Job的数据量处理
    * 统计emp表中NULL出现的次数以及正常数据的条数 & 打印正常数据的信息
    */
    object AccumulatorsApp {
    def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[2]").setAppName("AccumulatorsApp")
    val sc = new SparkContext(conf)
    val lines = sc.textFile("E:/emp.txt")
    // long类型的累加器值
    val nullNum = sc.longAccumulator("NullNumber")
    val normalData = lines.filter(line => {
      var flag = true
      val splitLines = line.split("\t")
      for (splitLine <- splitLines){
        if ("".equals(splitLine)){
          flag = false
          nullNum.add(1)
        }
      }
      flag
    })
    // 使用cache方法,将RDD的第一次计算结果进行缓存;防止后面RDD进行重复计算,导致累加器的值不准确
    normalData.cache()
    // 打印每一条正常数据
    normalData.foreach(println)
    // 打印正常数据的条数
    println("NORMAL DATA NUMBER: " + normalData.count())
    // 打印emp表中NULL出现的次数
    println("NULL: " + nullNum.value)
    sc.stop()
    }
    }

原文地址:https://blog.51cto.com/14309075/2414001

时间: 2024-08-27 12:16:59

生产常用Spark累加器剖析之四的相关文章

生产常用Spark累加器剖析之二

Driver端 Driver端初始化构建Accumulator并初始化,同时完成了Accumulator注册,Accumulators.register(this)时Accumulator会在序列化后发送到Executor端 Driver接收到ResultTask完成的状态更新后,会去更新Value的值 然后在Action操作执行后就可以获取到Accumulator的值了 Executor端 Executor端接收到Task之后会进行反序列化操作,反序列化得到RDD和function.同时在反序

Spark 累加器

由于spark是分布式的计算,所以使得每个task间不存在共享的变量,而为了实现共享变量spark实现了两种类型 - 累加器与广播变量, 对于其概念与理解可以参考:共享变量(广播变量和累加器).可能需要注意:Spark累加器(Accumulator)陷阱及解决办法 因此,我们便可以利用累加器与广播变量来构造一些比较常用的关系,以Map的形式广播出去,提高效率. 如下通过累加器构造了一个DF数据间的映射关系, defgetMap(spark:SparkSession,data:DataFrame)

精通Spark:Spark内核剖析、源码解读、性能优化和商业案例实战

这是世界上第一个Spark内核高端课程: 1, 该课程在对Spark的13个不同版本源码彻底研究基础之上提炼而成: 2, 课程涵盖Spark所有内核精髓的剖析: 3, 课程中有大量的核心源码解读: 4, 全景展示Spark商业案例下规划.部署.开发.管理技术: 5, 涵盖Spark核心优化技巧 该课程是Spark的高端课程,其前置课程是“18小时内掌握Spark:把云计算大数据速度提高100倍以上!”. 培训对象 1,  系统架构师.系统分析师.高级程序员.资深开发人员: 2, 牵涉到大数据处理

Spark累加器使用

Spark累加器使用 转贴请声明原文:http://blog.csdn.net/duck_genuine/article/details/41550019 使用spark累加器,解决视频平均播放数计算,以及视频播放数平方和平均值 val totalTimes=sc.accumulator(0l) val totalVids=sc.accumulator(0) val totalPow2Times=sc.accumulator(0d) val timesFile=sc.textFile("/use

WCF技术剖析之四:基于IIS的WCF服务寄宿(Hosting)实现揭秘

原文:WCF技术剖析之四:基于IIS的WCF服务寄宿(Hosting)实现揭秘 通过<再谈IIS与ASP.NET管道>的介绍,相信读者已经对IIS和ASP.NET的请求处理管道有了一个大致的了解,在此基础上去理解基于IIS服务寄宿的实现机制就显得相对容易了.概括地说,基于IIS的服务寄宿依赖于两个重要的对象:System.ServiceModel.Activation.HttpModule和System. ServiceModel.Activation.HttpHandler. 一.通过Htt

Spark 累加器实验

以下代码用 Pyspark + IPython 完成 统计日志空行的数量: 读取日志,创建RDD: myrdd = sc.textFile("access.log") 不使用累加器: In [68]: s = 0 In [69]: def f(x):     ...:     global s     ...:     if len(x) == 0:     ...:         s += 1     ...: In [70]: myrdd.foreach(f) In [71]: 

Spark 累加器使用

1.使用foreach碰到了问题 没看过累加器的时候,写了这么个代码,发现map里头foreach完了还是0啊?咋回事啊? 1 def calNrOfEachDataMap(data:RDD[String],neededDataMap:Set[Map[Int,String]]): Map[Map[Int,String],Int] ={ 2 var ans:Map[Map[Int,String],Int] = Map[Map[Int,String],Int]() 3 neededDataMap.f

spark streaming测试之四设置窗口大小接收数据

测试思路: 首先,使用网络数据发送程序发送数据: 然后,运行spark程序: 观察效果. 说明: 1. 这里也需要设置检查点目录 2. 这里有四个参数: 前两个分别是监听的端口和每隔多少毫秒接收一次数据: 第三个参数是接收前多少毫秒的数据:(详细请参见window具体含义) 第四个参数是每隔多少毫秒接收一次数据. sparkStreaming import org.apache.log4j.{LoggerLevel} import org.apache.spark.storage.Storage

[Spark源代码剖析] DAGScheduler提交stage

转载请标明出处:http://blog.csdn.net/bigbigdata/article/details/47310657 DAGScheduler通过调用submitStage来提交stage.实现例如以下: private def submitStage(stage: Stage) { val jobId = activeJobForStage(stage) if (jobId.isDefined) { logDebug("submitStage(" + stage + &q