Spark Core 的RDD

(1)RDD的介绍

???
??RDD(Resilient Distributed Dataset)叫做分布式数据集,是Spark中最基本的数据抽象,它代表一个不可变(RDD中的数据,不能增删改),可分区、元素可并行计算的集合。
??具有数据流的模型的特点,自动容错、位置感知性调度和可伸缩性。RDD允许用户在执行多个查询时显示的将工作集缓存在内存中。后续的查询能够重用工作集,这极大地提升了查询速度。
??RDD可以从 三方面理解:
??? - 数据集:RDD是数据集合的抽象,是复杂物理介质上存在数据的一种逻辑视图。从外部看RDD的确可以被看待成经过封装,带扩展特性(如容错性)的数据集合。
??? - 分布式:RDD的数据可能存储在多个节点的磁盘上或者内存中,也就是所谓的多级存储。
??? - 弹性:虽然 RDD 内部存储的数据是只读的,但是,我们可以去修改(例如通 过 repartition 转换操作)并行计算计算单元的划分结构,也就是分区的数量。
??总之:RDD就是一个大集合,将所有的数据都加载到内存中,方便多次进行重用。它的数据可以在多个节点上,并且RDD可以保存在内存中,当如果某个阶段的RDD丢失,不需要重新计算,只需要提取上一次的RDD,在相应的计算即可。

(2)RDD的属性

??

?1)A list of partitions(一组分片,数据集的基本单位)

??一个分区通常与一个任务向关联,分区的个数决定了并行的粒度。分区的个数可以在创建RDD的时候指定,如果不指定,那么默认的由节点的cores个数决定。最终每一个分区会被映射成为BlockManager 中的一个Block,而这个Block会被下一个task使用进行计算。

?2)A function for computing each split(算子)

??每一个RDD都会实现compute,用于分区进行计算

?3)A list of dependencies on other RDDs(RDD之间的依赖)

??RDD 的每次转换都会生成一个新的 RDD,所以 RDD 之间就会形成类似于流水线一样的前后依赖关系。在部分分区数据丢失时,Spark 可以通过这个依赖关系重新计算丢失的分区数据, 而不是对 RDD 的所有分区进行重新计算。
宽依赖和窄依赖

窄依赖(完全依赖):一个父分区唯一对应一个子分区,例:map操作
宽依赖(部分依赖):一个父分区对应多个子分区,如:reduce、group操作
区分宽依赖和窄依赖:当前这个算子的执行过程中是否有shuffle操作。

?4)Optionally a Partitioner for key-value RDDs(分区函数)

??当前 Spark 中实现了两种类型的分片函数,一个是基于哈希的 HashPartitioner,另外一个是基于范围的 RangePartitioner。只有对于 key-value 的 RDD,才会有 Partitioner,非 key-value的 RDD 的 Parititioner 的值是 None。Partitioner 函数不但决定了 RDD 本身的分片数量,也决 定了 parent RDD Shuffle 输出时的分片数量。

?5)Optionally a list of preferred locations to compute each split on

??一个列表,存储存取每个 Partition 的优先位置(preferred location)。按照”移动数据不如移动计算”的理念,Spark 在进行任务调度的时候,会尽可能地将计算任务分配到其所要处理数据块的存储位置。而这个列表中就存放着每个分区的优先位置。

(3)RDD的API(相关算子)

??RDD编程中有两种中形式:Transformation(转换)和Action(行动)。
??Transformation:表示把一个RDD ---->RDD。
??Action:表示把RDD----?集合或者scala对象。

?1)RDD的创建:

object SparktTest {
    def main(args: Array[String]): Unit = {
        val conf: SparkConf = new SparkConf()
        conf.setAppName("SparktTest")
        conf.setMaster("local[2]")
        val sc: SparkContext = new SparkContext()
        //由一个已经存在的 Scala 数据集合创建
        val arr=Array(1,2,3,4)
        val arr1RDD: RDD[Int] = sc.parallelize(arr)
        val arr2RDD: RDD[Int] = sc.makeRDD(arr)

        //由外部存储系统的数据创建(HDFS、HBase...)
        val HDFSRDD: RDD[String] = sc.textFile("/data/input")
    }
}

