Spark累加器使用

Spark累加器使用

转贴请声明原文:http://blog.csdn.net/duck_genuine/article/details/41550019

使用spark累加器,解决视频平均播放数计算,以及视频播放数平方和平均值

val totalTimes=sc.accumulator(0l)
val totalVids=sc.accumulator(0)
val totalPow2Times=sc.accumulator(0d)
val timesFile=sc.textFile("/user/zhenyuan.yu/DumpIdTimesJob_tmp_out")

timesFile.foreach(f=>{
   val vid_times=f.split("\t")
   var times=vid_times(1).toInt

   if(times>10000000)times=10000000
   if(times>500){
   val times_d=times.toDouble
   totalTimes+=times
   totalPow2Times+=Math.pow(times_d,2)
   totalVids+=1
   }
   }
)
val avgTimes=totalTimes.value/totalVids.value
val avgPow2Times=totalPow2Times.value/totalVids.value
println("totalTimes:"+totalTimes+",totalVids:"+totalVids+",totalPow2Times:"+totalPow2Times)
println("avgTimes:"+avgTimes+",avgPow2Times:"+avgPow2Times)

~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

计算视频播放数每个区间占用比例

val totalVids=sc.accumulator(0)
val timesFile=sc.textFile("/user/zhenyuan.yu/DumpIdTimesJob_tmp_out")
val keysList=List(100, 500, 1000, 2000, 5000, 10000, 20000, 40000, 80000, 100000, 200000, 300000, 500000, 1000000, 2000000, 5000000, 10000000)
val timesRDD=timesFile.map(f=>{
   val vid_times=f.split("\t")
   var times=vid_times(1).toInt
   times
   }).filter(_>50).map(times=>{
      totalVids+=1
      var key=0
      var end=false
      var i=0
      var size=keysList.size
      while(i<size && !end){
         key=keysList(i)
         if(times<key){
           end=true
         }
         i+=1
      }
      (key,1)
}).reduceByKey(_+_)

val rdd=timesRDD.collect()
println("totalVid:"+totalVids)
for(i<-0 to rdd.size-1){
  val times_times=rdd(i)
  val percent=times_times._2.toFloat/totalVids.value
  println("times:<"+times_times._1+",vid_num:"+times_times._2+",percent:"+percent)
}

时间: 2024-10-15 17:36:22

Spark累加器使用的相关文章

Spark 累加器

由于spark是分布式的计算,所以使得每个task间不存在共享的变量,而为了实现共享变量spark实现了两种类型 - 累加器与广播变量, 对于其概念与理解可以参考:共享变量(广播变量和累加器).可能需要注意:Spark累加器(Accumulator)陷阱及解决办法 因此,我们便可以利用累加器与广播变量来构造一些比较常用的关系,以Map的形式广播出去,提高效率. 如下通过累加器构造了一个DF数据间的映射关系, defgetMap(spark:SparkSession,data:DataFrame)

生产常用Spark累加器剖析之四

生产常用Spark累加器剖析之四 现象描述 val acc = sc.accumulator(0, "Error Accumulator") val data = sc.parallelize(1 to 10) val newData = data.map(x => { if (x % 2 == 0) { accum += 1 } }) newData.count acc.value newData.foreach(println) acc.value 上述现象,会造成acc.v

Spark 累加器实验

以下代码用 Pyspark + IPython 完成 统计日志空行的数量: 读取日志,创建RDD: myrdd = sc.textFile("access.log") 不使用累加器: In [68]: s = 0 In [69]: def f(x):     ...:     global s     ...:     if len(x) == 0:     ...:         s += 1     ...: In [70]: myrdd.foreach(f) In [71]: 

Spark 累加器使用

1.使用foreach碰到了问题 没看过累加器的时候,写了这么个代码,发现map里头foreach完了还是0啊?咋回事啊? 1 def calNrOfEachDataMap(data:RDD[String],neededDataMap:Set[Map[Int,String]]): Map[Map[Int,String],Int] ={ 2 var ans:Map[Map[Int,String],Int] = Map[Map[Int,String],Int]() 3 neededDataMap.f

生产常用Spark累加器剖析之二

Driver端 Driver端初始化构建Accumulator并初始化,同时完成了Accumulator注册,Accumulators.register(this)时Accumulator会在序列化后发送到Executor端 Driver接收到ResultTask完成的状态更新后,会去更新Value的值 然后在Action操作执行后就可以获取到Accumulator的值了 Executor端 Executor端接收到Task之后会进行反序列化操作,反序列化得到RDD和function.同时在反序

Spark之RDD持久化、广播、累加器

RDD持久化.广播.累加器实质上分别涉及了RDD的数据如何保存,RDD在构建高效算法的时候涉及了persist或者checkpoint,以及广播和累加器,通过spark-shell可以试验一些小功能,spark-shell本身是spark的发行包推出的一个程序,通过这个程序可以直接写代码,spark-shell会把代码直接进行运行. 1.1.   RDD持久化实战 从2个层面考虑持久化: 1)操作RDD的时候怎么保存结果(属于Action的部分) 下面使用Spark-shell进行实战: 1.1

spark Accumulator累加器使用示例

官网 http://spark.apache.org/docs/2.3.1/rdd-programming-guide.html#accumulators http://spark.apache.org/docs/2.3.1/api/scala/index.html#org.apache.spark.util.AccumulatorV2 Accumulator是spark提供的累加器,累加器的一个常用用途是在调试时对作业执行过程中的事件进行计数,但是只要driver能获取Accumulator的

RDD持久化、广播、累加器(DT大数据梦工厂)

内容: 1.RDD持久化实战: 2.Spark广播实战: 3.Spark累加器实战: 持久化实战几个方面: 1.怎么保存结果: 2.实现算法的时候cache.persist: 3.checkpoint 广播: 构建算法至关重要,降低网络传输数据量.提高内存的使用效率.加快程序的运行速度 累加器: 全局的指针部件的变量,在executor中只能修改累加器的内容,不能读累加器的内容,在driver中才能读取 ========== Action============ collect.count.sa

大数据精英实战项目班-Hadoop-Spark-真实企业项目

2018最新最全大数据技术视频,项目视频.整套视频,非那种杂七杂八自己拼凑的,内容如下,需要的联系QQ:3164282908(加Q注明大数据) 更有海量大数据技术视频.大数据项目视频,机器学习深度学习技术视频.项目视频.Python编程视频.Oracle数据库视频.Java培训视频高级架构师视频等等等. ├----------01-大数据Java基础------------- │├java第01天 ││├java第01天-01.类型转换.avi ││├java第01天-02.归档分析与实现.av