Spark开发指南

简介

总的来说,每一个Spark应用程序,都是由一个驱动程序组成,它运行用户的main函数,并且在一个集群上执行各种各样的并行操作。Spark提 供的主要的抽象(概念)是一个弹性分布式数据集,它是一个元素集合,划分到集群的不同节点上,可以被并行操作。RDDs的创建可以从Hadoop文件系统 (或者任何支持Hadoop的文件系统)上的一个文件开始,或者通过转换这个驱动程序中已存在的Scala集合而来。用户也可以使Spark持久化一个 RDD到内存中,使其能在并行操作中被有效的重用。最后,RDDs能自动从节点故障中恢复。

Spark中的第二个抽象(概念)是共享变量,他可以在并行操作中使用。默认情况下,Spark通过不同节点上的一系列任务来并行运行一个函数。他 将每一个函数中用的到变量的拷贝传递到每一个任务中。有时候,一个变量需要在不同的任务之间,或者任务和驱动程序之间共享。Spark支持两种类型的共享 变量:广播变量,可以再所有节点的内存中缓存一个值,累加器,一个只能做加法的变量,例如计数器和求和。

本指南通过每一种Spark支持的语言来展示Spark的每个特性。It is easiest to follow along with if you launch Spark’s interactive shell – either bin/spark-shell for the Scala shell or bin/pyspark for the Python one.

接入Spark

Java

Spark1.0.2工作在Java6或者java6以后之上。如果你在使用Java8,Spark支持lamdba表达式来简化函数编写,否则,你可以使用org.apache.spark.api.java.function 包下的类。

用Java编写Spark应用,你需要添加Spark的依赖,Spark可以通过Maven Central使用:

groupId=org.apache.spark 
artifactId=spark-core_2.10 
version=1.0.2

另外,如果你想访问一个HDFS集群,你需要根据你的HDFS版本添加一个hadoop-client依赖。一些常用的HDFS版本标签显示在页面。

groupId=org.apache.hadoop 
artifactId=hadoop-client 
version=

最后,你需要在你的程序中导入一些Spark类,通过添加如下几行:

import org.apache.spark.api.java.JavaSparkContext 
import org.apache.spark.api.java.JavaRDD 
import org.apache.spark.SparkConf

初始化Spark

Java

Spark程序需要做的第一件事就是创建一个JavaSparkContext对象 ,它将告诉Spark怎样访问一个集群。创建一个SparkContext,你首先必须创建SparkConf对象,它包含关于你的应用程序的信息。

SparkConf conf=new SparkConf().setAppName(appName).setMaster(master); 
JavaSparkContext sc=new JavaSparkContext(conf);

appName参数是你的应用程序的名字,将会在集群的UI上显示。master是Spark、Mesos、或者YARN 集群URL,或者一个专用的字符串”local“使其在本地模式下运行。在实践中,当运行在一个集群上,你将不会想要把master硬编码到程序中,而是 通过使用spark-submit运行程序并且接受master。但是,在本地测试或者单元测试中,你可以传递”local“在进程内运行Spark。

弹性分布式数据集

Spark反复围绕的一个概念是弹性分布式数据集。它是一个有容错机制的元素集合,并且可以被并行操作。有两种创建RDDs的方法。并行化你的驱动 程序中已存在的集合,或者引用一个外部存储系统的数据集,例如一个共享文件系统,HDFS、HBase、或者任何可以提供一个Hadoop InputFormat的数据源。

并行集合

并行集合通过调用JavaSparkContext的parallelize方法,在你的驱动程序中已存在的Collection上创建。集合的元素将会拷贝组成一个可以被并行操作的分布式数据集。例如,下面是如何创建一个包含数字1到5的并行集合:

List data=Arrays.asList(1,2,3,4,5); 
JavaRDD distData=sc.parallelize(data);

一旦创建,分布式数据集(distData)就可以并行操作。例如,我们可以调用 distData.reduce((a,b)->a+b)来将列表中的元素相加。我们稍后将会在分布式数据集的操作中描述。

注意:在这个指南中,我们经常使用简洁的Java8 lamdba语法来定义java functions,但是在老的Java版本中,你可以实现org.apache.spark.api.java.function包中的接口。我们将会 在下面详细描述passing functions to Spark。

并行集合的另一个重要的参数是数据集被切分成切片(slices)的数量。Spark将会为集群中的每一个slice运行一个task。通常情况 下,你要为集群中的每个CPU 2-4个slice。通常,Spark会尝试根据你的集群自动设置slice的数量。然而,你可以手动的设置它,把它作为第二个参数传递给 parallelize(例如:sc.parallelize(data,10)).

外部数据集

