Spark1.0.x入门指南

Spark1.0.x入门指南

1 节点说明


IP


Role


192.168.1.111


ActiveNameNode


192.168.1.112


StandbyNameNode,Master,Worker


192.168.1.113


DataNode,Master,Worker


192.168.1.114


DataNode,Worker

HDFS集群和Spark集群之间节点共用。

2 安装HDFS

见HDFS2.X和Hive的安装部署文档:http://www.cnblogs.com/Scott007/p/3614960.html

3 Spark部署

Spark常用的安装部署模式有Spark On YarnStandalone,可以同时使用。

3.1 Spark on Yarn

这种模式,借助Yarn资源分配的功能,使用Spark客户端来向Yarn提交任务运行。只需将Spark的部署包放置到Yarn集群的某个节点上即可(或者是Yarn的客户端,能读取到Yarn集群的配置文件即可)。Spark本身的Worker节点、Master节点不需要启动。

但是,Spark的部署包须是基于对应的Yarn版本正确编译后的,否则会出现Spark和Yarn的兼容性问题。

on Yarn的两种运行方式,其运行结束后的日志不能在Yarn的Application管理界面看到,目前只能在客户端通过:

yarn logs -applicationId <applicationId>

命令查看每个Application的日志。

3.1.1 配置

部署这种模式,需要修改conf目录下的spark-env.sh文件。在其中新增如下配置选项:

export HADOOP_HOME= /home/hadoop/hadoop-2.0.0-cdh4.5.0

export HADOOP_CONF_DIR= $HADOOP_HOME/etc/hadoop

SPARK_EXECUTOR_INSTANCES=2

SPARK_EXECUTOR_CORES=1

SPARK_EXECUTOR_MEMORY=400M

SPARK_DRIVER_MEMORY=400M

SPARK_YARN_APP_NAME="Spark 1.0.0"

其中:

(1) HADOOP_HOME:当前节点中HDFS的部署路径,因为Spark需要和HDFS中的节点在一起;

(2) HADOOP_CONF_DIR:HDFS节点中的conf配置文件路径,正常情况下此目录为$HADOOP_HOME/etc/hadoop;

(3) SPARK_EXECUTOR_INSTANCES:在Yarn集群中启动的Worker的数目,默认为2个;

(4) SPARK_EXECUTOR_CORES:每个Worker所占用的CPU核的数目;

(5) SPARK_EXECUTOR_MEMORY:每个Worker所占用的内存大小;

(6) SPARK_DRIVER_MEMORY:Spark应用程序Application所占的内存大小,这里的Driver对应Yarn中的ApplicationMaster;

(7) SPARK_YARN_APP_NAME:Spark Application在Yarn中的名字;

配置完成后,将Spark部署文件放置到Yarn的节点中即可。这里,将spark-1.0.0整个目录放到Yarn集群的一个节点192.168.1.112的/home/hadoop(设为spark的安装路径的父目录)路径下。

3.1.2 测试

在Spark的部署路径的bin路径下,执行spark-submit脚本来运行spark-examples包中的例子。执行如下:

./bin/spark-submit --master yarn 
--class org.apache.spark.examples.JavaWordCount 
--executor-memory 400M 
--driver-memory 400M 
/home/hadoop/spark-1.0.0/examples/target/scala-2.10/spark-examples-1.0.0-hadoop2.0.0-cdh4.5.0.jar ./hdfs-site.xml

这个例子是计算WordCount的,例子被打包在/home/hadoop/spark-1.0.0/examples/target/scala-2.10/spark-examples-1.0.0-hadoop2.0.0-cdh4.5.0.jar包中,对应的Class为org.apache.spark.examples.JavaWordCount,./hdfs-site.xml是HDFS中指定路径下的一个文件,WordCount就是针对它来做的。而--master yarn就是指定运行在Yarn集群中,以yarn模式运行。

Spark On Yarn有两种运行模式,一种是Yarn Cluster方式,一种是Yarn Client方式。

(1) Yarn Cluster: Spark Driver程序将作为一个ApplicationMaster在YARN集群中先启动,然后再由ApplicationMaster向RM申请资源启动 executor以运行Task。因为Driver程序在Yarn中运行,所以程序的运行结果不能在客户端显示,所以最好将结果保存在HDFS上,客户端 的终端显示的是作为Yarn的job的运行情况。

(2) Yarn Client: Spark Driver程序在客户端上运行,然后向Yarn申请运行exeutor以运行Task,本地程序负责最后的结果汇总等。客户端的Driver将应用提交 给Yarn后,Yarn会先后启动ApplicationMaster和executor,另外ApplicationMaster和executor都 是装载在container里运行,container默认的内存是1G,ApplicationMaster分配的内存是driver- memory,executor分配的内存是executor-memory。同时,因为Driver在客户端,所以程序的运行结果可以在客户端显 示,Driver以进程名为SparkSubmit的形式存在。

上面命令中的提交方式“yarn”就是默认按照“Yarn Client”方式运行。用户可自定义运行方式,通过“--master”指定程序以yarn、yarn-cluster或者yarn-client中的一种方式运行。

