走近RDD

  RDD(Resilient Distributed Datasets)弹性分布式数据集。RDD可以看成是一个简单的"数组",对其进行操作也只需要调用有限的"数组"中的方法即可,但它与一般数组的区别在于:RDD是分布式存储,可以跟好的利用现有的云数据平台,并在内存中进行。此处的弹性指的是数据的存储方式,及数据在节点中进行存储的时候,既可以使用内存也可以使用磁盘。此外,RDD还具有很强的容错性,在spark运行计算的过程中,不会因为某个节点错误而使得整个任务失败;不通节点中并发运行的数据,如果在某个节点发生错误时,RDD会自动将其在不同的节点中重试。

  RDD一大特性是延迟计算,即一个完整的RDD运行任务被分成2部分:Transformation和Action。

  Transformation用于对RDD的创建。在spark中,RDD只能使用Transformation来创建,同时Transformation还提供了大量的操作方法。RDD还可以利用Transformation来生成新的RDD,这样可以在有限的内存空间中生成竟可能多的数据对象。无论发生了多少次Transformation,此时,在RDD中真正数据计算运行的操作Action都没真正的开始运行。

  Action是数据的执行部分,其也提供了大量的方法去执行数据的计算操作部分。

  RDD可以将其看成一个分布在不同节点中的分布式数据集,并将数据以数据块(Block)的形式存储在各个节点的计算机中。每个BlockMaster管理着若干个BlockSlave,而每个BlockSlave又管理着若干个BlockNode。当BlockSlave获得了每个Node节点的地址,又会反向向BlockMaster注册每个Node的基本信息,这样就形成了分层管理。

  RDD依赖

  窄依赖 (narrowdependencies) 和宽依赖 (widedependencies) 。窄依赖是指 父 RDD 的每个分区都只被子 RDD 的一个分区所使用,例如map、filter。相应的,那么宽依赖就是指父 RDD 的分区被多个子 RDD 的分区所依赖,例如groupByKey、reduceByKey等操作。如果父RDD的一个Partition被一个子RDD的Partition所使用就是窄依赖,否则的话就是宽依赖。

  这种划分有两个用处。首先,窄依赖支持在一个结点上管道化执行。例如基于一对一的关系,可以在 filter 之后执行 map 。其次,窄依赖支持更高效的故障还原。因为对于窄依赖,只有丢失的父 RDD 的分区需要重新计算。而对于宽依赖,一个结点的故障可能导致来自所有父 RDD 的分区丢失,因此就需要完全重新执行。因此对于宽依赖,Spark 会在持有各个父分区的结点上,将中间数据持久化来简化故障还原,就像 MapReduce 会持久化 map 的输出一样。对于join操作有两种情况,如果join操作的使用每个partition仅仅和已知的Partition进行join,此时的join操作就是窄依赖;其他情况的join操作就是宽依赖;因为是确定的Partition数量的依赖关系,所以就是窄依赖,得出一个推论,窄依赖不仅包含一对一的窄依赖,还包含一对固定个数的窄依赖(也就是说对父RDD的依赖的Partition的数量不会随着RDD数据规模的改变而改变)

                

  下面就是RDD API

  1、parallelize

  def parallelize[T](seq : scala.Seq[T], numSlices : scala.Int = { /* compiled code */ }) //第一个参数是数据,同时还有一个带有默认数值的参数,改参数为1,该参数表示的是将数据分布在多少个数据节点中存放。

  2、aggregate

  def aggregate[U](zeroValue : U)(seqOp : scala.Function2[U, T, U], combOp : scala.Function2[U, U, U]) //seqOp 是给定的计算方法,combOp 是合并方法,将第一个计算方法得出的结果与源码中的zeroValue进行合并。实例:

import org.apache.spark.{SparkConf, SparkContext}

object test {
  def main(args: Array[String]): Unit = {
    val conf=new SparkConf().setMaster("local").setAppName("test")
    val sc=new SparkContext(conf)
    val arr=sc.parallelize(Array(1,2,3,4,5,6,7,8))//parallelize将内存数据读入Spark系统中,作为整体数据集
    val result=arr.aggregate(0)(math.max(_,_),_+_)//_+_ 对传递的第一个方法的结果集进行进一步处理
    println(result)
  }
}

结果为8

import org.apache.spark.{SparkConf, SparkContext}

