Spark计算均值

作者:Syn良子 出处:http://www.cnblogs.com/cssdongl 转载请注明出处

用spark来快速计算分组的平均值,写法很便捷,话不多说上代码

object ColumnValueAvg extends App {
  /**
    * ID,Name,ADDRESS,AGE
    * 001,zhangsan,chaoyang,20
    * 002,zhangsa,chaoyang,27
    * 003,zhangjie,chaoyang,35
    * 004,lisi,haidian,24
    * 005,lier,haidian,40
    * 006,wangwu,chaoyang,90
    * 007,wangchao,haidian,80
    */
  val conf = new SparkConf().setAppName("test column value sum and avg").setMaster("local[1]")
  val sc = new SparkContext(conf)

  val textRdd = sc.textFile(args(0))

  //be careful the toInt here is necessary ,if no cast ,then it will be age string append
  val addressAgeMap = textRdd.map(x => (x.split(",")(2), x.split(",")(3).toInt))

  val sumAgeResult = addressAgeMap.reduceByKey(_ + _).collect().foreach(println)

  val avgAgeResult = addressAgeMap.combineByKey(
    (v) => (v, 1),
    (accu: (Int, Int), v) => (accu._1 + v, accu._2 + 1),
    (accu1: (Int, Int), accu2: (Int, Int)) => (accu1._1 + accu2._1, accu1._2 + accu2._2)
  ).mapValues(x => (x._1 / x._2).toDouble).collect().foreach(println)

  println("Sum and Avg calculate successfuly")

  sc.stop()

}

用textFile读取数据后,以address进行分组来求age的平均值,这里用combineByKey来计算,这是一个抽象层次很高的函数.稍微总结一下自己的理解

查看源代码会发现combineByKey定义如下

def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C)
    : RDD[(K, C)] = {
    combineByKey(createCombiner, mergeValue, mergeCombiners, defaultPartitioner(self))
  }

combineByKey函数需要传递三个函数做为参数,分别为createCombiner、mergeValue、mergeCombiner,需要理解这三个函数的意义

结合数据来讲的话,combineByKey默认按照key来进行元素的combine,这里三个参数都是对value的一些操作

1>第一个参数createCombiner,如代码中定义的是 : (v) => (v, 1)

这里是创建了一个combiner,作用是当遍历rdd的分区时,遇到第一次出现的key值,那么生成一个(v,1)的combiner,比如这里key为address,当遇到第一个

chaoyang,20 的时候,(v,1)中的v就是age的值20,1是address出现的次数
 
2>第2个参数是mergeValue,顾名思义就是合并value,如代码中定义的是:(accu: (Int, Int), v) => (accu._1 + v, accu._2 + 1)
这里的作用是当处理当前分区时,遇到已经出现过的key,那么合并combiner中的value,注意这里accu: (Int, Int)对应第一个参数中出现的combiner,即(v,1),注意类型要一致
那么(accu._1 + v, accu._2 + 1)就很好理解了,accu._1即使需要合并的age的值,而acc._2是需要合并的key值出现的次数,出现一次即加1
 
3>第三个参数是mergeCombiners,用来合并各个分区上的累加器,因为各个分区分别运行了前2个函数后需要最后合并分区结果.
 
ok,运行代码,结果如下,分别按照address来计算出age的平均值
 
(haidian,48.0)
(chaoyang,43.0)
 
由于combineByKey抽象程度很高,可以自己custom一些函数做为计算因子,因此可以灵活的完成更多的计算功能.
时间: 2024-08-04 16:13:39

Spark计算均值的相关文章

使用R语言计算均值,方差等

R语言对于数值计算很方便,最近用到了计算方差,标准差的功能,特记录. 数据准备 height <- c(6.00, 5.92, 5.58, 5.92) 1 计算均值 mean(height) [1] 5.855 2 计算中位数 median(height) [1] 5.92 3 计算标准差 sd(height) [1] 0.1871719 4 计算方差 var(height) [1] 0.03503333 5 计算两个变量之间的相关系数 cor(height,log(height)) [1] 0

【Spark深入学习 -13】Spark计算引擎剖析

----本节内容------- 1.遗留问题解答 2.Spark核心概念 2.1 RDD及RDD操作 2.2 Transformation和Action 2.3 Spark程序架构 2.4 Spark on Yarn运行流程 2.5 WordCount执行原理 3.Spark计算引擎原理 3.1 Spark内部原理 3.2 生成逻辑执行图 3.3 生成物理执行图 4.Spark Shuffle解析 4.1 Shuffle 简史 4.2  Spark Shuffle ·Shuffle Write

python计算均值方差

用Python求均值与方差,可以自己写,也可以借助于numpy,不过到底哪个快一点呢? 我做了个实验,首先生成9百万个样本: ? 1 2 3 nlist=range(0,9000000) nlist=[float(i)/1000000 for i in nlist] N=len(nlist) 第二行是为了让样本小一点,否则从1加到9百万会溢出的. 自己实现,遍历数组来求均值方差: ? 1 2 3 4 5 6 7 sum1=0.0 sum2=0.0 for i in range(N):     s

C++ - Vector 计算 均值(mean) 和 方差(variance)

Vector 计算 均值(mean) 和 方差(variance) 本文地址: http://blog.csdn.net/caroline_wendy/article/details/24623187 vector<>类型的数组, 计算均值和方差的最简方法. 代码: double sum = std::accumulate(std::begin(resultSet), std::end(resultSet), 0.0); double mean = sum / resultSet.size()

C语言之文件操作07——读取文件数据并计算均值方差标准差

//文件 /* =============================================================== 题目:从文本文件"high.txt"中取出运动员的身高数据,并计算平均值,方差和标准差! =============================================================== */ #include<stdio.h> #include <math.h> #define hh pr

使用Spark计算PV、UV

日志字段格式: id,ip,url,ref,cookie,time_stamp 把日志文件放到HDFS.仅取了1000行. hadoop fs -put 1000_log hdfs://localhost:9000/user/root/input 直接在Scala Shell中读取文件并计算PV. scala> val textFile = sc.textFile("hdfs://localhost:9000/user/root/input/1000_log") scala>

spark计算平均值

对于 Array(('a',1), ('a',2), ('b',3), ('a',4), ('a',15))如何计算平均值呢: 原来通过计算两边,第一遍计算总次数 val a = sc.parallelize(data).map(x=>1)val b = sc.parallelize(data).map(x=>x._2)val count = a.reduce(_+_)val value = b.reduce(_+_)print(value/count)但是这样需要对数据做两次处理,效率大大的

Spark计算模型

通过一个经典的程序来说明 //输入与构造 RDD val file=sc.textFile("***") //转换Transformation val errors=file.filter(line=>line.contains("ERRORS"))  //输出 Action  error.count() 从RDD的转换和存储角度看这个过程: 用户程序对RDD通过多个函数进行操作,将RDD进行转换. Block-Manager管理RDD的物理分区,每个Bloc

不同格式下计算图片的均值和caffe.proto

均值是所有训练样本的均值,减去之后再进行训练会提高其速度和精度. 1.caffe下的均值 数据格式是二进制的binaryproto,作者提供了计算均值的文件compute_image_mean, 计算均值时调用: sudo build/tools/compute_image_mean examples/mnist/mnist_train_lmdb examples/mnist/mean.binaryproto 生成的均值文件保存在mean_binaryproto. 2.python格式下的均值(