Spark(十二) -- Spark On Yarn & Spark as a Service & Spark On Tachyon

Spark On Yarn:

从0.6.0版本其,就可以在在Yarn上运行Spark

通过Yarn进行统一的资源管理和调度

进而可以实现不止Spark,多种处理框架并存工作的场景

部署Spark On Yarn的方式其实和Standalone是差不多的,区别就是需要在spark-env.sh中添加一些yarn的环境配置,在提交作业的时候会根据这些配置加载yarn的信息,然后将作业提交到yarn上进行管理

首先请确保已经部署了Yarn,相关操作请参考:

hadoop2.2.0集群安装和配置

部署完成之后可以通过

yarn-master:8088

查看yarn的web管理界面

yarn-master为配置的yarn主机名或ip地址

Spark的一些配置如下:

修改spark-env.sh文件

必须添加的是

HADOOP_CONF_DIR 或者 YARN_CONF_DIR指向hadoop的conf配置文件目录

其余的和Spark Standalone部署是一样的,具体请参考:

Spark(一)– Standalone HA的部署

另外,可以通过

SPARK_YARN_USER_ENV

来配置要传给Spark进程的环境变量,如JAVA_HOME等

通过export SPARK_JAR=hdfs://some/path

来将jar文件放在全局可读的HDFS上,缓存在各个节点中,这样一来,运行应用时就无需每次都分发jar文件到各个节点上

两种方式作业提交方式:

1.yarn-cluster

在spark目录下执行:

./bin/spark-submit --class org.apache.spark.examples.SparkPi --master yarn-cluster lib/spark-examples*.jar 10

来运行SparkPi这个example

2.yarn-client

和之前的方式一模一样,只是将yarn-cluster换成yarn-client,如下:

./bin/spark-submit --class org.apache.spark.examples.SparkPi --master yarn-client lib/spark-examples*.jar 10

两种方式的区别:

client方式下,Spark的Driver会在客户端进程中,Application Master仅仅是向Yarn申请资源,同时会在客户端(终端)上打印出具体的执行log

cluster方式下,Driver会在Application Master进程中运行,受到Yarn的管理。客户端在应用初始化之后就可以脱离,这时候在客户端不能收到执行的log信息,但是可以通过Yarn的WebUI来查看作业的运行情况

Spark On Yarn作业的提交方式和Standalone相比仅仅是将–master这个参数由具体的spark主节点,换成了yarn-cluster/client

Spark as a Service:

将部署好的Spark集群作为一种服务通过REST接口向外提供

这就很像云计算模型

我们将Spark集群部署好,将适用于各种场景作业的jar包分配上去,而外面的人通过REST接口来调用我们提供的各种服务,这就是Spark as a Service

其中典型的实现是JobServer

JobServer其实就是一套软件,将其下载下来之后部署在Spark集群上

它会想外界提供REST接口,Spark上的各个资源都可以通过一个唯一的URL来访问

构架图如下:

特性

“Spark as a Service”: 简单的面向job和context管理的REST接口

通过长期运行的job context支持亚秒级低延时作业(job)

可以通过结束context来停止运行的作业(job)

分割jar上传步骤以提高job的启动

异步和同步的job API,其中同步API对低延时作业非常有效

支持Standalone Spark和Mesos

Job和jar信息通过一个可插拔的DAO接口来持久化

命名RDD以缓存,并可以通过该名称获取RDD。这样可以提高作业间RDD的共享和重用

部署JobServer需要sbt

JobServer下载地址

装好sbt之后,将JobServer解压,进入其根目录

敲sbt

进入sbt命令之后(第一次启动要下载很多jar包,可能会因为网络的问题卡很久。。)

执行

re-start --- -Xmx4g

此时会下载spark-core,jetty和liftweb等相关模块

完成之后可以通过访问http://localhost:8090 可以看到Web UI

相关的API如下:

curl --data-binary @job-server-tests/target/job-
server-tests-0.3.1.jar localhost:8090/jars/test
//运行指定的jar包

curl localhost:8090/jars/
//查看提交的jar

curl -d "input.string = hello job server" ‘localhost:8090/jobs?appName=test&classPath=spark.jobserver.WordCountExample‘
//提交的appName为test,class为spark.jobserver.WordCountExample

curl localhost:8090/jobs/34ce0666-0148-46f7-8bcf-a7a19b5608b2
curl localhost:8090/jobs/34ce0666-0148-46f7-8bcf-a7a19b5608b2/config
//通过job-id查看结果和配置信息

curl -d "input.string = hello job server" ‘localhost:8090/jobs?appName=test&classPath=spark.jobserver.WordCountExample&sync=true‘
//sync=true会直接将执行接口返回,如果没有设置,那么将会分配一个jobId,等作业完成后可以通过jobId在查看信息

