既然你已经在shell里运行了你的第一个Spark代码片段,是时候来学习在shell里面编程的更多细节了。
从上层看,每一个Spark程序都是由一个驱动程序组成,这个驱动程序在集群上发布各种各样的平行操作。驱动程序包含你的应用程序的main函数,定义在集群上的分布式数据集,并且将一些操作作用在这些数据集上。在之前的例子中,驱动程序是Spark shell本身,你只需要在里面输入你想要运行的操作就行了。
驱动程序通过一个SparkContext 对象访问Spark,一个SparkContext 对象代表了一个到计算集群的连接。在shell里,有一个自动为你创建的SparkContext 对象,就是sc变量。试着打印出sc变量来看看它的类型,就像例2-3 那样。
例2-3. 检查sc变量
>>> sc
<pyspark.context.SparkContext object at 0x1025b8f90>
一旦你有了一个SparkContext 对象,你就可以用它来创建RDD。在例2-1和2-2 中,我们调用sc.textFile()来创建了一个RDD, 这个RDD代表了一个文件中的文本行的集合。我们可以在这些行上执行不同的操作,例如count()。
(补充示例:
例2-1. Python line count
>>> lines = sc.textFile(“README.md”) # 创建一个叫lines的RDD
>>> lines.count() # 统计这个RDD中元素个数
127
>>> lines.first() # 获得这个RDD中的第一个元素
u’# Apache Spark’
例2-2. Scala line count
scala> val lines = sc.textFile(“README.md”) // 创建一个叫lines的RDD
lines: spark.RDD[String] = MappedRDD[…]
scala> lines.count() // 统计这个RDD中元素个数
res0: Long = 127
scala> lines.first() // 获得这个RDD中的第一个元素
res1: String = # Apache Spark
)
为了运行这些操作,驱动程序管理着一定数量的节点,这些节点被叫做executor 。例如,如果我们在一个集群上运行count() 操作,不同机器可能统计文本不同部分的文本行数。因为我们只是在本地运行Spark shell,所以它在单台机器上执行所有任务,但是你可以连接shell到一个集群来并行地分析数据。Figure 2-3显示了Spark如何在一个集群上运行。
Figure 2-3
最后,当传递函数给操作,并在集群上运行它们的时候,有大量Spark’的API可以考虑。例如,我们扩展README 示例,过滤掉那些不包含某个单词的行,比如单词Python ,具体代码显示在例2-4(Python)和例2-5(for Scala)。
例2-4. Python执行过滤的例子
>>> lines = sc.textFile(“README.md”)
>>> pythonLines = lines.filter(lambda line: “Python” in line)
>>> pythonLines.first()
u’## Interactive Python Shell’
例2-5. Scala执行过滤的例子
scala> val lines = sc.textFile(“README.md”) // 创建一个叫lines的RDD
lines: spark.RDD[String] = MappedRDD[…]
scala> val pythonLines = lines.filter(line => line.contains(“Python”))
pythonLines: spark.RDD[String] = FilteredRDD[…]
scala> pythonLines.first()
res0: String = ## Interactive Python Shell
传递函数给Spark
如果你对例lambda和=>这样的语法不熟悉,你也可以先定义函数,然后再把函数名传递给Spark。例如,在Python中:
def hasPython(line):
return “Python” in line
pythonLines = lines.filter(hasPython)
在Java中传递函数给Spark也是可能的,但是在这种情况下,它们被定义为类,这些类实现Function接口。例如:
JavaRDD<String> pythonLines = lines.filter(
new Function<String, Boolean>() {
Boolean call(String line) { return line.contains(“Python”); }
}
);
Java8中引入了看起来和Python和Scala中类似的函数简单写法,被称为lambda表达式。用这种语法实现上面的功能的代码如下:
JavaRDD<String> pythonLines = lines.filter(line -> line.contains(“Python”));
我们将在“Passing Functions to Spark”一节更加深入的讨论这个话题。
后面我们会讲述Spark API更多的细节,Spark这样神奇,只因为像filter这样的函数操作也是在整个集群上并行的执行的。也就是说,Spark自动获得你的函数(比如,line.contains(“Python”)),把它们传送到执行节点上。因此,你可以在单个驱动程序中写代码,然后让代码运行在多个节点上。第三章将更细致的讲解RDD API 。