需要重点说明的是最后文件的路径,是相当于HDFS中的/user/hadoop而言,hadoop是当前命令的用户。“./hdfs-site.xml”在HDFS中的全路径为“hdfs://namespace/user/hadoop/hdfs-site.xml”,其中hadoop是当前的用户,namespace是HDFS的命名空间;如果写成“/hdfs-site.xml”则在HDFS中指的是“hdfs://namespace/hdfs-site.xml”;当然也可以直接传入“hdfs://namespace/user/hadoop/hdfs-site.xml”用于指定在HDFS中的要进行WordCount计算的文件。

另外,Spark应用程序需要的CPU Core数目和内存,需要根据当前Yarn的NodeManager的硬件条件相应设置,不能超过NodeManager的硬件条件。

./bin/spark-submit --master yarn 
--class org.apache.spark.examples.JavaWordCount 
--executor-memory 400M 
--driver-memory 400M 
/home/hadoop/spark-1.0.0/examples/target/scala-2.10/spark-examples-1.0.0-hadoop2.0.0-cdh4.5.0.jar hdfs://namespace/user/hadoop/hdfs-site.xml

在Yarn的ResourceManager对应的Web界面中查看启动的Application。

Running:

Success:

同时可以在启动脚本的客户端看到WordCount的运行结果:

3.2 Spark Standalone

这种模式,就是把Spark单独作为一个集群来进行部署。集群中有两种节点,一种是Master,另一种是Worker节点。Master负责分配任务给Worker节点来执行,并负责最后的结果合并,Worker节点负责具体的任务执行。

3.2.1 配置

所需修改的配置文件除了spark-env.sh文件以外,还有slave文件,都位于conf目录中。

slave文件中保存的是worker节点host或者IP,此处的配置为:

192.168.1.112

192.168.1.113

192.168.1.114

至于spark-env.sh文件,可以配置如下属性:

(1) SPARK_MASTER_PORT:Master服务端口,默认为7077;

(2) SPARK_WORKER_CORES:每个Worker进程所需要的CPU核的数目;

(3) SPARK_WORKER_MEMORY:每个Worker进程所需要的内存大小;

(4) SPARK_WORKER_INSTANCES:每个Worker节点上运行Worker进程的数目;

(5) SPARK_MASTER_WEBUI_PORT:Master节点对应Web服务的端口;

(6)export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=192.168.1.117:2181,192.168.1.118:2181,192.168.1.119:2181 -Dspark.deploy.zookeeper.dir=/spark":用于指定Master的HA,依赖于zookeeper集群;

(7) export SPARK_JAVA_OPTS="-Dspark.cores.max=4":用于限定每个提交的Spark Application的使用的CPU核的数目,因为缺省情况下提交的Application会使用所有集群中剩余的CPU Core

注意在Worker进程的CPU个数和内存大小的时候,要结合机器的实际硬件条件,如果一个Worker节点上的所有Worker进程需要的CPU总数目或者内存大小超过当前Worker节点的硬件条件,则Worker进程会启动失败

将配置好的Spark文件拷贝至每个Spark集群的节点上的相同路径中。为方便使用spark-shell,可以在环境变量中配置上SPARK_HOME

3.2.2 启动

配置结束后,就该启动集群了。这里使用Master的HA方式,选取192.168.1.112、192.168.1.113节点作为Master,192.168.1.112、192.168.1.113、192.168.1.114节点上运行两个Worker进程。

首先在192.168.1.113节点上做此操作:

启动之后,可以查看当前节点的进程:

另外,为了保证Master的HA,在192.168.1.112节点上只启动Master

192.168.1.112节点的进程为:

启动过后,通过Web页面查看集群的情况,这里访问的是:

http://192.168.1.113:8090/

再看standby节点192.168.1.112的web界面http://192.168.1.112:8090/

3.2.3 测试

Spark的bin子目录中的spark-submit脚本是用于提交程序到集群中运行的工具,我们使用此工具做一个关于pi的计算。命令如下:

./bin/spark-submit --master spark://spark113:7077 \--class org.apache.spark.examples.SparkPi \--name Spark-Pi --executor-memory 400M \--driver-memory 512M \/home/hadoop/spark-1.0.0/examples/target/scala-2.10/spark-examples-1.0.0-hadoop2.0.0-cdh4.5.0.jar

其中--master参数用于指定Master节点的URI,但是这里填的是Host,不是IP

任务启动之后,在Spark的Master的Web界面可以看到运行中的Application。

任务运行结束之后,在Web界面中Completed Applications表格中会看到对应的结果。

同时,命令行中会打印出来运行的结果,如下所示:

4 spark-submit工具

上面测试程序的提交都是使用的spark-submit脚本,其位于$SPARK_HOME/bin目录中,执行时需要传入的参数说明如下:

Usage: spark-submit [options] <app jar | python file> [app options]


参数名称


含义


--master MASTER_URL


可以是spark://host:port, mesos://host:port, yarn, yarn-cluster,yarn-client, local


--deploy-mode DEPLOY_MODE


Driver程序运行的地方,client或者cluster


--class CLASS_NAME


主类名称,含包名