?2)Transformation:

??官网:http://spark.apache.org/docs/latest/rdd-programming-guide.html#transformations
??注意:RDD中所有的转换(Transformation)都是延迟加载,也就是说,他们并不是直接计算结果,相反的,他们只是记住这些应用到基础数据集,上的一个转换动作,只有当发生一个要求返回一个Driver动作的时候,这些转换才真正运行。
?
map()算子

        val HDFSRDD: RDD[String] = sc.textFile("/data/input")
        /**
          * map 算子,返回一个新的RDD,该RDD由每一个输入元素经过function函数转换后组成
          */
        val mapRDD: RDD[(String, Int)] = HDFSRDD.map(ele=>(ele,1))

flatMap()算子:

val arr=Array("hive hbase hadoop","spark hadoop","yarn hdfs")
        val lineRDD: RDD[String] = sc.parallelize(arr)
        /**
          * flagMap:类似于map,但是每一个元素输入的元素可以被
          * 映射成为0个或者多个输出的元素(返回的是一个序列,而不是单一的元素)
          */
        //返回一个集合hive hbase hadoop spark hadoop yarn hdfs
        val wordRDD: RDD[String] = lineRDD.flatMap(line=>line.split("\\s+"))

filter()算子:

        val arr=Array(1,2,3,4,5)
        val arrRDD: RDD[Int] = sc.parallelize(arr)
        /**
          * filter过滤:返回一个新的RDD,该RDD由经过func函数计算后返回
          * 值为true的输入元素组成
          */
        val filterRDD: RDD[Int] = arrRDD.filter(num=>num%2==0)

mapPartitions()算子:

        val hdfsRDD: RDD[String] = sc.textFile("/data/input")
        /**
          * mapPartitions与map的唯一区别就是,mapPartitions迭代的是一个分区,
          * 而map遍历的每一个元素,mapPartitions参数是一个迭代对象,返回的也是一个迭代对象
          */
        val partitionRDD: RDD[String] = hdfsRDD.mapPartitions((x: Iterator[String]) => {
            val temp = x.toList.map(line => line + "!")
            temp.toIterator
        })

mapPartitionsWithIndex()算子:

        val hdfsRDD: RDD[String] = sc.textFile("/data/input")
        /**
          * 第一个参数是分区编号:分区编号是从0开始的不间断的连续编号
          * 第二个参数和mapPartitions相同
          */
        val partitionRDD: RDD[String] = hdfsRDD.mapPartitionsWithIndex((parnum:Int,x: Iterator[String]) => {
            println(parnum) //分区编号
            val temp = x.toList.map(line => line + "!")
            temp.toIterator
        })

sample()算子:

        val list=1 to 5000
        /**
          * sample方法有三个参数:
          * withReplacement:代表是否有放回的抽取(false 不放回,true:放回)
          * fraction:抽取样本空间占总体的比例,(以分数的形式) 0<=fraction <=1
          * seed:随机数生成器,new Random().nextInt(10),不传表示使用系统的
          * 注意:我们使用的sample算子,不能保证提供集合大小就恰巧是rdd.size()*fraction,结果大小会上下浮动
          * sample在做抽样调查的时候,特别受用
          */
        val listRDD: RDD[Int] = sc.parallelize(list)
        val sampleRDD: RDD[Int] = listRDD.sample(false,0.2)
        println(sampleRDD.count())  //大概是5000*0.2 上下浮动

groupByKey()算子:

        val list=List(("math",18),("hbase",18),("hive",22),("hive",18))
        /**
          * groupByKey,分组
          * 建议groupByKey在实践中,能不用就不用,主要因为groupByKey的效率低,
          * 因为有大量的数据在网络中传输,而且还没有进行本地的预处理
          * 可以使用reduceByKey或者aggregateByKey或者combineByKey代替这个groupByKey
          */

        val stuRDD: RDD[(String, Int)] = sc.parallelize(list)
        //分组
        val groupRDD: RDD[(String, Iterable[Int])] = stuRDD.groupByKey()
        //求平均值
        val result: RDD[(String, Double)] = groupRDD.map { case (name, score) => {
            val avg = score.sum.toDouble / (score.size)
            (name, avg)
        }
        }

