英文标题:Quick Start
英文原址:http://spark.apache.org/docs/latest/quick-start.html
Spark Version:1.3.0
1, 使用Spark-Shell进行交互式分析
1.1 基本使用
Spark-shell为学习API提供了简单的方式,它是一个非常强大的进行交互式数据分析的工具,在Scala或Python中都有提供。在Spark目录中运行下面的命令:
./bin/spark-shell |
Spark的主要抽象是一个分布式数据集合,叫做弹性分布式数据集(RDD),可以通过读取HDFS文件或转换其它RDD来创建RDD,下面示例表示的是通过读取一个HDFS文件来创建RDD:
val textFile = sc.textFile(“README.md”) |
RDD有transformation和action两种操作,前者返回一个指向新RDD的指针,后者返回值。下面是一些action操作:
textFile.count() textFile.first() |
下面是一些transformation操作,用filter过滤出文件中的Spark行得到一个新的RDD:
val linesWithSpark = textFile.filter(line => line.contains(“Spark”)) |
可以把transformation和action串起来:
textFile.filter(line => line.contains(“Spark”)).count()//计算了README.md文件中有多少行包含Spark这个单词 |
1.2 更多RDD操作
可以用transformation和action来做很多更复杂的计算,例如下面的示例是找出所有行中单行的最多单词数
textFile.map(line => line.split(“ “).size).reduce((a,b) => if(a>b) a else b) |
这行代码首先将一行映射成一个整数值(单词数)生成一个RDD,在这个RDD上调用reduce找出最大size最大的一个。Map和reduce的参数是Scala中的函数,可以使用其大量的语言特性或Scala/Java库,例如,可以使用其他语言声明的包使得上面的代码更易理解:
import java.lang.Math textFile.map(line => line.split(“ “).size).reduce((a,b) => Math.max(a,b)) |
一个通用的数据模型是MapReduce,Spark可以轻松地实现MapReduce:
val wordCounts = textFile.flatMap(line => line.split(“ “)).map(word => (word,1)).reduceByKey((a,b) => a+b) |
这里结合了flatMap,map,reduceByKey几个转换计算每一个单词的数以(String,Int)对作为一个RDD,在Shell中合并这些单词数,可以用collect这个action:
wordCounts.collect |
1.3 缓存(Caching)
Spark支持将数据集放到集群的内存中进行缓存,这对于要重复使用的数据非常管用,例如查询小的热点数据集或者运行类似于PageRank的迭代算法,下面的示例将linesWithSpark数据集进行缓存:
linesWithSpark.cache() linesWithSpark.count() linesWithSpark.count() |
看上去用Spark缓存一个100行的文件挺愚蠢的,但是有意思的是相同的函数也可以被用在很大的数据上,而且效果一样,即使它们跨上百个节点存储,你可以用bin/spark-shell交互式的连接到一个集群。
2, 独立的应用程序
使用Spark API写一个独立应用程序,下面从一个简单的程序开始(Scala&SBT):
/* SimpleApp.scala */ importorg.apache.spark.SparkContext importorg.apache.spark.SparkContext._ importorg.apache.spark.SparkConf objectSimpleApp{ def main(args:Array[String]){ val logFile="YOUR_SPARK_HOME/README.md"// val conf=newSparkConf().setAppName("Simple val sc=newSparkContext(conf) val logData= val numAs= val numBs= println("Lines with a: %s, Lines with b: %s".format(numAs, } } |
注意,应用程序必须定义一个main()函数而不是继承scala.App,scala.App的子类可能不会正确的运行(Why?)。
这个程序分别计算了README.md文件中包含a和b的行数,在程序中你要用自己的Spark_HOME替换掉YOUR_SPARK_HOME。独立程序不像Spark-Shell启动时会初始化自己的SparkContext,而是需要将初始化SparkContext作为程序的一部分。
给SparkContext构造器传递一个SparkConf对象,这个对象包含了应用程序的相关信息。应用程序依赖于Spark API,所以还要有一个SBT的配置文件,下面的simple.sbt解释了Spark依赖,此依赖还添加了Spark依赖的一些库。
name:="Simple Project" version:="1.0" scalaVersion:="2.10.4" libraryDependencies+="org.apache.spark"%%"spark-core"%"1.3.0" |
为了使SBT正常工作,得根据目录结构来布局SimpleApp.scala和simple.sbt,如果位置正确的话,就可以创建包含独立应用程序的Jar包了,然后使用spark-submit脚本运行我们的程序。
# Your directory layout should look like this $
# Package a jar containing your application $
[ # Use spark-submit to run your application $
|
转载请注明出处。