Spark安装部署| 运行模式

Spark

一种基于内存的快速、通用、可扩展的大数据分析引擎;

内置模块:

Spark Core(封装了rdd、任务调度、内存管理、错误恢复、与存储系统交互); Spark SQL(处理结构化数据)、Spark Streaming(对实时数据进行流式计算) 、 Spark Mlib(机器学习程序库包括分类、回归、聚合、协同过滤等)、Spark GraghX(图计算);独立调度器、Yarn、Mesos

特点:

快( 基于内存、多线程模型(而mapReduce是基于多进程的)、可进行迭代计算(而hadoop需要多个mr串行) )

易用(支持java、scala、python等的API,支持超过80多种算法)、

通用(spark提供了统一解决方案,可用于批处理、交互式查询(spark sql)\ 实时流式处理(spark streaming)\机器学习和图计算,可在同一应用中无缝使用)

兼容性(与其他开源产品的融合,如hadoop的yarn、Mesos、HDFS、Hbase等)

http://spark.apache.org/   文档查看地址 https://spark.apache.org/docs/2.1.1/

集群角色

Master和Workers

1)Master

Spark特有资源调度系统的Leader。掌管着整个集群的资源信息,类似于Yarn框架中的ResourceManager,主要功能:

(1)监听Worker,看Worker是否正常工作;

(2)Master对Worker、Application等的管理(接收worker的注册并管理所有的worker,接收client提交的application,(FIFO)调度等待的application并向worker提交)。

2)Worker

Spark特有资源调度系统的Slave,有多个。每个Slave掌管着所在节点的资源信息,类似于Yarn框架中的NodeManager,主要功能:

(1)通过RegisterWorker注册到Master;

(2)定时发送心跳给Master;

(3)根据master发送的application配置进程环境,并启动StandaloneExecutorBackend(执行Task所需的临时进程)

Driver和Executor

1)Driver(驱动器)

Spark的驱动器是执行开发程序中的main方法的进程。它负责开发人员编写的用来创建SparkContext、创建RDD,以及进行RDD的转化操作和行动操作代码的执行。如果你是用spark shell,那么当你启动Spark shell的时候,系统后台自启了一个Spark驱动器程序,就是在Spark shell中预加载的一个叫作 sc的SparkContext对象。如果驱动器程序终止,那么Spark应用也就结束了。主要负责:

(1)把用户程序转为任务

(2)跟踪Executor的运行状况

(3)为执行器节点调度任务

(4)UI展示应用运行状况

2)Executor(执行器)

Spark Executor是一个工作进程,负责在 Spark 作业中运行任务,任务间相互独立。Spark 应用启动时,Executor节点被同时启动,并且始终伴随着整个 Spark 应用的生命周期而存在。如果有Executor节点发生了故障或崩溃,Spark 应用也可以继续执行,会将出错节点上的任务调度到其他Executor节点上继续运行。主要负责:

(1)负责运行组成 Spark 应用的任务,并将状态信息返回给驱动器进程;

(2)通过自身的块管理器(Block Manager)为用户程序中要求缓存的RDD提供内存式存储。RDD是直接缓存在Executor进程内的,因此任务可以在运行时充分利用缓存数据加速运算。

总结:Master和Worker是Spark的守护进程,即Spark在特定模式下正常运行所必须的进程。Driver和Executor是临时进程,当有具体任务提交到Spark集群才会开启的进程。

1. Local模式-本地单机

Linux中查看有多少核数:
[[email protected] ~]$ cat /proc/cpuinfo
...
[[email protected] ~]$ cat /proc/cpuinfo | grep ‘processor‘ | wc -l
8

在一台计算机,可以设置Master; (提交任务时需要指定--master)Local模式又分为:
Local所有计算都运行在一个线程中(单节点单线程),没有任何并行计算;
Local[K] ,如local[4]即运行4个Worker线程(单机也可以并行有多个线程),可指定几个线程来运行计算,通常CPU有几个Core就执行几个线程,最大化利用cpu的计算能力;
Local[*], 直接帮你安装Cpu最多Cores来设置线程数;

bin/spark-submit \  //提供任务的命令
--class org.apache.spark.examples.SparkPi \  //指定运行jar的主类
--master //它有默认值是local[*] =>spark://host:port, mesos://host:port, yarn, or local.
--executor-memory 1G \ //指定每个executor可用内存
--total-executor-cores 2 \ 指定executor总核数
./examples/jars/spark-examples_2.11-2.1.1.jar \  \\jar包
100   //main方法中的args参数

