Spark编程进阶

1.累加器     

通常在向Spark传递函数时,比如使用map()函数或者用filter()传条件是,可以使用驱动器程序中定义的变量,但是集群中运行的每个人物都会得到这些变量的一份新的副本,更新这些副本的值也不会影响驱动器中的对应变量。Spark的两个共享变量,累加器与广播变量,分别为结果聚合与广播这两种常见的通信模式突破了这一限制。

第一种共享变量,累加器,提供了将工作节点中的值聚合到驱动器程序中的简单语法。累加器的一个常见用途是在调试时对作业执行过程中的事件进行计数。

下面的例子是计算输入的文本中有多少空行。

SparkConf conf =new SparkConf().setMaster("local").setAppName("analysis");JavaSparkContext sc =new JavaSparkContext(conf);JavaRDD<String> rdd=sc.textFile("c:\\tests.txt");final Accumulator<Integer> blankLines =sc.accumulator(0);JavaRDD<String> callSigns = rdd.flatMap(        new FlatMapFunction<String, String>() {

public Iterable<String> call(String s) throws Exception {                if (s.equals(""))                    blankLines.add(1);                return Arrays.asList(s.split(" "));            }

});  callSigns.saveAsTextFile("output.txt");System.out.println("Blank Lines: "+blankLines.value());

总结,累加器的用法如下:

1.通过在驱动器中调用SparkContext。accumulator(initialValue)方法,创建出存有初始值得累加器。返回值为org.apache.spark.Accumulator[T]对象,其中T是初始值intialValue的类型。

2.Spark闭包李的执行器代码可以调用累加器的value属性(在Java中使用value()或者setValue())来访问累加器的值。

3.驱动器程序可以调用累加器的value属性(在Java中使用value()或者setValue())来访问累加器的值

注意:

  工作节点上的任务不能够访问累加器的值。从这些任务的角度来看,累计器只是一个只写变量。在这种模式下,累加器的实现可以更加高效,不需要对每次更新操作进行复杂的通信。

累加器与容错性:

Spark会自动重新执行失败或较慢的任务来应对有错误或者比较慢的机器。例如,如果对某分区执行map()操作的节点失败了,Spark会在另一个节点上重新运行该任务。即使该节点没有崩溃,而只是处理速度比别的节点慢很多,Spark也可以抢占式地在另一个节点上启动一个"投机"(speculative)型的任务副本,如果该任务更早结束就可以直接获取结果。即使没有节点失败,spark有时也需要重新运行任务来获取缓存中被移除出内存的数据。因此最终结果就是同一个函数可能对同一个数据运行了多次,这取决于集群中发生了什么。

这种情况下累加器要怎么处理呢?实际结果是,对于要在行动操作中使用的累加器,Spark只会把每个人物对各累加器的修改应用一次。因此,如果想要一个无论在失败还是重复计算时都绝对可靠的累加器,我们必须把它放在foreach()这样的行动操作中。

对于在RDD转化操作中使用的累加器,就不能保证有这种情况了。转化操作中累计器可能会发生不止一次更新。举个例子,当一个被缓存下来但是没有经常使用的RDD在第一次从LRU缓存中被移除并又被重新用到时,这种非预期的多次更新就会发生。这会强制RDD根据其谱系进行重算,而副作用就是这也会是的谱系中的转化操作里的累加器进行更新,并在此发送到驱动器中。在转化操作中,累计器通常只用于调试目的。

Spark中的累加器支持的类型有Double Integer Long Float,Spark还引入了自定义累计器和聚合操作的API,可以实现找到累计的值得最大值,而不是把这些值加起来等功能。

2.广播变量

Spark的第二种共享变量类型是广播变量,它可以让程序高效地想所有工作节点发送一个较大的只读值,以供一个或者多个Spark操作使用。比如,如果你的应用需要向所有节点发送一个较大的只读查询表,甚至是机器学习算法中的一个很大的特征变量,广播变量用起来都很顺手。

前面提过,Spark会自动把闭包中所有引用到的变量发送到工作节点上。虽然这很方便,但是也很低效。原因如下:首先,默认的任务发射机制是专门为小任务进行优化的;其次,事实上你可能会在多个并行操作中使用同一个变量,但是Spark会为每个操作分别发送

