16.RDD实战

第16课:RDD实战

由于RDD的不可修改的特性,导致RDD的操作与正常面向对象的操作不同,RDD的操作基本分为3大类:transformation,action,contoller

1.   Transformation

Transformation是通过转化针对已有的RDD创建出新的RDD

map(func):对调用map的RDD数据集中的每个element都使用func,然后返回一个新的RDD,这个返回的数据集是分布式的数据集

filter(func): 对调用filter的RDD数据集中的每个元素都使用func,然后返回一个包含使func为true的元素构成的RDD

flatMap(func):和map差不多,但是flatMap生成的是多个结果

mapPartitions(func):和map很像,但是map是每个element,而mapPartitions是每个partition

mapPartitionsWithSplit(func):和mapPartitions很像,但是func作用的是其中一个split上,所以func中应该有index

sample(withReplacement,faction,seed):抽样

union(otherDataset):返回一个新的dataset,包含源dataset和给定dataset的元素的集合

distinct([numTasks]):返回一个新的dataset,这个dataset含有的是源dataset中的distinct的element

groupByKey(numTasks):返回(K,Seq[V]),也就是hadoop中reduce函数接受的key-valuelist

reduceByKey(func,[numTasks]):就是用一个给定的reducefunc再作用在groupByKey产生的(K,Seq[V]),比如求和,求平均数

sortByKey([ascending],[numTasks]):按照key来进行排序,是升序还是降序,ascending是boolean类型

join(otherDataset,[numTasks]):当有两个KV的dataset(K,V)和(K,W),返回的是(K,(V,W))的dataset,numTasks为并发的任务数

cogroup(otherDataset,[numTasks]):当有两个KV的dataset(K,V)和(K,W),返回的是(K,Seq[V],Seq[W])的dataset,numTasks为并发的任务数

Transformation特性:

lazy优化:由于Tranformation的lazy特性,也就是创建不马上运行,对于框架来说,我有足够的时间查看到尽可能多的步骤,看到的步骤越多,优化的空间就越大。最简单的优化方式就是步骤合并,例如本来的做法是a=b*3;b=c*3;c=d*3;d=3,步骤合并后就是a=3*3*3*3。

2.   Action

Action操作的目的是得到一个值,或者一个结果

reduce(func):说白了就是聚集,但是传入的函数是两个参数输入返回一个值,这个函数必须是满足交换律和结合律的

collect():一般在filter或者足够小的结果的时候,再用collect封装返回一个数组

count():返回的是dataset中的element的个数

first():返回的是dataset中的第一个元素

take(n):返回前n个elements,这个士driverprogram返回的

takeSample(withReplacement,num,seed):抽样返回一个dataset中的num个元素,随机种子seed

saveAsTextFile(path):把dataset写到一个textfile中,或者hdfs,或者hdfs支持的文件系统中,spark把每条记录都转换为一行记录,然后写到file中

saveAsSequenceFile(path):只能用在key-value对上,然后生成SequenceFile写到本地或者hadoop文件系统

countByKey():返回的是key对应的个数的一个map,作用于一个RDD

foreach(func):对dataset中的每个元素都使用func

3.   Contoller

Contoller动作主要为持久化RDD,例如cache(),persist(),checkpoint();

具体内容在后续刊物中会讲解。

4.   Spark WordCount动手实践

本小节通过IDEA具体逐步调试一个WordCount案例,让学员知道各步骤中RDD的具体类型,并为下一节逐步解析做铺垫

(1)     使用的wordCount代码如下:

  1. object WordCount {
  2. def main (args: Array[String]) {
  3. val conf = new SparkConf()//create SparkConf
  4. conf.setAppName("Wow,My First Spark App")//set app name
  5. conf.setMaster("local")//run local
  6. val sc =new SparkContext(conf)
  7. val lines =sc.textFile("C://Users//feng//IdeaProjects//WordCount//src//SparkText.txt")
  8. val words = lines.flatMap{ lines => lines.split(" ") }
  9. val pairs =words.map ( word => (word,1) )
  10. val reduce = pairs.reduceByKey(_+_)
  11. val sort_1 = reduce.map(pair=>(pair._2,pair._1))
  12. val sort_2 = sort_1.sortByKey(false)
  13. val sort_3=sort_2.map(pair=>(pair._2,pair._1))
  14. val filter=sort_3.filter(pair=>pair._2>2)
  15. filter.collect.foreach(wordNumberPair => println(wordNumberPair._1+" : "+wordNumberPair._2))
  16. sc.stop()
  17. }
  18. }

(1)       程序使用的SparkText.txt文件内容如下

hadoop hadoop hadoop

