Spark 自定义累加变量(Accmulator)AccumulatorParam

1.创建一个累加变量

public <T> Accumulator<T> accumulator(T initialValue,
                             AccumulatorParam<T> param)
Create an Accumulator variable of a given type, which tasks can "add" values to using the += method. Only the driver can access the accumulator‘s value.
Parameters:
initialValue - (undocumented)
param - (undocumented)
Returns:
(undocumented)

使用SparkContext的如上方法,可以创建一个累加变量。默认情况下,这里的T是int或者double,因此如果想要创建T为long的累加变量是不行的。

2.AccumulatorParam介绍

概念:

initialValue:Accumulator的初始值,也就是调用SparkContext.accululator时传递的initialValue

zeroValue:AccumulatorParam的初始值,也就是zero方法的返回值。

假设样本数据集合为simple={1,2,3,4}

执行顺序:

1.调用zero(initialValue),返回zeroValue

2.调用addAccumulator(zeroValue,1) 返回v1.

调用addAccumulator(v1,2)返回v2.

调用addAccumulator(v2,3)返回v3.

调用addAccumulator(v3,4)返回v4.

3.调用addInPlace(initialValue,v4)

因此最终结果是zeroValue+1+2+3+4+initialValue.

3.实现AccumulatorParam

import org.apache.spark.AccumulatorParam;

public class LongAccumulator implements AccumulatorParam<Long>{

        //执行完addAccumulator方法之后,最后会执行这个方法,将value加到init。
        @Override
        public Long addInPlace(Long init, Long value) {
            // TODO Auto-generated method stub
            // return arg0+arg1;
            System.out.println(init+":"+value);
            return init+value;
        }

        /*
         * init 就是SparkContext.accumulator(init)参数init。
         * 这里的返回值是累计的起始值。注意哦,他可以不等于init。
         *
         * 如果init=10,zero(init)=0,那么运算过程如下:
         * v1:=0+step
         * v1:=v1+step
         * ...
         * ...
         * 最后v1:=v1+init
         **/
        @Override
        public Long zero(Long init) {
            // TODO Auto-generated method stub
            System.out.println(init);
            return 0l;
        }

        @Override
        public Long addAccumulator(Long value, Long step) {
            // TODO Auto-generated method stub
            System.out.println(value+","+step);
            return value+step;
        }

    }

接下来使用它。

import java.util.Arrays;
import java.util.List;

import org.apache.spark.Accumulator;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.VoidFunction;