./bin/spark-submit 回车可查看所有的参数
[[email protected] spark-local]$ bin/spark-shell
Spark context Web UI available at http://192.168.1.101:4040
Spark context available as ‘sc‘ (master = local[*], app id = local-1554255531204). ##spark core的入口sc
Spark session available as ‘spark‘.  ##它是spark sql程序的入口
再起一个spark-shell会报错:
    spark sql也有一个默认的元数据也是存在derby数据库里边
 Failed to start database ‘metastore_db‘ with class loader org.apache.spark.sql.hive.client.IsolatedClientLoader$$anon$1@63e5b8aa, see the next exception for details.
 Caused by: org.apache.derby.iapi.error.StandardException: Another instance of Derby may have already booted the datab

 查看页面:hadoop101:4040 

scala> sc.textFile("./wc.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect
res0: Array[(String, Int)] = Array((Hello,3), (smile,1), (java,2), (world,1), (kris,1))

提交任务(或者开启spark-shell)的时候会有driver和executor进程,Local模式下它被封装到了SparkSubmit中

提交任务分析

driver和executor是干活的;

① Client提交任务--->②起一个Driver ---> ③注册应用程序,申请资源--资源管理者有 (Master(Standalone模式)ResourceManage(yarn模式))----->④拿到资源后去其他节点启动Executor----> ⑤Executor会反向注册给Driver汇报;

⑥(把提交的jar包做任务切分,把任务发给具体执行的节点Executor)--->Driver会进行初始化sc、任务划分、任务调度 <===>Executor具体执行任务(负责具体执行任务、textFile、flatMap、map...)

⑦ Driver把任务发到Executor不一定会执行,有可能资源cpu或内存不够了或者executor挂了,spark会有一个容错机制,某一个挂了可转移到其他的Executor;

最后任务跑完了,Driver会向资源管理者申请注销(Executor也会注销)

数据流程

textFile("input"):读取本地文件input文件夹数据;

flatMap(_.split(" ")):压平操作,按照空格分割符将一行数据映射成一个个单词;

map((_,1)):对每一个元素操作,将单词映射为元组;

reduceByKey(_+_):按照key将值进行聚合,相加;

collect:将数据收集到Driver端展示。

2. Standalone模式--完全分布式

概述

构建一个由Master+Slave构成的Spark集群,Spark运行在集群中;它的调度器是其实就是Master

提交任务时需要有一个客户端Client,Master和Worker是守护进程它们是资源管理系统,提交任务(运行spark-shell或者spark-submit)之前它们就已经启动了;

①提交--->起Driver就是初始化SparkContext,然后启动Executor时需要资源;②向Master申请资源(即注册),启动ExecutorBackend

启动Executor---->反向注册给Driver汇报信息;

③ Driver划分切分任务把Task发送给Executor,如果Executor会有一个容错机制,Executor运行时会给Driver发送报告Task运行状态直至结束;

④最后任务运行完之后driver向master申请注销,Executor也会注销掉; 

不一定非要在Client中起Driver(SparkContext),cluster模式,具体在哪个节点起sc由Master决定,随机的在worker节点上选择一个一个;

Driver在哪个节点起的原因:driver和executor之间是有通讯,每个 executor都要向driver汇报信息,互相通讯(消耗内存、资源+cpu数); 所有的executor节点都去跟driver做通讯,客户端的压力就会特别大;

Client是本地调试用,输入之后马上能看到输入的结果;

bin/spark-submit --class org.apache.spark.examples.SparkPi --master spark://hadoop101:7077 \
--deploy-mode cluster --executor-memory 1G --total-executor-cores 2 ./examples/jars/spark-examples_2.11-2.1.1.jar 100

  1)修改slave文件,添加work节点:

[[email protected] conf]$ vim slaves
hadoop101
hadoop102
hadoop103

  2)修改spark-env.sh文件,添加如下配置:

在高可用集群需把下面内容这给注释掉:

#SPARK_MASTER_HOST=hadoop101

#SPARK_MASTER_PORT=7077

[[email protected] conf]$ vim spark-env.sh
export JAVA_HOME=/opt/module/jdk1.8.0_144  ##如果遇到JAVA_HOME not set异常时可配置
SPARK_MASTER_HOST=hadoop101
SPARK_MASTER_PORT=7077
#配置历史服务
export SPARK_HISTORY_OPTS="-Dspark.history.ui.port=18080
-Dspark.history.retainedApplications=30
-Dspark.history.fs.logDirectory=hdfs://hadoop101:9000/directory"
#配置高可用
export SPARK_DAEMON_JAVA_OPTS="
-Dspark.deploy.recoveryMode=ZOOKEEPER
-Dspark.deploy.zookeeper.url=hadoop101,hadoop102,hadoop103
-Dspark.deploy.zookeeper.dir=/spark"

  3)修改spark-default.conf文件,开启Log:

