Spark笔记:RDD基本操作(上)

  本文主要是讲解spark里RDD的基础操作。RDD是spark特有的数据模型,谈到RDD就会提到什么弹性分布式数据集,什么有向无环图,本文暂时不去展开这些高深概念,在阅读本文时候,大家可以就把RDD当作一个数组,这样的理解对我们学习RDD的API是非常有帮助的。本文所有示例代码都是使用scala语言编写的。

  Spark里的计算都是操作RDD进行,那么学习RDD的第一个问题就是如何构建RDD,构建RDD从数据来源角度分为两类:第一类是从内存里直接读取数据,第二类就是从文件系统里读取,当然这里的文件系统种类很多常见的就是HDFS以及本地文件系统了。

  第一类方式从内存里构造RDD,使用的方法:makeRDD和parallelize方法,如下代码所示:

    /* 使用makeRDD创建RDD */
    /* List */
    val rdd01 = sc.makeRDD(List(1,2,3,4,5,6))
    val r01 = rdd01.map { x => x * x }
    println(r01.collect().mkString(","))
    /* Array */
    val rdd02 = sc.makeRDD(Array(1,2,3,4,5,6))
    val r02 = rdd02.filter { x => x < 5}
    println(r02.collect().mkString(","))

    val rdd03 = sc.parallelize(List(1,2,3,4,5,6), 1)
    val r03 = rdd03.map { x => x + 1 }
    println(r03.collect().mkString(","))
    /* Array */
    val rdd04 = sc.parallelize(List(1,2,3,4,5,6), 1)
    val r04 = rdd04.filter { x => x > 3 }
    println(r04.collect().mkString(","))

  大家看到了RDD本质就是一个数组,因此构造数据时候使用的是List(链表)和Array(数组)类型。

  第二类方式是通过文件系统构造RDD,代码如下所示:

    val rdd:RDD[String] = sc.textFile("file:///D:/sparkdata.txt", 1)
    val r:RDD[String] = rdd.flatMap { x => x.split(",") }
    println(r.collect().mkString(","))

  这里例子使用的是本地文件系统,所以文件路径协议前缀是file://。

  构造了RDD对象了,接下来就是如何操作RDD对象了,RDD的操作分为转化操作(transformation)和行动操作(action),RDD之所以将操作分成这两类这是和RDD惰性运算有关,当RDD执行转化操作时候,实际计算并没有被执行,只有当RDD执行行动操作时候才会促发计算任务提交,执行相应的计算操作。区别转化操作和行动操作也非常简单,转化操作就是从一个RDD产生一个新的RDD操作,而行动操作就是进行实际的计算。

  下面是RDD的基础操作API介绍:


操作类型


函数名


作用


转化操作


map()


参数是函数,函数应用于RDD每一个元素,返回值是新的RDD


flatMap()


参数是函数,函数应用于RDD每一个元素,将元素数据进行拆分,变成迭代器,返回值是新的RDD


filter()


参数是函数,函数会过滤掉不符合条件的元素,返回值是新的RDD


distinct()


没有参数,将RDD里的元素进行去重操作


union()


参数是RDD,生成包含两个RDD所有元素的新RDD


intersection()


参数是RDD,求出两个RDD的共同元素


subtract()


参数是RDD,将原RDD里和参数RDD里相同的元素去掉


cartesian()


参数是RDD,求两个RDD的笛卡儿积


行动操作


collect()


返回RDD所有元素


count()


RDD里元素个数


countByValue()


各元素在RDD中出现次数


reduce()


并行整合所有RDD数据,例如求和操作


fold(0)(func)


和reduce功能一样,不过fold带有初始值


aggregate(0)(seqOp,combop)


和reduce功能一样,但是返回的RDD数据类型和原RDD不一样


foreach(func)