--name NAME


Application名称


--jars JARS


Driver依赖的第三方jar包


--py-files PY_FILES


用逗号隔开的放置在Python应用程序PYTHONPATH上的.zip, .egg, .py文件列表


--files FILES


用逗号隔开的要放置在每个executor工作目录的文件列表


--properties-file FILE


设置应用程序属性的文件路径,默认是conf/spark-defaults.conf


--driver-memory MEM


Driver程序使用内存大小


--driver-java-options


--driver-library-path


Driver程序的库路径


--driver-class-path


Driver程序的类路径


--executor-memory MEM


executor内存大小,默认1G


--driver-cores NUM


Driver程序的使用CPU个数,仅限于Spark Alone模式


--supervise


失败后是否重启Driver,仅限于Spark Alone模式


--total-executor-cores NUM


executor使用的总核数,仅限于Spark Alone、Spark on Mesos模式


--executor-cores NUM


每个executor使用的内核数,默认为1,仅限于Spark on Yarn模式


--queue QUEUE_NAME


提交应用程序给哪个YARN的队列,默认是default队列,仅限于Spark on Yarn模式


--num-executors NUM


启动的executor数量,默认是2个,仅限于Spark on Yarn模式


--archives ARCHIVES


仅限于Spark on Yarn模式

另外,在执行spark-submit.sh工具进行提交应用之前,可以使用如下方式提前定义好当前Spark Application所使用的CPU Core数目和内存大小:

SPARK_JAVA_OPTS="-Dspark.cores.max=2 -Dspark.executor.memory=600m" 
./bin/spark-submit --master spark://update113:7077 \--class org.apache.spark.examples.SparkPi 
…

…

5 Spark HistoryServer

类似于Mapreduce的JobHistoryServer,Spark也有一个服务可以保存历史Application的运行记录。

修改$SPARK_HOME/conf下的spark-defaults.conf文件(注意,修改后的配置文件在每个节点都要有),其中可修改的配置属性为:


属性名称


默认值


含义


spark.history.updateInterval


10


以秒为单位,更新日志相关信息的时间间隔


spark.history.retainedApplications


250


保存Application历史记录的个数,如果超过这个值,旧的应用程序信息将被删除


spark.history.ui.port


18080


HistoryServer的web端口


 spark.history.kerberos.enabled


False


是否使用kerberos方式登录访问HistoryServer,对于持久层位于安全集群的HDFS上是有用的,如果设置为true,就要配置下面的两个属性


 spark.history.kerberos.principal


用于HistoryServer的kerberos主体名称


spark.history.kerberos.keytab


用于HistoryServer的kerberos keytab文件位置


spark.history.ui.acls.enable


False


授权用户查看应用程序信息的时候是否检查acl。如果启用,只有应用程序所有者和spark.ui.view.acls指定的用户可以查看应用程序信息;否则,不做任何检查


spark.eventLog.enabled


False


是否记录Spark事件


spark.eventLog.dir


保存日志相关信息的路径,可以是hdfs://开头的HDFS路径,也可以是file://开头的本地路径,都需要提前创建


spark.yarn.historyServer.address


Server端的URL:Ip:port 或者host:port

此处的设置如下:

spark.eventLog.enabled  truespark.eventLog.dir      hdfs://yh/user/hadoop/sparklogsspark.yarn.historyServer.address    update113:18080

设置完文件之后,进入sbin目录启动服务:

运行完成的Application历史记录可以通过访问上面指定的HistoryServer地址查看,这里是http://192.168.1.113:18080/

无论运行时是本地模式,还是yarn-client、yarn-cluster,运行记录均可在此页面查看。

并且程序运行时的环境变量、系统参数、各个阶段的耗时均可在此查看,很强大

6 Spark可配置参数

Spark参数的配置可通过三种方式:SparkConf方式 > 命令行参数方式 >文件配置方式

6.1 应用属性


属性名


默认值


含义


 spark.app.name


应用程序名称


spark.master


要连接的Spark集群Master的URL


spark.executor.memory


512 m


每个executor使用的内存大小


spark.serializer


org.apache.spark

.serializer.JavaSerializer


序列化方式,官方建议使用org.apache.spark.serializer.KryoSerializer,当然也可以任意是定义为org.apache.spark.Serializer子类的序化器


spark.kryo.registrator


如果要使用 Kryo序化器,需要创建一个继承KryoRegistrator的类并设置系统属性spark.kryo.registrator指向该类


 spark.local.dir


/tmp


用于保存map输出文件或者转储RDD。可以多个目录,之间以逗号分隔。在Spark 1.0 及更高版本此属性会被环境变量 SPARK_LOCAL_DIRS (Standalone、Mesos) 或 LOCAL_DIRS (YARN) 代替


spark.logConf


False


SparkContext 启动时是否记录有效 SparkConf信息

6.2 运行环境变量


属性名


默认值


含义


spark.executor.extraJavaOptions


传递给executor的额外JVM 选项,但是不能使用它来设置Spark属性或堆空间大小


spark.executor.extraClassPath


追加到executor类路径中的附加类路径


spark.executor.extraLibraryPath


