Spark问题笔记3

1、RDD的缓存策略是什么?

缓存策略对应类StorageLevel,包括多种存储级别:

objectStorageLevel{

val NONE=newStorageLevel(false,false,false,false)

val DISK_ONLY=newStorageLevel(true,false,false,false)

val DISK_ONLY_2=newStorageLevel(true,false,false,false,2)

val MEMORY_ONLY=newStorageLevel(false,true,false,true)

val MEMORY_ONLY_2=newStorageLevel(false,true,false,true,2)

val MEMORY_ONLY_SER=newStorageLevel(false,true,false,false)

val MEMORY_ONLY_SER_2=newStorageLevel(false,true,false,false,2)

val MEMORY_AND_DISK=newStorageLevel(true,true,false,true)

val MEMORY_AND_DISK_2=newStorageLevel(true,true,false,true,2)

val MEMORY_AND_DISK_SER=newStorageLevel(true,true,false,false)

val MEMORY_AND_DISK_SER_2=newStorageLevel(true,true,false,false,2)

val OFF_HEAP=newStorageLevel(false,false,true,false)//
Tachyon

}

//
其中,StorageLevel 类的构造器参数如下:

classStorageLevelprivate(

privatevar useDisk_:Boolean,

privatevar useMemory_:Boolean,

privatevar useOffHeap_:Boolean,

privatevar deserialized_:Boolean,

privatevar replication_:Int=1

)

存储级别 描述
MEMORY_ONLY
将RDD 作为反序列化的的对象存储JVM 中。如果RDD不能被内存装下,一些分区将不会被缓存,并且在需要的时候被重新计算。

这是是默认的级别

MEMORY_AND_DISK 将RDD作为反序列化的的对象存储在JVM 中。如果RDD不能被与内存装下,超出的分区将被保存在硬盘上,并且在需要时被读取
MEMORY_ONLY_SER
将RDD 作为序列化的的对象进行存储(每一分区占用一个字节数组)。

通常来说,这比将对象反序列化的空间利用率更高,尤其当使用fast serializer,但在读取时会比较占用CPU

MEMORY_AND_DISK_SER 与MEMORY_ONLY_SER 相似,但是把超出内存的分区将存储在硬盘上而不是在每次需要的时候重新计算
DISK_ONLY 只将RDD 分区存储在硬盘上
DISK_ONLY_2 (含2的) 与上述的存储级别一样,但是将每一个分区都复制到两个集群结点上

注意:

1)spark默认存储策略为MEMORY_ONLY:只缓存到内存并且以原生方式存(反序列化)一个副本;

2)MEMORY_AND_DISK存储级别在内存够用时直接保存到内存中,只有当内存不足时,才会存储到磁盘中。

cache()方法本质是调用了persist()方法;默认情况下的存储级别为MEMORY_ONLY

但是通过persist(StorageLevel)方法设定StorageLevel来满足工程的存储需求。

2、创建SparkContext?

客户Spark程序(Driver Program)来操作Spark集群是通过SparkContext对象来进行,SparkContext作为一个操作和调度的总入口,在初始化过程中集群管理器会创建DAGScheduler作业调度和TaskScheduler任务调度(For Spark Standalone,而在Spark On Yarn中,TaskScheduler会被YARN代替)。

创建SparkContext由下面的代码组成:

import org.apache.spark.{SparkContext,SparkConf}

import org.apache.spark.SparkContext._

val conf
= newSparkConf().setAppName(appName).setMaster(master_url)

val sc = new SparkContext(conf)

/*创建RDD,执行相应的操作*/

sc.close(); // 关闭SparkContext

在完成应用程序的编写后,生成jar包,使用spark-submit进行提交:

./bin/spark-submit \
  --class <main-class>
  --master <master-url> \
  --deploy-mode <deploy-mode> \
  --conf <key>=<value> \
  ... # other options
  <application-jar> \
  [application-arguments]

参考链接:http://spark.apache.org/docs/latest/submitting-applications.html

Spark有多种运行方式,其取决于SparkContext中的Master环境变量的值;

如下是官方提供的例子:

# 在本地8核机器上运行(本地模式)(8个CPU)
./bin/spark-submit \
  --class org.apache.spark.examples.SparkPi \
  --master local[8] \
  /path/to/examples.jar \
  100

# 在Standalone集群部署下运行
./bin/spark-submit \
  --class org.apache.spark.examples.SparkPi \
  --master spark://207.184.161.138:7077 \
  --executor-memory 20G \
  --total-executor-cores 100 \
  /path/to/examples.jar \
  1000