object test {
  def main(args: Array[String]): Unit = {
    val conf=new SparkConf().setMaster("local").setAppName("test")
    val sc=new SparkContext(conf)
    val arr=sc.parallelize(Array("abd","hello world","hello sb"))//parallelize将内存数据读入Spark系统中,作为整体数据集
    val result=arr.aggregate("")((value,word)=>value+word,_+_)//_+_ 对传递的第一个方法的结果集进行进一步处理
    println(result)
  }
}

结果为abdhello worldhello sb

  3、cache是将数据内容计算并保存在计算节点的内存中

  4、cartesion是用于对不同的数组进行笛卡尔操作,要求是数组的长度必须相同

import org.apache.spark.{SparkConf, SparkContext}

object test {
  def main(args: Array[String]): Unit = {
    val conf=new SparkConf().setMaster("local").setAppName("test")
    val sc=new SparkContext(conf)
    val arr1=sc.parallelize(Array(1,2,3,4))//parallelize将内存数据读入Spark系统中,作为整体数据集
    val arr2=sc.parallelize(Array(4,3,2,1))
    val res=arr1.cartesian(arr2)
    res.foreach(print)
  }
}

结果:(1,4)(1,3)(1,2)(1,1)(2,4)(2,3)(2,2)(2,1)(3,4)(3,3)(3,2)(3,1)(4,4)(4,3)(4,2)(4,1)

  5、Coalesce是将已经存储的数据重新分片后再进行存储(repartition与Coalesce类似)

import org.apache.spark.{SparkConf, SparkContext}

object test {
  def main(args: Array[String]): Unit = {
    val conf=new SparkConf().setMaster("local").setAppName("test")
    val sc=new SparkContext(conf)
    val arr1=sc.parallelize(Array(1,2,3,4,5,6))//parallelize将内存数据读入Spark系统中,作为整体数据集
    val arr2=arr1.coalesce(2,true)
    val res1=arr1.aggregate(0)(math.max(_,_),_+_)
    println(res1)
    val res2=arr2.aggregate(0)(math.max(_,_),_+_)
    println(res2)
  }
}

结果为6    11

  6、countByValue是计算数据集中某个数据出现的个数,并将其以map的形式返回

  7、countByKey是计算数据集中元数据键值对key出现的个数

import org.apache.spark.{SparkConf, SparkContext}

object test {
  def main(args: Array[String]): Unit = {
    val conf=new SparkConf().setMaster("local").setAppName("test")
    val sc=new SparkContext(conf)
    val arr1=sc.parallelize(Array((1,"a"),(2,‘b‘),(1,‘c‘),(1,‘d‘),(2,‘a‘)))//parallelize将内存数据读入Spark系统中,作为整体数据集
    val res1=arr1.countByValue()
    res1.foreach(println)
    val res2=arr1.countByKey()
    res2.foreach(println)
  }
}
//结果:((1,c),1)
((2,a),1)
((1,a),1)
((1,d),1)
((2,b),1)
(1,3)
(2,2)

  8、filter是对数据集进行过滤

  9、flatMap是对RDD中的数据进行整体操作的一个特殊方法,其在定义时就是针对数据集进行操作

  10、map可以对RDD中的数据集进行逐个操作,其与flatmap不同得是,flatmap是将数据集中的数据作为一个整体去处理,之后再对其中的数据做计算,而map则直接对数据集中的数据做单独的处理

  11、groupBy是将传入的数据进行分组

  12、keyBy是为数据集中的每个个体数据添加一个key,从而形成键值对

  13、reduce同时对2个数据进行处理,主要是对传入的数据进行合并处理

  14、sortBy是对已有的RDD进行重新排序

import org.apache.spark.{SparkConf, SparkContext}

object test {
  def main(args: Array[String]): Unit = {
    val conf=new SparkConf().setMaster("local").setAppName("test")
    val sc=new SparkContext(conf)
    val arr1=sc.parallelize(Array((1,"a"),(2,"c"),(3,"b"),(4,"x"),(5,"f")))//parallelize将内存数据读入Spark系统中,作为整体数据集
    val res1=arr1.sortBy(word=>word._1,true)
    val res2=arr1.sortBy(word=>word._2,true)
    res1.foreach(println)
    res2.foreach(println)
  }
}

  15、zip可以将若干个RDD压缩成一个新的RDD

时间: 2024-10-05 04:27:39

走近RDD的相关文章

(3)RDD编程