spark Flink spark

scala scala object

object spark scala

spark spark

Hadoop hadoop

(2)       程序WordCount调试结果:

通过IDEA的逐步调试,会在调试窗口显示每一行代码具体操作什么类型的RDD,此RDD通过什么依赖关系依赖于父RDD等重要信息(如图2-14所示),程序运行结果如图2-15所示。

图2-14调试过程图

图2-15wordCount结果

2.8.2 解析RDD生成的内部机制

本小节基于上小节程序的调试结果,逐条查看调试信息内容,并基于信息内容进行讲解,并在讲解中回顾并复习本章所有内容。

(1)       line = sc.textFile()

本语句的作用在于从外部数据中读取数据,并生成MapPartitionsRDD。此处需要注意:

如图2-16所示,可以看出次MapPartitionsRDD的deps(dependency,依赖)为HadoopRDD,从这里可以发现其实textFile()过程包含两个步骤,第一步骤将文件内容转化为HadoopRDD(key-value形式,key为行号),第二步骤将HadoopRDD转化为MapPartitionsRDD(value形式,将key-value类型的key删去)

图2-16通过HadoopRDD获取数据

(2)       words=line.flatMap()

此命令对于RDD采取transformation(转换)操作,作用在于将MapPartitionsRDD中的每一个记录进行以空格为标记的切分,并把每一个RDD的切分的结果放在一个MapPartitionRDD中

(3)       pairs=words.map(word=>(word,1))

此命令对于RDD采取transformation(转换)操作,作用在于将MapPartitionsRDD中的每一个记录(例:spark(value类型))转换为key-value类型(例: (spark,1)),便于下一步reduceByKey操作

(4)       reduce = pairs.reduceByKey(_+_)

此命令对于RDD采取action(动作)操作,作用在于通过shuffle将pairs中所有的记录按照key相同value相加的规则进行处理,并把结果放到一个shuffleRDD中。例((spark,1),(spark,1))变成((spark,2))。

同时需要注意一下两点:首先本步骤实质上分为两个步骤,第一步骤为local级别的reduce,对当前计算机所拥有的数据先进行reduce操作,生成MapPartitionsRDD;第二步骤为shuffle级别的reduce,基于第一步骤的结果,对结果进行shuffle-reduce操作,生成最终的shuffleRDD。其次 Action操作进行时,对此操作之前的所有转换操作进行执行,所以调试过程中会出现此前的除textFile操作的执行时间均非常短,说明RDD转换操作不直接进行运算。

(5)       sort_1 = reduce.map(pair=>(pair._2,pair._1))

此命令对于RDD采取transformation(转换)操作,作用在于将shuffleRDD中的每一个记录的key和value互换,生成一个新的MapPartitionsRDD。例: (spark,2)变为(2,spark)

(6)       sort_2 = sort_1.sortByKey(false)

此命令对于RDD采取action(动作)操作,作用在于将MapPartitionsRDD根据key进行排序,并生成shuffleRDD

(7)       sort_3=sort_2.map(pair=>(pair._2,pair._1))

此命令对于RDD采取transformation(转换)操作,作用在于将shuffleRDD中的每一个记录的key和value互换,生成一个新的MapPartitionsRDD。例: (2,spark)变为(spark,2)

(8)       filter=sort_3.filter(pair=>pair._2>2)

此命令对于RDD采取transformation(转换)操作,作用在于根据value值筛选MapPartitionsRDD中的数据,输出value大于2的记录

(9)       最后通过collect()方法将结果收集后,使用foreach()方法遍历数据并通过println()方法打印出所有数据。

注:本内容原型来自 IMP 课程笔记

如果技术上有什么疑问,欢迎加我QQ交流: 1106373297

时间: 2025-01-13 19:11:51

16.RDD实战的相关文章

Spark IMF传奇行动第16课RDD实战总结

今晚听了王家林老师的Spark IMF传奇行动第16课RDD实战,课堂笔记如下: RDD操作类型:Transformation.Action.Contoller reduce要符合交换律和结合律 val textLines = lineCount.reduceByKey(_+_,1) textLines.collect.foreach(pair=> println(pair._1 + "="+pair._2)) def collect(): Array[T] = withScop

(升级版)Spark从入门到精通(Scala编程、案例实战、高级特性、Spark内核源码剖析、Hadoop高端)

本课程主要讲解目前大数据领域最热门.最火爆.最有前景的技术——Spark.在本课程中,会从浅入深,基于大量案例实战,深度剖析和讲解Spark,并且会包含完全从企业真实复杂业务需求中抽取出的案例实战.课程会涵盖Scala编程详解.Spark核心编程.Spark SQL和Spark Streaming.Spark内核以及源码剖析.性能调优.企业级案例实战等部分.完全从零起步,让学员可以一站式精通Spark企业级大数据开发,提升自己的职场竞争力,实现更好的升职或者跳槽,或者从j2ee等传统软件开发工程