reduceByKey算子:

        val list=List(("math",18),("hbase",18),("hive",22),("hive",18))
        /**
          * reduceByKey:在一个(K,V)对的数据集上使用,返回一个(K,V)对的数据
          * 集,key 相同的值,都被使用指定的 reduce 函数聚合
          * 到一起。和 groupByKey 类似,任务的个数是可以通过
          * 第二个可选参数来配置的。
          */
        val stuRDD: RDD[(String, Int)] = sc.parallelize(list)
        //分组,求总分
        val sumRDD: RDD[(String, Int)] = stuRDD.reduceByKey((x, y)=>x+y)
        sumRDD.foreach(println) //打印:(hbase,36)(math,18)(hbase,18)

sortByKey()算子:

        val list=List(("math",18),("hbase",18),("hive",22),("hive",18))
        /**
          * sortByKey:在一个(K,V)的 RDD 上调用,K 必须实现 Ordered 接口,
          * 返回一个按照 key 进行排序的(K,V)的 RDD
          */

        //分组,求总分,排序
        val stuRDD: RDD[(String, Int)] = sc.parallelize(list)
        val sumRDD: RDD[(String, Int)] = stuRDD.reduceByKey((x, y)=>x+y)
        sumRDD.foreach(println) //打印:(hbase,36)(math,18)(hbase,18)
        val sortRDD: RDD[(String, Int)] = sumRDD.map(kv=>(kv._2,kv._1)).sortByKey().map(kv=>(kv._2,kv._1))
        sortRDD.foreach(println)

sortBy算子:

        val list=List(("math",18),("hbase",18),("hive",22),("hive",18))
        /**
          * sortBy(func,[ascending], [numTasks])
          * 与 sortByKey 类似,但是更灵活
          * 第一个参数是根据什么排序
          * 第二个是怎么排序,true 正序,false 倒序
          * 第三个排序后分区数,默认与原 RDD 一样
          */
        //分组,求总分,排序
        val stuRDD: RDD[(String, Int)] = sc.parallelize(list)
        val sumRDD: RDD[(String, Int)] = stuRDD.reduceByKey((x, y)=>x+y)
        sumRDD.foreach(println) //打印:(hbase,36)(math,18)(hbase,18)
        val sortRDD: RDD[(String, Int)] = sumRDD.sortBy(kv=>kv._2,false,2)

aggregateByKey()算子:

object SparktTest {
    def main(args: Array[String]): Unit = {
        val conf: SparkConf = new SparkConf()
        conf.setAppName("SparktTest")
        conf.setMaster("local[2]")
        val sc: SparkContext = new SparkContext()
        /**
          * aggregateByKey(zeroValue)(seqOp,combOp, [numTasks])
          * 先按分区聚合再总的聚合,每次要跟初始值交流
          * zeroValue:初始值
          * seqOp:迭代操作,拿RDD中的每一个元素跟初始值进行合并
          * combOp:分区结果的最终合并
          * numTasks:分区个数
          * aggregate+groupByKey=aggregateByKey
          * aggregate对单个值进行RDD,aggregateByKey对(K,V)值进行RDD
          */
        //aggregate
        val list = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
        val listRDD: RDD[Int] = sc.parallelize(list)
        //求平均值
        /**
          * seqOp: (U, T) => U
          * combOp: (U, U) => U
          * u:(Int,Int)   总和,总次数
          */
        val result: (Int, Int) = listRDD.aggregate(0, 0)((u: (Int, Int), x: Int) => {
            (u._1 + x, u._2 + 1)
        }
            , (u1: (Int, Int), u2: (Int, Int)) => {
                (u1._1 + u2._1, u1._2 + u2._2)
            })
        println(result._1 / result._2)

        //aggregateByKey已经根据(k,v)k 进行分组,以下的操作,是对v进行操作
        //以下操作时求平均值
        val list1 = List(("math", 18), ("hbase", 18), ("hive", 22), ("hive", 18))
        val stuRDD: RDD[(String, Int)] = sc.parallelize(list1)
        val reslutRDD2: RDD[(String, (Int, Int))] = stuRDD.aggregateByKey((0, 0))((x: (Int, Int), y: Int) => {
            (x._1 + y, x._2 + 1)
        }, (x: (Int, Int), y: (Int, Int)) => {
            (x._1 + y._1, x._2 + y._2)
        })
        reslutRDD2.foreach(kv=>{
            val name=kv._1
            val avg=kv._2._1.toDouble/kv._2._2
        })
    }
}

