spark变量使用broadcast、accumulator

broadcast


官方文档描述:

Broadcast a read-only variable to the cluster, returning a [[org.apache.spark.broadcast.Broadcast]] object for reading it in distributed functions. The variable will be sent to each cluster only once.

源码分析:

这里使用告警方式代替异常,为了是避免用户进程中断;可能有用户创建了广播变量但是没有使用他们;


  /**
   * Broadcast a read-only variable to the cluster, returning a
   * [[org.apache.spark.broadcast.Broadcast]] object for reading it in distributed functions.
   * The variable will be sent to each cluster only once.
   */
  def broadcast[T: ClassTag](value: T): Broadcast[T] = {
    assertNotStopped()
    require(!classOf[RDD[_]].isAssignableFrom(classTag[T].runtimeClass),
      "Can not directly broadcast RDDs; instead, call collect() and broadcast the result.")
    val bc = env.broadcastManager.newBroadcast[T](value, isLocal)
    val callSite = getCallSite
    logInfo("Created broadcast " + bc.id + " from " + callSite.shortForm)
    cleaner.foreach(_.registerBroadcastForCleanup(bc))
    bc
  }

广播变量允许程序员将一个只读的变量缓存在每台机器上,而不用在任务之间传递变量。广播变量可被用于有效地给每个节点一个大输入数据集的副本。Spark还尝试使用高效地广播算法来分发变量,进而减少通信的开销。 Spark的动作通过一系列的步骤执行,这些步骤由分布式的洗牌操作分开。Spark自动地广播每个步骤每个任务需要的通用数据。这些广播数据被序列化地缓存,在运行任务之前被反序列化出来。这意味着当我们需要在多个阶段的任务之间使用相同的数据,或者以反序列化形式缓存数据是十分重要的时候,显式地创建广播变量才有用。

实例


List<Integer> data = Arrays.asList(5, 1, 1, 4, 4, 2, 2);
JavaRDD<Integer> javaRDD = javaSparkContext.parallelize(data,5);
final Broadcast<List<Integer>> broadcast = javaSparkContext.broadcast(data);
JavaRDD<Integer> result = javaRDD.map(new Function<Integer, Integer>() {
  List<Integer> iList = broadcast.value();
  @Override
  public Integer call(Integer v1) throws Exception {
    Integer isum = 0;
    for(Integer i : iList)
      isum += i;
    return v1 + isum;
  }
});
System.out.println(result.collect());

accumulator


源码分析:

// Methods for creating shared variables

  /**
   * Create an [[org.apache.spark.Accumulator]] variable of a given type, which tasks can "add"
   * values to using the `+=` method. Only the driver can access the accumulator‘s `value`.
   */
  @deprecated("use AccumulatorV2", "2.0.0")
  def accumulator[T](initialValue: T)(implicit param: AccumulatorParam[T]): Accumulator[T] = {
    val acc = new Accumulator(initialValue, param)
    cleaner.foreach(_.registerAccumulatorForCleanup(acc.newAcc))
    acc
  }

  /**
   * Create an [[org.apache.spark.Accumulator]] variable of a given type, with a name for display
   * in the Spark UI. Tasks can "add" values to the accumulator using the `+=` method. Only the
   * driver can access the accumulator‘s `value`.
   */
  @deprecated("use AccumulatorV2", "2.0.0")
  def accumulator[T](initialValue: T, name: String)(implicit param: AccumulatorParam[T])
    : Accumulator[T] = {
    val acc = new Accumulator(initialValue, param, Some(name))
    cleaner.foreach(_.registerAccumulatorForCleanup(acc.newAcc))
    acc
  }

累加器是仅仅被相关操作累加的变量,因此可以在并行中被有效地支持。它可以被用来实现计数器和sum。Spark原生地只支持数字类型的累加器,开发者可以添加新类型的支持。如果创建累加器时指定了名字,可以在Spark的UI界面看到。这有利于理解每个执行阶段的进程(对于Python还不支持) 。
累加器通过对一个初始化了的变量v调用SparkContext.accumulator(v)来创建。在集群上运行的任务可以通过add或者”+=”方法在累加器上进行累加操作。但是,它们不能读取它的值。只有驱动程序能够读取它的值,通过累加器的value方法。

class VectorAccumulatorParam implements AccumulatorParam<Vector> {
  @Override
  //合并两个累加器的值。
  //参数r1是一个累加数据集合
  //参数r2是另一个累加数据集合
  public Vector addInPlace(Vector r1, Vector r2) {
    r1.addAll(r2);
    return r1;
  }
  @Override
  //初始值
  public Vector zero(Vector initialValue) {
     return initialValue;
  }
  @Override
  //添加额外的数据到累加值中
  //参数t1是当前累加器的值
  //参数t2是被添加到累加器的值
  public Vector addAccumulator(Vector t1, Vector t2) {
      t1.addAll(t2);
      return t1;
  }
}
List<Integer> data = Arrays.asList(5, 1, 1, 4, 4, 2, 2);
JavaRDD<Integer> javaRDD = javaSparkContext.parallelize(data,5);

final Accumulator<Integer> accumulator = javaSparkContext.accumulator(0);
Vector initialValue = new Vector();
for(int i=6;i<9;i++)
  initialValue.add(i);