public class AccumulatorDemo {
    public static void main(String[]args){
        SparkConf conf=new SparkConf().setAppName("AccumulatorDemo").setMaster("local");
        JavaSparkContext sc=new JavaSparkContext(conf);

        Accumulator<Long> acc=sc.accumulator(0L,new LongAccumulator());

        List<Long> seq=Arrays.asList(1L,2L,3L,4L);
        JavaRDD<Long> rdd=sc.parallelize(seq);

        rdd.foreach(new VoidFunction<Long>(){

            @Override
            public void call(Long arg0) throws Exception {
                acc.add(arg0);
            }

        });

        System.out.println(acc.value());;
    }
时间: 2024-08-28 21:54:37

Spark 自定义累加变量(Accmulator)AccumulatorParam的相关文章

MVC路由探寻,涉及路由的惯例、自定义片段变量、约束、生成链接和URL等

引子 在了解MVC路由之前,必须了解的概念是"片段".片段是指除主机名和查询字符串以外的.以"/"分隔的各个部分.比如,在http://site.com/Home/Index中,包含2个片段,第一个片段是Home,第二个片段是Index. URL匹配的特点:● 保守的:URL中的片段数量必须和路由规则中的片段数量一致(路由规则没有设置默认值的前提下)● 宽松的:在满足片段数要求的前提下,URL中的片段内容是宽松的 本篇涉及的方面包括:1.路由惯例2.自定义片段变量3

Xcode 8:在 Active Compilation Conditions 中自定义环境变量

来源:没故事的卓同学 链接:http://www.jianshu.com/p/96b36360bb2d 在Xcode 7我们在 OTHER_SWIFT_FLAGS中配置环境变量.但是有一个不爽的地方就是需要在自定义的变量前增加“-D”后才能使用.比如: 然后才能正常使用: #if MYFLAG // 逻辑判断 #endif 现在在Xcode 8中新增了一个 SWIFT_ACTIVE_COMPILATION_CONDITIONS选项,现在直接在里面添加就可以啦! 在代码中的使用逻辑和之前一样. 相

nagios_自定义宏变量隐藏发送邮件账号的敏感信息

需求: Nagios在邮件告警的时候使用sendemail来发送邮件. 默认web管理界面下Configuration -->  Object Type: Commands --> notify-host-by-email  notify-service-by-email 会显示 发送邮件账户的登录密码 通过:nagios的宏变量来隐藏掉显示出来的密码(自定义宏变量) vi /usr/local/nagios/etc/resource.cfg 加上 [email protected] $USE

SSIS:使用自定义的变量 改变 原有数据库连接的相应参数的值

1.创建变量,并设置初始值 2.在Connection Manager 中创建数据库连接 3.打开新建数据库连接的属性(单击该新建链接,按F4,或右击,选项 属性),点击Expressions 后面的 ...按钮 4.在弹出的Property Expressions Editor 窗体中,Property列中,在DropDownList中选择InitialCatelog和ServerName参数,然后点击相对应的Expression 列的 ...按钮 5.在新弹出的Expression Buil

Python语言程序设计之一--for循环中累加变量是否要清零

最近学到了Pyhton中循环这一章.之前也断断续续学过,但都只是到了函数这一章就停下来了,写过的代码虽然保存了下来,但是当时的思路和总结都没有记录下来,很可惜.这次我开通了博客,就是要把这些珍贵的学习思考总结记录下来.从现在开始. 关于这一章始终有几个难点不懂.第一个就是每次循环过后,其中的累加变量是否要清零?这个问题困扰了我许久.很多次写完代码运行时发现,计算结果和我想要的结果根本不同.比如在<Python语言程序设计>这本书里,第5章习题第27题: 计算π的值:π =4 * (1 - 1/

Spark自定义排序与分区

Spark自定义排序与分区 前言: 随着信息时代的不断发展,数据成了时代主题,今天的我们徜徉在数据的海洋中:由于数据的爆炸式增长,各种数据计算引擎如雨后春笋般冲击着这个时代.作为时下最主流的计算引擎之一 Spark也是从各方面向时代展示自己的强大能力.Spark无论是在数据处理还是数据分析.挖掘方面都展现出了强大的主导能力.其分布式计算能力受到越来越多的青睐.本文将介绍spark的排序以及分区. 一.Spark自定义排序 在spark中定义了封装了很多高级的api,在我们的日常开发中使用这些ap

【Spark篇】---Spark中广播变量和累加器

一.前述 Spark中因为算子中的真正逻辑是发送到Executor中去运行的,所以当Executor中需要引用外部变量时,需要使用广播变量. 累机器相当于统筹大变量,常用于计数,统计. 二.具体原理 1.广播变量 广播变量理解图 注意事项 1.能不能将一个RDD使用广播变量广播出去? 不能,因为RDD是不存储数据的.可以将RDD的结果广播出去. 2. 广播变量只能在Driver端定义,不能在Executor端定义. 3. 在Driver端可以修改广播变量的值,在Executor端无法修改广播变量

puppet 自定义fact变量

一.puppetmaster配置: 1. 增加环境变量,仅用于测试编写的变量是否生效. [[email protected] ~]# vi /etc/profile 增加以下内容: #puppet facter export FACTERLIB=/etc/puppet/modules/facts/lib/facter/        #自定义变量文件存放路径 [[email protected] ~]# source /etc/profile 2. 自定义变量文件路径在: /etc/puppet

自定义maven变量以及maven内置常量

本文转自:http://zheng12tian.iteye.com/blog/1770909 在创建Maven工程后,插件配置中通常会用到一些Maven变量,因此需要找个地方对这些变量进行统一定义,下面介绍如何定义自定义变量. 在根节点project下增加properties节点,所有自定义变量均可以定义在此节点内,如下所示: Java代码 <!-- 全局属性配置 --> <properties> <project.build.name>tools</projec