1.RDD基础 弹性分布式数据集,简称RDD,是一个不可变的分布式对象集合.在Spark中,对数据的所有操作不外乎创建RDD,转化已有RDD以及调用RDD操作进行求值. 每一个RDD都被分为多个分区,这些分区运行在集群中的不同节点上,RDD可以包含Python,Java,Scala中任意类型的对象,甚至可以包含用户自定义对象. 用户可以使用两种方法创建RDD:读取一个外部数据集,或者在驱动程序里分发驱动器程序中的对象集合. 创建出来以后,RDD支持两种类型的操作:转化操作和行动操作.转化操作和行

spark2.x由浅入深深到底系列六之RDD java api详解三

学习任何spark知识点之前请先正确理解spark,可以参考:正确理解spark 本文详细介绍了spark key-value类型的rdd java api 一.key-value类型的RDD的创建方式 1.sparkContext.parallelizePairs JavaPairRDD<String, Integer> javaPairRDD =         sc.parallelizePairs(Arrays.asList(new Tuple2("test", 3

spark2.x由浅入深深到底系列六之RDD java api详解四

学习spark任何的知识点之前,先对spark要有一个正确的理解,可以参考:正确理解spark 本文对join相关的api做了一个解释 SparkConf conf = new SparkConf().setAppName("appName").setMaster("local"); JavaSparkContext sc = new JavaSparkContext(conf); JavaPairRDD<Integer, Integer> javaPa

spark的数据结构 RDD——DataFrame——DataSet区别

转载自:http://blog.csdn.net/wo334499/article/details/51689549 RDD 优点: 编译时类型安全 编译时就能检查出类型错误 面向对象的编程风格 直接通过类名点的方式来操作数据 缺点: 序列化和反序列化的性能开销 无论是集群间的通信, 还是IO操作都需要对对象的结构和数据进行序列化和反序列化. GC的性能开销 频繁的创建和销毁对象, 势必会增加GC import org.apache.spark.sql.SQLContext import org

Spark弹性分布式数据集RDD

RDD(Resilient Distributed Dataset)是Spark的最基本抽象,是对分布式内存的抽象使用,实现了以操作本地集合的方式来操作分布式数据集的抽象实现.RDD是Spark最核心的东西,它表示已被分区,不可变的并能够被并行操作的数据集合,不同的数据集格式对应不同的RDD实现.RDD必须是可序列化的.RDD可以cache到内存中,每次对RDD数据集的操作之后的结果,都可以存放到内存中,下一个操作可以直接从内存中输入,省去了MapReduce大量的磁盘IO操作.这对于迭代运算比

RDD Join中宽依赖与窄依赖的判断

1.规律 如果JoinAPI之前被调用的RDD API是宽依赖(存在shuffle), 而且两个join的RDD的分区数量一致,join结果的rdd分区数量也一样,这个时候join api是窄依赖 除此之外的,rdd 的join api是宽依赖 2.测试程序 1 package com.ibeifeng.senior.join 2 3 import org.apache.spark.{SparkConf, SparkContext} 4 5 /** 6 * RDD数据Join相关API讲解 7

RDD Join相关API,以及程序

1.数据集 A表数据: 1 a 2 b 3 c B表数据: 1 aa1 1 aa2 2 bb1 2 bb2 2 bb3 4 dd1 2.join的分类 inner join left outer join right outer join full outer join left semi join 3.集中join的结果 A inner join B: 1 a 1 aa1 1 a 1 aa2 2 b 2 bb1 2 b 2 bb2 2 b 2 bb3 A left outer join B:

解析spark RDD

RDD是spark抽象的基石,可以说整个spark编程就是对RDD进行的操作 RDD是弹性的分布式数据集,它是只读的,可分区的,这个数据集的全部或者部分数据可以缓存在内存中,在多次计算间重用.所谓的弹性意思是:内存不够时可以与磁盘进行交换.这是RDD另一个特性:内存计算.就是将数据保存到内存中,同时为了解决内存容量大小的问题,他允许所有的数据我们可以自由的设置cache,和 是否cache RDD的特征: (1)有一个分片列表,就是这个RDD可以被切分,和hadoop一样,能被切分的数据才能并行

RDD之三:RDD创建方式

RDD创建方式 1)从Hadoop文件系统(如HDFS.Hive.HBase)输入创建.2)从父RDD转换得到新RDD.3)通过parallelize或makeRDD将单机数据创建为分布式RDD. 4)基于DB(Mysql).NoSQL(HBase).S3(SC3).数据流创建. 从集合创建RDD parallelize def parallelize[T](seq: Seq[T], numSlices: Int = defaultParallelism)(implicit arg0: Clas