spark core之RDD编程

  spark提供了对数据的核心抽象——弹性分布式数据集(Resilient Distributed Dataset,简称RDD)。RDD是一个分布式的数据集合,数据可以跨越集群中的多个机器节点,被分区并行执行。
  在spark中,对数据的所有操作不外乎创建RDD、转化已有RDD及调用RDD操作进行求值。spark会自动地将RDD中的数据分发到集群中并行执行。

五大特性

  • a list of partitions
      RDD是一个由多个partition(某个节点里的某一片连续的数据)组成的的list;将数据加载为RDD时,一般会遵循数据的本地性(一般一个hdfs里的block会加载为一个partition)。
  • a function for computing each split
      RDD的每个partition中都会有function,即函数应用,其作用是实现RDD之间partition的转换。
  • a list of dependencies on other RDDs
      RDD会记录它的依赖,为了容错(重算,cache,checkpoint),即内存中的RDD操作出错或丢失时会进行重算。
  • Optionally,a Partitioner for Key-value RDDs
      可选项,如果RDD里面存的数据是key-value形式,则可以传递一个自定义的Partitioner进行重新分区,例如自定义的Partitioner是基于key进行分区,那则会将不同RDD里面的相同key的数据放到同一个partition里面。
  • Optionally, a list of preferred locations to compute each split on
      可选项,最优的位置去计算每个分片,即数据的本地性。

    创建RDD

      spark提供了两种创建RDD的方式:读取外部数据源、将驱动器程序中的集合进行并行化。

    并行化集合

      使用sparkContext的parallelize()方法将集合并行化。
      parallelize()方法第二个参数可指定分区数。spark会为每个分区创建一个task任务,通常每个cpu需要2-4个分区。spark会自动地根据集群大小设置分区数,也支持通过parallelize()方法的第二个参数手动指定。

    scala

    val data = Array(1, 2, 3, 4, 5)
    val distData = sc.parallelize(data)

    java

    List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);
    JavaRDD<Integer> distData = sc.parallelize(data);

    python

    data = [1, 2, 3, 4, 5]
    distData = sc.parallelize(data)

      注:除了开发和测试外,这种方式用得不多。这种方式需要把整个数据集先放到一台机器的内存中。

    读取外部数据源

      spark可接入多种hadoop支持的数据源来创建分布式数据集。包括:本地文件系统、HDFS、Cassandra、HBase、Amazon S3等。
      spark支持多种存储格式,包括textFiles、SequenceFiles及其他hadoop存储格式。

    scala

    scala> val distFile = sc.textFile("data.txt")
    distFile: org.apache.spark.rdd.RDD[String] = data.txt MapPartitionsRDD[10] at textFile at <console>:26

    java

    JavaRDD<String> distFile = sc.textFile("data.txt");

    python

    >>> distFile = sc.textFile("data.txt")

RDD操作

  RDD支持两种操作:转化操作和行动操作。