[[email protected] conf]$ vi spark-defaults.conf
spark.eventLog.enabled           true
spark.eventLog.dir               hdfs://hadoop101:9000/directory

  注意:HDFS上的目录需要提前存在。  hadoop fs -mkdir /directory

4) 分发spark包
[[email protected] module]$ xsync spark/spark-standalone

   5)启动

[[email protected] spark]$ sbin/start-all.sh
网页查看Master:hadoop101:8080 可看到Status:ALIVE;Memory in use 等信息

高可用集群的启动,要先启动zookeeper;
在hadoop102上(也可以是其他节点)单独启动master节点
[[email protected] spark]$ sbin/start-master.sh

启动历史服务之前要先启动 start-dfs.shsbin/start-history-server.sh   --->HistoryServer

查看历史服务hadoop101:18080

官方求PI案例  

[[email protected] spark]$ bin/spark-submit --class org.apache.spark.examples.SparkPi --master spark://hadoop101:7077 \
--executor-memory 1G --total-executor-cores 2 ./examples/jars/spark-examples_2.11-2.1.1.jar 100
启动spark shell
/opt/module/spark/bin/spark-shell --master spark://hadoop101:7077 \
--executor-memory 1g --total-executor-cores 2
只要提交了任务就可以看到driver和executor,driver被封装在了SparkSubmit里边;CoarseGrainedExecutorBackend就是启动的executor
提交任务提交给哪个executor都是有可能的
执行WordCount程序
scala>sc.textFile("input").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect
res0: Array[(String, Int)] = Array((hadoop,6), (oozie,3), (spark,3), (hive,3), (atguigu,3), (hbase,6))
[[email protected] conf]$ jpsall
-------hadoop101-------
6037 Master
7029 Jps
6151 Worker
6747 CoarseGrainedExecutorBackend
6652 SparkSubmit
-------hadoop102-------
5616 Jps
3146 Worker
-------hadoop103-------
3104 Worker
5713 Jps
3369 CoarseGrainedExecutorBackend

spark-shell的 spark HA集群访问

/opt/module/spark/bin/spark-shell --master spark://hadoop101:7077,hadoop102:7077 \
--executor-memory 1g --total-executor-cores 2

 把其中ACTIVE状态节点的kill掉,另外一个Master的状态将从standby模式--->active状态;

可验证下:

scala>sc.textFile("input").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect
res0: Array[(String, Int)] = Array((hadoop,6), (oozie,3), (spark,3), (hive,3), (atguigu,3), (hbase,6))
提交任务时:
  --deploy-mode DEPLOY_MODE   Whether to launch the driver program locally ("client") or
                              on one of the worker machines inside the cluster ("cluster")
                              (Default: client).
client和cluster的区别:
    SparkContext的位置不同(也就是运行Driver的位置不一样),由Master决定,随机的在其他节点初始化一个sc
Driver和Executor之间会有通信,通信需要消耗资源内存cpu等,所有的executor去和客户端(如果是client模式,Driver是启在Client上的)去通信,
客户端的压力会非常大,如果有大量的executor再加上提交多个任务就启动多个Driver,那么Client单点就挂掉被拖垮;
cluster模式,每次提交任务时的sc的位置分散在不同节点上,分担了压力,

Client本地调试时候用,可以看到输出的结果,如可看到打印的π
 --deploy-mode DEPLOY_MODE   Whether to launch the driver program locally ("client") or
                              on one of the worker machines inside the cluster ("cluster")
                              (Default: client).
 --executor-cores NUM        Number of cores per executor. (Default: 1 in YARN mode,
                              or all available cores on the worker in standalone mode)

bin/spark-submit --class org.apache.spark.examples.SparkPi --master spark://hadoop101:7077 \
--deploy-mode cluster --executor-memory 1G --total-executor-cores 2 \   ##总的是2,默认1个cores/executor--->推导出有2/1个executor;可控制executor的数量;
./examples/jars/spark-examples_2.11-2.1.1.jar 100

cluster模式下,driver叫DriverWrapper

/opt/module/spark/bin/spark-shell --master spark://hadoop101:7077,hadoop102:7077 \
--executor-memory 1g --total-executor-cores 2

3. Yarn模式

概述