对RDD每个元素都是使用特定函数

  下面是以上API操作的示例代码,如下:

  转化操作:

    val rddInt:RDD[Int] = sc.makeRDD(List(1,2,3,4,5,6,2,5,1))
    val rddStr:RDD[String] = sc.parallelize(Array("a","b","c","d","b","a"), 1)
    val rddFile:RDD[String] = sc.textFile(path, 1)

    val rdd01:RDD[Int] = sc.makeRDD(List(1,3,5,3))
    val rdd02:RDD[Int] = sc.makeRDD(List(2,4,5,1))

    /* map操作 */
    println("======map操作======")
    println(rddInt.map(x => x + 1).collect().mkString(","))
    println("======map操作======")
    /* filter操作 */
    println("======filter操作======")
    println(rddInt.filter(x => x > 4).collect().mkString(","))
    println("======filter操作======")
    /* flatMap操作 */
    println("======flatMap操作======")
    println(rddFile.flatMap { x => x.split(",") }.first())
    println("======flatMap操作======")
    /* distinct去重操作 */
    println("======distinct去重======")
    println(rddInt.distinct().collect().mkString(","))
    println(rddStr.distinct().collect().mkString(","))
    println("======distinct去重======")
    /* union操作 */
    println("======union操作======")
    println(rdd01.union(rdd02).collect().mkString(","))
    println("======union操作======")
    /* intersection操作 */
    println("======intersection操作======")
    println(rdd01.intersection(rdd02).collect().mkString(","))
    println("======intersection操作======")
    /* subtract操作 */
    println("======subtract操作======")
    println(rdd01.subtract(rdd02).collect().mkString(","))
    println("======subtract操作======")
    /* cartesian操作 */
    println("======cartesian操作======")
    println(rdd01.cartesian(rdd02).collect().mkString(","))
    println("======cartesian操作======")

  行动操作代码如下:

    val rddInt:RDD[Int] = sc.makeRDD(List(1,2,3,4,5,6,2,5,1))
    val rddStr:RDD[String] = sc.parallelize(Array("a","b","c","d","b","a"), 1)

    /* count操作 */
    println("======count操作======")
    println(rddInt.count())
    println("======count操作======")
    /* countByValue操作 */
    println("======countByValue操作======")
    println(rddInt.countByValue())
    println("======countByValue操作======")
    /* reduce操作 */
    println("======countByValue操作======")
    println(rddInt.reduce((x ,y) => x + y))
    println("======countByValue操作======")
    /* fold操作 */
    println("======fold操作======")
    println(rddInt.fold(0)((x ,y) => x + y))
    println("======fold操作======")
    /* aggregate操作 */
    println("======aggregate操作======")
    val res:(Int,Int) = rddInt.aggregate((0,0))((x,y) => (x._1 + x._2,y),(x,y) => (x._1 + x._2,y._1 + y._2))
    println(res._1 + "," + res._2)
    println("======aggregate操作======")
    /* foeach操作 */
    println("======foeach操作======")
    println(rddStr.foreach { x => println(x) })
    println("======foeach操作======")

  RDD操作暂时先学习到这里,剩下的内容在下一篇里再谈了,下面我要说说如何开发spark,安装spark的内容我后面会使用专门的文章进行讲解,这里我们假设已经安装好了spark,那么我们就可以在已经装好的spark服务器上使用spark-shell进行与spark交互的shell,这里我们直接可以敲打代码编写spark程序。但是spark-shell毕竟使用太麻烦,而且spark-shell一次只能使用一个用户,当另外一个用户要使用spark-shell就会把前一个用户踢掉,而且shell也没有IDE那种代码补全,代码校验的功能,使用起来很是痛苦。

  不过spark的确是一个神奇的框架,这里的神奇就是指spark本地开发调试非常简单,本地开发调试不需要任何已经装好的spark系统,我们只需要建立一个项目,这个项目可以是java的也可以是scala,然后我们将spark-assembly-1.6.1-hadoop2.6.0.jar这样的jar放入项目的环境里,这个时候我们就可以在本地开发调试spark程序了。

  大家请看我们装有scala插件的eclipse里的完整代码:

package cn.com.sparktest

import org.apache.spark.SparkConf
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD

object SparkTest {
  val conf:SparkConf = new SparkConf().setAppName("xtq").setMaster("local[2]")
  val sc:SparkContext = new SparkContext(conf)

