spark 学习(二) RDD及共享变量

声明:本文基于spark的programming guide,并融合自己的相关理解整理而成

Spark应用程序总是包含着一个driver program(驱动程序),它执行着用户的main方法,并且执行大量的并行操作(parallel operations)在集群上.


概述

Spark最主要的抽象就是RDD(resilient distributed dataset) 弹性分布式数据集,RDD  就是分割元素的集合,他被分发在集群的各个节点上,并且能够进行并行操作.

RDD的创建有三种方式:

  • HDFS,HBase 或者其他任何能够提供Hadoop输入格式的数据源;
  • 驱动程序中已存在的Scala集合;
  • 其他的RDD的转换

RDD能够持久化到内存中以重复使用加速计算速度, RDD能够自动从失败的节点中恢复(血统设计).

Spark中的另一个抽象就是能够被用于并行计算的共享变量. 默认的情况下, Spark并行运行一个函数是作为一组tasks在不同的节点上同时计算的, 这种情况下,他是通过分发每一个变量的拷贝到每个task中的. 有时候,我们需要某些变量在tasks之间进行共享. 这里Spark支持两种共享变量:

  1. broadcast variables, 被用于持久化变量在每个node的内存中;
  2. accumulators,  这个变量只能够被累加,

RDD操作

操作主要包括两种,分别是transformations 和 action

transformation : 将一个已经存在的RDD中转换成一个新的RDD,所有的转换操作都是lazy执行的,即只是记下了执行的动作,只有当driver程序需要结果的时候才会进行计算.

http://spark.apache.org/docs/latest/programming-guide.html

action:一般用于对RDD中的元素进行实际的计算,然后返回相应的值,例如reduce操作,collect操作,count操作等等.这中action之后返回的就不在是RDD了

RDD基本操作的几个例子以及自己的理解:

 val conf = new SparkConf().setAppName("BasicRDDApp").setMaster("local[4]")
    //spark://host:port
    val sc = new SparkContext(conf)

    /**
     *  parallelized collections
     *  将scala的集合数据,并行化成为能够并行计算的分布式数据集
     */
    val data = 1 to 1000 toArray
    val distData = sc.parallelize(data,10)
    //后面的数字是表示将集合切分成多少个块 ,通常是一个CPU 2-4块,通常spark可以自动帮你切分

    val sum =  distData.reduce((a, b) => a+b )
    //在reduce的时候才开始真正的执行,driver将任务分布到各个机器上,然后每个机器单独执行,将计算的结果返回到driver程序
    println("sum " + sum)

    /**
     * 读取外部的数据源
     * 1.Hadoop支持的数据源 ,例如HDFS,Cassandra,HBase ,Amazon S3
     * ##如果文件地址是本地地址的话,那么他应该在集群的每个节点上都能够被访问(即:每个节点上都应该有同样的文件)
     * ##textFile的第二个参数控制文件被切割的大小默认为64MB ,可以设置更大的但是不能设置更小的
     */

    val distFile = sc.textFile("file:///usr/local/spark/README.md")

    //接下来就可以进行相关的操作了
    distFile.persist()//持久化

    val len = distFile.map(s => 1).reduce((a, b) => a+b)
    println(len)

    val words = distFile.flatMap(l => l.split(" ")).map(w => (w,1)).reduceByKey((a,b) => a+b)
    //w => (v1+v2+v3+...)
    //map => 1->1 , flatMap => 1 -> 0..n

    print(words.count())
    words foreach println

    val twords = distFile.flatMap(l => l.split(" ")).map(w => (w,1)).groupByKey()
    //分组 w => (v1, v2, v3 ...)

    twords foreach println
    //.map(w => (w,1)).foreach(w => w._1);

RDD的持久化

  1. 用法: 使用persist()或者cache()方法,其中cache()方法默认持久化到内存,persist可以自己选择持久化的层次,在shuffle操作中,spark会自动保存中间计算结果,例如reduceBykey
  2. 作用:  RDD的持久化会将会使得每个节点保存相应的计算部分,以便再次使用该数据集时可以直接使用,加快计算速度
  3. 如何选择持久化层次: 如果RDDs 在MEMORY_ONLY下表现良好的话,就选这个层次,这样CPU效率最高

    其次MEMORY_ONLY_SER ,其他情况http://spark.apache.org/docs/latest/programming-guide.html


共享变量

1. broadcast 变量, 只读的共享变量 每个节点上都有一个拷贝, 用法

val broadcastVar = sc.broadcast("string test")

broadcastVar.value

2.accumulator 变量,做累加器用,类似与counter或者是sum

    val broadcastVar = sc.broadcast("string test")//broadcast variable is readonly

    val v = broadcastVar.value
    println(v)

    val accum = sc.accumulator(0, "My Accumulator")//value and name

    sc.parallelize(1 to 1000000).foreach(x => accum+= 1)

    println(accum.name + ":" + accum.value)