curl -d "" ‘localhost:8090/contexts/test-context?
num-cpu-cores=4&mem-per-node=512m‘
//启动一个context

curl localhost:8090/contexts
//查询所有的context

curl -d "input.string = a b c a b see" ‘localhost:8090/jobs?appName=test&classPath=spark.jobserver.WordCountExample&context=test-context&sync=true‘
//在某个指定的context上执行作业

配置文件:

vim spark-jobserver/config/local.conf.template
master = "local[4]"//将这里改为集群的地址

jobdao = spark.jobserver.io.JobFileDAO
    filedao {
      rootdir = /tmp/spark-job-server/filedao/data
    }
//数据对象的存储方法和存储路径

context-settings {
    num-cpu-cores = 2
    memory-per-node = 512m
    }
//context的默认设置,如果在REST接口中显示的指明了context的配置,那么这里将会被覆盖

POST /contexts/my-new-context?num-cpu-cores=10
//在REST中设置一个新的context,以参数的形式放在url中

JobServer部署:

复制con?g/local.sh.template到env.sh ,并且设置相关参数如:指定安装路径,Spark Home, Spark Conf等。

DEPLOY_HOSTS="spark1
              spark2
              spark3"
APP_USER=spark
APP_GROUP=spark
INSTALL_DIR=/home/spark/jobserver
LOG_DIR=/home/spark/jobserver/log
PIDFILE=spark-jobserver.pid
SPARK_HOME=/home/spark/spark
SPARK_CONF_HOME=/home/spark/spark/conf

修改project/Dependencies.scala。重新指定spark版本为当前的版本