  /**
   * 创建数据的方式--从内存里构造数据(基础)
   */
  def createDataMethod():Unit = {
    /* 使用makeRDD创建RDD */
    /* List */
    val rdd01 = sc.makeRDD(List(1,2,3,4,5,6))
    val r01 = rdd01.map { x => x * x }
    println("===================createDataMethod:makeRDD:List=====================")
    println(r01.collect().mkString(","))
    println("===================createDataMethod:makeRDD:List=====================")
    /* Array */
    val rdd02 = sc.makeRDD(Array(1,2,3,4,5,6))
    val r02 = rdd02.filter { x => x < 5}
    println("===================createDataMethod:makeRDD:Array=====================")
    println(r02.collect().mkString(","))
    println("===================createDataMethod:makeRDD:Array=====================")

    /* 使用parallelize创建RDD */
    /* List */
    val rdd03 = sc.parallelize(List(1,2,3,4,5,6), 1)
    val r03 = rdd03.map { x => x + 1 }
    println("===================createDataMethod:parallelize:List=====================")
    println(r03.collect().mkString(","))
    println("===================createDataMethod:parallelize:List=====================")
    /* Array */
    val rdd04 = sc.parallelize(List(1,2,3,4,5,6), 1)
    val r04 = rdd04.filter { x => x > 3 }
    println("===================createDataMethod:parallelize:Array=====================")
    println(r04.collect().mkString(","))
    println("===================createDataMethod:parallelize:Array=====================")
  }

  /**
   * 创建Pair Map
   */
  def createPairRDD():Unit = {
    val rdd:RDD[(String,Int)] = sc.makeRDD(List(("key01",1),("key02",2),("key03",3)))
    val r:RDD[String] = rdd.keys
    println("===========================createPairRDD=================================")
    println(r.collect().mkString(","))
    println("===========================createPairRDD=================================")
  }

  /**
   * 通过文件创建RDD
   * 文件数据:
   * 	key01,1,2.3
		  key02,5,3.7
      key03,23,4.8
      key04,12,3.9
      key05,7,1.3
   */
  def createDataFromFile(path:String):Unit = {
    val rdd:RDD[String] = sc.textFile(path, 1)
    val r:RDD[String] = rdd.flatMap { x => x.split(",") }
    println("=========================createDataFromFile==================================")
    println(r.collect().mkString(","))
    println("=========================createDataFromFile==================================")
  }

  /**
   * 基本的RDD操作
   */
  def basicTransformRDD(path:String):Unit = {
    val rddInt:RDD[Int] = sc.makeRDD(List(1,2,3,4,5,6,2,5,1))
    val rddStr:RDD[String] = sc.parallelize(Array("a","b","c","d","b","a"), 1)
    val rddFile:RDD[String] = sc.textFile(path, 1)

    val rdd01:RDD[Int] = sc.makeRDD(List(1,3,5,3))
    val rdd02:RDD[Int] = sc.makeRDD(List(2,4,5,1))

    /* map操作 */
    println("======map操作======")
    println(rddInt.map(x => x + 1).collect().mkString(","))
    println("======map操作======")
    /* filter操作 */
    println("======filter操作======")
    println(rddInt.filter(x => x > 4).collect().mkString(","))
    println("======filter操作======")
    /* flatMap操作 */
    println("======flatMap操作======")
    println(rddFile.flatMap { x => x.split(",") }.first())
    println("======flatMap操作======")
    /* distinct去重操作 */
    println("======distinct去重======")
    println(rddInt.distinct().collect().mkString(","))
    println(rddStr.distinct().collect().mkString(","))
    println("======distinct去重======")
    /* union操作 */
    println("======union操作======")
    println(rdd01.union(rdd02).collect().mkString(","))
    println("======union操作======")
    /* intersection操作 */
    println("======intersection操作======")
    println(rdd01.intersection(rdd02).collect().mkString(","))
    println("======intersection操作======")
    /* subtract操作 */
    println("======subtract操作======")
    println(rdd01.subtract(rdd02).collect().mkString(","))
    println("======subtract操作======")
    /* cartesian操作 */
    println("======cartesian操作======")
    println(rdd01.cartesian(rdd02).collect().mkString(","))
    println("======cartesian操作======")
  }