启动executor JVM 时要用到的特殊库路径


spark.files.userClassPathFirst


False


executor在加载类的时候是否优先使用用户自定义的JAR包,而不是Spark带有的JAR包,目前,该属性只是一项试验功能

6.3 Shuffle操作相关属性


属性名


默认值


含义


spark.shuffle.consolidateFiles


False


如果为true,在shuffle时就合并中间文件,对于有大量Reduce任务的shuffle来说,合并文件可 以提高文件系统性能,如果使用的是ext4 或 xfs 文件系统,建议设置为true;对于ext3,由于文件系统的限制,设置为true反而会使内核>8的机器降低性能


 spark.shuffle.spill


True


如果为true,在shuffle期间通过溢出数据到磁盘来降低了内存使用总量,溢出阈值是由spark.shuffle.memoryFraction指定的


spark.shuffle.spill.compress


True


是否压缩在shuffle期间溢出的数据,如果压缩将使用spark.io.compression.codec。


 spark.shuffle.compress


True


是否压缩map输出文件,压缩将使用spark.io.compression.codec。


spark.shuffle.file.buffer.kb


100


每个shuffle的文件输出流内存缓冲区的大小,以KB为单位。这些缓冲区可以减少磁盘寻道的次数,也减少创建shuffle中间文件时的系统调用


spark.reducer.maxMbInFlight


48


每个reduce任务同时获取map输出的最大大小 (以兆字节为单位)。由于每个map输出都需要一个缓冲区来接收它,这代表着每个 reduce 任务有固定的内存开销,所以要设置小点,除非有很大内存

6.4 SparkUI相关属性


属性名


默认值


含义


spark.ui.port


4040


应用程序webUI的端口


spark.ui.retainedStages


1000


在GC之前保留的stage数量


 spark.ui.killEnabled


True


允许在webUI将stage和相应的job杀死


 spark.eventLog.enabled


False


是否记录Spark事件,用于应用程序在完成后重构webUI


spark.eventLog.compress


False


是否压缩记录Spark事件,前提spark.eventLog.enabled为true


spark.eventLog.dir


file:///tmp/spark-events


如果spark.eventLog.enabled为 true,该属性为记录spark事件的根目录。在此根目录中,Spark为每个应用程序创建分目录,并将应用程序的事件记录到在此目录中。可以将此属性 设置为HDFS目录,以便history server读取历史记录文件

6.5 压缩和序列化相关属性


属性名


默认值


含义


spark.broadcast.compress


True


是否在发送之前压缩广播变量


spark.rdd.compress


False


是否压缩RDD分区


spark.io.compression.codec


org.apache.spark.io.

LZFCompressionCodec


用于压缩内部数据如 RDD分区和shuffle输出的编码解码器, org.apache.spark.io.LZFCompressionCodec和 org.apache.spark.io.SnappyCompressionCodec。其中,Snappy提供更快速的压缩和解压缩,而LZF提供了 更好的压缩比


 spark.io.compression.snappy

.block.size


32768


使用Snappy编码解码器时,编码解码器使用的块大小 (以字节为单位)


 spark.closure.serializer


org.apache.spark.serializer.

JavaSerializer


用于闭包的序化器,目前只有支持Java序化器


spark.serializer.
 objectStreamReset


10000


org.apache.spark.serializer.JavaSerializer序列化时,会缓存对象以防 止写入冗余数据,此时会停止这些对象的垃圾收集。通过调用重置序化器,刷新该信息就可以收集旧对象。若要关闭这重定期重置功能将其设置为< = 0 。默认情况下每10000个对象将重置序化器


spark.kryo.referenceTracking


True


当使用Kryo序化数据时,是否跟踪对同一对象的引用。如果你的对象图有回路或者同一对象有多个副本,有必要设置为true;其他情况下可以禁用以提高性能


 spark.kryoserializer.buffer.mb


2


在Kryo 里允许的最大对象大小(Kryo会创建一个缓冲区,至少和序化的最大单个对象一样大)。每个worker的每个core只有一个缓冲区

6.6 执行时相关属性


属性名


默认值


含义


spark.default.parallelism


本地模式:机器核数

Mesos:8

其他:max(executor的core,2)


如果用户不设置,系统使用集群中运行shuffle操作的默认任务数(groupByKey、 reduceByKey等)


spark.broadcast.factory


org.apache.spark.broadcast.

HttpBroadcastFactory


广播的实现类


spark.broadcast.blockSize


4096


TorrentBroadcastFactory块大小(以kb为单位)。过大会降低广播速度;过小会使印象BlockManager性能


spark.files.overwrite


Fale


通过 SparkContext.addFile() 添加的文件在目标中已经存在并且内容不匹配时,是否覆盖目标文件


spark.files.fetchTimeout


False


在获取由driver通过SparkContext.addFile() 添加的文件时,是否使用通信时间超时


spark.storage.memoryFraction


0.6


Java堆用于cache的比例


 spark.tachyonStore.baseDir


System.getProperty("java.io.tmpdir")


用于存储RDD的techyon目录,tachyon文件系统的URL由spark.tachyonStore.url设置,也可以是逗号分隔的多个techyon目录