Spark客户端直接连接Yarn,不需要额外构建Spark集群。有yarn-client和yarn-cluster两种模式,主要区别在于:Driver程序的运行节点。

yarn-client:Driver程序运行在客户端,适用于交互、调试,希望立即看到app的输出

yarn-cluster:Driver程序运行在由RM(ResourceManager)启动的AM(APPMaster)适用于生产环境。

安装使用

1)修改hadoop配置文件yarn-site.xml,添加如下内容:

[[email protected] hadoop]$ vim yarn-site.xml

        <!--是否启动一个线程检查每个任务正使用的物理内存量,如果任务超出分配值,则直接将其杀掉,默认是true -->
        <property>
                <name>yarn.nodemanager.pmem-check-enabled</name>
                <value>false</value>
        </property>
        <!--是否启动一个线程检查每个任务正使用的虚拟内存量,如果任务超出分配值,则直接将其杀掉,默认是true -->
        <property>
                <name>yarn.nodemanager.vmem-check-enabled</name>
                <value>false</value>
        </property>

2)配置历史服务JobHistoryServer| 配置日志查看功能

修改spark-env.sh,添加如下配置:

[[email protected] conf]$ vim spark-env.sh

export JAVA_HOME=/opt/module/jdk1.8.0_144
YARN_CONF_DIR=/opt/module/hadoop-2.7.2/etc/hadoop

# 配置JobHistoryServer  注意:HDFS上的目录需要提前存在。
export SPARK_HISTORY_OPTS="-Dspark.history.ui.port=18080
-Dspark.history.retainedApplications=30
-Dspark.history.fs.logDirectory=hdfs://hadoop101:9000/directory"

从这里看到历史日志:http://hadoop102:8088/cluster点击直接跳转到spark中  http://hadoop101:18080/history/application_1554294467331_0001/jobs/

[[email protected] conf]$ vim spark-defaults.conf

#修改spark-default.conf文件,开启Log:
spark.eventLog.enabled           true
spark.eventLog.dir               hdfs://hadoop101:9000/directory
# 日志查看
spark.yarn.historyServer.address=hadoop101:18080
spark.history.ui.port=18080
[[email protected] spark-yarn]$ bin/spark-shell --master yarn
Spark context Web UI available at http://192.168.1.101:4040
Spark context available as ‘sc‘ (master = yarn, app id = application_1554290192113_0004).
Spark session available as ‘spark‘.

-------hadoop101-------
25920 NodeManager
25751 DataNode
28075 SparkSubmit
28252 Jps
25469 QuorumPeerMain
25630 NameNode
-------hadoop102-------
14995 CoarseGrainedExecutorBackend
15076 Jps
13447 DataNode
13672 NodeManager
13369 QuorumPeerMain
13549 ResourceManager
14942 ExecutorLauncher
-------hadoop103-------
13536 DataNode
14610 CoarseGrainedExecutorBackend
14691 Jps
13638 NodeManager
13464 QuorumPeerMain
13710 SecondaryNameNode

[[email protected] spark-yarn]$ sbin/start-history-server.sh   ##开启历史服务

提交任务到Yarn执行
[[email protected] spark-yarn]$ bin/spark-submit --class org.apache.spark.examples.SparkPi --master yarn --deploy-mode client ./examples/jars/spark-examples_2.11-2.1.1.jar 100

package com.atguigu.spark
import org.apache.spark.{SparkConf, SparkContext}
object WordCount {
  def main(args: Array[String]): Unit = {
    //1.创建SparkConf并设置App名称
    val conf = new SparkConf().setAppName("WordCount")
    //2.创建SparkContext,该对象是提交Spark App的入口
    val context = new SparkContext(conf)
    //3.使用sc创建RDD并执行相应的transformation和action
    context.textFile(args(0)).flatMap(_.split(" ")).map((_, 1)).reduceByKey(_+_, 1).sortBy(_._2, false).saveAsTextFile(args(1))
    //4.关闭连接
    context.stop()
  }
}
[[email protected] spark-yarn]$ hadoop fs -put wc.txt /
[[email protected] spark-yarn]$ bin/spark-submit --class com.atguigu.spark.WordCount --master yarn --deploy-mode client /opt/module/spark/spark-yarn/WordCount.jar /wc.txt /out

结果:
(Hello,3)
(smile,2)
(kris,2)
(alex,1)
(hi,1)

原文地址:https://www.cnblogs.com/shengyang17/p/10648160.html

时间: 2024-11-01 16:45:58

Spark安装部署| 运行模式的相关文章

Spark安装部署(local和standalone模式)

