在这次对Spark粗略的讲解过程中,我们还没有讲如何在单独的应用程序中使用Spark。撇开交互式运行来说,我们能在Java,Scala或这Python程序中连接Spark。与在shell中连接Spark相比,唯一的区别是,在程序中,你需要自己初始化SparkContext 。
连接Spark的过程因语言而异。在Java和Scala中,你在你的应用程序的Maven依赖中添加对spark-core 的依赖就可以了。到写这本书的时候,Spark的最新版是1.2.0,它对应的Maven坐标是:
groupId=org.apache.spark
artifactId=spark-core_2.10
version=1.2.0
Maven是流行的针对基于Java(原文:Java-based)语言的包管理工具,它让你能使用公共仓库中的包。你可以通过Maven构建你的项目,也可以通过其它能与Maven仓库对话的工作构建项目,包括Scala的sbt工具和Gradle。流行的集成开发环境(比如eclipse)都允许你直接往项目中添加Maven依赖。
在Python中,你只需要把应用程序写成Python脚本,但是你必须用Spark自带的bin/spark-submit脚本运行它们。bin/spark-submit脚本包含了Python中需要的Spark依赖。脚本为Spark的Python API设置环境变量到函数中。运行你的脚本,像例2-6一样。
例2-6. 运行Python脚本
bin/spark-submit my_script.py
(注意,在Windows中,你必须用反斜线代替正斜线)
初始化SparkContext
一旦你的应用程序连接到Spark,你需要在你的应用程序中导入Spark包并创建一个SparkContext对象。你需要先创建一个SparkConf对象去配置你的应用程序,然后用这个SparkConf对象创建SparkContext对象。例2-7到例2-9展示了各个Spark支持的语言创建SparkContext对象的过程。
例2-7. 在Python中初始化Spark
from pyspark import SparkConf, SparkContext
conf = SparkConf().setMaster(“local”).setAppName(“My App”)
sc = SparkContext(conf = conf)
例2-8. 在Scala中初始化Spark
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
val conf = new SparkConf().setMaster(“local”).setAppName(“My App”)
val sc = new SparkContext(conf)
例2-9. 在Java中初始化Spark
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
SparkConf conf = new SparkConf().setMaster(“local”).
这些例子展示了初始化SparkContext对象的简单方法,我们只设置了两个参数:
1.集群URL(在这些例子中是“local”),它告诉Spark如何连接到集群,local是一个特殊的值,它使Spark运行在本地机器的一个线程上,而不会连接到集群。
2.应用程序名字(在这些例子中是“My App”),当你连接到集群上时,这个名字将在集群管理器的UI上标识你的应用程序。
除了这两个参数之外,还存在其它一些参数,可以用它们来配置你的应用程序的执行方式,或者添加传递到集群中去执行的代码,但是,这些在这本书之后的章节中才会讲到。
在你初始化你的SparkContext对象之后,你可以使用我们之前展示的所有方法去创建RDD(例如,从一个文件)或操作这些RDD。
最后,为了关闭Spark,你可以调用你的SparkContext对象上的stop()方法,或者简单地推出程序(用System.exit(0)或者sys.exit())。
这次对Spark简单的讲解应该足够让你在你的电脑上运行一个单独的Spark应用程序了。对于更多高级配置,第七章将会讲怎样连接你的程序到集群上,包括打包你的程序让它的代码被自动传送到工作节点上。目前,请参考官方Spark文档Quick Start Guide。
构建单独的应用程序
如果我们没有一个word count例子,这个大数据书籍的介绍章节将是不完整的。在单台机器上,实现统计单词是简单的,但是在分布式框架中,它是一个通常的例子,因为它涉及到从大量工作节点上读取数据和合并数据。我们将看看如果通过sbt和Maven构建和打包一个word count程序。我们的所有示例都可以一起构建,但是为了说明一个拥有最少量以来的简单构建,在learning-spark-examples/mini-complete-example目录下,我们有一个更小的项目,正如你在例2-10(Java)和2-11(Scala)中看到的。
例2-10. Word count Java程序 — 现在不要担心细节
// 创建一个SparkConf
SparkConf conf = new SparkConf().setAppName(“wordCount”);
JavaSparkContext sc = new JavaSparkContext(conf);
// 加载我们的数据
JavaRDD<String> input = sc.textFile(inputFile);
// 分割行为单词集
JavaRDD<String> words = input.flatMap(
new FlatMapFunction<String, String>() {
public Iterable<String> call(String x) {
return Arrays.asList(x.split(” “));
}});
// 转化为单词:次数对,统计单词
JavaPairRDD<String, Integer> counts = words.mapToPair(
new PairFunction<String, String, Integer>(){
public Tuple2<String, Integer> call(String x){
return new Tuple2(x, 1);
}}).reduceByKey(new Function2<Integer, Integer, Integer>(){
public Integer call(Integer x, Integer y){ return x + y;}});
// 把单词计数保存会文本
counts.saveAsTextFile(outputFile);
例2-11. Word count Scala程序 —
现在不要担心细节
// 创建一个SparkConf
val conf = new SparkConf().setAppName(“wordCount”)
val sc = new SparkContext(conf)
// 加载我们的数据.
val input = sc.textFile(inputFile)
// 分割行为单词集
val words = input.flatMap(line => line.split(” “))
// 转化为单词:次数对,统计单词
val counts = words.map(word => (word, 1)).reduceByKey{case (x, y) => x + y}
// 把单词计数保存会文本
counts.saveAsTextFile(outputFile)
我们能用sbt(例2-12)或者Maven(例2-13)用非常简单的构建文件构建这些应用程序。我们已经标记Spark Core依赖已经是被提供了的,所以,之后,当我们使用一个assembly JAR的时候,我们不会包含spark-core jar,因为spark-core jar已经在工作节点的类路径下了。
例2-12. sbt构建文件
name := “learning-spark-mini-example”
version := “0.0.1”
scalaVersion := “2.10.4”
// additional libraries
libraryDependencies ++= Seq(
“org.apache.spark” %% “spark-core” % “1.2.0” % “provided”
)
例2-13. Maven构建文件
小提示
spark-core包被标记为provided,以防它跟着我们的应用程序被打包进assembly JAR中。第七章会讲更多相关细节。
一旦我们的构建文件定义好了,我们能很容易地打包我们的应用程序,然后用bin/spark-submit脚本运行它们。spark-submit需要设置一些Spark需要用到的环境变量。在mini-complete-example目录下,我们能构建Java(例2-14)和Scala(例2-15)程序。
例2-14. Scala 构建和运行
sbt clean package
$SPARK_HOME/bin/spark-submit \
—class com.oreilly.learningsparkexamples.mini.scala.WordCount \
./target/…(as above) \
./README.md ./wordcounts
例2-15. Maven 构建和运行
mvn clean && mvn compile && mvn package
$SPARK_HOME/bin/spark-submit \
—class com.oreilly.learningsparkexamples.mini.java.WordCount \
./target/learning-spark-mini-example-0.0.1.jar \
./README.md ./wordcounts
关于连接应用程序到集群的更详细的示例,请参考Spark官方文档Quick Start
Guide。第七章会将打包应用程序的更多细节。