spark.storage.

memoryMapThreshold


8192


以字节为单位的块大小,用于磁盘读取一个块大小时进行内存映射。这可以防止Spark在内存映射时使用很小块,一般情况下,对块进行内存映射的开销接近或低于操作系统的页大小


spark.tachyonStore.url


tachyon://localhost:19998


基于techyon文件的URL


spark.cleaner.ttl


spark记录任何元数据(stages生成、task生成等)的持续时间。定期清理可以确保将超期的元数据丢弃,这在运行长时间任务是很有用的,如运行7*24的sparkstreaming任务。RDD持久化在内存中的超期数据也会被清理

6.7 网络相关属性


属性名


默认值


含义


spark.driver.host


运行driver的主机名或 IP 地址


spark.driver.port


随机


driver侦听的端口


spark.akka.frameSize


10


以MB为单位的driver和executor之间通信信息的大小,设置值越大,driver可以接受更大的计算结果


spark.akka.threads


4


用于通信的actor线程数,在大型集群中拥有更多CPU内核的driver可以增加actor线程数


spark.akka.timeout


100


以秒为单位的Spark节点之间超时时间


spark.akka.heartbeat.pauses


600


下面3个参数是用于设置Akka自带的故障探测器。启用的话,以秒为单位设置如下这三个参数,有助于对恶意的executor的定位,而对于由于GC暂停或网络滞后引起的情况下,不需要开启故障探测器;另外故障探测器的开启会导致由于心跳信息的频繁交换而引起的网络泛滥。

本参数是设置可接受的心跳停顿时间


spark.akka.failure-detector.threshold


300.0


对应Akka的akka.remote.transport-failure-detector.threshold


spark.akka.heartbeat.interval 


1000


心跳间隔时间

6.8 调度相关属性


属性名


默认值


含义


spark.task.cpus


1


为每个任务分配的内核数


spark.task.maxFailures


4


Task的最大重试次数


spark.scheduler.mode


FIFO


Spark的任务调度模式,还有一种Fair模式


spark.cores.max


当应用程序运行在Standalone集群或者粗粒度共享模式Mesos集群时,应用程序向集群请求的最大CPU内 核总数(不是指每台机器,而是整个集群)。如果不设置,对于Standalone集群将使用spark.deploy.defaultCores中数值, 而Mesos将使用集群中可用的内核


spark.mesos.coarse


False


如果设置为true,在Mesos集群中运行时使用粗粒度共享模式


 spark.speculation


False


以下几个参数是关于Spark推测执行机制的相关参数。此参数设定是否使用推测执行机制,如果设置为true则spark使用推测执行机制,对于Stage中拖后腿的Task在其他节点中重新启动,并将最先完成的Task的计算结果最为最终结果


spark.speculation.interval 


100


Spark多长时间进行检查task运行状态用以推测,以毫秒为单位


 spark.speculation.quantile


0.75


推测启动前,Stage必须要完成总Task的百分比


spark.speculation.multiplier


1.5


比已完成Task的运行速度中位数慢多少倍才启用推测


 spark.locality.wait


3000


以下几个参数是关于Spark数据本地性的。本参数是以毫秒为单位启动本地数据task的等待时间,如果超出就启动 下一本地优先级别的task。该设置同样可以应用到各优先级别的本地性之间(本地进程 -> 本地节点 -> 本地机架 -> 任意节点 ),当然,也可以通过spark.locality.wait.node等参数设置不同优先级别的本地性


 spark.locality.wait.process


spark.locality.wait


本地进程级别的本地等待时间


spark.locality.wait.node


spark.locality.wait


本地节点级别的本地等待时间


spark.locality.wait.rack


spark.locality.wait


本地机架级别的本地等待时间


spark.scheduler.revive.interval


1000


复活重新获取资源的Task的最长时间间隔(毫秒),发生在Task因为本地资源不足而将资源分配给其他Task运行后进入等待时间,如果这个等待时间内重新获取足够的资源就继续计算

6.9 安全相关属性


属性名


默认值


含义


spark.authenticate


False


是否启用内部身份验证


spark.authenticate.secret


设置组件之间进行身份验证的密钥。如果不是YARN上运行并且spark.authenticate为true时,需要设置密钥


spark.core.connection. auth.wait.timeout


30


进行身份认证的超时时间


spark.ui.filters


Spark web UI 要使用的以逗号分隔的筛选器名称列表。筛选器要符合javax  servlet Filter标准,每个筛选器的参数可以通过设置java系统属性来指定:

spark.<class name of  filter>.params=‘param1=value1,param2=value2‘

例如:

-Dspark.ui.filters=com.test.filter1

-Dspark.com.test.filter1.params=‘param1=foo,param2=testing‘


 spark.ui.acls.enable


False


Spark webUI存取权限是否启用。如果启用,在用户浏览web界面的时候会检查用户是否有访问权限


spark.ui.view.acls


以逗号分隔Spark webUI访问用户的列表。默认情况下只有启动Spark job的用户才有访问权限

6.10 SparkStreaming相关属性


属性名


默认值


含义