  /**
   * 基本的RDD行动操作
   */
  def basicActionRDD():Unit = {
    val rddInt:RDD[Int] = sc.makeRDD(List(1,2,3,4,5,6,2,5,1))
    val rddStr:RDD[String] = sc.parallelize(Array("a","b","c","d","b","a"), 1)

    /* count操作 */
    println("======count操作======")
    println(rddInt.count())
    println("======count操作======")
    /* countByValue操作 */
    println("======countByValue操作======")
    println(rddInt.countByValue())
    println("======countByValue操作======")
    /* reduce操作 */
    println("======countByValue操作======")
    println(rddInt.reduce((x ,y) => x + y))
    println("======countByValue操作======")
    /* fold操作 */
    println("======fold操作======")
    println(rddInt.fold(0)((x ,y) => x + y))
    println("======fold操作======")
    /* aggregate操作 */
    println("======aggregate操作======")
    val res:(Int,Int) = rddInt.aggregate((0,0))((x,y) => (x._1 + x._2,y),(x,y) => (x._1 + x._2,y._1 + y._2))
    println(res._1 + "," + res._2)
    println("======aggregate操作======")
    /* foeach操作 */
    println("======foeach操作======")
    println(rddStr.foreach { x => println(x) })
    println("======foeach操作======")
  }

  def main(args: Array[String]): Unit = {
    println(System.getenv("HADOOP_HOME"))
    createDataMethod()
    createPairRDD()
    createDataFromFile("file:///D:/sparkdata.txt")
    basicTransformRDD("file:///D:/sparkdata.txt")
    basicActionRDD()
    /*打印结果*/
    /*D://hadoop
===================createDataMethod:makeRDD:List=====================
1,4,9,16,25,36
===================createDataMethod:makeRDD:List=====================
===================createDataMethod:makeRDD:Array=====================
1,2,3,4
===================createDataMethod:makeRDD:Array=====================
===================createDataMethod:parallelize:List=====================
2,3,4,5,6,7
===================createDataMethod:parallelize:List=====================
===================createDataMethod:parallelize:Array=====================
4,5,6
===================createDataMethod:parallelize:Array=====================
===========================createPairRDD=================================
key01,key02,key03
===========================createPairRDD=================================
key01,1,2.3,key02,5,3.7,key03,23,4.8,key04,12,3.9,key05,7,1.3
=========================createDataFromFile==================================
2,3,4,5,6,7,3,6,2
======map操作======
======filter操作======
5,6,5
======filter操作======
======flatMap操作======
key01
======flatMap操作======
======distinct去重======
4,6,2,1,3,5
======distinct去重======
======union操作======
1,3,5,3,2,4,5,1
======union操作======
======intersection操作======
1,5
======intersection操作======
======subtract操作======
3,3
======subtract操作======
======cartesian操作======
(1,2),(1,4),(3,2),(3,4),(1,5),(1,1),(3,5),(3,1),(5,2),(5,4),(3,2),(3,4),(5,5),(5,1),(3,5),(3,1)
======cartesian操作======
======count操作======
9
======count操作======
======countByValue操作======
Map(5 -> 2, 1 -> 2, 6 -> 1, 2 -> 2, 3 -> 1, 4 -> 1)
======countByValue操作======
======countByValue操作======
29
======countByValue操作======
======fold操作======
29
======fold操作======
======aggregate操作======
19,10
======aggregate操作======
======foeach操作======
a
b
c
d
b
a
======foeach操作======*/
  }
}

  Spark执行时候我们需要构造一个SparkContenxt的环境变量,构造环境变量时候需要构造一个SparkConf对象,例如代码:setAppName("xtq").setMaster("local[2]")

  appName就是spark任务名称,master为local[2]是指使用本地模式,启动2个线程完成spark任务。

  在eclipse里运行spark程序时候,会报出如下错误:

