spark Accumulator累加器使用示例

官网

http://spark.apache.org/docs/2.3.1/rdd-programming-guide.html#accumulators

http://spark.apache.org/docs/2.3.1/api/scala/index.html#org.apache.spark.util.AccumulatorV2

Accumulator是spark提供的累加器,累加器的一个常用用途是在调试时对作业执行过程中的事件进行计数,但是只要driver能获取Accumulator的值(调用value方法), Task只能对其做增加操作,也可以给Accumulator命名(不支持Python),这样就可以在spark web ui中查看, 可以帮助了解程序运行的情况。

数值累加器可通过调用SparkContext.longAccumulator()或SparkContext,doubleAccumulator()创建,分别用于累加Long或Double类型的值。运行在集群上的任务之后可使用add方法进行累加操作。但是,这些任务并不能读取累加器的值,只有驱动程序使用value方法能读取累加器的值。

spark内置累加器使用示例

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.util.LongAccumulator;

import java.util.Arrays;
import java.util.HashSet;

/**
 * spark内置了数值类型的累加器,比如LongAccumulator、DoubleAccumulator
 */
public class TestAccumulator {

    public static void main(String[] args) {
        SparkConf conf = new SparkConf();
        SparkSession spark = SparkSession
                .builder()
                .appName("gxl")
                .master("local")
                .config(conf)
                .enableHiveSupport()
                .getOrCreate();

        JavaSparkContext jsc = JavaSparkContext.fromSparkContext(spark.sparkContext());

        LongAccumulator accum = jsc.sc().longAccumulator();
        JavaRDD<Integer> javaRDD = jsc.parallelize(Arrays.asList(1, 2, 3));

//        onlyMapOperator(accum,javaRDD);
//        accumInActionOperator(accum,javaRDD);
//        accumExecuteRepeatedly(accum,javaRDD);
    }

    private static void accumInActionOperator(LongAccumulator accum,JavaRDD<Integer> javaRDD){
        javaRDD.foreach(x -> accum.add(x));
        System.out.println(accum.value());
    }

    private static void onlyMapOperator(LongAccumulator accum,JavaRDD<Integer> javaRDD){
        //累加器也是lazy的,只有map操作的算子,累加器不会执行
        javaRDD.map((Function<Integer, Integer>) v1 -> {
            accum.add(v1);
            return v1;
        });
        System.out.println(accum.value());
    }

    private static void accumExecuteRepeatedly(LongAccumulator accum,JavaRDD<Integer> javaRDD){
        JavaRDD<Integer> map = javaRDD.map((Function<Integer, Integer>) v1 -> {
            accum.add(v1);
            return v1;
        });
//        map.count();
//        System.out.println(accum.value());
//        map.reduce((Function2<Integer, Integer, Integer>) (v1, v2) -> v1+v2);
//        System.out.println(accum.value());

        //将map后的rdd缓存起来
        JavaRDD<Integer> cache = map.cache();
        cache.count();
        System.out.println(accum.value());
        cache.reduce((Function2<Integer, Integer, Integer>) (v1, v2) -> v1+v2);
        System.out.println(accum.value());
    }

}

自定义spark累加器使用示例

import org.apache.spark.util.AccumulatorV2;

/**
 * spark 2.3
 * 自定义累计器需要继承AccumulatorV2,并且重写以下方法
 * 将符合条件的数据拼接在一起
 */
public class MyAccumulator extends AccumulatorV2<String,String> {

    private StringBuffer stringBuffer = new StringBuffer();
    /**
     * Returns if this accumulator is zero value or not.
     * 返回该累加器是否为零值。
     * @return
     */
    @Override
    public boolean isZero() {
        return stringBuffer.length() == 0;
    }

    /**
     * Creates a new copy of this accumulator.
     * @return
     */
    @Override
    public AccumulatorV2<String,String> copy() {
        MyAccumulator newMyAccu = new MyAccumulator();
        newMyAccu.stringBuffer.append(stringBuffer);
        return newMyAccu;
    }

    /**
     * Resets this accumulator, which is zero value.
     */
    @Override
    public void reset() {
        stringBuffer.setLength(0);
    }

    /**
     * Takes the inputs and accumulates.
     * @param input
     */
    @Override
    public void add(String input) {
        stringBuffer.append(input).append(",");
    }

    /**
     * Merges another same-type accumulator into this one and update its state, i.e.
     * @param other
     */
    @Override
    public void merge(AccumulatorV2 other) {
        stringBuffer.append(other.value());
    }

    /**
     * Defines the current value of this accumulator
     * @return
     */
    @Override
    public String value() {
        return stringBuffer.toString();
    }
}

测试示例

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.util.LongAccumulator;

import java.util.Arrays;
import java.util.HashSet;

/**
 * spark内置了数值类型的累加器,比如LongAccumulator、DoubleAccumulator
 */
public class TestAccumulator {

    public static void main(String[] args) {
        SparkConf conf = new SparkConf();
        SparkSession spark = SparkSession
                .builder()
                .appName("gxl")
                .master("local")
                .config(conf)
                .enableHiveSupport()
                .getOrCreate();

        JavaSparkContext jsc = JavaSparkContext.fromSparkContext(spark.sparkContext());
        testMyAccumulator(jsc);

    }

