理解Spark的RDD

RDD是个抽象类,定义了诸如map()、reduce()等方法,但实际上继承RDD的派生类一般只要实现两个方法:

  • def getPartitions: Array[Partition]
  • def compute(thePart: Partition, context: TaskContext): NextIterator[T]

getPartitions()用来告知怎么将input分片;

compute()用来输出每个Partition的所有行(行是我给出的一种不准确的说法,应该是被函数处理的一个单元);

以一个hdfs文件HadoopRDD为例:

  override def getPartitions: Array[Partition] = {
    val jobConf = getJobConf()
    // add the credentials here as this can be called before SparkContext initialized
    SparkHadoopUtil.get.addCredentials(jobConf)
    val inputFormat = getInputFormat(jobConf)
    if (inputFormat.isInstanceOf[Configurable]) {
      inputFormat.asInstanceOf[Configurable].setConf(jobConf)
    }
    val inputSplits = inputFormat.getSplits(jobConf, minPartitions)
    val array = new Array[Partition](inputSplits.size)
    for (i <- 0 until inputSplits.size) {
      array(i) = new HadoopPartition(id, i, inputSplits(i))
    }
    array
  }

它直接将各个split包装成RDD了,再看compute():

  override def compute(theSplit: Partition, context: TaskContext): InterruptibleIterator[(K, V)] = {
    val iter = new NextIterator[(K, V)] {

      val split = theSplit.asInstanceOf[HadoopPartition]
      logInfo("Input split: " + split.inputSplit)
      var reader: RecordReader[K, V] = null
      val jobConf = getJobConf()
      val inputFormat = getInputFormat(jobConf)
      HadoopRDD.addLocalConfiguration(new SimpleDateFormat("yyyyMMddHHmm").format(createTime),
        context.stageId, theSplit.index, context.attemptId.toInt, jobConf)
      reader = inputFormat.getRecordReader(split.inputSplit.value, jobConf, Reporter.NULL)

      // Register an on-task-completion callback to close the input stream.
      context.addTaskCompletionListener{ context => closeIfNeeded() }
      val key: K = reader.createKey()
      val value: V = reader.createValue()

      // Set the task input metrics.
      val inputMetrics = new InputMetrics(DataReadMethod.Hadoop)
      try {
        /* bytesRead may not exactly equal the bytes read by a task: split boundaries aren't
         * always at record boundaries, so tasks may need to read into other splits to complete
         * a record. */
        inputMetrics.bytesRead = split.inputSplit.value.getLength()
      } catch {
        case e: java.io.IOException =>
          logWarning("Unable to get input size to set InputMetrics for task", e)
      }
      context.taskMetrics.inputMetrics = Some(inputMetrics)

      override def getNext() = {
        try {
          finished = !reader.next(key, value)
        } catch {
          case eof: EOFException =>
            finished = true
        }
        (key, value)
      }

      override def close() {
        try {
          reader.close()
        } catch {
          case e: Exception => logWarning("Exception in RecordReader.close()", e)
        }
      }
    }
    new InterruptibleIterator[(K, V)](context, iter)
  }

它调用reader返回一系列的K,V键值对。

再来看看数据库的JdbcRDD:

  override def getPartitions: Array[Partition] = {
    // bounds are inclusive, hence the + 1 here and - 1 on end
    val length = 1 + upperBound - lowerBound
    (0 until numPartitions).map(i => {
      val start = lowerBound + ((i * length) / numPartitions).toLong
      val end = lowerBound + (((i + 1) * length) / numPartitions).toLong - 1
      new JdbcPartition(i, start, end)
    }).toArray
  }

它直接将结果集分成numPartitions份。其中很多参数都来自于构造函数:

class JdbcRDD[T: ClassTag](
    sc: SparkContext,
    getConnection: () => Connection,
    sql: String,
    lowerBound: Long,
    upperBound: Long,
    numPartitions: Int,
    mapRow: (ResultSet) => T = JdbcRDD.resultSetToObjectArray _)

再看看compute()函数:

  override def compute(thePart: Partition, context: TaskContext) = new NextIterator[T] {
    context.addTaskCompletionListener{ context => closeIfNeeded() }
    val part = thePart.asInstanceOf[JdbcPartition]
    val conn = getConnection()
    val stmt = conn.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)

    // setFetchSize(Integer.MIN_VALUE) is a mysql driver specific way to force streaming results,
    // rather than pulling entire resultset into memory.
    // see http://dev.mysql.com/doc/refman/5.0/en/connector-j-reference-implementation-notes.html
    if (conn.getMetaData.getURL.matches("jdbc:mysql:.*")) {
      stmt.setFetchSize(Integer.MIN_VALUE)
      logInfo("statement fetch size set to: " + stmt.getFetchSize + " to force MySQL streaming ")
    }

    stmt.setLong(1, part.lower)
    stmt.setLong(2, part.upper)
    val rs = stmt.executeQuery()

    override def getNext: T = {
      if (rs.next()) {
        mapRow(rs)
      } else {
        finished = true
        null.asInstanceOf[T]
      }
    }

    override def close() {
      try {
        if (null != rs && ! rs.isClosed()) {
          rs.close()
        }
      } catch {
        case e: Exception => logWarning("Exception closing resultset", e)
      }
      try {
        if (null != stmt && ! stmt.isClosed()) {
          stmt.close()
        }
      } catch {
        case e: Exception => logWarning("Exception closing statement", e)
      }
      try {
        if (null != conn && ! conn.isClosed()) {
          conn.close()
        }
        logInfo("closed connection")
      } catch {
        case e: Exception => logWarning("Exception closing connection", e)
      }
    }
  }

这段代码就是一段sql分页查询执行情况(顺便吐槽一下,这段代码写得确实比较渣。。。确定sql里面不会在limit前面出现整形变量?有兴趣的同仁们,赶紧操起MyBatis或者Hibernate去投稿吧!)

以上内容为本人原创,转载请注明博客地址:http://blog.csdn.net/bluejoe2000/article/details/41415087

以下内容为转载,来自:http://developer.51cto.com/art/201309/410276_1.htm

◆ RDD的特点:

  1. 它是在集群节点上的不可变的、已分区的集合对象。
  2. 通过并行转换的方式来创建如(map, filter, join, etc)。
  3. 失败自动重建。
  4. 可以控制存储级别(内存、磁盘等)来进行重用。
  5. 必须是可序列化的。
  6. 是静态类型的。

◆ RDD的好处

  1. RDD只能从持久存储或通过Transformations操作产生,相比于分布式共享内存(DSM)可以更高效实现容错,对于丢失部分数据分区只需根据它的lineage就可重新计算出来,而不需要做特定的Checkpoint。
  2. RDD的不变性,可以实现类Hadoop MapReduce的推测式执行。
  3. RDD的数据分区特性,可以通过数据的本地性来提高性能,这与Hadoop MapReduce是一样的。
  4. RDD都是可序列化的,在内存不足时可自动降级为磁盘存储,把RDD存储于磁盘上,这时性能会有大的下降但不会差于现在的MapReduce。

◆ RDD的存储与分区

  1. 用户可以选择不同的存储级别存储RDD以便重用。
  2. 当前RDD默认是存储于内存,但当内存不足时,RDD会spill到disk。
  3. RDD在需要进行分区把数据分布于集群中时会根据每条记录Key进行分区(如Hash 分区),以此保证两个数据集在Join时能高效。

◆ RDD的内部表示

在RDD的内部实现中每个RDD都可以使用5个方面的特性来表示:

  1. 分区列表(数据块列表)
  2. 计算每个分片的函数(根据父RDD计算出此RDD)
  3. 对父RDD的依赖列表
  4. 对key-value RDD的Partitioner【可选】
  5. 每个数据分片的预定义地址列表(如HDFS上的数据块的地址)【可选】

◆ RDD的存储级别

RDD根据useDisk、useMemory、deserialized、replication四个参数的组合提供了11种存储级别:

  1. val NONE = new StorageLevel(false, false, false)
  2. val DISK_ONLY = new StorageLevel(true, false, false)
  3. val DISK_ONLY_2 = new StorageLevel(true, false, false, 2)
  4. val MEMORY_ONLY = new StorageLevel(false, true, true)
  5. val MEMORY_ONLY_2 = new StorageLevel(false, true, true, 2)
  6. val MEMORY_ONLY_SER = new StorageLevel(false, true, false)
  7. val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, 2)
  8. val MEMORY_AND_DISK = new StorageLevel(true, true, true)
  9. val MEMORY_AND_DISK_2 = new StorageLevel(true, true, true, 2)
  10. val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false)
  11. val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, 2)

◆ RDD定义了各种操作,不同类型的数据由不同的RDD类抽象表示,不同的操作也由RDD进行抽实现。

RDD的生成

◆ RDD有两种创建方式:

1、从Hadoop文件系统(或与Hadoop兼容的其它存储系统)输入(例如HDFS)创建。

2、从父RDD转换得到新RDD。

◆ 下面来看一从Hadoop文件系统生成RDD的方式,如:val file = spark.textFile("hdfs://..."),file变量就是RDD(实际是HadoopRDD实例),生成的它的核心代码如下:

  1. // SparkContext根据文件/目录及可选的分片数创建RDD, 这里我们可以看到Spark与Hadoop MapReduce很像
  2. // 需要InputFormat, Key、Value的类型,其实Spark使用的Hadoop的InputFormat, Writable类型。
  3. def textFile(path: String, minSplits: Int = defaultMinSplits): RDD[String] = {
  4. hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable],
  5. classOf[Text], minSplits) .map(pair => pair._2.toString) }
  6. // 根据Hadoop配置,及InputFormat等创建HadoopRDD
  7. new HadoopRDD(this, conf, inputFormatClass, keyClass, valueClass, minSplits)

◆ 对RDD进行计算时,RDD从HDFS读取数据时与Hadoop MapReduce几乎一样的:

RDD的转换与操作

◆ 对于RDD可以有两种计算方式:转换(返回值还是一个RDD)与操作(返回值不是一个RDD)。

◆ 转换(Transformations) (如:map, filter, groupBy, join等),Transformations操作是Lazy的,也就是说从一个RDD转换生成另一个RDD的操作不是马上执行,Spark在遇到Transformations操作时只会记录需要这样的操作,并不会去执行,需要等到有Actions操作的时候才会真正启动计算过程进行计算。

◆ 操作(Actions) (如:count, collect, save等),Actions操作会返回结果或把RDD数据写到存储系统中。Actions是触发Spark启动计算的动因。

时间: 2024-09-26 20:09:32

理解Spark的RDD的相关文章

深入理解spark-rdd详解

1.我们在使用spark计算的时候,操作数据集的感觉很方便是因为spark帮我们封装了一个rdd(弹性分布式数据集Resilient Distributed Dataset): 那么rdd数据本身是如何存储的呢,又是如何调度读取的? spark大部分时候都是在集群上上运行的,那么数据本身一定是也是分布式存储的,数据是由每个Excutor的去管理多个block的,而元数据本身是由driver的blockManageMaster来管理,当每个excutor创建的时候也会创建相对应的数据集管理服务bl

Spark的RDD检查点实现分析

概述 在<深入理解Spark:核心思想与源码分析>一书中只是简单介绍了下RDD的checkpoint,对本书是个遗憾.所以此文的目的旨在查漏补缺,完善本书的内容. Spark的RDD执行完成之后会保存检查点,便于当整个作业运行失败重新运行时候,从检查点恢复之前已经运行成功的RDD结果,这样就会大大减少重新计算的成本,提高任务恢复效率和执行效率,节省Spark各个计算节点的资源.本文着重分析检查点的代码实现,更深入理解其原理.在<深入理解Spark:核心思想与源码分析>一书的第5章中

《深入理解SPARK:核心思想与源码分析》——SparkContext的初始化(中)

<深入理解Spark:核心思想与源码分析>一书前言的内容请看链接<深入理解SPARK:核心思想与源码分析>一书正式出版上市 <深入理解Spark:核心思想与源码分析>一书第一章的内容请看链接<第1章 环境准备> <深入理解Spark:核心思想与源码分析>一书第二章的内容请看链接<第2章 SPARK设计理念与基本架构> 由于本书的第3章内容较多,所以打算分别开辟三篇随笔分别展现. <深入理解Spark:核心思想与源码分析>一

(版本定制)第3课:从作业和容错的角度来理解Spark Streaming

本节课内容: 1.Spark Streaming Job架构和运行机制 2.Spark Streaming Job容错架构和运行机制 理解Spark Streaming Job整个架构和运行机制对于精通Spark Streaming来说是至关重要的. 一.首先我们运行以下程序,然后通过这个程序的运行过程进一步加深对Spark Streaming流处理Job的执行过程的理解,代码如下: object OnlineForeachRDD2DB { def main(args: Array[String

轻松理解 Spark 的 aggregate 方法

2019-04-20 关键字: Spark 的 agrregate 作用.Scala 的 aggregate 是什么 Spark 编程中的 aggregate 方法还是比较常用的.本篇文章站在初学者的角度以大白话的形式来讲解一下 aggregate 方法. aggregate 方法是一个聚合函数,接受多个输入,并按照一定的规则运算以后输出一个结果值. aggregate 在哪 aggregate 方法是 Spark 编程模型 RDD 类( org.apache.spark.RDD ) 中定义的一

Spark之RDD的定义及五大特性

RDD是分布式内存的一个抽象概念,是一种高度受限的共享内存模型,即RDD是只读的记录分区的集合,能横跨集群所有节点并行计算,是一种基于工作集的应用抽象. RDD底层存储原理:其数据分布存储于多台机器上,事实上,每个RDD的数据都以Block的形式存储于多台机器上,每个Executor会启动一个BlockManagerSlave,并管理一部分Block:而Block的元数据由Driver节点上的BlockManagerMaster保存,BlockManagerSlave生成Block后向Block

Spark学习之路 (三)Spark之RDD[转]

RDD的概述 什么是RDD? RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是Spark中最基本的数据抽象,它代表一个不可变.可分区.里面的元素可并行计算的集合.RDD具有数据流模型的特点:自动容错.位置感知性调度和可伸缩性.RDD允许用户在执行多个查询时显式地将工作集缓存在内存中,后续的查询能够重用工作集,这极大地提升了查询速度. RDD的属性 (1)一组分片(Partition),即数据集的基本组成单位.对于RDD来说,每个分片都会被一个计算任务处

Spark核心—RDD初探

本文目的 ? 最近在使用Spark进行数据清理的相关工作,初次使用Spark时,遇到了一些挑(da)战(ken).感觉需要记录点什么,才对得起自己.下面的内容主要是关于Spark核心-RDD的相关的使用经验和原理介绍,作为个人备忘,也希望对读者有用. ? 为什么选择Spark ? 原因如下 代码复用:使用Scala高级语言操作Spark,灵活方便,面向对象,函数编程的语言特性可以全部拿来.Scala基本上可以无缝集成java及其相关库.最重要的是,可以封装组件,沉淀工作,提高工作效率.之前用hi

Spark发行笔记8:解读Spark Streaming RDD的全生命周期

本节主要内容: 一.DStream与RDD关系的彻底的研究 二.StreamingRDD的生成彻底研究 Spark Streaming RDD思考三个关键的问题: RDD本身是基本对象,根据一定时间定时产生RDD的对象,随着时间的积累,不对其管理的话会导致内存会溢出,所以在BatchDuration时间内执行完RDD操作后,需对RDD进行管理. 1.DStream生成RDD的过程,DStream到底是怎么生成RDD的? 2.DStream和RDD到底什么关系? 3.运行之后怎么对RDD处理? 所