# Run on a YARN cluster
export HADOOP_CONF_DIR=XXX
./bin/spark-submit \
  --class org.apache.spark.examples.SparkPi \
  --master yarn-cluster \  # can also be `yarn-client` for client mode
  --executor-memory 20G \
  --num-executors 50 \
  /path/to/examples.jar \
  1000

3、创建RDD?

RDD数据来源:

(1)并行集合:接收一个已经存在的Scala集合,然后进行各种计算;

     val data = Array(1, 2, 3, 4, 5)
     val distData = sc.parallelize(data)

(2)外部文件:在一个文件的每条记录上运行函数;文件系统是hdfs或者Hadoop支持的任意存储系统;使用textFile()将本地文件或者Hdfs文件转换为RDD

val rdd1= sc.textFile("file:///root/access_log/access_log*.filter");

val rdd2=rdd1.map(_.split("\t")).filter(_.length==6)rdd2.count()

  • SparkContext.wholeTextFiles lets you read a directory containing multiple small text
    files, and returns each of them as (filename, content) pairs. This is in contrast with textFile, which would return one record
    per line in each file.
  • For SequenceFiles,
    use SparkContext’s sequenceFile[K, V] method where K and V are
    the types of key and values in the file. These should be subclasses of Hadoop’s Writable interface,
    like IntWritable and Text.
    In addition, Spark allows you to specify native types for a few common Writables; for example, sequenceFile[Int, String] will automatically
    read IntWritables and Texts.
  • For other Hadoop InputFormats, you can use the SparkContext.hadoopRDD method, which
    takes an arbitrary JobConf and input format class, key class and value class. Set these the same way you would for a Hadoop job
    with your input source. You can also useSparkContext.newAPIHadoopRDD for InputFormats based on the “new” MapReduce API (org.apache.hadoop.mapreduce).
  • RDD.saveAsObjectFile and SparkContext.objectFile support
    saving an RDD in a simple format consisting of serialized Java objects. While this is not as efficient as specialized formats like Avro, it offers an easy way to save any RDD.

4、RDD操作?

val lines = sc.textFile("data.txt")             // a base rdd from an external file.
val lineLengths = lines.map(s => s.length)        // lineLengths为map操作的结果,是lazy的,返回一个新的RDD
val totalLength = lineLengths.reduce((a, b) => a + b)         // action 
lineLengths.persist()                                          // memory_only

val num=sc.parallelize(1
to 100)val num2
= num.map(_*2)

val num3
= num2.filter(_%3==0)

num3.collect

//res: Array[Int] = Array(6, 12, 18, 24, 30, 36, 42, 48, 54, 60, 66, 72, 78, 84, 90, 96, 102, 108, 114, 120, 126, 132, 138, 144, 150, 156, 162, 168, 174, 180, 186, 192, 198)

num3.toDebugString

//res5: String =

//FilteredRDD[20] at filter at <console>:16 (48 partitions)

//MappedRDD[19] at map at <console>:14 (48 partitions)

//ParallelCollectionRDD[18] at parallelize at <console>:12 (48 partitions)

val lines = sc.textFile("data.txt")
val pairs = lines.map(s => (s, 1))
val counts = pairs.reduceByKey((a, b) => a + b)

val kv=sc.parallelize(List(List(1,2),List(3,4),List(3,6,8)))

kv.flatMap(x=>x.map(_+1)).collect
// flatMap()对每一个输入元素被映射为0或者多个输出元素

sc.textFile(‘hdfs://hdp01:9000/home/debugo/*.csv‘).flatMap(_.split(‘,‘)).map((_,1)).reduceByKey(_+_)//
word count操作

1)源码中的map算子相当于初始化一个RDD,新RDD叫作MapPartitionsRDD(this,sc.clean(f))。(用户自定义函数f)

只有等到Action算子触发后,这个f函数才会和其他函数在一个Stage中对数据进行运算

2)将原来RDD中的每个元素通过函数f转换为新的元素,并将生成的RDD的每个集合中的元素合并为一个集合。
内部创建FlatMappedRDD(this,sc.clean(f))

3)mapPartitions函数获取到每个分区的迭代器,在函数中通过这个分区整体的迭代器对整个分区的元素进行操作。
内部实现是生成MapPartitionsRDD。

4)glom函数将每个分区形成一个数组,内部实现是返回的RDD[Array[T]]。