Spark可以通过任何Hadoop支持的存储源创建分布式数据集。包括你的本地文件系 统,HDFS,Cassandra,HBase,Amazon S3等等。Spark支持text files(文本文件),SequenceFiles(序列化文件),和任何其他的Hadoop InputFormat(输入格式)。

Text file 可以通过使用SparkContext的textFile方式创建。这个方法接受一个文件的URI(或者机器上的一个本地路径,或者hdfs://,s3n:// 等URI)并且把这个文件读成一个行的集合。下面是一个调用的例子:

JavaRDD distFile=sc.textFile(“data.txt”);

一旦创建,distFile可以被进行数据集操作。例如:我们可以通过使用map和reduce将所有数据行的长度相加.例如:distFile.map(s->s.length()).reduce((a,b)->(a+b)).

Spark读文件时的一些注意事项:

  • 如果使用本地文件系统上的路径,
  • Spark的所有基于文件的输入方法,包括textFile,支持运行目录,压缩文件盒通配符。例如,你可以使用textFile(“/my/directory/“),textFile(“/my/directory/.txt”),和textFile(“/my/directory/.gz”)
  • textFile方法也可以接受一个可选的第二参数来控制这个文件的slice数目。默认情况下,Spark为每一个文件创建一个 slice(HDFS中block默认为64MB)。但是你可以通过传递一个较大的值来指定一个跟高的slice值。注意你的slice数不能小于 block数。

除了文本文件,Spark的Java API 也支持集中其他数据格式。

  • JavaSparkContext.wholeTextFiles lets you read a directory containing multiple small text files, and returns each of them as (filename, content) pairs. This is in contrast with textFile, which would return one record per line in each file.
  • 对于序列化文件(SequenceFiles),使用SparkContext的sequenceFile[K,V],K和V是文件中key和value的类型。他们必须是Hadoop的Writeable接口的子类,像IntWriteable和Text。
  • 对于其他的Hadoop输入格式,你可以使用JavaSparkContext.hadoopRDD方法。它可以接受任意(类型)的 JobConf和输入格式类,key类和value类。按照像Hadoop Job一样,来设置输入源就可以了。你也可以为InputFormats使用JavaSparkContext.newHadoopRDD,基 于”new“MapReduce API(org.apache.hadoop.mapreduce).
  • JavaRDD.saveAsObjectFile 和JavaContext.objectFile支持以一种由Java对象序列化组成的简单的格式保存RDD。虽然这不是有效地专门的格式向Avro,但是它提供了一个简单的方式存储RDD。

RDD操作

RDDs支持两种类型的操作:转换(transformations),它从一个现有的数据集创建一个新的数据集。动作(actions),它在数 据集上运行计算后,返回一个值给驱动程序。例如:map就是一个转换,它将数据集的每一个元素传递给一个函数,并且返回一个新的RDD表示结果。另一方 面,reduce是一个动作,他通过一些行数将一些RDD的所有元素聚合起来,并把最终的结果返回给驱动程序(不过还有一个并行的 reduceByKey,它返回一个分布式数据集)。

Spark中的所有转换都是惰性的,也就是说,他们不会立即计算出结果。相反,他们只是记住应用到这些基础数据集(例如file)上的转换。只有当 发生一个需要返回一个结果给驱动程序的动作时,这些转换才真正执行。这样的设计使得Spark运行更加高效——例如,我们可以实现,通过map创建一个数 据集,并在reduce中使用,最终只返回reduce的结果给驱动程序,而不是整个大的新数据集。

默认情况下,每一个转换过的RDD都会在你在它上面运行一个action时重新计算。然而,你也可以使用persist方法(或者cache)持久 化一个RDD到内存中。在这种情况下,Spark将会在集群中,保存相关元素,下次你访问这个RDD时,它将能够更快速访问,。在磁盘上持久化数据集,或 者在集群间复制数据集也是支持的。

基本操作

为了说明RDD基础,考虑下面的简单的程序:

JavaRDD lines=sc.textFile(“data.txtt”); 
JavaRDD lineLengths=lines.map(s->s.length()); 
int totalLength=lineLengths.reduce((a,b)->a+b);

第一行通过一个外部文件定义了一个基本的RDD。这个数据集未被加载到内存,也未在上面执行动作。lines仅仅是这个文件的一个指针。第二行定义 了lineLengths作为map转换的结果。此外,lineLengths因为惰性没有立即计算。最后,我们运行reduce,他是一个 action。这时候,Spark将这个计算拆分成不同的task,并使其运行在独立的机器上,并且每台机器运行它自己的map部分和本地的 reducation,仅仅返回他的结果给驱动程序。

如果我们想在以后重复使用lineLengths,我们可以添加:

lineLengths.persist();

在reduce之前,这将导致lineLengths在第一次被计算之后被保存在内存中。

传递Functions到Spark

Spark的API,在很大程度上依赖于传递函数使其驱动程序在集群上运行。在Java中,函数有实现了org.apache.spark.api.java.function包中接口的类表示。有两种创建这样的函数的方式:

  • 在你自己的类中实现Function接口,可以是匿名内部类,后者命名类,并且你要传递他的一个实例到Spark
  • 在Java8中,使用lamdba表达式来简洁的定义一种实现

为了简洁起见,本指南中的大多数使用lamdba语法,它易于使用,所有的APIs in long-form,例如,我们可以编写上面的代码如下:

JavaRDD<String> lines = sc.textFile("data.txt");
JavaRDD<Integer> lineLengths = lines.map(newFunction<String, Integer>() {
  public Integer call(String s) {returns.length(); }
});
inttotalLength = lineLengths.reduce(newFunction2<Integer, Integer, Integer>() {
  public Integer call(Integer a, Integer b) {return a + b; }
});

或者,如果编写内联函数显得很笨拙:

class GetLengthimplementsFunction<String, Integer> {
  public Integer call(String s) {returns.length(); }
}
class SumimplementsFunction2<Integer, Integer, Integer> {
  public Integer call(Integer a, Integer b) {return a + b; }
}

JavaRDD<String> lines = sc.textFile("data.txt");
JavaRDD<Integer> lineLengths = lines.map(newGetLength());
int totalLength = lineLengths.reduce(newSum());

Note that anonymous inner classes in Java can also access variables in the enclosing scope as long as they are marked final. Spark will ship copies of these variables to each worker node as it does for other languages

Wroking with Key-Value Pairs使用键/值对工作

虽然大多数Spark操作工作在包含各种类型的对象的RDDs之上,一些特殊的操作仅仅能够使用包含key-value对的RDDs。最常见的操作之一是分布式”shuffle“(移动?)操作,例如通过key分组或者聚合元素。

在Java中,key-value对使用scala标准包下的scala Tuple2(元祖,数组)类表示。你可以简单的调用new Tuple2(a,b)去创建一个tuple,并且通过tuple.1()和tuple.2()访问它的字段。

key-value对的RDDs通过JavaPairRDD表示。你可以通过JavaRDDs构建JavaPairRDDs,使用指定的map操作 版本,像mapToPair和flatMapToPair。JavaPair将不仅拥有标准RDD函数,并且有特殊的key-value函数。

例如,下面的代码在key-value对上使用reduceByKey操作来计算在一个文件中每行文本出现的次数和。

JavaRDD lines=sc.textFile(“data.txt”); 
JavaPairRDD pairs=lines.mapToPair(s->new Tuple2(s,1)) 
JavaPairRDD counts=pairs.reduceByKey((a,b)->a+b);

我们也可以使用counts.sortByKey(),例如,按照字母顺序排序这个键值对。并且最后调用counts.collect()作为一个对象数组返回给驱动程序。

注意:当使用自定义的对象作为key-value对操作的key时,你必须确保自定义equals()方法伴随着一个匹配的hashCode()方法。有关详情,参考 Object.hashCode() 文档大纲中列出的规定。

RDD持久化

Spark最重要的一个功能是在不同的操作间,持久化(或者缓存)一个数据集到内存中。当你持久化一个RDD时,每一个节点都把它计算的分片结果保 存在内存中,并且在对此数据集(或者衍生出的数据集)进行其他动作时重用。这将使后续的动作变得更快(通过快109倍以上)。缓存是(Spark)迭代算 法和快速交互使用的关键工具。

你可以使用persist()和cache()方法来标记一个将要持久化的RDD。第一次他被一个动作进行计算,他将会保留在这个节点的内存中。Spark的缓存有容错性-如果RDD的任何一个分区丢失了,他会通过使用最初创建的它转换操作,自动重新计算。

此外,每一个持久化RDD可以使用不同的存储级别存储。允许你,例如,持久化数据集到磁盘,持久化数据集到内存作为序列化的Java对象(节省空 间),跨节点复制,或者 store it off-heap in Tachyon。这些级别通过传递一个StorageLevel对象(Scala,Java,Python)到persist()来设置。cache() 方法是使用默认存储级别的快捷方法,即StorageLevel.MEMORY_ONLY(存储反序列化对象到内存),完整的存储级别设置为:

Spark也会在shuffle操作(例如,reduceByKey)中自动的持久化一些中间数据。甚至当用户未调用persist方法。这样做是 为了阻止在进行shuffle操作时由于一个节点故障而重新计算整个输入。我们依然推荐用户在作为结果的RDD上调用persist如果想打算重用它。

时间: 2024-10-12 05:10:42

Spark开发指南的相关文章

【spark系列3】spark开发简单指南

分布式数据集创建之textFile 文本文件的RDDs能够通过SparkContext的textFile方法创建,该方法接受文件的URI地址(或者机器上的文件本地路径,或者一个hdfs://, sdn://,kfs://,其他URI).这里是一个调用样例:scala> val distFile = sc.textFile("data.txt")distFile: spark.RDD[String] = [email protected] 分布式数据集操作之转换和动作 分布式数据集

【转】Spark Streaming和Kafka整合开发指南

基于Receivers的方法 这个方法使用了Receivers来接收数据.Receivers的实现使用到Kafka高层次的消费者API.对于所有的Receivers,接收到的数据将会保存在Spark executors中,然后由Spark Streaming启动的Job来处理这些数据. 然而,在默认的配置下,这种方法在失败的情况下会丢失数据,为了保证零数据丢失,你可以在Spark Streaming中使用WAL日志,这是在Spark 1.2.0才引入的功能,这使得我们可以将接收到的数据保存到WA

spark编程指南

去年学习Spark了一段时间,今年捡回来,发现好多东西都已经忘记了.现在讲官方网站上的东西转诉过来,回顾并记录下来. 概要 从架构角度来看,每一个Spark应用由driver程序组成,在集群中运行用户的main函数和执行大量的parallel操作.Spark的核心抽象概念就是弹性分布式数据集(RDD),这是一种跨越并行集群中节点操作元素的集合.RDD在Hadoop文件系统上建立的(或者其他hadoop支持的文件系统),或现有的Scala集合中的驱动程序,并可以transforming.用户还可以

【资源共享】Rockchip I2C 开发指南 V1.0

2C设备的设备应用非常广泛,常见的包含重力传感器,触摸屏驱动芯片,音频解码等 这个文档是RK3399的I2C开发文档:<Rockchip I2C 开发指南 V1.0> 内容预览: 下载地址:http://developer.t-firefly.com/thread-12495-1-1.html

七日Python之路--第十二天(Django Web 开发指南)

<Django Web 开发指南>.貌似使用Django1.0版本,基本内容差不多,细读无妨.地址:http://www.jb51.net/books/76079.html (一)第一部分 入门 (1)内置数字工厂函数 int(12.34)会创建一个新的值为12的整数对象,而float(12)则会返回12.0. (2)其他序列操作符 连接(+),复制(*),以及检查是否是成员(in, not in) '**'.join('**')   或  '***%s***%d' % (str, int)

开发指南专题八:JEECG微云快速开发平台数据字典

   开发指南专题八:JEECG微云快速开发平台数据字典的使用 1.标签中使用数据字典 数据字典为系统中可能用到的字典类型数据提供了使用的便利性和可维护性.以下拉框标签<t:dictSelect>为例进行讲解 1.1. 标签参数 属性名 类型 描述 是否必须 默认值 typeGroupCode string 字典分组编码 是 null field string 对应表单 是 null id string 唯一标识 否 null title string 显示文本 否 null defaul

C#在Linux上的开发指南(续)

续之前的一篇开发指南http://www.cnblogs.com/RainbowInTheSky/p/5496777.html 部分人在部署的时候经常出现dll兼容问题(其实可以看小蝶惊鸿的文章,蝶神早已踩过了坑http://www.cnblogs.com/xiaodiejinghong/tag/mono/) 站点部署后建议使用webbench进行压力测试 1.Microsoft.Web.Infrastructure.dll不用上传,mono已经实现(MS的dll有api依赖的问题),Mono的

《NodeJS开发指南》学习笔记

欢迎大家指导与讨论 : ) 注:此笔记是基于<NodeJS开发指南>,并不是原著. 第一章——NodeJS简介 NodeJS是一个可以让Javascript运行在服务器端的平台,它为实时Web应用(Real-time Web)开发而生.拥有实时响应,超大规模数据要求下架构的可扩展性.它采用了单线程.异步式I/O.事件驱动式的程序设计模型.统意义上,Javscript是由ECMA.DOM.BOM组成. NodeJS采用的是单线程模型,对于所有的I/O都采用异步式的请求方式,避免了频繁的上下文切换

Angularjs中文版本开发指南发布

Angularjs中文版本开发指南发布 2014-02-16 15:49 by 破狼, 29069 阅读, 9 评论, 收藏,  编辑 从本人开始在写关于Angularjs的文章开始,也算是见证了Angularjs在国内慢慢的火起来,如今的Angularjs正式如日中天.想知道为什么Angularjs会这么火,请移步angularjs移除不必要的$watch. 也是一次偶然的机会,在Angular.js中文社区群里相遇一群Angular的爱好者,在一次巧妙的交谈,大家对于Angular官方的Gu