foldLeft()算子:(不是spark的算子,是scala的高级操作)

        /**
          *  foldLeft
          * (zeroValue: T)  初值值
          * (B, A) => B  B是一个元组,B._1 表示累加元素,B._2 表示个数, A 表示下一个元素
          */

        //aggregate
        val list = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
        val result: (Int, Int) = list.foldLeft((0,0))((x, y)=>{(x._1+y,x._2+1)})
        println(result._1.toDouble/result._2)

combineByKey()算子:

object SparktTest {
    def main(args: Array[String]): Unit = {
        val conf: SparkConf = new SparkConf()
        conf.setAppName("SparktTest")
        conf.setMaster("local[2]")
        val sc: SparkContext = new SparkContext(conf)
        /**
          * combineByKey:
          * 合并相同的 key 的值 rdd1.combineByKey(x => x, (a: Int,
          * b: Int) => a + b, (m: Int, n: Int) => m + n)
          */
        //求平均值
        val list1 = List(("math", 18), ("hbase", 18), ("hive", 22), ("hive", 18))
        val listRDD: RDD[(String, Int)] = sc.parallelize(list1)
        /**
          * createCombiner: V => C,
          * mergeValue: (C, V) => C,
          * mergeCombiners: (C, C) => C): RDD[(K, C)]
          */
        val resultRDD: RDD[(String, (Int, Int))] = listRDD.combineByKey(x => {
            (x, 1)
        },
            (x: (Int, Int), y: Int) => {
                (x._1 + y, x._2 + 1)
            },
            (x: (Int, Int), y: (Int, Int)) => {
                (x._1 + y._1, x._2 + y._2)
            })
        resultRDD.foreach{case (name,(sum,count))=>{
            val avg=sum.toDouble/count
            println(s"${name}:${avg}")
        }}
    }
}

连接操作

object SparktTest {
    def main(args: Array[String]): Unit = {
        val conf: SparkConf = new SparkConf()
        conf.setAppName("SparktTest")
        conf.setMaster("local[2]")
        val sc: SparkContext = new SparkContext(conf)
        val arr1 = Array(1, 2, 4, 5)
        val arr1RDD = sc.parallelize(arr1)
        val arr2 = Array(4, 5, 6, 7)
        val arr2RDD = sc.parallelize(arr2)
        //cartesian  笛卡尔积
        val cartesianRDD: RDD[(Int, Int)] = arr1RDD.cartesian(arr2RDD)
        //union : 连接
        val unionRDD: RDD[Int] = arr1RDD.union(arr2RDD)
        //subtract,求,差集
        val sbutractRDD: RDD[Int] = arr1RDD.subtract(arr2RDD)

        //join
        val list1 = List(("a", 1), ("b", 2), ("c", 3))
        val list1RDD = sc.parallelize(list1)
        val list2 = List(("a", "zs"), ("b", "sl"))
        val list2RDD = sc.parallelize(list2)
        /**
          * 根据元组中的key进行join 操作,相同的key向连接
          * 返回的是RDD[(String, (Int, String))] (key,连接结果)
          */
        val joinRDD: RDD[(String, (Int, String))] = list1RDD.join(list2RDD)

        //cogroup
        /**
          * (String key   ,
          * (Iterable[Int] arr1中的相应的key所有value的集合
          * , Iterable[String]))  arr2中的相应的key所有value的集合
          */
        val cogroupRDD: RDD[(String, (Iterable[Int], Iterable[String]))] = list1RDD.cogroup(list2RDD)
    }
}

分区操作