转化操作

  RDD的转化操作会返回一个新的RDD。转化操作是惰性求值的,只有行动操作用到转化操作生成的RDD时,才会真正进行转化。

  spark使用lineage(血统)来记录转化操作生成的不同RDD之间的依赖关系。依赖分为窄依赖(narrow dependencies)和宽依赖(wide dependencies)。

  • 窄依赖

    • 子RDD的每个分区依赖于常数个父分区
    • 输入输出一对一,结果RDD的分区结构不变,主要是map、flatMap
    • 输入输出一对一,但结果RDD的分区结构发生变化,如union、coalesce
    • 从输入中选择部分元素的算子,如filter、distinct、subtract、sample
  • 宽依赖
    • 子RDD的每个分区依赖于所有父RDD分区
    • 对单个RDD基于key进行重组和reduce,如groupByKey、reduceByKey
    • 对两个RDD基于key进行合并和重组,如join

      行动操作

        行动操作则会向驱动器程序返回结果或把结果写入外部系统,会触发实际的计算。

      缓存方式

        RDD通过persist方法或cache方法可以将前面的计算结果缓存,但是并不是这两个方法被调用时立即缓存,而是触发后面的action时,该RDD将会被缓存在计算节点的内存中,并供后面重用。
        cache最终也是调用了persist方法,默认的存储级别是仅在内存存储一份。

        Spark的存储级别还有好多种,存储级别在object StorageLevel中定义的。

        缓存有可能丢失,RDD的缓存容错机制保证即使缓存丢失也能保证计算正确执行。通过基于RDD的一系列转换,丢失的数据会被重算,由于RDD的各个Partition是相对独立的,因此只需要计算丢失的部分即可,并不需要重算全部Partition。

      容错机制

      • Lineage机制

        • RDD的Lineage记录的是粗粒度的特定数据Transformation操作行为。当RDD的部分分区数据丢失时,可以通过Lineage来重新运算和恢复丢失的数据分区。这种粗颗粒的数据模型,限制了Spark的运用场合,所以Spark并不适用于所有高性能要求的场景,但同时相比细颗粒度的数据模型,也带来了性能的提升。
        • Spark Lineage机制是通过RDD的依赖关系来执行的
          • 窄依赖可以在某个计算节点上直接通过计算父RDD的某块数据计算得到子RDD对应的某块数据。
          • 宽依赖则要等到父RDD所有数据都计算完成后,将父RDD的计算结果进行hash并传到对应节点上之后才能计算子RDD。宽依赖要将祖先RDD中的所有数据块全部重新计算,所以在长“血统”链特别是有宽依赖的时候,需要在适当的时机设置数据检查点。
      • Checkpoint机制
        • 简介

          • 当RDD的action算子触发计算结束后会执行checkpoint;Task计算失败的时候会从checkpoint读取数据进行计算。
        • 实现方式(checkpoint有两种实现方式,如果代码中没有设置checkpoint,则使用local的checkpoint模式,如果设置路径,则使用reliable的checkpoint模式。)
          • LocalRDDCheckpointData:临时存储在本地executor的磁盘和内存上。该实现的特点是比较快,适合lineage信息需要经常被删除的场景(如GraphX),可容忍executor挂掉。
          • ReliableRDDCheckpointData:存储在外部可靠存储(如hdfs),可以达到容忍driver 挂掉情况。虽然效率没有存储本地高,但是容错级别最好。

原文地址:http://blog.51cto.com/12967015/2163616

时间: 2024-08-25 18:32:52

spark core之RDD编程的相关文章

Spark Core 的RDD

(1)RDD的介绍 ?????RDD(Resilient Distributed Dataset)叫做分布式数据集,是Spark中最基本的数据抽象,它代表一个不可变(RDD中的数据,不能增删改),可分区.元素可并行计算的集合.??具有数据流的模型的特点,自动容错.位置感知性调度和可伸缩性.RDD允许用户在执行多个查询时显示的将工作集缓存在内存中.后续的查询能够重用工作集,这极大地提升了查询速度.??RDD可以从 三方面理解:??? - 数据集:RDD是数据集合的抽象,是复杂物理介质上存在数据的一

spark 中的RDD编程 -以下基于Java api

1.RDD介绍:     RDD,弹性分布式数据集,即分布式的元素集合.在spark中,对所有数据的操作不外乎是创建RDD.转化已有的RDD以及调用RDD操作进行求值.在这一切的背后,Spark会自动将RDD中的数据分发到集群中,并将操作并行化. Spark中的RDD就是一个不可变的分布式对象集合.每个RDD都被分为多个分区,这些分区运行在集群中的不同节点上.RDD可以包含Python,Java,Scala中任意类型的对象,甚至可以包含用户自定义的对象. 用户可以使用两种方法创建RDD:读取一个

spark记录(1)spark Core之RDD