5)使用union函数时需要保证两个RDD元素的数据类型相同,返回的RDD数据类型和被合并的RDD元素数据类型相同,并不进行去重操作,保存所有元素。如果想去重,可以使用distinct()。

6)对两个RDD内的所有元素进行笛卡尔积操作。操作后,内部实现返回CartesianRDD。

7)groupBy将元素通过函数生成相应的Key,数据就转化为Key-Value格式,之后将Key相同的元素分为一组。

val
cleanF = sc.clean(f)
中sc.clean函数将用户函数预处理;

this.map(t => (cleanF(t), t)).groupByKey(p)对数据map进行函数操作,再对groupByKey进行分组操作。其中,p中确定了分区个数和分区函数,也就决定了并行化的程度。

8)subtract相当于进行集合的差操作,RDD 1去除RDD 1和RDD 2交集中的所有元素

9)sample将RDD这个集合内的元素进行采样,获取所有元素的子集。用户可以设定是否有放回的抽样、百分比、随机种子,进而决定采样方式。

10)takeSample()函数和上面的sample函数是一个原理,但是不使用相对比例采样,而是按设定的采样个数进行采样,同时返回结果不再是RDD,而是相当于对采样后的数据进行collect(),返回结果的集合为单机的数组。

11)mapValues:针对(Key,Value)型数据中的Value进行Map操作,而不对Key进行处理。

12)combineByKey是对单个Rdd的聚合。相当于将元素为(Int,Int)的RDD转变为了(Int,Seq[Int])类型元素的RDD。

13)reduceByKey是更简单的一种情况,只是两个值合并成一个值,所以createCombiner很简单,就是直接返回v,而mergeValue和mergeCombiners的逻辑相同,没有区别。

14)partitionBy函数对RDD进行分区操作。如果原有RDD的分区器和现有分区器(partitioner)一致,则不重分区,如果不一致,则相当于根据分区器生成一个新的ShuffledRDD。

15)cogroup函数将两个RDD进行协同划分。对在两个RDD中的Key-Value类型的元素,每个RDD相同Key的元素分别聚合为一个集合,并且返回两个RDD中对应Key的元素集合的迭代器(K,
(Iterable[V], Iterable[w]))
。其中,Key和Value,Value是两个RDD下相同Key的两个数据集合的迭代器所构成的元组。

16)join对两个需要连接的RDD进行cogroup函数操作。cogroup操作之后形成的新RDD,对每个key下的元素进行笛卡尔积操作,返回的结果再展平,对应Key下的所有元组形成一个集合,最后返回RDD[(K,(V,W))]。join的本质是通过cogroup算子先进行协同划分,再通过flatMapValues将合并的数据打散。

17)LeftOuterJoin(左外连接)和RightOuterJoin(右外连接)相当于在join的基础上先判断一侧的RDD元素是否为空,如果为空,则填充为空。
如果不为空,则将数据进行连接运算,并返回结果。

18)foreach()对RDD中的每个元素都应用f函数操作,不返回RDD和Array,而是返回Uint。

19)saveAsTextFile()函数将数据输出,存储到HDFS的指定目录。将RDD中的每个元素映射转变为(Null,x.toString),然后再将其写入HDFS。

20)saveAsObjectFile将分区中的每10个元素组成一个Array,然后将这个Array序列化,映射为(Null,BytesWritable(Y))的元素,写入HDFS为SequenceFile的格式。

21)collect相当于toArray,toArray已经过时不推荐使用,collect将分布式的RDD返回为一个单机的scala
Array数组。

22)collectAsMap对(K,V)型的RDD数据返回一个单机HashMap。对于重复K的RDD元素,后面的元素覆盖前面的元素。

23)reduceByKeyLocally实现的是先reduce再collectAsMap的功能,先对RDD的整体进行reduce操作,然后再收集所有结果返回为一个HashMap

推荐阅读:http://jasonding1354.github.io/2015/07/12/Spark/【Spark】RDD操作详解4——Action算子/

版权声明:本文为博主原创文章,未经博主允许不得转载。

时间: 2024-10-15 22:11:57

Spark问题笔记3的相关文章

Spark学习笔记之SparkRDD