spark.streaming.blockInterval


200


Spark Streaming接收器将接收数据合并成数据块并存储在Spark里的时间间隔,毫秒


spark.streaming.unpersist


True


如果设置为true,强迫将SparkStreaming持久化的RDD数据从Spark内存中清理,同样 的,SparkStreaming接收的原始输入数据也会自动被清理;如果设置为false,则允许原始输入数据和持久化的RDD数据可被外部的 Streaming应用程序访问,因为这些数据不会自动清理

6.11 Standalone模式特有属性

可以在文件conf/spark-env.sh中来设置此模式的特有相关属性:

(1)SPARK_MASTER_OPTS:配置master使用的属性

(2)SPARK_WORKER_OPTS:配置worker使用的属性

(3)SPARK_DAEMON_JAVA_OPTS:配置master和work都使用的属性

配置的时候,使用类似的语句:

export SPARK_MASTER_OPTS="-Dx1=y1 -Dx2=y2"

其中x代表属性,y代表属性值。

SPARK_MASTER_OPTS所支持的属性有:


属性名


默认值


含义


spark.deploy.spreadOut


True


Standalone集群管理器是否自由选择节点还是固定到尽可能少的节点,前者会有更好的数据本地性,后者对于计算密集型工作负载更有效


spark.worker.timeout


60


master因为没有收到心跳信息而认为worker丢失的时间(秒)


spark.deploy.defaultCores


如果没有设置spark.cores.max,该参数设置Standalone集群分配给应用程序的最大内核数,如果不设置,应用程序获取所有的有效内核。注意在一个共享的集群中,设置一个低值防止攫取了所有的内核,影响他人的使用

SPARK_WORKER_OPTS所支持的属性有


属性名


默认值


含义


spark.worker.cleanup.enabled


False


是否定期清理worker的应用程序工作目录,只适用于Standalone模式,清理的时候将无视应用程序是否在运行


 spark.worker.cleanup.interval


1800


清理worker本地过期的应用程序工作目录的时间间隔(秒)


spark.worker.cleanup.appDataTtl 


7*24*3600


worker保留应用程序工作目录的有效时间。该时间由磁盘空间、应用程序日志、应用程序的jar包以及应用程序的提交频率来设定

SPARK_DAEMON_JAVA_OPTS所支持的属性有:


属性名


含义


spark.deploy.recoveryMode


下面3个参数是用于配置zookeeper模式的master HA。设置为ZOOKEEPER表示启用master备用恢复模式,默认为NONE


spark.deploy.zookeeper.url


zookeeper集群URL


 spark.deploy.zookeeper.dir


zooKeeper保存恢复状态的目录,缺省为/spark


spark.deploy.recoveryMode


设成FILESYSTEM启用master单节点恢复模式,缺省值为NONE


spark.deploy.recoveryDirectory


Spark保存恢复状态的目录

6.12 Spark on Yarn特有属性


属性名


默认值


含义


spark.yarn.applicationMaster.waitTries


10


RM等待Spark AppMaster启动重试次数,也就是SparkContext初始化次数。超过这个数值,启动失败


spark.yarn.submit.file.replication


3


应用程序上传到HDFS的文件的副本数


spark.yarn.preserve.staging.files


False


若为true,在job结束后,将stage相关的文件保留而不是删除


spark.yarn.scheduler.heartbeat.interval-ms


5000


Spark AppMaster发送心跳信息给YARN RM的时间间隔


spark.yarn.max.executor.failures


2倍于executor数


导致应用程序宣告失败的最大executor失败次数


spark.yarn.historyServer.address


Spark history server的地址(不要加http://)。这个地址会在Spark应用程序完成后提交给YARN RM,然后RM将信息从RM UI写到history server UI上。

7 示例配置

主要的配置文件均位于$SPARK_HOME/conf中,包括slave、spark-env.sh、spark-defaults.conf文件等。

7.1 slave文件

192.168.1.112192.168.1.113192.168.1.114

7.2 spark-env.sh文件

export JAVA_HOME="/export/servers/jdk1.6.0_25" #yarn

export HADOOP_HOME=/home/hadoop/hadoop-2.0.0-cdh4.5.0export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop

SPARK_EXECUTOR_INSTANCES=2SPARK_EXECUTOR_CORES=1SPARK_EXECUTOR_MEMORY=400M

SPARK_DRIVER_MEMORY=400M

SPARK_YARN_APP_NAME="Spark 1.0.0"#alone

SPARK_MASTER_WEBUI_PORT=8090SPARK_WORKER_MEMORY=400M

SPARK_WORKER_CORES=1SPARK_WORKER_INSTANCES=2#Master HA

export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=192.168.1.117:2181,192.168.1.118:2181,192.168.1.119:2181 -Dspark.deploy.zookeeper.dir=/spark"

7.3 spark-defaults.conf文件

#history server

spark.eventLog.enabled  truespark.eventLog.dir      hdfs://namespace/user/hadoop/sparklogsspark.yarn.historyServer.address    spark113:18080#shuffle

spark.shuffle.consolidateFiles true#task

spark.task.cpus 1spark.task.maxFailures 3#scheduler type

spark.scheduler.mode FAIR

#security

park.authenticate truespark.authenticate.secret hadoop

spark.core.connection.auth.wait.timeout 1500spark.ui.acls.enable truespark.ui.view.acls root,hadoop

#each executor used max memory

spark.executor.memory 400m

#spark on yarn

spark.yarn.applicationMaster.waitTries 5spark.yarn.submit.file.replication 3spark.yarn.preserve.staging.files falsespark.yarn.scheduler.heartbeat.interval-ms 5000#park standalone and on mesos

spark.cores.max 4

8 Spark SQL

Spark支持Scala、Python等语言写的脚本直接在Spark环境执行,更重要的是支持对Hive语句进行包装后在Spark上运行。这就是Spark SQL。

8.1 相关配置

配置的步骤比较简单,把Hive的配置文件hive-site.xml直接放置到$SPARK_HOME的conf路径下即可。如果是想在Spark集群本地执行SQL的话,每个对应的节点都要做同样的配置。

8.2 运行SQL

启动bin目录下的spark-shell脚本,依次执行如下语句:

val sc: SparkContext

val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)

import hiveContext._

hql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")

hql("LOAD DATA LOCAL INPATH ‘/examples /data.txt‘ INTO TABLE src")

hql("FROM src SELECT key, value").collect().foreach(println)

上面的命令,分别是声明SparkContext对象,利用hql方法执行Hive的SQL语句,在执行SQL语句的过程中,可以通过Hive的Cli客户端进行查看相应操作的结果。

8.3 on yarn模式

由于spark-shell脚本是在本地执行的,如果想放到Yarn上去执行的话,可以使用上面第4节中的spark-submit工具,这时候需要对需要输入的sql语句进行包装,将包装类打包成jar文件,再提交。

包装类的代码如下:

 1 package spark; 2  3 import java.util.List; 4  5 import org.apache.spark.SparkConf; 6 import org.apache.spark.api.java.JavaSparkContext; 7 import org.apache.spark.sql.api.java.Row; 8 import org.apache.spark.sql.hive.api.java.JavaHiveContext; 9 10 /**11  * Description:12  * Author: [email protected]  * Date: 2014/7/1514  */15 public class SparkSQL {16 17     public static void main(String[] args) {18         if(args.length != 2){19             System.out.println("usage: <applicationName> <sql statments>");20             System.exit(1);21         }22 23         String applicationName = args[0];24         String sql = args[1];25 26         SparkConf conf = new SparkConf().setAppName(applicationName);27         JavaSparkContext sc = new JavaSparkContext(conf);28         JavaHiveContext hiveContext = new JavaHiveContext(sc);29         List<Row> results = hiveContext.hql(sql).collect();30 31         System.out.println("Sql is:" + sql + ", has been executed over.");32         System.out.println("The result size is " + results.size() + ", they are:");33         for(int i=0; i<results.size(); i++){34             System.out.println(results.get(i).toString());35         }36 37         System.out.println("Execute over ...");38         sc.stop();39         System.out.println("Stop over ...");40     }41 42 }

将其打包成jar文件spark-0.0.1-SNAPSHOT.jar,再使用spark-submit工具进行任务的提交,命令如下:

./spark-submit \--class spark.SparkSQL \--master yarn-cluster \--num-executors 3 \--driver-memory 400m --executor-memory 400m --executor-cores 1 \--jars /home/hadoop/spark-1.0.0/examples/libs/spark-core_2.10-1.0.0.jar,/home/hadoop/spark-1.0.0/examples/libs/spark-hive_2.10-1.0.0.jar,/home/hadoop/spark-1.0.0/lib_managed/jars/datanucleus-api-jdo-3.2.1.jar,/home/hadoop/spark-1.0.0/lib_managed/jars/datanucleus-core-3.2.2.jar,/home/hadoop/spark-1.0.0/lib_managed/jars/datanucleus-rdbms-3.2.1.jar,/home/hadoop/hive-0.12.0/lib/mysql-connector-java-5.1.27-bin.jar--files /home/hadoop/spark-1.0.0/conf/hive-site.xml \/home/hadoop/spark-1.0.0/examples/libs/spark-0.0.1-SNAPSHOT.jar "hiveTest" "CREATE TABLE IF NOT EXISTS test4 (key INT, value STRING)"

其中,--master参数指定的是yarn-cluster模式,当然也可以使用yarn-client模式,至于区别,已经在上文说了;--class指定的是我们包装类的主类,见上文源码;--jars是依赖的四个jar包;--files是指定的hive-site.xml配置文件,提交到Yarn中的Application在执行的时候,需要把此配置文件分发到每个Executor上;最后的两个参数,一个是Application的名称,一个是运行的SQL语句。

运行结束后,可以到Spark HistoryServer中查看运行结果。

Spark1.0.x入门指南

时间: 2024-10-31 10:06:09

Spark1.0.x入门指南的相关文章

Chapter 0.SymmetricDS快速入门指南( Quick Start Guide)

