[Spark]-RDD详解之变量&操作

RDD的操作

  1.1 概述

      RDD整体包含两大类操作

      transformation 从现有中创建一个新的数据集

      action 在对数据集做一定程度的计算后将结果返回

         以MapReduce来说,Map就是一个transformation ,它是从每个文件块上执行一个方法来抽取转换,最终形成一个新的数据集.而Reduce就是一个action,它在对数据集执行一个函数进行计算后返回一个结果

    对于所有的transformation,都是Lazy的,也就是说它不会立即执行,只是单纯的记住怎么样从原来的数据集进行转换的逻辑而已,它仅在某一个计算需要的情况下,才会被真正执行.

    因为transformation 的Lazy性,RDD支持在每次计算时都进行重新计算,当然你可以将这个RDD保存下来 (persist  or cache方法)避免每次重计算

      这种保存既可以是硬盘,也可以是内存,甚至可以选择同步多个副本到多个节点中

    1.2 集群环境下的操作

    集群环境,所有操作最终会交给executors去执行.而变量,会以数据副本的形式交给executors.很多时候,这与我们非集群环境下的开发思维有非常大的不同.

    1.2.1 集群下的闭包

        RDD是支持闭包操作的.但务必注意的是Spark不保证对闭包之外的对象引用进行的变化.

        原因是闭包的会被序列化发生给每一个executor,对于闭包的之外的对象引用会拷贝一个副本给executor.这时多个executor执行至少是跨JVM的

        这时对这个副本对象的变更没有任何意义,因为每个JVM(executor)的副本都是独立的.

    1.2.2 集群下的print

      集群环境下,print不会在driver端有任何输出.

      原因也是一样,print最终是在每个executor执行,其输出也是在每个executor的stdout上,在driver端,是不会有这些输出的.

      如果想在driver输出,一个比较简单的办法是调用collect()将结果发送到driver端在进行print,但这样可能会造成driver内存爆掉(所有executor的数据涌入).比较推荐的做法是rdd.take(100).foreach(println)

     1.2.3 共享变量

      因为集群下,变量只会以副本方式交给executor,而不会将变量的值变化传回driver,所以一个可读写共享的变量是非常有用的.

      Spark提供了两种共享变量 broadcast(广播变量) 和 accumulators(累加器)

      1.2.3.1 广播变量(broadcast)

        广播变量允许将一个只读变量的副本发送到每个机器上(executor机器),而不是对每一个任务发送一个副本.这样在同一机器上的多个任务,就可以反复使用这个变量了.      

        注意:

          广播变量只会对每个节点分发一次,所以一般来说,广播变量不应该再被修改了.以保证每个广播变量的副本的值都是一致的

          如果广播变量被修改,则需要将广播变量重新分发

        另:

          举个例子:Spark的action操作本身是通过一系列的stage来完成的,这些Stage是通过分布式的shuffle操作来进行切分的.而在每个Stage之间,Spark自动使用广播变量.

          这里用法说明,只有数据本身会在多个Stage的多个任务中反复使用,或者说缓存这个数据是非常重要且非常必要的情况下,使用广播变量才有意义.

        广播变量的使用如下:      

          // SparkContext.broadcast(v)进行创建,返回的是广播变量的封装器,以.value读取广播变量的的值
          val broadcastVar = sc.broadcast(Array(1, 2, 3))
          val v = broadcastVar.value

      1.2.3.2 累加器(accumulators) 

        累加器变量仅支持累加操作,因为可以在并行计算执行一些特殊的计算(比计数或者求和).并且累加器的变化是可以在UI的Task界面上看见的(注,不支持Python)

        累加器操作,依然遵循RDD的Lazy原则:

          累加器更新操作是在Action中,并且在每个任务中只会执行一次(如果任务失败重启了,累加器更新不会执行)

          而在transformation中,累加器依然不会立即执行更新,如果transformation被重新执行了,则累加器操作会重复执行

        对于累加器变量,Spark原生支持数值类型.一个使用例子如下        

          val accum = sc.longAccumulator("My Accumulator")
          sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(x))
          println(accum.value)

         也可以创建继承AccumulatorV2的类型,来实现自定义类型的累加器,例子如下:          

          //两个泛型参数->累加的元素类型和结果类型可以不同的
          class VectorAccumulatorV2 extends AccumulatorV2[MyVector, MyVector] {

            private val myVector: MyVector = MyVector.createZeroVector

            def reset(): Unit = {
              myVector.reset()
            }

            def add(v: MyVector): Unit = {
              myVector.add(v)
            }
            ...
          }

          // 创建一个自定义的累加器类型:
          val myVectorAcc = new VectorAccumulatorV2
          //将这个触发器注册到SparkContext:
          sc.register(myVectorAcc, "MyVectorAcc1")

  1.3 RDD的一些基本操作

    1.3.1 Transformations 操作

      map 将原来RDD中的每个项,用自定义的map函数进行映射转变为新的元素,并返回一个新的RDD

      filter 对原来RDD进行过滤,将过滤的结果返回为一个新的RDD

      flatMap 与map类似

    1.3.2 Action 操作

  1.4 Shuffle过程

    Spark的某些操作,会引起一个Shuffle过程.Shuffle是指不同节点上的不同分区数据整合重新分区分组的机制.

    所以Shuffle是一个代价很高的操作,因为它会导致executor和不同的机器节点之间进行数据复制.

    1.4.1 Shuffle简述

      以reduceByKey为例,将原始数据中key相同的记录聚合为一个组.这里挑战是原始数据很可能是存在不同分区不同机器的(参考MapReduce执行过程)

      Spark-Shuffle与MapReduce-Shuffle的区别

        MapReduce-Shuffle结果是分区有序,分区内再按Key排序

        Spark-Shuffle结果是分区有序,但分区内Key无序.

          要对Spark-Shuffle的分区内再排序,有以下方法:

           mapPartitions 在已有的每个分区上再使用.sort排序

           repartitionAndSortWithinPartitions  重建分区,并排序

           sortBy提前对RDD本身做一个全范围排序

    1.4.2 RDD中引起Shuffle的操作

       repartition类操作 例如:repartitioncoalesce

       _ByKey操作(除了counting相关操作)例如:groupByKeyreduceByKey

       join 例如:cogroupjoin

      1.4.3 Shuffle的性能影响

      Shuffle本身是同时高耗内存,高耗磁盘IO,高耗网络IO的昂贵操作.

        Spark会启动一系列的MapReduce(Hadoop MapReduce),产生大量的数据缓冲区与归并排序,大量的pill文件与归并Merge等等