Spark学习笔记之SparkRDD 一.   基本概念 RDD(resilient distributed datasets)弹性分布式数据集. 来自于两方面 ①   内存集合和外部存储系统 ②   通过转换来自于其他RDD,如map,filter等 2.创建操作(creation operation):RDD的创建由SparkContext来负责. 3.转换操作(transformation operation):将一个RDD通过一定操作转换为另一个RDD. 4.控制操作(control o

spark学习笔记总结-spark入门资料精化

Spark学习笔记 Spark简介 spark 可以很容易和yarn结合,直接调用HDFS.Hbase上面的数据,和hadoop结合.配置很容易. spark发展迅猛,框架比hadoop更加灵活实用.减少了延时处理,提高性能效率实用灵活性.也可以与hadoop切实相互结合. spark核心部分分为RDD.Spark SQL.Spark Streaming.MLlib.GraphX.Spark R等核心组件解决了很多的大数据问题,其完美的框架日受欢迎.其相应的生态环境包括zepplin等可视化方面

Spark调研笔记第7篇 - 应用实战: 如何利用Spark集群计算物品相似度

本文是Spark调研笔记的最后一篇,以代码实例说明如何借助Spark平台高效地实现推荐系统CF算法中的物品相似度计算. 在推荐系统中,最经典的推荐算法无疑是协同过滤(Collaborative Filtering, CF),而item-cf又是CF算法中一个实现简单且效果不错的算法. 在item-cf算法中,最关键的步骤是计算物品之间的相似度.本文以代码实例来说明如何利用Spark平台快速计算物品间的余弦相似度. Cosine Similarity是相似度的一种常用度量,根据<推荐系统实践>一

Spark调研笔记第1篇 - Spark简介

在公司线上项目中引入Spark已经将近1年时间了,从效果来看,Spark确实是能提高生产力的优秀分布式计算平台. 从本篇笔记开始,会把之前调研Spark时的调研报告分享出来(限于篇幅,会分成几篇文章),以便帮助刚接触Spark的朋友们尽快入门. 下面开始正文. 1. 项目背景 Spark项目于2009年诞生于UC Berkeley AMP Lab并于2010年正式提交Apache Software Foundation成为开源项目.目前已经成为Apache下的明星项目,其代码提交活跃度在整个社区

Spark学习笔记——手写数字识别

import org.apache.spark.ml.classification.RandomForestClassifier import org.apache.spark.ml.regression.RandomForestRegressor import org.apache.spark.mllib.classification.{LogisticRegressionWithLBFGS, NaiveBayes, SVMWithSGD} import org.apache.spark.ml

Spark学习笔记——Spark Streaming

许多应用需要即时处理收到的数据,例如用来实时追踪页面访问统计的应用.训练机器学习模型的应用, 还有自动检测异常的应用.Spark Streaming 是 Spark 为这些应用而设计的模型.它允许用户使用一套和批处理非常接近的 API 来编写流式计算应用,这样就可以大量重用批处理应用的技术甚至代码. Spark Streaming 使用离散化流( discretized stream)作为抽象表示, 叫作 DStream. DStream 是随时间推移而收到的数据的序列.在内部,每个时间区间收到

Spark学习笔记总结-入门资料精化

Spark简介 spark 可以很容易和yarn结合,直接调用HDFS.Hbase上面的数据,和hadoop结合.配置很容易. spark发展迅猛,框架比hadoop更加灵活实用.减少了延时处理,提高性能效率实用灵活性.也可以与hadoop切实相互结合. spark核心部分分为RDD.Spark SQL.Spark Streaming.MLlib.GraphX.Spark R等核心组件解决了很多的大数据问题,其完美的框架日受欢迎.其相应的生态环境包括zepplin等可视化方面,正日益壮大.大型公

Spark学习笔记-Spark Streaming

http://spark.apache.org/docs/1.2.1/streaming-programming-guide.html 在SparkStreaming中如何对数据进行分片 Level of Parallelism in Data Processing Cluster resources can be under-utilized if the number of parallel tasks used in any stage of the computation is not

Spark 学习笔记之 Spark history Server 搭建

在hdfs上建立文件夹/directory hadoop fs -mkdir /directory 进入conf目录  spark-env.sh 增加以下配置 export SPARK_HISTORY_OPTS="-Dspark.history.ui.port=7777 -Dspark.history.retainedApplications=3 -Dspark.history.fs.logDirectory=hdfs://bjsxt/directory" spark-defaults

Spark学习笔记——文本处理技术

1.建立TF-IDF模型 import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.mllib.linalg.{SparseVector => SV} import org.apache.spark.mllib.feature.HashingTF import org.apache.spark.mllib.feature.IDF /** * Created by common on 17-5-6. */ o