lazy val sparkDeps = Seq(

“org.apache.spark” %% “spark-core” % “1.3.1” ……

运?行bin/server_deploy.sh env(或者直接将env.sh的绝对路径写进server_deploy.sh这样就不用再传参数了)

打好包后与相关配置?一起放到指定服务器的指定目录

启动:

需要把config下的local.conf复制到INSTALL_DIR下面,改名为local.conf,并修改其中的master以及两个路径。

jar-store-rootdir = /var/lib/spark/jars

rootdir = /var/lib/spark/filedao

进?入服务器指定指定目录,运?行server_start.sh

如果启动有问题可以试试把cfg.sh 拷贝到 spark-job-server目录下 改名为 settings.sh

创建JobServer工程:

在idea中新建SBT工程

在.sbt文件中添加以下内容

name := "job server demo"
version := "1.0"
scalaVersion := "2.10.4"
resolvers += "Ooyala Bintray" at "http://dl.bintray.com/ooyala/maven"
libraryDependencies += "ooyala.cnd" % "job-server" % "0.3.1" % "provided"
libraryDependencies += "org.apache.spark" % "spark-core_2.10" % "1.0.0"

继承SparkJob,重写validate与runJob

validate就是一个执行一系列验证的方法,执行的时候先看一下validate的验证对不对

runJob执行作业的逻辑

import com.typesafe.config.{Config, ConfigFactory}
import org.apache.spark._
import org.apache.spark.SparkContext._
import scala.util.Try
import spark.jobserver.SparkJob
import spark.jobserver.SparkJobValidation
import spark.jobserver.SparkJobValid
import spark.jobserver.SparkJobInvalid

object WordCount extends SparkJob{
def main(args: Array[String]) {
    val sc = new SparkContext("local[4]", "WordCountExample")
    val config = ConfigFactory.parseString("")
    val results = runJob(sc, config)
    println("Result is " + results)
  }

  override def validate(sc: SparkContext, config: Config): SparkJobValidation = {
    Try(config.getString("input.string"))
      .map(x => SparkJobValid)
      .getOrElse(SparkJobInvalid("No input.string config param"))
  }

  override def runJob(sc: SparkContext, config: Config): Any = {
    val dd = sc.parallelize(config.getString("input.string").split(" ").toSeq)
    val rsList = dd.map((_,1)).reduceByKey(_+_).map(x => (x._2,x._1)).sortByKey(false).collect
    rsList(0)._2
  }
}

生成jar包并提交

curl --data-binary @/root/install-pkg/job-server-demo_2.10-1.0.jar localhost:8090/jars/example

测试

curl -i -d "input.string=a a a b b c" ‘localhost:8090/jobs?appName=example&classPath=com.persia.spark.WordCount‘

HTTP/1.1 202 Accepted
Server: spray-can/1.2.0
Date: Sat, 12 Jul 2014 09:22:26 GMT
Content-Type: application/json; charset=UTF-8
Content-Length: 150

{
  "status": "STARTED",
  "result": {
    "jobId": "b5b2e80f-1992-471f-8a5d-44c08c3a9731",
    "context": "6bd9aa29-com.persia.spark.WordCount"
  }
}

使用命名RDD:

object MyNamedRDD extends SparkJob with NamedRDDSuport
//继承SparkJob并混入NamedRDDSuport特质之后写自己的NamedRDD

this.namedRDDs.update("myrdd",myrdd)
//以键值对的形式将自定义的命名RDD缓存起来

val myrdd = this.namedRDDs.get[(String,String)]("myrdd").get
//将缓存的RDD拿出来

//命名RDD可以用于有点类似于Session的作用

Spark On Tachyon:

什么是Tachyon?

来看看传统的Spark不同job之间,不同的框架是如何共享数据的

通过不断的读取HDFS来实现数据的共享,HDFS是什么?是一种分布式的文件系统啊,说到底就是硬盘。那么问题就很明显了,频繁的磁盘IO操作,还有cache丢失,内存使用等问题

解决方案是什么?

就是Tachyon,一种分布式的内存?文件系统,注意内存两个字,不同任务(框架)享受可靠快速的数据共享

于是BDAS变成了下面这种构架:

之前的问题解决方案:

Tachyon部署:

在命令行中下载Tachyon

wget https://github.com/amplab/tachyon/releases/download/v0.6.4/tachyon-0.6.4-bin.tar.gz

tar xvfz tachyon-0.6.4-bin.tar.gz

cd tachyon-0.6.4

国内的网络访问不了的时候可以在这里下载:

Tachyon下载地址

1.Local模式:

cp conf/tachyon-env.sh.template conf/tachyon-env.sh

./bin/tachyon format

./bin/tachyon-start.sh local

可以通过 http://localhost:19999 WebUI查看

测试

./bin/tachyon runTest Basic CACHE_THROUGH

./bin/tachyon runTests

./bin/tachyon-stop.sh

2.Cluster模式:

在配置文件目录下修改slaves

加入各个节点的主机名

tachyon-env.sh修改如下配置:

export TACHYON_MASTER_ADDRESS=spark1

export TACHYON_WORKER_MEMORY_SIZE=4GB

export TACHYON_UNDERFS_HDFS_IMPL=org.apache.hadoop.hdfs.DistributedFileSystem

export TACHYON_UNDERFS_ADDRESS=hdfs://spark1:9000

./bin/tachyon format

./bin/tachyon-start.sh

可以通过 http://tachyon.master.spark1:19999 WebUI查看

测试

./bin/tachyon runTests

3.基于zookeeper的Master HA

确保在tachyon-env.sh中设置过

export TACHYON_UNDERFS_ADDRESS=hdfs://hostname:port

在TACHYON_JAVA_OPTS中加?

-Dtachyon.master.journal.folder=hdfs://hostname:port/

tachyon/journal/

-Dtachyon.usezookeeper=true

-Dtachyon.zookeeper.address=zkserver1:2181,zkserver2:2181,zkserver3:2181

4.Spark On Tachyon:

在Spark的conf目录下新建core-site.xml,并加入以下内容(zk模式下,如果不是zk模式,将name替换为fs.tachyon.impl即可)

<configuration>
<property>
<name>fs.tachyon-ft.impl</name>
<value>tachyon.hadoop.TFS</value>
</property>
</configuration>

如果运?行的是低于Spark1.0.版本,在spark.env.sh中加?入:

export SPARK_CLASSPATH=/pathToTachyon/client/target/tachyon-client-0.5.0-jar-with-dependencies.jar:$SPARK_CLASSPATH

在zk模式下,还需要在spark-env.sh中加入

以下内容:

export SPARK_JAVA_OPTS="
-Dtachyon.usezookeeper=true
-Dtachyon.zookeeper.address=zkserver1:2181,zkserver2:2181,zkserver3:2181
$SPARK_JAVA_OPTS
"

要在spark程序中使用 Tachyon需指定:

1、spark.tachyonStore.url

2、spark.tachyonStore.baseDir

如果不想每次手动输入以上配置时,可以在spark的conf目录下编辑spark-defaults.conf文件

将上面两个配置加入去即可

spark.master                     spark://spark1:7077
spark.eventLog.enabled           true
spark.eventLog.dir               hdfs://ns1/spark_event_log
spark.tachyonStore.url           tachyon://spark1:19998
spark.tachyonStore.baseDir       /data/tachyon_tmp

spark程序存储数据要指定OffHeap方式,说明数据不让spark自己管理了,而是让Tachyon接手

时间: 2024-10-16 08:58:58

Spark(十二) -- Spark On Yarn & Spark as a Service & Spark On Tachyon的相关文章

Spark学习之路 (十二)SparkCore的调优之资源调优

摘抄自:https://tech.meituan.com/spark-tuning-basic.html 一.概述 在开发完Spark作业之后,就该为作业配置合适的资源了.Spark的资源参数,基本都可以在spark-submit命令中作为参数设置.很多Spark初学者,通常不知道该设置哪些必要的参数,以及如何设置这些参数,最后就只能胡乱设置,甚至压根儿不设置.资源参数设置的不合理,可能会导致没有充分利用集群资源,作业运行会极其缓慢:或者设置的资源过大,队列没有足够的资源来提供,进而导致各种异常

Spark机器学习实战 (十二) - 推荐系统实战

0 相关源码 将结合前述知识进行综合实战,以达到所学即所用.在推荐系统项目中,讲解了推荐系统基本原理以及实现推荐系统的架构思路,有其他相关研发经验基础的同学可以结合以往的经验,实现自己的推荐系统. 1 推荐系统简介 1.1 什么是推荐系统 1.2 推荐系统的作用 1.2.1 帮助顾客快速定位需求,节省时间 1.2.2 大幅度提高销售量 1.3 推荐系统的技术思想 1.3.1 推荐系统是一种机器学习的工程应用 1.3.2 推荐系统基于知识发现原理 1.4 推荐系统的工业化实现 Apache Spa

Spark(五十二):Spark Scheduler模块之DAGScheduler流程

导入 从一个Job运行过程中来看DAGScheduler是运行在Driver端的,其工作流程如下图: 图中涉及到的词汇概念: 1. RDD——Resillient Distributed Dataset 弹性分布式数据集. 2. Operation——作用于RDD的各种操作分为transformation和action. 3. Job——作业,一个JOB包含多个RDD及作用于相应RDD上的各种operation. 4. Stage——一个作业分为多个阶段. 5. Partition——数据分区,

spark streaming (二)

一.基础核心概念 1.StreamingContext详解 (一) 有两种创建StreamingContext的方式:             val conf = new SparkConf().setAppName(appName).setMaster(master);             val ssc = new StreamingContext(conf, Seconds(1)); StreamingContext, 还可以使用已有的SparkContext来创建         

Spark任务调度executors分配问题 in yarn

红色留着继续思考.  问题背景:  CCSWYB ,在云平台上模拟shell流程,在各个节点上分配fvcom计算任务. Spark程序流程: 从HDFS中读取tasklist.txt(每一行对应一个任务) 经过一些操作过后生成一个JavaPairRDD ,记作data,对data执行foreach操作,函数内执行shell脚本启动任务.可以正常执行任务.  集群: 四个i5机器,hadoop2.3.0 + spark 1.0.0 + jdk1.7.0_60 问题: 任务数目从2 - 20 左右,

spark(二)优化思路

优化思路 内存优化 内存优化大概分为三个方向 1.所有对象的总内存(包括数据和java对象) 2.访问这些对象的开销 3.垃圾回收的开销 其中Java的原生对象往往都能被很快的访问,但是会多占据2-5倍或更多的内存,有下面4点原因 ·每个单独的java对象都有一个对象头(16字节),其中包括指向对象的指针(栈->堆),如果该对象只有几个属性,那么对象头可能比实际数据占用的空间都大(严重浪费资源) ·java每个string都包含了40字节的额外开销(因为底层其实是存储在数组,需要记录数组的指针,

Spark(二)算子详解

目录 Spark(二)算子讲解 一.wordcountcount 二.编程模型 三.RDD数据集和算子的使用 Spark(二)算子讲解 @ 一.wordcountcount 基于上次的wordcount,我们来写一个wordcountcount,来对wc程序进行第二次计数,我们来分析一下性能. package com.littlepage.wc import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkCon

理解Spark SQL(二)—— SQLContext和HiveContext

使用Spark SQL,除了使用之前介绍的方法,实际上还可以使用SQLContext或者HiveContext通过编程的方式实现.前者支持SQL语法解析器(SQL-92语法),后者支持SQL语法解析器和HiveSQL语法解析器,默认为HiveSQL语法解析器,用户可以通过配置切换成SQL语法解析器来运行HiveQL不支持的语法,如:select 1.实际上HiveContext是SQLContext的子类,因此在HiveContext运行过程中除了override的函数和变量,可以使用和SQLC

spark学习二

SparkContext是编写Spark程序用到的第一个类,是Spark的主要入口点,用于连接Spark集群.创建RDD.累加器和广播变量,是Spark程序的根本.编写不同类型的Spark程序,使用的SparkContext是不同的Scala 使用SparkContextJava   使用JavaSparkContext 开发一个简单的Spark程序:第一步:创建SparkConf对象,设置Spark应用的配置信息第二步:创建Spark Context对象第三步:针对输入源创建一个初始的RDD(