原文地址:https://www.cnblogs.com/NightPxy/p/9245707.html

时间: 2024-11-09 01:40:50

[Spark]-RDD详解之变量&操作的相关文章

深入探究Spark -- RDD详解

Spark最基本.最根本的数据抽象 RDD基于内存,提高了迭代式.交互式操作的性能 RDD是只读的,只能通过其他RDD批量操作来创建,提高容错性    另外RDD还具有位置感知性调度和可伸缩性 RDD只支持粗粒度转换,记录Lineage,用于恢复丢失的分区,从物理存储的数据计算出相应的RDD分区 RDD的5个主要属性: 1.一组分片,默认的分片个数等于core数.BlockManager进行分配. 2.一个compute计算分区函数,对迭代器进行复合,以分片为单位 3.RDD之间的依赖关系,使数

Spark RDD详解

1.RDD是什么 RDD:Spark的核心概念是RDD (resilientdistributed dataset),指的是一个只读的,可分区的分布式数据集,这个数据集的全部或部分可以缓存在内存中,在多次计算间重用. RDD的描述 Internally, each RDD is characterized by five main properties: A list of partitions A function for computing each split A list of depe

Apache Spark源码走读之13 -- hiveql on spark实现详解

欢迎转载,转载请注明出处,徽沪一郎 概要 在新近发布的spark 1.0中新加了sql的模块,更为引人注意的是对hive中的hiveql也提供了良好的支持,作为一个源码分析控,了解一下spark是如何完成对hql的支持是一件非常有趣的事情. Hive简介 Hive的由来 以下部分摘自Hadoop definite guide中的Hive一章 "Hive由Facebook出品,其设计之初目的是让精通SQL技能的分析师能够对Facebook存放在HDFS上的大规模数据集进行分析和查询. Hive大大

详解js变量、作用域及内存

详解js变量.作用域及内存 来源:伯乐在线 作者:trigkit4 原文出处: trigkit4 基本类型值有:undefined,NUll,Boolean,Number和String,这些类型分别在内存中占有固定的大小空间,他们的值保存在栈空间,我们通过按值来访问的. JavaScript 1 2 (1)值类型:数值.布尔值.null.undefined. (2)引用类型:对象.数组.函数. 如果赋值的是引用类型的值,则必须在堆内存中为这个值分配空间.由于这种值的大小不固定(对象有很多属性和方

Python3字典操作详解 Python3字典操作大全

1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 #Author:sking 4 #Python3字典操作详解 Python3字典操作大全 5 6 #字典是无序的(因为它没有下标),key必须是唯一的 7 #字典是可变的,可以修改和添加 8 #创建字典 9 info = {'haha':12,'heihei':13,'wowo':45} 10 #查找字典的值value 11 #方法1 12 print(info['heihei']) #13 此

python3集合操作方法详解 python3集合操作大全

1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 #Author:sking 4 #python3集合操作方法详解 python3集合操作大全 5 6 #集合是无序的 7 #创建集合 8 a = {1,2,3} #正确 9 b = set([1,2,3]) #正确 10 c = set((1,2,3)) #正确 11 d = set({1:3,2:4}) #结果{1, 2} 只能取字典的key 12 13 #add添加集合中的元素(添加一项)

Spark(六) -- RDD详解

What is RDD? A Resilient Distributed Dataset(RDD),分布式弹性数据集,是Spark上的一个核心抽象 表示用于并行计算的,不可修改的,对数据集合进行分片的数据结构 在Spark上,针对各种各样的计算场景存在着各种各种的RDD,这些RDD拥有一些共同的操作,例如map,filter,persist等,就好像RDDs都是一个总RDD的子类一样,拥有所有RDD的共同特性 同时RDDs也拥有能表达自己特性的操作,例如PairRDDR有groupByKey,j

spark的rdd详解1

1,rdd的转换和行动操作 2,创建rdd的2种方式 1,通过hdfs支持的文件系统,没有真正把数据放rdd,只记录了一下元数据 2,通过scala的集合或者数组并行化的创建rdd 3,rdd的5大特点 1,rdd是分区的,可以指定分区数 2,算子的方法都会作用在每个分区 3,rdd之前有一系列的依赖,所有依赖形成DAG图,DAG计算单位是阶段 4,k-v的rdd可以选择分区器,默认的是hash-partitioned 5,会选择最优的位置计算每个分区,避免跨网络传输数据 原文地址:https:

spark配置详解

对付看把 到这里格式变化太大了,懒得调整了,这是大概spark1.5版本时候的一些参数默认值,现在2.x会有变化 这些皆可在 spark-default.conf配置,或者部分可在 sparkconf().set设置 应用程序属性 |--------------------------------------------------------------------------------------------| 属性名称