java.io.IOException: Could not locate executable null\bin\winutils.exe in the Hadoop binaries.
	at org.apache.hadoop.util.Shell.getQualifiedBinPath(Shell.java:355)
	at org.apache.hadoop.util.Shell.getWinUtilsPath(Shell.java:370)
	at org.apache.hadoop.util.Shell.<clinit>(Shell.java:363)
	at org.apache.hadoop.util.StringUtils.<clinit>(StringUtils.java:79)
	at org.apache.hadoop.security.Groups.parseStaticMapping(Groups.java:104)
	at org.apache.hadoop.security.Groups.<init>(Groups.java:86)
	at org.apache.hadoop.security.Groups.<init>(Groups.java:66)
	at org.apache.hadoop.security.Groups.getUserToGroupsMappingService(Groups.java:280)
	at org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupInformation.java:271)
	at org.apache.hadoop.security.UserGroupInformation.ensureInitialized(UserGroupInformation.java:248)
	at org.apache.hadoop.security.UserGroupInformation.loginUserFromSubject(UserGroupInformation.java:763)
	at org.apache.hadoop.security.UserGroupInformation.getLoginUser(UserGroupInformation.java:748)
	at org.apache.hadoop.security.UserGroupInformation.getCurrentUser(UserGroupInformation.java:621)
	at org.apache.spark.util.Utils$$anonfun$getCurrentUserName$1.apply(Utils.scala:2160)
	at org.apache.spark.util.Utils$$anonfun$getCurrentUserName$1.apply(Utils.scala:2160)
	at scala.Option.getOrElse(Option.scala:120)
	at org.apache.spark.util.Utils$.getCurrentUserName(Utils.scala:2160)
	at org.apache.spark.SparkContext.<init>(SparkContext.scala:322)
	at cn.com.sparktest.SparkTest$.<init>(SparkTest.scala:10)
	at cn.com.sparktest.SparkTest$.<clinit>(SparkTest.scala)
	at cn.com.sparktest.SparkTest.main(SparkTest.scala)

  该错误不会影响程序的运算,但总是让人觉得不舒服,这个问题是因为spark运行依赖于hadoop,可是在window下其实是无法安装hadoop,只能使用cygwin模拟安装,而新版本的hadoop在windows下使用需要使用winutils.exe,解决这个问题很简单,就是下载一个winutils.exe,注意下自己操作系统是32位还是64位,找到对应版本,然后放置在这样的目录下:

  D:\hadoop\bin\winutils.exe

  然后再环境变量里定义HADOOP_HOME= D:\hadoop

  环境变量的改变要重启eclipse,这样环境变量才会生效,这个时候程序运行就不会报出错误了。

时间: 2024-12-18 07:04:30

Spark笔记:RDD基本操作(上)的相关文章

Spark笔记整理(五):Spark RDD持久化、广播变量和累加器

[TOC] Spark RDD持久化 RDD持久化工作原理 Spark非常重要的一个功能特性就是可以将RDD持久化在内存中.当对RDD执行持久化操作时,每个节点都会将自己操作的RDD的partition持久化到内存中,并且在之后对该RDD的反复使用中,直接使用内存缓存的partition.这样的话,对于针对一个RDD反复执行多个操作的场景,就只要对RDD计算一次即可,后面直接使用该RDD,而不需要反复计算多次该RDD. 巧妙使用RDD持久化,甚至在某些场景下,可以将spark应用程序的性能提升1

Spark发行笔记8:解读Spark Streaming RDD的全生命周期

本节主要内容: 一.DStream与RDD关系的彻底的研究 二.StreamingRDD的生成彻底研究 Spark Streaming RDD思考三个关键的问题: RDD本身是基本对象,根据一定时间定时产生RDD的对象,随着时间的积累,不对其管理的话会导致内存会溢出,所以在BatchDuration时间内执行完RDD操作后,需对RDD进行管理. 1.DStream生成RDD的过程,DStream到底是怎么生成RDD的? 2.DStream和RDD到底什么关系? 3.运行之后怎么对RDD处理? 所

Spark核心—RDD初探