Spark运行的4中模式: Local Standalone Yarn Mesos 一.安装spark前期准备 1.安装java $ sudo tar -zxvf jdk-7u67-linux-x64.tar.gz -C /opt/service/ export JAVA_HOME=/opt/service/jdk1.7.0_67 export PATH=$JAVA_HOME/bin:$JAVA_HOME/jre/bin:$PATH export CLASSPATH=.:$JAVA_HOME/l

spark 安装部署

一.安装spark依赖的内容 1.JDK spark是由Scala语言编写的,但是运行的环境是jvm,所以需要安装JDK 编译过程:Python.java.Scala编写的代码 -> scala编译器编译解释,生成class文件 -> 由jvm负责执行class文件(与java代码执行一致) 2.scala 由于 spark是由Scala语言编写的,所以依赖Scala环境,且由Scala编写的执行代码也需要环境进行编译 3.配置SSH免密码登录 集群节点无密码访问,与安装Hadoop时一致 4

spark学习(基础篇)--(第三节)Spark几种运行模式

h2 { color: #fff; background-color: #7CCD7C; padding: 3px; margin: 10px 0px } h3 { color: #fff; background-color: #008eb7; padding: 3px; margin: 10px 0px } spark应用执行机制分析 前段时间一直在编写指标代码,一直采用的是--deploy-mode client方式开发测试,因此执行没遇到什么问题,但是放到生产上采用--master yar

spark安装部署

spark是由Scala语言编写的,但是运行的环境是jvm,所以需要安装JDK 编译过程:Python.java.Scala编写的代码 -> scala编译器编译解释,生成class文件 -> 由jvm负责执行class文件(与java代码执行一致) 由于 spark是由Scala语言编写的,所以依赖Scala环境,且由Scala编写的执行代码也需要环境进行编译. hdfs是作为spark的持久层,所以需要安装Hadoop,同时如果需要配置spark on yarn,则Hadoop需要安装ya

spark安装部署手册

官网下载spark安装包 上传到服务器使用命令tar -zxvf 解压缩 解压后的spark通过start-master.sh启动,此时有可能启动报错,找不到java_home,这是需要修改spark-config.sh,需要在里面加上export JAVA_HOME=jdk路径 主节点启动后,一般是占用8080端口,如果不是依次类推,8081.... ,从管理控制台获取URL地址. 启动从节点,使用 start-slave.sh <master-spark-URL>(该URL)是从spark

Spark internal - 多样化的运行模式 (下)

Spark的各种运行模式虽然启动方式,运行位置,调度手段有所不同,但它们所要完成的任务基本都是一致的,就是在合适的位置安全可靠的根据用户的配置和Job的需要管理和运行Task,这里粗略的列举一下在运行调度过程中各种需要考虑的问题 环境变量的传递 Jar包和各种依赖文件的分发 Task的管理和序列化等 用户参数配置 用户及权限控制 环境变量的传递 Spark的运行参数有很大一部分是通过环境变量来设置的,例如Executor的内存设置,Library路径等等.Local模式当然不存在环境变量的传递问

3.spark运行模式

spark支持的运行模式:本地模式.本地集群模式.standalone模式.yarn模式及mesos模式. 本地模式 local.local[N]或local[N,maxRetries].主要用于代码调试和跟踪.不具备容错能力,不适用于生产环境. 本地模式只有Driver,没有Master和Worker.执行任务的Executor与Driver在同一个JVM进程中. 本地集群模式 local-cluster[N,cores,memory].也主要用于代码调试和测试,是源码学习常用的模式.不具备容

【转载】Spark系列之运行原理和架构

参考 http://www.cnblogs.com/shishanyuan/p/4721326.html 1. Spark运行架构 1.1 术语定义 lApplication:Spark Application的概念和Hadoop MapReduce中的类似,指的是用户编写的Spark应用程序,包含了一个Driver 功能的代码和分布在集群中多个节点上运行的Executor代码: lDriver:Spark中的Driver即运行上述Application的main()函数并且创建SparkCon

spark2.10安装部署(集成hadoop2.7+)

这里默认你的hadoop是已经安装好的,master是node1,slaver是node2-3,hdfs启动在node1,yarn启动在node2,如果没安装好hadoop可以看我前面的文章 因为这里是spark和hadoop集成,我已经预先启动好了hdfs 和 yarn:MapReduce History Server也是需要启动的,详情往下看 Spark安装包:概述 类别 与Hadoop打包在一起的安装包 ? 比如:spark-2.1.0-bin-hadoop2.7.tgz,spark版本为