时间: 2024-11-08 09:28:37

spark 学习(二) RDD及共享变量的相关文章

spark学习二

SparkContext是编写Spark程序用到的第一个类,是Spark的主要入口点,用于连接Spark集群.创建RDD.累加器和广播变量,是Spark程序的根本.编写不同类型的Spark程序,使用的SparkContext是不同的Scala 使用SparkContextJava   使用JavaSparkContext 开发一个简单的Spark程序:第一步:创建SparkConf对象,设置Spark应用的配置信息第二步:创建Spark Context对象第三步:针对输入源创建一个初始的RDD(

Spark学习之RDD的理解

转自:http://www.infoq.com/cn/articles/spark-core-rdd/ 感谢张逸老师的无私分享 RDD,全称为Resilient Distributed Datasets,是一个容错的.并行的数据结构,可以让用户显式地将数据存储到磁盘和内存中,并能控制数据的分区.同时,RDD还提供了一组丰富的操作来操作这些数据.在这些操作中,诸如map.flatMap.filter等转换操作实现了monad模式,很好地契合了Scala的集合操作.除此之外,RDD还提供了诸如joi

Spark RDD与共享变量简介

hadoop有两个东东:HDFS(存储)和MapReduce(计算).MapReduce计算比较慢,于是Spark(速度是MR的10~100倍)出现了.Spark有两个核心的概念:弹性分布式数据集RDD与共享变量.下面进行一下简单的介绍. 弹性分布式数据集(RDD)获得方式:1 并行化驱动程序内的集合; 2 从外部数据集加载. 1 并行化驱动程序内的集合code demo val data = Array(1,2,3,4,5,6,7,8,9)   //普通数组 val distData = sc

Spark学习三:Spark Schedule以及idea的安装和导入源码

Spark学习三:Spark Schedule以及idea的安装和导入源码 标签(空格分隔): Spark Spark学习三Spark Schedule以及idea的安装和导入源码 一RDD操作过程中的数据位置 二Spark Schedule 三Idea导入spark源码 一,RDD操作过程中的数据位置 [hadoop001@xingyunfei001 spark-1.3.0-bin-2.5.0]$ bin/spark-shell --master local[2] val rdd = sc.t

Spark学习笔记之SparkRDD

Spark学习笔记之SparkRDD 一.   基本概念 RDD(resilient distributed datasets)弹性分布式数据集. 来自于两方面 ①   内存集合和外部存储系统 ②   通过转换来自于其他RDD,如map,filter等 2.创建操作(creation operation):RDD的创建由SparkContext来负责. 3.转换操作(transformation operation):将一个RDD通过一定操作转换为另一个RDD. 4.控制操作(control o

Spark 学习: spark 原理简述与 shuffle 过程介绍

Spark学习: 简述总结 Spark 是使用 scala 实现的基于内存计算的大数据开源集群计算环境.提供了 java,scala, python,R 等语言的调用接口. Spark学习 简述总结 引言 1 Hadoop 和 Spark 的关系 Spark 系统架构 1 spark 运行原理 RDD 初识 shuffle 和 stage 性能优化 1 缓存机制和 cache 的意义 2 shuffle 的优化 3 资源参数调优 4 小结 本地搭建 Spark 开发环境 1 Spark-Scal

Spark学习笔记总结-入门资料精化

Spark简介 spark 可以很容易和yarn结合,直接调用HDFS.Hbase上面的数据,和hadoop结合.配置很容易. spark发展迅猛,框架比hadoop更加灵活实用.减少了延时处理,提高性能效率实用灵活性.也可以与hadoop切实相互结合. spark核心部分分为RDD.Spark SQL.Spark Streaming.MLlib.GraphX.Spark R等核心组件解决了很多的大数据问题,其完美的框架日受欢迎.其相应的生态环境包括zepplin等可视化方面,正日益壮大.大型公

Spark学习笔记

Spark 阅读官方文档 Spark Quick Start Spark Programming Guide Spark SQL, DataFrames and Datasets Guide Cluster Mode Overview Spark Standalone Mode 重要的概念:resilient distributed dataset (RDD), a collection of elements partitioned across the nodes of the cluste

Spark学习四:网站日志分析案例

Spark学习四:网站日志分析案例 标签(空格分隔): Spark Spark学习四网站日志分析案例 一创建maven工程 二创建模板 三日志分析案例 一,创建maven工程 1,执行maven命令创建工程 mvn archetype:generate -DarchetypeGroupId=org.scala-tools.archetypes -DarchetypeArtifactId=scala-archetype-simple -DremoteRepositories=http://scal