本文目的 ? 最近在使用Spark进行数据清理的相关工作,初次使用Spark时,遇到了一些挑(da)战(ken).感觉需要记录点什么,才对得起自己.下面的内容主要是关于Spark核心-RDD的相关的使用经验和原理介绍,作为个人备忘,也希望对读者有用. ? 为什么选择Spark ? 原因如下 代码复用:使用Scala高级语言操作Spark,灵活方便,面向对象,函数编程的语言特性可以全部拿来.Scala基本上可以无缝集成java及其相关库.最重要的是,可以封装组件,沉淀工作,提高工作效率.之前用hi

Spark的RDD检查点实现分析

概述 在<深入理解Spark:核心思想与源码分析>一书中只是简单介绍了下RDD的checkpoint,对本书是个遗憾.所以此文的目的旨在查漏补缺,完善本书的内容. Spark的RDD执行完成之后会保存检查点,便于当整个作业运行失败重新运行时候,从检查点恢复之前已经运行成功的RDD结果,这样就会大大减少重新计算的成本,提高任务恢复效率和执行效率,节省Spark各个计算节点的资源.本文着重分析检查点的代码实现,更深入理解其原理.在<深入理解Spark:核心思想与源码分析>一书的第5章中

Spark和RDD模型研究

1背景介绍 现今分布式计算框架像MapReduce和Dryad都提供了高层次的原语,使用户不用操心任务分发和错误容忍,非常容易地编写出并行计算程序.然而这些框架都缺乏对分布式内存的抽象和支持,使其在某些应用场景下不够高效和强大.RDD(Resilient Distributed Datasets弹性分布式数据集)模型的产生动机主要来源于两种主流的应用场景: ?  迭代式算法:迭代式机器学习.图算法,包括PageRank.K-means聚类和逻辑回归(logistic regression) ? 

spark执行在yarn上executor内存不足异常ERROR YarnScheduler: Lost executor 542 on host-bigdata3: Container marked as failed: container_e40_1550646084627_1007653_01_000546 on host: host-bigdata3. Exit status: 143.

当spark跑在yarn上时 单个executor执行时,数据量过大时会导致executor的memory不足而使得rdd  最后lost,最终导致任务执行失败 其中会抛出如图异常信息 如图中异常所示 对应解决方法可以加上对应的参数调优(这个配置可以在总的处理数据量在几百TB或者1~3PB级别的数据处理时解决executor-memory不足问题) --num-executors=512 --executor-cores=8 --executor-memory=32g --driver-memo

Spark之RDD弹性特性

RDD作为弹性分布式数据集,它的弹性具体体现在以下七个方面. 1.自动进行内存和磁盘数据存储的切换 Spark会优先把数据放到内存中,如果内存实在放不下,会放到磁盘里面,不但能计算内存放下的数据,也能计算内存放不下的数据.如果实际数据大于内存,则要考虑数据放置策略和优化算法.当应用程序内存不足时,Spark应用程序将数据自动从内存存储切换到磁盘存储,以保障其高效运行. 2.基于Lineage(血统)的高效容错机制 Lineage是基于Spark RDD的依赖关系来完成的(依赖分为窄依赖和宽依赖两

Spark之RDD的定义及五大特性

RDD是分布式内存的一个抽象概念,是一种高度受限的共享内存模型,即RDD是只读的记录分区的集合,能横跨集群所有节点并行计算,是一种基于工作集的应用抽象. RDD底层存储原理:其数据分布存储于多台机器上,事实上,每个RDD的数据都以Block的形式存储于多台机器上,每个Executor会启动一个BlockManagerSlave,并管理一部分Block:而Block的元数据由Driver节点上的BlockManagerMaster保存,BlockManagerSlave生成Block后向Block

Spark学习之路 (三)Spark之RDD[转]

RDD的概述 什么是RDD? RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是Spark中最基本的数据抽象,它代表一个不可变.可分区.里面的元素可并行计算的集合.RDD具有数据流模型的特点:自动容错.位置感知性调度和可伸缩性.RDD允许用户在执行多个查询时显式地将工作集缓存在内存中,后续的查询能够重用工作集,这极大地提升了查询速度. RDD的属性 (1)一组分片(Partition),即数据集的基本组成单位.对于RDD来说,每个分片都会被一个计算任务处