Spark运行模式 Local 多用于本地测试,如在eclipse,idea中写程序测试等. Standalone Standalone是Spark自带的一个资源调度框架,它支持完全分布式. Yarn Hadoop生态圈里面的一个资源调度框架,Spark也是可以基于Yarn来计算的. Mesos 资源调度框架. ¬  要基于Yarn来进行资源调度,必须实现AppalicationMaster接口,Spark实现了这个接口,所以可以基于Yarn. 1.概念 RDD(Resilient Distri

大数据技术之_19_Spark学习_02_Spark Core 应用解析+ RDD 概念 + RDD 编程 + 键值对 RDD + 数据读取与保存主要方式 + RDD 编程进阶 + Spark Core 实例练习

第1章 RDD 概念1.1 RDD 为什么会产生1.2 RDD 概述1.2.1 什么是 RDD1.2.2 RDD 的属性1.3 RDD 弹性1.4 RDD 特点1.4.1 分区1.4.2 只读1.4.3 依赖1.4.4 缓存1.4.5 CheckPoint第2章 RDD 编程2.1 RDD 编程模型2.2 RDD 创建2.2.1 由一个已经存在的 Scala 集合创建,即集合并行化(测试用)2.2.2 由外部存储系统的数据集创建(开发用)2.3 RDD 编程2.3.1 Transformatio

Spark RDD编程(二)

转载请注明出处:http://blog.csdn.net/gamer_gyt @高阳团 博主微博:http://weibo.com/234654758 Github:https://github.com/thinkgamer ============================================================ SparkRDD编程(一) Spark 的键值对(pair RDD)操作,Scala实现 RDD的分区函数 目前Spark中实现的分区函数包括两种 Ha

Learning Spark中文版--第三章--RDD编程(1)

? ?本章介绍了Spark用于数据处理的核心抽象概念,具有弹性的分布式数据集(RDD).一个RDD仅仅是一个分布式的元素集合.在Spark中,所有工作都表示为创建新的RDDs.转换现有的RDDs,或者调用RDDs上的操作来计算结果.在底层,Spark自动将数据中包含的数据分发到你的集群中,并将你对它们执行的操作进行并行化.数据科学家和工程师都应该阅读这一章,因为RDDs是Spark的核心概念.我们强烈建议你在这些例子中尝试一些 交互式shell(参见"Spark的Python和Scala she

Spark Core 的核心理论

1. Spark Core的核心功能  (1)SparkContext:    通常而言,DriverApplication 的执行与输出都是通过SparkContext完成的,在正式提交Application 之前,首先需要初始化SparkContext.SparkContext隐藏了网络通信.分布式部署.消息通信.存储能力.计算能力.缓存.测量系统.文件服务.web服务等内容.应用程序的开发者只需要使用SparkContext 提供的API完成功能开发.  一个application ---

大数据技术之_27_电商平台数据分析项目_02_预备知识 + Scala + Spark Core + Spark SQL + Spark Streaming + Java 对象池

第0章 预备知识0.1 Scala0.1.1 Scala 操作符0.1.2 拉链操作0.2 Spark Core0.2.1 Spark RDD 持久化0.2.2 Spark 共享变量0.3 Spark SQL0.3.1 RDD.DataFrame 与 DataSet0.3.2 DataSet 与 RDD 互操作0.3.3 RDD.DataFrame 与 DataSet 之间的转换0.3.4 用户自定义聚合函数(UDAF)0.3.5 开窗函数0.4 Spark Streaming0.4.1 Dst

Spark Core应用解析

一.RDD概念 1.1.RDD概述 1.1.1.什么是RDD RDD(Resilient Distributed Dataset)叫做分布式数据集,是Spark中最基本的数据抽象,它代表一个不可变.可分区.里面的元素可并行计算的集合.在 Spark 中,对数据的所有操作不外乎创建 RDD.转化已有RDD 以及调用 RDD 操作进行求值.每个 RDD 都被分为多个分区,这些分区运行在集群中的不同节点上.RDD 可以包含 Python.Java.Scala 中任意类型的对象, 甚至可以包含用户自定义