《Spring 3.x 企业应用开发实战》目录

图书信息:陈雄华 林开雄 编著 ISBN 978-7-121-15213-9 概述: 第1章:对Spring框架进行宏观性的概述,力图使读者建立起对Spring整体性的认识. 第2章:通过一个简单的例子展现开发Spring Web应用的整体过程,通过这个实例,读者可以快速跨入Spring Web应用的世界. 第3章:讲解Spring IoC容器的知识,通过具体的实例详细地讲解IoC概念.同时,对Spring框架的三个最重要的框架级接口进行了剖析,并对Bean的生命周期进行讲解. 第4章:讲解如何

第15课:RDD创建内幕彻底解密

本节课主要内容: 1.RDD创建的几种方式 2.RDD创建实战 3.RDD内幕 RDD创建有很多种方式,以下几种创建RDD的方式:      1.使用程序中的集合创建RDD,实际意义用于测试用:      2.使用本地文件系统创建RDD,测试大量数据的文件:      3.使用HDFS创建RDD,最常用的方式:      4.基于DB创建RDD;      5.基于NoSQL创建RDD,例如HBase;      6.基于S3创建RDD;      7.基于数据源创建RDD; RDD实战: //

009.实战案例::产品设计实例精解

1.实战案例1 2.实战案例2 3.实战案例3 4.实战案例4 5.实战案例5 6.实战案例6 7.实战案例7 8.实战案例8 9.实战案例9 10.实战案例10 11.实战案例11 12.实战案例12 13.实战案例13 14.实战案例14 15.实战案例15 16.实战案例16 17.实战案例17 18.实战案例18 19.实战案例19:工兵铲 20.实战案例20 21.实战案例21 22.实战案例22 23.实战案例23 24.实战案例24 25.实战案例25 25.实战案例:25V型带轮

基于PHP实战ThinkSNS V3二次开发

基于PHP实战ThinkSNS V3二次开发(系统架构.网站定制化开发.问答系统开发) 课程分类:PHP 适合人群:中级 课时数量:25课时 服务类型:C类(普通服务类课程) 用到技术:PHP.ThinkSNS V3二次开发 涉及项目:ThinkSNS V3二次开发.问答系统 咨询QQ:1840215592 课程内容简介: 由于ThinkSNS产品提供了完整的微博及系统功能,并提供了非常友好的二次开发规范.越多越多的企业和开发人员选择使用ThinkSNS构建成熟的SNS企业网站和商业项目.本课程

《构建跨平台APP:PhoneGap移动应用实战》内容简介、目录

当当网链接 http://product.dangdang.com/23567381.html 内容简介 PhoneGap是一款优秀的移动跨平台开发框架,开发者通过它能够快速地将Web应用打包成在各个平台上运行的本地APP. 李柯泉.欧阳薇编著的<构建跨平台APP PhoneGap移动应用实战>分4篇共19章,第一篇是入门篇,包括了PhoneGap的小伙伴们.在安卓开发环境下的配置.对HTML 5前景的简单介绍.第二篇是基础知识篇,包含了设备信息.通讯录.加速度传感器.设备传感器.音频.文件.

零基础php开发工程师视频教程全套,基础+进阶+项目实战(80G)

天数 模块 阶段1 php基础 阶段2 mysql 阶段3 html+css+js 阶段4 php高级 阶段5 xml编程 阶段6 smarty 阶段7 ThinkPHP框架 阶段8 js高级 阶段9 ajax 阶段10 jquery 阶段11 linux 阶段12 SVN 阶段13 redis 阶段14 项目实战 阶段15 项目实战 阶段16 项目实战 阶段17 sphinx,mongodb 阶段18 lnmp 阶段19 dedecms 阶段20 微信接口开发 阶段21 discuz    下

王家林 大数据Spark超经典视频链接全集[转]

压缩过的大数据Spark蘑菇云行动前置课程视频百度云分享链接 链接:http://pan.baidu.com/s/1cFqjQu SCALA专辑 Scala深入浅出经典视频 链接:http://pan.baidu.com/s/1i4Gh3Xb 密码:25jc DT大数据梦工厂大数据spark蘑菇云Scala语言全集(持续更新中) http://www.tudou.com/plcover/rd3LTMjBpZA/ 1 Spark视频王家林第1课:大数据时代的“黄金”语言Scala 2 Spark视