下面是一个使用广播变量的例子:使用广播变量广播各个国家的区号前缀,通过每个城市的区号,利用广播的这个变量,查找其对应的国家,然后将RDD针对同一国家名称进行reduce,相同国家的值相加,这样就能够得到每个国家的通信的次数之和了。

  1. //广播变量,每个的国家区号表
  2. final Broadcast<String[]> signPrefixes =sc.broadcast(loadCallSignTable());
  3. JavaPairRDD<String,Integer> countryContactCounts =contactCounts.mapToPair(
  4. new PairFunction<Tuple2<String,Integer>,String,Integer>(){
  5. public Tuple2<String,Integer> call<Tuple2<String,Integer>callSignCount){
  6. //获取区号前缀
  7. String sign=callSignCount._1();
  8. //获取该前缀,和广播变量比较,得到其对应的国家
  9. String country=lookupCountry(sign,signPrefixed.value());
  10. //将(电话,通信次数)转换为(国家,通信次数)PairRDD
  11. return new Tuple2(country,callSignCount._2());
  12. }
  13. }//针对同一个国家做归并,求每一个国家的通信次数之和
  14. ).reduceByKey(new SumInts());
  15. //保存结果
  16. contryContactCounts.saveAsTextFile(outputDir+"/countrys.txt")

总结:广播变量的使用如下:

(1)通过对一个类型T的对象调用SparkContext.broadcast创建出一个Broadcast[T]对象;

(2)通过value()方法访问该对象的值;

(3)变量只会被发到各个节点一次,应作为只读值处理(修改这个值不会影响到其他结点)。

*广播的优化

当广播一个比较大的值是,选择既快又好的序列化格式是很重要的,因为序列化对象的时间很长或者传送话费了很长的时间,这段时间很容易成为性能的瓶颈。你可以使用spark.serializer属性选择另一个序列化库来优化序列化的过程,也可以为你的数据类型实现自己的序列化方式。

3.基于分区的操作

基于分区对数据进行操作可以让我们避免为每个数据元素进行重复的配置工作。诸如打开数据库连接或者创建随机数生成器等操作,都是我们应当尽量避免为每个元素都配置一次的工作。Spark提供基于分区的map和foreach,让你的部分代码只对每个分区进行一次,这样可以帮助降低这些操作的代价。

示例程序如下:我们有一个在线的业余电台呼号数据库,可以用这个数据库查询日志记录过的联系人呼号列表。通过基于分区的操作,可以在每个分区内共享一个数据库连接池,来避免建立太多的连接,同时还可以重用JSON解析器。

  1. /*我们有一个在线的业余呼号数据库,可以用这个数据库查询日志中级路过的联系人呼号列表。可以通过基于分区的操作,在每个分区中共享一个数据库连接池*/
  2. JavaPairRDD<String,CallLog[]>contactsContactLists =
  3. validCallSigns.mapPartitionsToPair(
  4. new PairFlatMapFunction<Iterator<String>,String,CallLog[]>(){
  5. public Iterable<Tuple2<String,CallLog[]>> call(Iterator<String> input){
  6. ArrayList<Tuple2<String,CallLog[]>> callSignLogs =new ArrayList<Tuple2<String, CallLog[]>>();
  7. Arraylist<Tuple2<String,ContentExchange>> requests =new ArrayList<Tuple2<String,ContentExchange>>();
  8. ObjectMapper mapper=createMapper();
  9. HttpClient client =new HttpClient();
  10. try{
  11. client.start();
  12. while(input.hasNext()){
  13. requests.add(createRequestForSign(input.next(),client));
  14. }
  15. for(Tuple2<String,ContentExchange> signExchange :requests){
  16. callSignLogs.add(fetchResultFromRequest(mapper,signExchange));
  17. }
  18. }catch(Exception e){
  19. e.printStackTrace();
  20. }
  21. return callSignLogs;
  22. }
  23. }
  24. );

表格:按分区执行的操作符

函数名 调用所提供的 返回的 对于RDD[T]的函数签名
mapPartitions() 该分区中元素的迭代器 返回的元素的迭代器 f:(Iteraotr[T]) -> Iterator[U]
mapPartitionsWithIndex() 分区序号,以及每个分区中的元素迭代器 返回的元素的迭代器 f:(Int,Iterator[T]) -> Iterator[U]
foreachPartitions() 元素迭代器 f:(Iterator[T]) ->Unit

为了避免重复的配置工作,也可以使用mapPartitions()避免创建对象的开销。有事需要创建一个对象来将不同类型的数据聚合起来。对于计算平均值,一种方法是将数值RDD转化为二元组RDD,以及在规约过程中追踪处理的元素个数,现在可以为每个分区只创建一次二元组,而不用为每个元素都执行这个操作,如下示例程序可以说明这一点。