object SparktTest {
    def main(args: Array[String]): Unit = {
        val conf: SparkConf = new SparkConf()
        conf.setAppName("SparktTest")
        conf.setMaster("local[2]")
        val sc: SparkContext = new SparkContext(conf)
        val hdfsRDD: RDD[String] = sc.textFile("/data/word.txt")
        /**
          * 表示在执行了filter操作之后,由于大量的数据被过滤,导致之前设定的分区task个数,
          * 处理剩下的数据导致资源浪费,为了合理高效的利用资源,
          * 可以对task重新定义,在coalesce方法中的分区个数一定要小于之前设置的分区个数。
          */
        hdfsRDD.coalesce(2)
        //打乱数据,重新分区,分区规则为随机分区
        hdfsRDD.repartition(3)

        //自定义分区规则(注意,只在有key-value的RDD中可以使用)
        var arr1 = Array(("a", 1), ("a", 2), ("c", 1), ("b", 2), ("d", 2)
            ("b", 2), ("e", 2)
            , ("b", 2)
            , ("f", 2), ("g", 2), ("h", 2))
        val arrRDD: RDD[(String, Int)] = sc.parallelize(arr1,4)
        arrRDD.partitionBy(new MyPartitioner(3))

    }
}
class MyPartitioner(val numPTN:Int) extends Partitioner{
    //分区个数
    override def numPartitions: Int = numPTN
    //分区规则
    override def getPartition(key: Any): Int = {
        val num=key.hashCode()&Integer.MAX_VALUE%numPTN
        return num
    }
}

总结
   - Transformation返回的仍然是一个RDD
   - 它使用了链式调用的设计模式,对一个 RDD 进行计 算后,变换成另外一个 RDD,然后这个 RDD 又可以进行另外一次转换。这个过程是分布式的。

?3)Action:

常见操作

object SparktTest {
    def main(args: Array[String]): Unit = {
        val conf: SparkConf = new SparkConf()
        conf.setAppName("SparktTest")
        conf.setMaster("local[2]")
        val sc: SparkContext = new SparkContext(conf)
        val list = List(("math", 18), ("hbase", 18), ("hive", 22), ("hive", 18))
        val listRDD: RDD[(String, Int)] = sc.parallelize(list)
        //action  rdd ---map
        listRDD.reduceByKeyLocally((x,y)=>x+y)

        //调用collect的目的是:触发所有的计算,最终收集当前这个调用者RDD的所有数据,返回到客户端,如果数据量比较大,谨慎使用
        listRDD.collect()

        //统计RDD中有多少记录
        listRDD.count()
        //取出RDD中的第一条记录
        listRDD.first()
        //取出RDD前几条记录
        listRDD.take(5)
        //随机采样
        listRDD.takeSample(false,20)
        //按照某种格式,排序后的前几条
        listRDD.top(50)
        //按照升序或者降序,取相应的条数的记录(其中的元素必须继承Ordered)
        listRDD.takeOrdered(3)
        //统计每一个key中的value有多少个
        listRDD.countByKey()
        //统计有多少个元素
        listRDD.countByValue()
        //遍历RDD中每一个元素
        listRDD.foreach(kv=>{})
        //分区遍历RDD中的元素
        listRDD.foreachPartition(kv=>{})
        //将RDD的结果,保存到相应的文件系统中(注意这个目录一定是不存在的目录)
        listRDD.saveAsTextFile("/data/output")
    }
}

总结:Action返回值不是一个RDD。它要么是一个scala的集合,要么是一个值,要么是空。最终返回到Driver程序,或者把RDD写入到文件系统中。

原文地址:http://blog.51cto.com/14048416/2338156

时间: 2024-08-29 12:01:39

Spark Core 的RDD的相关文章

spark core之RDD编程

  spark提供了对数据的核心抽象--弹性分布式数据集(Resilient Distributed Dataset,简称RDD).RDD是一个分布式的数据集合,数据可以跨越集群中的多个机器节点,被分区并行执行.  在spark中,对数据的所有操作不外乎创建RDD.转化已有RDD及调用RDD操作进行求值.spark会自动地将RDD中的数据分发到集群中并行执行. 五大特性 a list of partitions  RDD是一个由多个partition(某个节点里的某一片连续的数据)组成的的lis

spark记录(1)spark Core之RDD

