去年学习Spark了一段时间,今年捡回来,发现好多东西都已经忘记了。现在讲官方网站上的东西转诉过来,回顾并记录下来。
概要
从架构角度来看,每一个Spark应用由driver程序组成,在集群中运行用户的main函数和执行大量的parallel操作。Spark的核心抽象概念就是弹性分布式数据集(RDD),这是一种跨越并行集群中节点操作元素的集合。RDD在Hadoop文件系统上建立的(或者其他hadoop支持的文件系统),或现有的Scala集合中的驱动程序,并可以transforming.用户还可以将一个RDD在内存中持久存在,循序它跨越并行操作被重复使用。最后,RDDs可以自动从节点故障中恢复。
Spark中第二个抽象的概念就是共享变量,Spark默认每个task节点中都有函数中每一个变量的拷贝。但是有时,tasks之间或者tasks和driver之间的一些变量需要被共享。Spark支持两种共享变量:
* broadcast变量,这个变量在所有节点的内存中缓存
* accumulators变量,这个变量是唯一可以增加的,比如counters和sums
Spark初始化
一个Spark程序第一件事就是JavaSparkContext对象,这个告诉Spark程序如何连接集群。而在创建一个SparkContext我们首先需要创建SparkCOnf对象,这个包括了你的应用的一些信息。
/*把spark看做一台超跑(速度非常快),SparkDriver用户提交应用程序,实际可看做火箭的驾驶员
* 那么sparkcontext就是火箭的引擎,火箭要起飞就必须要启动发动机
* 开车需要控制吧,比如方向盘,油门等等控制,需要通过SparkConf来进行参数配置
* 到目前为止,车子已经基本结束了。
*/
SparkConf conf = new SparkConf()
.setAppName("estimator test");
JavaSparkContext jsc = new JavaSparkContext(conf);
弹性分布式数据集(RDD)
Spark主要就是围绕RDD这个概念开发的,RDD具有高容错性,并行性等优势。有两种方式创建RDDs:
* parallelizing一个在driver程序中存在的collection
* 从外存中读取,比如共享文件系统,HDFS, HBase, 或者人也一种提供Hadoop输入格式的数据源。
RDD操作
RDDs支持两种操作:
* transformation,从一个已经存在的RDD转换到一个新的RDD。比如map就是一个transformation,将每一个数据集元素都运行一个function,返回一个表示结果的RDD。功能上来说就是java中的map函数,只不过这里结果还是一个RDD。
- actions,在完成数据集的计算过后返回一个值。比如上面的map生成了一个新的RDD,我要想知道结果怎么办。这个时候就可以使用reduce,这是一个action,使用某些函数聚合RDD中的所有元素,返回一个最终的结果给driver程序。(当然,这里并行的reduceByKey也可以返回一个并行的结果)。
注意这里的所有的transfomation都是具有惰性(lazy),意思是,你使用某个transfomation的时候,它并不执行,他们只是记住了在某个数据集中运用了这个transformations。这些transformation只有在一个action执行的时候才会执行,返回结果给driver程序。做一个比喻,我们都比较懒,上课老师布置了作业,我们只把这些作业记下来了,晚上并没有做。当过几天老师让我们交作业(需要一个结果),我们按照记下来的作业,先后完成。
JavaRDD<String> lines = sc.textFile("data.txt");
JavaRDD<Integer> lineLengths = lines.map(s -> s.length());
int totalLength = lineLengths.reduce((a, b) -> a + b);
默认来说,每一个transformed RDD只有在运行一个action的时候才会被重新计算。但是,你也可以使用persist(或者cache)方法将RDD持久化在内存中,在这个例子中,Spark将会让这些元素保存在集群中这样下次就可以更快的使用查询它了。当然也可以让RDDs保存在硬盘上,或者复制到多个节点上。
lineLengths.persist(StorageLevel.MEMORY_ONLY());
Spark的passing函数
在集群中的driver函数中,Spark’s API严重依赖passing函数。在java中,这个函数是通过实现org.apache.spark.api.java.function包中的接口完成的。下面有两种方式创建这样的函数:
* 在自己的类中实现函数接口,不管是一个匿名内部类或者命名的,在Spark中去实例化他。
* 在java8中,可以只用匿名函数直接定义一个接口。
如下
JavaRDD<String> lines = sc.textFile("data.txt");
JavaRDD<integer> lineLengths = lines.map(new Function<String, Integer>(){
public Integer call(string s){ return s.length();}
}
int totalLength = lineLengths.reduce(new Function2<Integer, Integer, Integer>(){
public Integer call(Integer a, Integer b){ return a+b;}
})
下面的这种写法就不是很明智了:
class GetLength implements Function<String, Integer>{
public Integer call(String s){ return s.length();}
}
class Sum implements Functions<Integer, Integer, Integer>{
public Integer call(Integer a, Integer b){ return a+b;}
}
JavaRDD<String> lines = sc.textFile("data.txt");
JavaRDD<Integer> lineLengths = lines.map(new GetLength());
int totalLength = lineLengths.reduce(new Sum());
理解闭合
Spark中最难的东西之一就是理解变量或方法在集群中执行的作用范围和生命周期。修改一个RDD作用范围之外的变量是造成混淆的常见原因。下面的例子是使用foreach()实现代码递增计数器的,在其他类似的问题上也会出现类似的操作。
例子
int counter = 0;
JavaRDD<Integer> rdd = sc.parallelize(data);
// Wrong: Don‘t do this!!
rdd.foreach(x -> counter += x);
println("Counter value: " + counter);
这个是一个简单的RDD元素相加的例子,结果表现不同跟是否在相同的JVM中有关系。比如在local模式和集群中就显著结果不一样。
上面代码可能不会像想象的那样工作。spark会将RDD转换成一个个的tasks,每个task都在一个executor中运行。就像前面执行的一样,Spark的每个task闭合执行的。
The closure is those variables and methods which must be visible for the executor to perform its computations on the RDD (in this case foreach()).这个是官方对closure的解释,怎么翻译都变扭,所以直接把原文贴上来。
上面的例子中,executor中的counter变量并不是driver节点中的counter。driver节点中仍然有一个counter变量,但是对executor并不再是可见的了。每一个executor只能看到并行化的counter,即每一个executor都有一份counter的拷贝。执行完上面的程序过后,每一个executor中的counter变了,但是driver中的counter变量仍然是0.
在本地模式中,上面的例子中excutor和driver真正是在同样一个JVM中执行的,这样excutor看到的counter就是driver的counter,所以可以完成更新。
如果更好的执行,可以使用一个accumulator(累加器).
一般来说,closures-想循环或本地定义的方法结构,不应该改动一些全局的状态。Spark不定义或者保证从闭合空间中引入外部变量会发生无法预料的事情。有些代码在本地可以允许,但是在犯不上情况下并不能起作用,不能依赖这种偶然性。所以最好使用一些全局累加器(aggregation)。
打印RDD上的元素
另一个常用的错误就是使用rdd.foreach(println)或rdd.map(println)方法打印RDD中的元素。在一台机器中这可是可以打印出来的。但是在集群模式中,输出stdout是输出到excutor的stdout中的,而不是driver。所以driver上面的stdout并不会显示任何数据。为了单一driver上面的所有数据,一个方法就是使用collect()方法首先将RDD全部收集到driver节点中:rdd.collect().foreach(println).这个可能会导致driver的内存溢出,因为collect让整个RDD都传到了一个机器上面。其实如果想打印出一些RDD的数据,一个更合适的方法就是使用take()方法:rdd.take(100).foreach(println);
使用key-value对
虽然大多数Spark上的操作能够工作在任意类型对象的RDDs上,还有几种特殊的操作是建立在key-value对上的。
在java中,key-value对是使用scala.Tuple2类表示的。可以见到定义new Tuple2(a, b)来创建一个tuple,可以使用tuple._1()和tuple._2()访问。
key-value类型的RDD是通过JavaPairRDD类表现出来的。可以使用特殊的map,比如mapToPair和flatMapToPair,建立JavaPairRDDs。
比如,下面的代码就是在key-value对上使用reduceByKey操作来计算文章中每一行出现的次数:
JavaRDD<String> lines = sc.textFile("data.txt");
JavaPairRDD<String, Integer> pairs = lines.mapToPair(s -> new Tuple2(s, 1));
JavaPairRDD<String, Integer> counts = pairs.reduceByKey((a, b) -> a + b);
我们也可以使用counts.sortByKey()按照关键字进行排序,绥中使用counts.collect()将结果以数组的形式返回到driver程序中。
Transformations
Transformation | Meaning |
---|---|
map(func) | 将原来RDD中的每个元素通过自定义函数func转换为一个包含新元素的RDD。 |
filter(func) | 对原有RDD中的元素进行过滤,每个元素输入到func函数中,如果func函数返回为true则保留,返回false则丢弃。 |
flatMap(func) | 功能与map相似,但是输出的是一个集合。 |
mapPartitions(func) | 功能与map相似,但是mapPatitions获取的是每个分区的迭代器。 |
mapPartitionsWithIndex(func) | 功能与mapPatitions相似,但是func函数要返回一个表示分区index的interger类型的值 |
sample(withReplacement, fraction, seed) | 对数据集中的数据进行采样,想成一个新的RDD |
union(otherDataset) | 将两个数据类型相同的RDD合并成一个RDD |
intersection(otherDataset) | 返回一个包含两个数据类型相同的RDD的交集的全新的RDD |
distinct([numTasks])) | 对RDD中的元素进行去重操作 |
groupByKey([numTasks]) | 返回一个(k, iterable)键值对.注意:如果你分组是为了执行一个聚合(比如求和或平均),使用reduceByKey或aggregateByKey将有更好的性能。注意:默认情况下,并行输出的分区数取决于父抽样的分区的数量。您可以通过一个可选的numTasks参数设置不同数量的任务。 |
reduceByKey(func, [numTasks]) | 对k相同的键值对中的值调用func函数,合并产生一个值 |
sortByKey([ascending], [numTasks]) | 返回一个按照k值进行排序的键值对RDD。 |
join(otherDataset, [numTasks]) | 对俩个需要连接的RDD进行cogroup函数操作,cogroup原理如上,cogroup操作后形成的新的RDD,对每个Key下的元素进行笛卡尔积操作,返回结果在展平。 |
cogroup(otherDataset, [numTasks]) | 对两个RDD进行协同划分,每个RDD中形同Key的元素分别聚合为一个集合,并且返回两个RDD中对应key中的元素集合的迭代器。 |
cartesian(otherDataset) | 对两个RDD内的所有元素进行笛卡尔积操作。 |
pipe(command, [envVars]) | 对RDD的每个分区通过脚本命令,RDD元素可以写入进程的stdin和行输出到标准输出作为字符串返回 |
coalesce(numPartitions) | 设置RDD数据的分区数,可以让数据集的操作更加高校。 |
repartition(numPartitions) | 修改RDD数据的分区数 |
repartitionAndSortWithinPartitions(partitioner) | 重新设置RDD分区,根据keys值排序,这个比repartition更加高效 |
action
本质上,Action通过SparkContext执行提交作业的runjob操作,触发了RDD DAG的执行。
Action | Meaning |
---|---|
reduce(func) | 将集合中的元素通过func函数合并起来,该函数应该是可交换,结合的,这样才能应用到并行计算中。 |
collect() | 将数据中的元素返回为一个数组,这个函数通常用在filter等其他操作后。 |
count() | 返回数据集中元素的个数 |
first() | 返回数据集中的第一个元素 |
take(n) | 返回数据集中前n个元素 |
takeSample(withReplacement, num, [seed]) | 按设定的采样个数进行采样 |
takeOrdered(n, [ordering]) | 返回前N个RDD中元素的自然顺序或自定义比较器 |
saveAsTextFile(path) | 将数据集中的元素存储起来,存在给定的目录中的本地文件系统,或任何其他的Hadoop HDFS 支持的文件系统。Spark可以调用toString将每个元素以每行的形式存在文本中 |
saveAsSequenceFile(path)(Java and Scala) | 将数据集中的RDD以Hadoop SequenceFile的形式存在给定路径本地的文件系统,HDFS或者其他任何Hadoop支持的文件系统中。这个对读取存在Hadoop中健值对十分有用。 |
saveAsObjectFile(path)(Java and Scala) | 使用Java序列化将数据集中的元素存储为一个简单的格式,我们可以通过SparkContext.objectFile()进行读取。 |
countByKey() | 返回具有不同健的键值对的个数 |
foreach(func) | 对RDD中的每个元素应用func函数,不返回RDD和Arry,而是返回Uint。 |
shuffle操作
Spark有一种特别重要的操作:shuffle。shuffle是Spark机制中重新分派数据以便于能够跨区分组。这通常涉及在executors和机器之间copy数据,这使得shuffle十分复杂以及代价高昂。
背景
为了理解在shuffle过程中发生了什么,我们就拿reduceByKey操作做例子。reduceByKey操作会产生一个新的RDD,这里同一个key的所有key-value都组合进一个数组-这个key和reduce函数执行需要找到与这个key相关联的所有key.问题在于不是一个key对应的所有value都是在一个分区中的,或者同一个机器,但是运算结果的时候必须放在同一个分区。
在Spark计算过程中,一个单一的task只会操作单一翻去——但是,使用reduceByKey时,会遍历所有数据,将所有数据转移到reduceByKey的task中执行。这就必须读取所有的分区找到所有keys对应的所有values了,然后将所有分区中的values聚合起来计算相应key的最终结果,这个就被称作shuffle。
尽管新的shuffled过后的每个分区数据集是确定的,分区本身的顺序也是确定的,但是这些数据元素的顺序是不确定的。如果需要在shuffle过后获得顺序的数据,下面可能会有用:
- mapPartitions排序使用.sorted
- 当同时分区的时候repartitionAndSortWithinPartitions能够更有效的对分区排序
- sortBy能够获得全局排序的RDD
下面操作包括shuffle过程
* 重新分区:repartition和coalesce
* 排序:groupByKey和reduceByKey
* 连接:cogroup和join
性能
Shuffle过程需要进行大量的磁盘I/O操作,进行数据序列化以及大量的网络I/O。shuffle过程可以通过设置大量的参数修改。具体参见spark配置指南
RDD持久化
Spark中最重要的能力之一就是持久化(缓存)一个数据集在内存中。当你持久化一个RDD的时候,每一个节点都存着这个RDD的分区,当在内存中计算的时候,在数据集中其他操作会重复利用这些分区。这个就会让我们将来的操作变得更快(经常快10x左右)。缓存是一个迭代算法关键的部分,这样就可以快速的使用。
你可以使用persist()或cache()方法标记一个RDD为持久化RDD。当它第一次在一个action中计算的时候,它将在节点的内存中持久存在。Spark的缓存是具有容错性的——如果RDD中的任意一个分区丢失了,它将使用之前的transformation重新创建它。
林外,每一个持久化RDD都可以使用不同的存储级别,比如,持久化在磁盘中,持久化在内存中但是作为序列化Java对象,复制到节点中或者将它存储在Tachyon内存文件系统中。在persist(),这些级别都可以通过StorageLevel来设置。cashe()方法是简单来说就是使用默认的存储级别——StorageLevel.MEMORY_ONLY(在内存中存储未序列化数据)。全部的存储级别如下:
Storage Level | Meaning |
---|---|
MEMORY_ONLY | 在JVM中,存储未序列化java对象。如果这个RDD不在内存中,一些分区也不会缓存,每次需要使用的时候都重新计算 |
MEMORY_AND_DISK | 在JVM中,存储未序列化的Java对象。如果RDD没有在内存中,则存储在分区的磁盘上,当需要的时候从磁盘中读取 |
MEMORY_ONLY_SER | 存储序列化的Java对象。一般来说这个比为序列化对象更加节省空间,特别是使用fast serializer,但是也需要更多的CPU去读取 |
MEMORY_AND_DISK_SER | 跟MEMORY_AND_DIST_SER类似,但是是将分区中不在内存的数据放到磁盘中而不是每次需要的时候重新计算 |
DISK_ONLY | 只将RDD分区存储在磁盘上 |
OFF_HEAP (experimental) | 在Tachyon的序列化格式存储RDD。相比MEMORY_ONLY_SER,OFF_HEAP开销减少了垃圾收集,并允许执行人要小的共享一个内存池,使得它在大型堆或多个并发应用程序的环境吸引力。此外,由于RDDS居住在的Tachyon,遗嘱执行人的崩溃不会导致丢失内存缓存。在这种模式下,在快子存储器是可废弃。因此,快子不会尝试重建它从存储器逐出块。如果您计划使用超光速粒子作为关堆店,Spark是兼容的Tachyon外的开箱。请参阅此页建议的版本配对。 |
选取哪一种存储模式呢?
Spark的存储模式以为这平衡内存使用和CPU效率。下面是一些很好的建议:
* 如果RDD使用的是默认存储模式(MEMORY_ONLY),那么改掉他吧。这是使用CPU最搞笑的方式,让操作RDDs尽量快。
* MEMORY_ONLY_SER并选择快速序列化库是最具空间效率的方式,速度仍然很快。
* 不要将数据存储在磁盘中,出发计算的成本很高或者需要赛选大量的数据,否则,重新计算一个分许比从磁盘中读取更快。
* 如果想要快速故障恢复,那么久使用复制存储级别(例如,例如使用Spark相应一个web应用的请求)。所有的存储级别重新计算修饰的数据提供了全面的容错能力,并且是同步让你继续运行RDD任务,而唔系等待重新计算丢失的分区。
* 如果有大量内存和应用的情况下,OFF_HEAP模式有下面一系列的有时:
* 允许多个executor共享Tachyon中的内存池
* 现在减少了存储collectiondaijia
* 如果单个executors崩溃了,缓存数据不会丢失
移除数据
Spark会自动缓存数据,按照最近最少使用原则移除一些数据。如果想人工删除内存中的RDD,可以使用RDD.unpersist()方法
共享变量
广播类型变量
广播变量允许程序员保持每台机器上一个只读变量缓存,而不是随着任务拷贝这个变量。例如,他们可以以有效的方式给每个节点的大型输入数据集的副本。星火也尝试使用高效广播算法以降低通信成本来分发广播变量。
Spark的action是通过一组策略执行,这个与分布式“shuffle”操作分开。Spark自动广播tasks需要的共同数据。广播数据是一序列化形式进行缓存和在每个task之前进行反序列化。这意味着显式地创建广播变量是唯一有用当在多个阶段的工作需要相同的数据或在反序列化形式缓存数据时是重要的。这意味着在多个阶段需要相同的数据显示地创建广播变量是唯一有用的方式或者当以非序列化的形式缓存数据是重要的。
广播变量v通过调用SparkContext.broadcast(v)创建。广播通过调用value()获得变量值。代码如下:
Broadcast<int[]> broadcastVar = sc.broadcast(new int[] {1, 2, 3});
broadcastVar.value();
// returns [1, 2, 3]
累加器
累加器是唯一一种在并行情况下有小支持累加的操作。这可以用来实现计数器或求和。Spark支持数字类型的累加器,也可以层架新的类型。如果累加器被创建一个带名字,他们会在Spark’s UI中限制。
Accumulator<Integer> accum = sc.accumulator(0);
sc.parallelize(Arrays.asList(1, 2, 3, 4)).foreach(x -> accum.add(x));
// ...
// 10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s
accum.value();
// returns 10
class VectorAccumulatorParam implements AccumulatorParam<Vector> {
public Vector zero(Vector initialValue) {
return Vector.zeros(initialValue.size());
}
public Vector addInPlace(Vector v1, Vector v2) {
v1.addInPlace(v2); return v1;
}
}
// Then, create an Accumulator of this type:
Accumulator<Vector> vecAccum = sc.accumulator(new Vector(...), new VectorAccumulatorParam());