//自定义累加器
final Accumulator accumulator1 = javaSparkContext.accumulator(initialValue,new VectorAccumulatorParam());
JavaRDD<Integer> result = javaRDD.map(new Function<Integer, Integer>() {
  @Override
  public Integer call(Integer v1) throws Exception {
    accumulator.add(1);
    Vector term = new Vector();
    term.add(v1);
    accumulator1.add(term);
    return v1;
  }
});
System.out.println(result.collect());
System.out.println("~~~~~~~~~~~~~~~~~~~~~" + accumulator.value());
System.out.println("~~~~~~~~~~~~~~~~~~~~~" + accumulator1.value());

参考文章:


https://www.cnblogs.com/jinggangshan/p/8117155.html

原文地址:https://www.cnblogs.com/AlanWilliamWalker/p/10960858.html

时间: 2024-11-05 18:58:49

spark变量使用broadcast、accumulator的相关文章

spark中的Broadcast variables和Accumulator

举个例子: val factor = 3 rdd.map( num => num*factor) 以上两行代码显示了rdd的一个map操作,其中factor是一个外部变量.默认情况下,算子的函数内,如果使用到了外部变量,那么会将这个变量拷贝到执行这个函数的每一个task中.如果该变量非常大的话,那么网络传输耗费的资源会特别大,而且在每个节点上占用的内存空间也特别大. Spark提供的Broadcast Variable,是只读的.并且在每个节点上只会有一份副本,而不会为每个task都拷贝一份副本

Spark大师之路:广播变量(Broadcast)源码分析

概述 最近工作上忙死了--广播变量这一块其实早就看过了,一直没有贴出来. 本文基于Spark 1.0源码分析,主要探讨广播变量的初始化.创建.读取以及清除. 类关系 BroadcastManager类中包含一个BroadcastFactory对象的引用.大部分操作通过调用BroadcastFactory中的方法来实现. BroadcastFactory是一个Trait,有两个直接子类TorrentBroadcastFactory.HttpBroadcastFactory.这两个子类实现了对Htt

spark中的广播变量broadcast

Spark中的Broadcast处理 首先先来看一看broadcast的使用代码: val values = List[Int](1,2,3) val broadcastValues = sparkContext.broadcast(values) rdd.mapPartitions(iter => { broadcastValues.getValue.foreach(println) }) 在上面的代码中,首先生成了一个集合变量,把这个变量通过sparkContext的broadcast函数进

08、共享变量(Broadcast Variable和Accumulator)

共享变量工作原理 Spark一个非常重要的特性就是共享变量. 默认情况下,如果在一个算子的函数中使用到了某个外部的变量,那么这个变量的值会被拷贝到每个task中.此时每个task只能操作自己的那份变量副本.如果多个task想要共享某个变量,那么这种方式是做不到的. Spark为此提供了两种共享变量,一种是Broadcast Variable(广播变量),另一种是Accumulator(累加变量).Broadcast Variable会将使用到的变量,仅仅为每个节点拷贝一份,更大的用处是优化性能,

Spark笔记整理(五):Spark RDD持久化、广播变量和累加器

[TOC] Spark RDD持久化 RDD持久化工作原理 Spark非常重要的一个功能特性就是可以将RDD持久化在内存中.当对RDD执行持久化操作时,每个节点都会将自己操作的RDD的partition持久化到内存中,并且在之后对该RDD的反复使用中,直接使用内存缓存的partition.这样的话,对于针对一个RDD反复执行多个操作的场景,就只要对RDD计算一次即可,后面直接使用该RDD,而不需要反复计算多次该RDD. 巧妙使用RDD持久化,甚至在某些场景下,可以将spark应用程序的性能提升1

Spark(四)Spark的广播变量和累加器

一.概述 在spark程序中,当一个传递给Spark操作(例如map和reduce)的函数在远程节点上面运行时,Spark操作实际上操作的是这个函数所用变量的一个独立副本.这些变量会被复制到每台机器上,并且这些变量在远程机器上的所有更新都不会传递回驱动程序.通常跨任务的读写变量是低效的,但是,Spark还是为两种常见的使用模式提供了两种有限的共享变量:广播变量(broadcast variable)和累加器(accumulator) 二.广播变量broadcast variable 2.1 为什

Spark学习之路 (四)Spark的广播变量和累加器[转]

概述 在spark程序中,当一个传递给Spark操作(例如map和reduce)的函数在远程节点上面运行时,Spark操作实际上操作的是这个函数所用变量的一个独立副本.这些变量会被复制到每台机器上,并且这些变量在远程机器上的所有更新都不会传递回驱动程序.通常跨任务的读写变量是低效的,但是,Spark还是为两种常见的使用模式提供了两种有限的共享变量:广播变量(broadcast variable)和累加器(accumulator) 广播变量broadcast variable 为什么要将变量定义成

[Spark內核] 第42课:Spark Broadcast内幕解密:Broadcast运行机制彻底解密、Broadcast源码解析、Broadcast最佳实践

本课主题 Broadcast 运行原理图 Broadcast 源码解析 Broadcast 运行原理图 Broadcast 就是将数据从一个节点发送到其他的节点上; 例如 Driver 上有一张表,而 Executor 中的每个并行执行的Task (100万个Task) 都要查询这张表的话,那我们通过 Broadcast 的方式就只需要往每个Executor 把这张表发送一次就行了,Executor 中的每个运行的 Task 查询这张唯一的表,而不是每次执行的时候都从 Driver 中获得这张表

Spark 自定义累加变量(Accmulator)AccumulatorParam

1.创建一个累加变量 public <T> Accumulator<T> accumulator(T initialValue, AccumulatorParam<T> param) Create an Accumulator variable of a given type, which tasks can "add" values to using the += method. Only the driver can access the acc