Spark运行模式 Local 多用于本地测试,如在eclipse,idea中写程序测试等. Standalone Standalone是Spark自带的一个资源调度框架,它支持完全分布式. Yarn Hadoop生态圈里面的一个资源调度框架,Spark也是可以基于Yarn来计算的. Mesos 资源调度框架. ¬  要基于Yarn来进行资源调度,必须实现AppalicationMaster接口,Spark实现了这个接口,所以可以基于Yarn. 1.概念 RDD(Resilient Distri

大数据技术之_19_Spark学习_02_Spark Core 应用解析+ RDD 概念 + RDD 编程 + 键值对 RDD + 数据读取与保存主要方式 + RDD 编程进阶 + Spark Core 实例练习

第1章 RDD 概念1.1 RDD 为什么会产生1.2 RDD 概述1.2.1 什么是 RDD1.2.2 RDD 的属性1.3 RDD 弹性1.4 RDD 特点1.4.1 分区1.4.2 只读1.4.3 依赖1.4.4 缓存1.4.5 CheckPoint第2章 RDD 编程2.1 RDD 编程模型2.2 RDD 创建2.2.1 由一个已经存在的 Scala 集合创建,即集合并行化(测试用)2.2.2 由外部存储系统的数据集创建(开发用)2.3 RDD 编程2.3.1 Transformatio

【Spark Core】任务运行机制和Task源代码浅析1

引言 上一小节<TaskScheduler源代码与任务提交原理浅析2>介绍了Driver側将Stage进行划分.依据Executor闲置情况分发任务,终于通过DriverActor向executorActor发送任务消息. 我们要了解Executor的运行机制首先要了解Executor在Driver側的注冊过程.这篇文章先了解一下Application和Executor的注冊过程. 1. Task类及其相关 1.1 Task类 Spark将由Executor运行的Task分为ShuffleMa

spark core源码分析6 Spark job的提交

本节主要讲解SparkContext的逻辑 首先看一个spark自带的最简单的例子: object SparkPi { def main(args: Array[String]) { val conf = new SparkConf().setAppName("Spark Pi") val spark = new SparkContext(conf) val slices = if (args.length > 0) args(0).toInt else 2 val n = ma

【Spark Core】任务执行机制和Task源码浅析1

引言 上一小节<TaskScheduler源码与任务提交原理浅析2>介绍了Driver侧将Stage进行划分,根据Executor闲置情况分发任务,最终通过DriverActor向executorActor发送任务消息. 我们要了解Executor的执行机制首先要了解Executor在Driver侧的注册过程,这篇文章先了解一下Application和Executor的注册过程. 1. Task类及其相关 1.1 Task类 Spark将由Executor执行的Task分为ShuffleMap

《OD学spark》20160925 Spark Core

一.引言 Spark内存计算框架 中国Spark技术峰会 十二场演讲 大数据改变世界,Spark改变大数据 大数据: 以Hadoop 2.x为主的生态系统框架(MapReduce并行计算框架) 存储数据.处理数据 分布式 Spark: 类似于MapReduce的另外一种分布式计算框架 核心: 数据结构:RDD,集合List[T] MapReduce 最大的痛点: IO性能瓶颈,也是所有分布式计算框架的痛点 (1)磁盘IO, input(disk) -> map -> DISK(local)-&

Spark Core源代码分析: Spark任务模型

概述 一个Spark的Job分为多个stage,最后一个stage会包含一个或多个ResultTask,前面的stages会包含一个或多个ShuffleMapTasks. ResultTask运行并将结果返回给driver application. ShuffleMapTask将task的output依据task的partition分离到多个buckets里.一个ShuffleMapTask相应一个ShuffleDependency的partition,而总partition数同并行度.redu

急中生智~利用Spark core完成&quot;ETL&quot;!

背景介绍:今天接到老板分配的一个小任务:开发一个程序,实现从数据库中抽取数据并生成报表的功能(这是我们数据库审计平台准备上线的一个功能).既然是要生成报表,那么首先得有数据,于是便想到从该业务系统的测试环境抽取业务表的数据,然后装载至自己云主机上的Mysql中.本来以为只要"select ...into outfile"和"load data infile..."两个命令就可以搞定的,可是还是出了意外.测试环境导出的txt文件在云主机load时,报了"Ro