JavaRDD<Integer> rdd =sc.parallelize(Arrays.asList(1,2,3,4,5,6,7,8));JavaRDD<Tuple2<Integer, Integer>> middle = rdd.mapPartitions(        new FlatMapFunction<Iterator<Integer>, Tuple2<Integer, Integer>>() {            public Iterable<Tuple2<Integer, Integer>> call(Iterator<Integer> integerIterator) throws Exception {               List<Tuple2<Integer,Integer>> list =new ArrayList<Tuple2<Integer, Integer>>();                int sum=0;                int num=0;                while(integerIterator.hasNext()){                    sum+=integerIterator.next();                    num++;                }                list.add(new Tuple2<Integer, Integer>(sum,num));//每个分区只创建了一次Tuple2对象                return list;            }        });Tuple2<Integer, Integer> reduce = middle.reduce(        new Function2<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Tuple2<Integer, Integer>>() {            public Tuple2<Integer, Integer> call(Tuple2<Integer, Integer> v1, Tuple2<Integer, Integer> v2) throws Exception {                return new Tuple2<Integer, Integer>(v1._1()+v2._1(),v1._2()+v2._2());            }        });System.out.println("the average of the numbers is "+(double) reduce._1()/reduce._2());

4.数值RDD的操作

数值RDD中可用的汇总统计数据

方法 含义
count() RDD中元素的个数
mean() 元素的平均值
sum() 总和
max() 最大值
min() 最小值
variance() 元素的方差
sampleVariance() 从采样中计算出的方差
stdev() 标准差
sampleStdev() 采样的标准差

示例程序

List<Double> list =new ArrayList<Double>();for(int i=0;i<10;i++){    list.add(i*1.0);}JavaDoubleRDD rdd =sc.parallelizeDoubles(list);System.out.println("元素个数: "+rdd.count());System.out.println("平均值: "+rdd.mean());System.out.println("和: "+rdd.sum());System.out.println("方差: "+rdd.variance());System.out.println("标准差: "+rdd.stdev());System.out.println("采样标准差: "+rdd.sampleStdev());

*spark 工具类StatCounter 简单使用

最近因为项目需要,需要计算一组数据的标准差,平均值,本来需要自己写一个工具类,今天偶然发现了Spark自带的工具类,现在简单说一下这个类的用法,主要是为了记忆。

(1)counter.merge(double )

添加double元素

(2) double counter.mean()

返回平均值

(3)double counter.stdev()

返回标准差

(4)double counter.variance()

返回方差

项目需要,还要添加一个小功能,计算异常参数。即标准差除以平均值,直接stdev()/mean()即可。需要注意的是,java类可以直接继承scala类,以下是继承类的定义:

public class stat extends StatCounter {    public double cv(){        if(count()==0)            return Double.NaN;        return stdev()/mean();    }    public double cv_format_percent(){        //标明是无效数据,这种情况cv的值一定大于5%        if(count()==0)            return Double.POSITIVE_INFINITY;        return                Math.abs(100*cv());    }}

请注意原始StatCounter类的实现,非常精妙,计算其实主要附加在merge()中。

  1. /** Add a value into this StatCounter, updating the internal statistics. */
  2. def merge(value: Double): StatCounter = {//此处mu是平均值,m2是每个数与平均数的差的平方和
  3. val delta = value - mu
  4. n += 1
  5. mu += delta / n
  6. m2 += delta * (value - mu)
  7. maxValue = math.max(maxValue, value)
  8. minValue = math.min(minValue, value)
  9. this
  10. }

来自为知笔记(Wiz)

时间: 2024-11-03 22:52:48

Spark编程进阶的相关文章

(6)Spark编程进阶

6.1 简介 累加器:用来对信息进行聚合: 广播变量:用来高效分发较大的对象 6.2 累加器 通常在向Spark传递函数时,可以使用驱动器程序中定义的变量,但是集群中运行的每个人物都会得到这些变量的一份新的副本,更新这些副本的值也不会影响驱动器中的对应变量.Spark的两个共享变量,累加器和广播变量,分别为结果聚合与广播这两种常见的通信模式突破了这一限制. 累加器,提供了将工作节点中的值聚合到驱动器程序中的简单语法.累加器的一个常见的用途是在调试时对作业执行过程中的事件进行计数. 在Python

Spark函数式编程进阶

函数式编程进阶 1.函数和变量一样作为Scala语言的一等公民,函数可以直接复制给变量: 2.函数更长用的方式是匿名函数,定义的时候只需要说明输入参数的类型和函数体即可,不需要名称,但是匿名函数赋值给一个变量(其实是val常量),Spark源码中大量存在这种语法: 3.函数可以作为参数直接传递给函数,这极大地简化的编程语法: 4.函数式编程一个非常强大的地方之一在于函数的返回值可以是函数,当函数的返回类型是函数的时候,这个时候就是表明Scala的函数是实现了闭包! Scala壁报的内幕是:Sca

大数据技术之_19_Spark学习_02_Spark Core 应用解析+ RDD 概念 + RDD 编程 + 键值对 RDD + 数据读取与保存主要方式 + RDD 编程进阶 + Spark Core 实例练习

第1章 RDD 概念1.1 RDD 为什么会产生1.2 RDD 概述1.2.1 什么是 RDD1.2.2 RDD 的属性1.3 RDD 弹性1.4 RDD 特点1.4.1 分区1.4.2 只读1.4.3 依赖1.4.4 缓存1.4.5 CheckPoint第2章 RDD 编程2.1 RDD 编程模型2.2 RDD 创建2.2.1 由一个已经存在的 Scala 集合创建,即集合并行化(测试用)2.2.2 由外部存储系统的数据集创建(开发用)2.3 RDD 编程2.3.1 Transformatio

Scala函数式编程进阶

1 package com.dtspark.scala.basics 2 3 /** 4 * 函数式编程进阶: 5 * 1,函数和变量一样作为Scala语言的一等公民,函数可以直接赋值给变量: 6 * 2, 函数更长用的方式是匿名函数,定义的时候只需要说明输入参数的类型和函数体即可,不需要名称,但是如果你要使用的话,一般会把这个匿名函数赋值给一个变量(其实是val常量),Spark源码中大量存在这种语法,必须掌握: 7 * 3, 函数可以作为参数直接传递给函数,这极大的简化的编程的语法,为什么这

Spark编程实现SQL查询的实例

1.Oracle中的SQL select count(1) from a_V_PWYZL_CUSTACCT_PSMIS t where not exists (select 1 from tb_show_multi_question q WHERE q.dqmp_rule_code = '仅比对系统有' and q.dqmp_role_id = '105754659' and q.DQMP_target_id = t.dqmp_mrid) AND NOT EXISTS (select /*+ i

Python基础-第七天-面向对象编程进阶和Socket编程简介

本篇内容: 1.面向对象编程进阶-静态方法 2.面向对象编程进阶-类方法 3.面向对象编程进阶-属性方法 4.面向对象编程进阶-特殊成员(内置方法) 5.面向对象编程进阶-反射 6.异常处理.断言 7.Socket编程简介 一.面向对象编程进阶-静态方法 1.静态方法的实现 通过@staticmethod装饰器可以把其装饰的方法变为一个静态方法: 变成静态方法后,形参中可以不用写self了.如果写了self,默认是不会把对象本身传递给self,需要手动传递: class Dog(object):

shell脚本编程进阶练习题

这两天学习了shell脚本编程进阶,作为一枚文科生,小编觉得...恩..脚本很烧脑.....,不过小编还是做了些题,稍作总结后,呈给各位看官,内容如下: 一.条件选择if语句 选择执行: 注意:if语句可嵌套 单分支 if 判断条件;then 条件为真的分支代码 fi 双分支 if 判断条件; then 条件为真的分支代码 else 条件为假的分支代码 fi 多分支 if 判断条件1; then 条件为真的分支代码 elif 判断条件2; then 条件为真的分支代码 elif 判断条件3; t

进击的Python【第七章】:Python的高级应用(四)面向对象编程进阶

Python的高级应用(三)面向对象编程进阶 本章学习要点: 面向对象高级语法部分 静态方法.类方法.属性方法 类的特殊方法 反射 异常处理 Socket开发基础 一.面向对象高级语法部分 静态方法 要在类中使用静态方法,需在类成员函数前面加上@staticmethod标记符,以表示下面的成员函数是静态函数.使用静态方法的好处是,不需要定义实例即可使用这个方法.另外,多个实例共享此静态方法. 类方法 类方法与普通的成员函数和静态函数有不同之处,在接触的语言中好像也没见过这种语义,看它的定义: 

第51讲:Scala中链式调用风格的实现代码实战及其在Spark编程中的广泛运用

今天学习了下scala中的链式调用风格的实现,在spark编程中,我们经常会看到如下一段代码: sc.textFile("hdfs://......").flatMap(_.split(" ")).map(_,1).reduceByKey(_ + _)........ 这种风格的编程方法叫做链式调用,它的实现方法见下面的代码: class Animal {def breathe : this.type = this}class Cat extends Animal