概述
Spark 应用由driver program 组成,driver program运行用户的主函数,在集群内并行执行各种操作
主要抽象RDD:
spark提供RDD,是贯穿整个集群中所有节点的分区元素的集合,能够被并行操作。
RDDS来源:
1.Hadoop文件系统或支持Hadoop的文件系统中操作一个文件
2.driver program中已存在的scala集合
3.从另一个RDD转换得到
主要抽象shared variables共享变量:
共享变量也可以被并行操作
默认的,当spark将一个运行的函数作为在不同的节点上的一系列的tasks集合时,共享变量传输函数用到的所有变量。
变量分享的范围:tasks内/tasks之间和整个driver program
spark支持两种类型的共享变量:
broadcast变量:被用来在所有节点的内存缓存值
accumulators(蓄能器):which are variables that are only “added” to, such as
counters and sums.
初始化spark
spark程序做的第一件事情就是:创建一个SparkContext对象(告诉spark怎么连接到集群)
为了创建SparkContext,需要先build一个SparkConf对象(包含应用相关的信息)
注意:一个JVM中只能有一个SparkContext处于aitve,想创建新的必须先停止旧的
val
conf
=
new
SparkConf
().
setAppName
(
appName
).
setMaster
(
master
)
new
SparkContext
(
conf
)
appName用来指定你的应用的名字,展示在集群UI中
master是Spark,Mesos,YARN cluster URL 或 一个指定的”local”字符串以本地模式运行
实际在集群中运行时,不需要在程序中指定这些,而是用spark-submit。
当然,对于本地测试和单元测试,能在spark运行中通过程序设置”local”
并行化集合
在driver program中,存在sc后,可利用SparkContext的paralleliz方法创建Parallelized集合,集合内的元素被复制去形成一个分布式的数据集(能被并行操作的)。如下创建并行化的集合(包括1-5)
val
data
=
Array
(
1
,
2
,
3
,
4
,
5
)
val
distData
=
sc
.
parallelize
(
data
)
并行化集合的一个重要的参数是:将数据集切分成分区的个数。spark将为集群中每一个分区运行一个task。通常,集群中每一个CPU会有2-4个分区。正常情况下,分区数是由spark根据集群情况自动设置的。当然也可以重载parallelize()手动设置。如:sc.parallelize(data, 10)
外部数据集
text files, SequenceFiles, and any other Hadoop InputFormat.