本文档是SymmetricDS3.6.14文档的第一章节Quick Start Guide文档的翻译,的目的是帮助读者快速搭建一个SymmetricDS集群并普及一些基本概念术语. 本文档描述了如何在两个SymmetricDS节点之间同步两个相同schema的数据库.下面的例子构建了一个分销业务模型,有一个中央数据库(我们叫它root或者corp节点)和多个零售商店的数据库(我们叫它client或者store节点).对于本教程,我们将只有一个store(商店)节点,如下图.如果你愿意,可以再教程

Spark1.0.0部署指南

1 节点说明   IP Role 192.168.1.111 ActiveNameNode 192.168.1.112 StandbyNameNode,Master,Worker 192.168.1.113 DataNode,Master,Worker 192.168.1.114 DataNode,Worker HDFS集群和Spark集群之间节点共用. 2 安装HDFS 见HDFS2.X和Hive的安装部署文档:http://www.cnblogs.com/Scott007/p/3614960

Asp.Net MVC4.0 官方教程 入门指南之五--控制器访问模型数据

Asp.Net MVC4.0 官方教程 入门指南之五--控制器访问模型数据 在这一节中,你将新创建一个新的 MoviesController类,并编写代码,实现获取影片数据和使用视图模板在浏览器中展现影片数据的功能.在进行下步之前,点击“生成应用程序“对应用程序进行编译.右键单击Controllers文件夹,新建一个名为“MoviesController ”的控制器.在创建窗口各选项如下图所示 点击添加,将创建以下文件和文件夹: 项目的 Controllers 文件夹下新增MoviesContr

Asp.Net MVC4.0 官方教程 入门指南之四--添加一个模型

Asp.Net MVC4.0 官方教程 入门指南之四--添加一个模型 在这一节中,你将添加用于管理数据库中电影的类.这些类是ASP.NET MVC应用程序的模型部分. 你将使用.NET Framework框架下的实体框架(Entity Framework)数据访问技术,与模型类协同工作.实体框架(常简称为EF)支持一种称之为编码先行(Code First)的开发模式.编码先行使你通过编写简单的类(简称为POCO类,全称为"plain-old CLR objects."),来创建模型对象

Asp.Net MVC4.0 官方教程 入门指南之三--添加一个视图

Asp.Net MVC4.0 官方教程 入门指南之三--添加一个视图 在本节中,您需要修改HelloWorldController类,从而使用视图模板文件,干净优雅的封装生成返回到客户端浏览器HTML的过程. 您将创建一个视图模板文件,其中使用了ASP.NET MVC 3所引入的Razor视图引擎.Razor视图模板文件使用.cshtml文件扩展名,并提供了一个优雅的方式来使用C#语言创建所要输出的HTML.用Razor编写一个视图模板文件时,将所需的字符和键盘敲击数量降到了最低,并实现了快速,

Asp.Net MVC4.0 官方教程 入门指南之二--添加一个控制器

Asp.Net MVC4.0 官方教程 入门指南之二--添加一个控制器 MVC概念 MVC的含义是 “模型-视图-控制器”.MVC是一个架构良好并且易于测试和易于维护的开发模式.基于MVC模式的应用程序包含: · Models: 表示该应用程序的数据并使用验证逻辑来强制实施业务规则的数据类. · Views: 应用程序动态生成 HTML所使用的模板文件. · Controllers: 处理浏览器的请求,取得数据模型,然后指定要响应浏览器请求的视图模板. 本系列教程,我们将覆盖所有这些概念,并告诉

Quartz.NET简介及入门指南

Quartz.NET简介 Quartz.NET是一个功能完备的开源调度系统,从最小的应用到大规模的企业系统皆可适用. Quartz.NET是一个纯净的用C#语言编写的.NET类库,是对非常流行的JAVA开源调度框架 Quartz 的移植. 入门指南 本入门指南包括以下内容: 下载 Quartz.NET 安装 Quartz.NET 根据你的特定项目配置 Quartz 启动一个样例程序 下载和安装 你可以下载 zip 文件或使用 Nuget 程序包.Nuget 程序包只包含 Quartz.NET 运

Java程序员的Golang入门指南(上)

Java程序员的Golang入门指南 1.序言 Golang作为一门出身名门望族的编程语言新星,像豆瓣的Redis平台Codis.类Evernote的云笔记leanote等. 1.1 为什么要学习 如果有人说X语言比Y语言好,两方的支持者经常会激烈地争吵.如果你是某种语言老手,你就是那门语言的"传道者",下意识地会保护它.无论承认与否,你都已被困在一个隧道里,你看到的完全是局限的.<肖申克的救赎>对此有很好的注脚: [Red] These walls are funny.

Spark1.0.0伪分布安装指南

?一.下载须知 软件准备: spark-1.0.0-bin-hadoop1.tgz   下载地址:spark1.0.0 scala-2.10.4.tgz    下载下载:Scala 2.10.4 hadoop-1.2.1-bin.tar.gz   下载地址:hadoop-1.2.1-bin.tar.gz jdk-7u60-linux-i586.tar.gz  下载地址:去官网下载就行,这个1.7.x都行 二.安装步骤 hadoop-1.2.1安装步骤,请看: http://my.oschina.