    private static void testMyAccumulator(JavaSparkContext jsc){
        MyAccumulator myAccumulator = new MyAccumulator();
        jsc.sc().register(myAccumulator,"myAccumulator");

        HashSet<String> blacklist = new HashSet<>();
        blacklist.add("jack");

        JavaRDD<String> stringJavaRDD = jsc.parallelize(Arrays.asList("jack", "kevin", "wade", "james"));
        JavaRDD<String> filter = stringJavaRDD.filter((Function<String, Boolean>) v1 -> {
            if (blacklist.contains(v1)) {
                return true;
            } else {
                myAccumulator.add(v1);
                return false;
            }
        });
        filter.count();
        System.out.println(myAccumulator.value());
    }

}

原文地址:https://www.cnblogs.com/zz-ksw/p/12443416.html

时间: 2024-07-28 12:53:40

spark Accumulator累加器使用示例的相关文章

Spark(Accumulator)陷阱及解决办法

Accumulator简介 Accumulator是spark提供的累加器,顾名思义,该变量只能够增加. 只有driver能获取到Accumulator的值(使用value方法),Task只能对其做增加操作(使用 +=).你也可以在为Accumulator命名(不支持Python),这样就会在spark web ui中显示,可以帮助你了解程序运行的情况. Accumulator使用 使用示例 举个最简单的accumulator的使用例子: //在driver中定义 val accum = sc.

spark的累加器-SQL-Streaming

RDD持久化 --------------- memory disk off-heap serial replication Memory_ONLY(true , false ,false , true ,1) 广播变量 --------------- driver端切成小块,存放到blockmanager,executor广播变量 的小块,首先从自己的blockmgr中提取,如果提取不到,在从其他 节点(driver + executor)提取,一旦提取到存放在自己的blockmgr. RDD

十一、spark SQL的scala示例

简介 spark SQL官网:http://spark.apache.org/docs/latest/sql-programming-guide.html sparkSQL是构建在sparkCore之上的组件,用于处理结构化的数据.它将数据抽象为DataFrame并提供丰富的API,并且sparkSQL允许使用SQL脚本进行操作,使得数据查询变得非常的容易使用. 同时,sparkSQL除了操作简单,API丰富之外,对于数据源的支持也很强大.你可以从,如: 1)HDFS 2)Parguet文件 3

十二、spark MLlib的scala示例

简介 spark MLlib官网:http://spark.apache.org/docs/latest/ml-guide.html mllib是spark core之上的算法库,包含了丰富的机器学习的一系列算法.你可以通过简单的API来构建算法模型,然后利用模型来进行预测分析推荐之类的. 它包含了一些工具,如: 1)算法工具:分类.回归.聚类.协同等 2)特征化工具:特征提取.转换.降维.选择等 3)管道:用于构建.评估和调整机器学习管道的工具 4)持久性:保存和加载算法.模型.管道 5)实用

spark自定义分区及示例代码

有时自己的业务需要自己实现spark的分区函数 以下代码是实现一个自定义spark分区的demo 实现的功能是根据key值的最后一位数字,写到不同的文件 例如: 10写入到part-00000 11写入到part-00001 . . . 19写入到part-00009 给读者提供一个自定义分区的思路 import org.apache.spark.{Partitioner, SparkContext, SparkConf} //自定义分区类,需继承Partitioner类 class Usrid

Spark的广播和累加器的使用

一.广播变量和累加器 1.1 广播变量: 广播变量允许程序员将一个只读的变量缓存在每台机器上,而不用在任务之间传递变量.广播变量可被用于有效地给每个节点一个大输入数据集的副本.Spark还尝试使用高效地广播算法来分发变量,进而减少通信的开销. Spark的动作通过一系列的步骤执行,这些步骤由分布式的洗牌操作分开.Spark自动地广播每个步骤每个任务需要的通用数据.这些广播数据被序列化地缓存,在运行任务之前被反序列化出来.这意味着当我们需要在多个阶段的任务之间使用相同的数据,或者以反序列化形式缓存

pyspark中使用累加器Accumulator统计指标

评价分类模型的性能时需要用到以下四个指标 最开始使用以下代码计算,发现代码需要跑近一个小时,而且这一个小时都花在这四行代码上 # evaluate model TP = labelAndPreds.filter(lambda (v, p): (v == 1 and p == 1)).count() FP = labelAndPreds.filter(lambda (v, p): (v == 0 and p == 1)).count() TN = labelAndPreds.filter(lamb

【Spark篇】---Spark中广播变量和累加器

一.前述 Spark中因为算子中的真正逻辑是发送到Executor中去运行的,所以当Executor中需要引用外部变量时,需要使用广播变量. 累机器相当于统筹大变量,常用于计数,统计. 二.具体原理 1.广播变量 广播变量理解图 注意事项 1.能不能将一个RDD使用广播变量广播出去? 不能,因为RDD是不存储数据的.可以将RDD的结果广播出去. 2. 广播变量只能在Driver端定义,不能在Executor端定义. 3. 在Driver端可以修改广播变量的值,在Executor端无法修改广播变量

spark变量使用broadcast、accumulator

broadcast 官方文档描述: Broadcast a read-only variable to the cluster, returning a [[org.apache.spark.broadcast.Broadcast]] object for reading it in distributed functions. The variable will be sent to each cluster only once. 源码分析: 这里使用告警方式代替异常,为了是避免用户进程中断: