Spark的基本说明

1、关于Application

用户程序,一个Application由一个在Driver运行的功能代码和多个Executor上运行的代码组成(工作在不同的节点上)。

又分成多个Job,每个Job由多个RDD和一些Action操作组成、job本分多个task组,每个task组称为:stage。

每个task又被分到多个节点,由Executor执行:

在程序中RDD转化其实还未真正运行,真正运行的是操作的时候。

2、程序执行过程

1)构建Spark Application的运行环境,就是启动SparkContext,启动后,向资源管理器

(standalone--spark自己的Master管理资源、Mesos或Yarn)注册且申请运行Executor资源。

2)资源管理器分配Executor资源,并且在各个节点上启动StandaloneExecutorBackend(对Standalone来说),Executor将运行情况随着心跳发送到资源管理器上。

3)SparkContext根据用户程序,构建DAG图,将DAG分解成Stage,划分原则是宽依赖时候划分,把Stage(TaskSet)发送给TaskScheduler。Stage

根据RDD的Partition数量来决定Task的数量;Executor向SparkContext申请Task。Task Scheduler将Task发送给Executor运行,且同时把代码发送给Executor(好像是Master开启HTTP服务,Executor去取代码)。

4)Task在Executor【此程序专属】上运行,多线程运行,线程数看可以运行的核数。

5)Spark Context运行地点和Worker不要分隔太远,中间过程有数据交换。

 3、DAG Scheduler

1)根据RDD的依赖关系来划分Stage,简单来说,如果一个子RDD只依赖一个父RDD,则在一个Stage中,否则在多个Stage中,只依赖一个父RDD称为窄依赖,依赖多个父RDD为宽依赖,

发生宽依赖称为Shuffle。

2)当Shuffle数据处理失败的时候,它重新处理之前的数据。

3)它根据RDD构建DAG(有向无环图),然后再进一步找出开销最小的调度方法。将Stage发送给Task Scheduler。

4、Task Scheduler

1)保存维护所有的TaskSet。

2)当Executor向Driver发送心跳的时候,TaskScheduler会根据其资源使用情况分配相应Task,如果允许失败,重试失败的Task。

5、RDD的运行原理

1)根据Spark内部对象或者Hadoop等外部对象创建RDD。

2)构建DAG。

3)划分为Task,分别在多个节点上执行后汇总。

举例:第一个字母排序:

sc.textFile("hdfs://names")

.map(name => (name.charAt(0),name))

.groupByKey()

.mapValues(names =>names.toSet.size)

.collect()

假设文件内容为按行的姓名:

Ah                                    (A,Ah)                                (A,(Ah,Anlly)                             [ (A,2),

PPT        ---> map---->      (P,PPT) ----->groupByKey--->(P,(PPT))-------->mapValues--->   (P,1)]

Anlly                                 (A,Anlly)

1)创建RDD、最后的collect为动作不会创建RDD,其他的操作都会创建新的RDD。

2)创建DAG,groupBy()会进行依赖多条上一个RDD的数据,所以多划分为一个阶段。

如图:

3)执行任务,每个阶段必须等上一个阶段执行完成。每个Stage又分成不同的Task执行,每个Task都包含代码+数据。

假设例子中的names下面有四个文件块,那么HadoopRDD中的Partitions自动划分为四个分区对应这四块数据。

就会创建四个Task执行相关任务。

每个Task操作一块数据再执行,以上例子的简单模拟:

import org.apache.spark.{SparkConf, SparkContext}

object NameCountCh {
  def main(args: Array[String]) {
    if (args.length < 1) {
      System.err.println("Usage:<File>")
      System.exit(1)
    }
    val conf = new SparkConf().setAppName("NameCountCh")
    val sc = new SparkContext(conf)

    sc.textFile(args(0))
      .map(name => (name.charAt(0), name))
      .groupByKey()
      .mapValues(names => names.toSet.size)
      .collect().foreach(println)

  }
}

 实际执行过程截图:

执行命令: ./spark-submit --master  spark://xxxx:7077 --class NameCountCh --executor-memory 512m --total-executor-cores 2  /data/spark/miaohq/scalaTestApp/scalatest4.jar  hdfs://spark29:9000/home/miaohq/testName.txt

1、启动一个HTTP端口:

2、按照提交的文件将文件放到这个Web服务器上

3、创建程序生成两个Executor

4、DAG调度

完成第一stage:

调度第二stage:

完成第二个stage输出结果:

疑惑:

1、小文件看不出来文件分区的过程,另外设置了几个执行核,就会有几个Executor,如果超过总数可能要多线程了??

2、为什么一个stage是两个task,按照原理应该是文件分为几个partition就几个task,目前测试文件很小,只能分1个partition,也不是和Executor相关的,

设置了3个执行核心仍然只是两个task?

3、为什么从mapValues划分第二个stage不应该是 groupByKey()???

6、Standalone架构下Spark的执行

1、standalone是Spark实现的资源调度框架,有:Client节点、Master节点、Worker节点。

2、Driver即可运行在Master节点,也可以运行在本地的Client端。

用spark-shell交互工具提交Spark的job的时候,运行在Master节点;

用spark-submit 提交或者用sparkConf.setManager("Spark://master:7077")是运行在Client端。

3、运行在Client端的执行过程如下:

说明:

1)sparkContext连接到Master,注册并申请资源(cpu 和内存)

2)Master根据申请信息和Worker心跳报告决定在哪个主机上分配资源,然后获取资源,启动StandaloneExecutorBackend。

3)StandaloneExecutorBackend向sparkContext注册。

4)sparkContext发送代码给StandaloneExecutorBackend且根据代码,构建DAG。

遇到Action动作会生成一个Job,然后根据Job内部根据RDD依赖关系生成多个Stage,Stage提交给TaskScheduler,

5)StandaloneExecutorBackend在汇报状态时候获取Task信息调用Executor多线程执行task,且向sparkContext汇报,

直到任务完成。

6)所有Task完成后,SparkContext向Master注销,释放资源。

说明:

文章中图片和内容来自:http://www.cnblogs.com/shishanyuan

时间: 2024-10-12 15:22:23

Spark的基本说明的相关文章

基于Spark MLlib平台的协同过滤算法---电影推荐系统

基于Spark MLlib平台的协同过滤算法---电影推荐系统 又好一阵子没有写文章了,阿弥陀佛...最近项目中要做理财推荐,所以,回过头来回顾一下协同过滤算法在推荐系统中的应用. 说到推荐系统,大家可能立马会想到协同过滤算法.本文基于Spark MLlib平台实现一个向用户推荐电影的简单应用.其中,主要包括三部分内容: 协同过滤算法概述 基于模型的协同过滤应用---电影推荐 实时推荐架构分析     一.协同过滤算法概述 本人对算法的研究,目前还不是很深入,这里简单的介绍下其工作原理. 通常,

Spark SQL 之 Join 实现

原文地址:Spark SQL 之 Join 实现 Spark SQL 之 Join 实现 涂小刚 2017-07-19 217标签: spark , 数据库 Join作为SQL中一个重要语法特性,几乎所有稍微复杂一点的数据分析场景都离不开Join,如今Spark SQL(Dataset/DataFrame)已经成为Spark应用程序开发的主流,作为开发者,我们有必要了解Join在Spark中是如何组织运行的. SparkSQL总体流程介绍 在阐述Join实现之前,我们首先简单介绍SparkSQL

spark性能调优之资源调优

转https://tech.meituan.com/spark-tuning-basic.html spark作业原理 使用spark-submit提交一个Spark作业之后,这个作业就会启动一个对应的Driver进程.根据你使用的部署模式(deploy-mode)不同,Driver进程可能在本地启动,也可能在集群中某个工作节点上启动.Driver进程本身会根据我们设置的参数,占有一定数量的内存和CPU core.而Driver进程要做的第一件事情,就是向集群管理器(可以是Spark Stand

Spark 整合hive 实现数据的读取输出

实验环境: linux centOS 6.7 vmware虚拟机 spark-1.5.1-bin-hadoop-2.1.0 apache-hive-1.2.1 eclipse 或IntelJIDea 本次使用eclipse. 代码: import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.DataFrame; import o

spark 教程三 spark Map filter flatMap union distinct intersection操作

RDD的创建 spark 所有的操作都围绕着弹性分布式数据集(RDD)进行,这是一个有容错机制的并可以被并行操作的元素集合,具有只读.分区.容错.高效.无需物化.可以缓存.RDD依赖等特征 RDD的创建基础RDD 1.并行集合(Parallelized Collections):接收一个已经存在的Scala集合,然后进行各种并行运算 var sc=new SparkContext(conf) var rdd=sc.parallelize(Array(2,4,9,3,5,7,8,1,6)); rd

Spark运行命令示例

local单机模式:结果xshell可见:./bin/spark-submit --class org.apache.spark.examples.SparkPi --master local[1] ./lib/spark-examples-1.3.1-hadoop2.4.0.jar 100 standalone集群模式:需要的配置项1, slaves文件2, spark-env.shexport JAVA_HOME=/usr/soft/jdk1.7.0_71export SPARK_MASTE

Spark Job具体的物理执行

即使采用pipeline的方式,函数f对依赖的RDD中的数据集合的操作也会有两种方式: 1.f(record),f作用于集合的每一条记录,每次只作用于一条记录 2.f(records),f一次性作用于集合的全部数据: Spark采用的是第一种方式,因为: 1.无需等待,可以最大化的使用集群的计算资源 2.减少OOM的产生 3.最大化的有利于并发 4.可以精准的控制每一个Partition本身(Dependency)及其内部的计算(compute) 5.基于lineage的算子流动式函数式计算,可

Dataflow编程模型和spark streaming结合

Dataflow编程模型和spark streaming结合 主要介绍一下Dataflow编程模型的基本思想,后面再简单比较一下Spark  streaming的编程模型 == 是什么 == 为用户提供以流式或批量模式处理海量数据的能力,该服务的编程接口模型(或者说计算框架)也就是下面要讨论的dataflow model 流式计算框架处理框架很多,也有大量的模型/框架号称能较好的处理流式和批量计算场景,比如Lambda模型,比如Spark等等,那么dataflow模型有什么特别的呢? 这就要要从

Spark性能优化指南——高级篇

Spark性能优化指南--高级篇 [TOC] 前言 继基础篇讲解了每个Spark开发人员都必须熟知的开发调优与资源调优之后,本文作为<Spark性能优化指南>的高级篇,将深入分析数据倾斜调优与shuffle调优,以解决更加棘手的性能问题. 数据倾斜调优 调优概述 有的时候,我们可能会遇到大数据计算中一个最棘手的问题--数据倾斜,此时Spark作业的性能会比期望差很多.数据倾斜调优,就是使用各种技术方案解决不同类型的数据倾斜问题,以保证Spark作业的性能. 数据倾斜发生时的现象 绝大多数tas

【Spark深入学习 -14】Spark应用经验与程序调优

----本节内容------- 1.遗留问题解答 2.Spark调优初体验 2.1 利用WebUI分析程序瓶颈 2.2 设置合适的资源 2.3 调整任务的并发度 2.4 修改存储格式 3.Spark调优经验 3.1 Spark原理及调优工具 3.2 运行环境优化 3.2.1 防止不必要的分发 3.2.2 提高数据本地性 3.2.3 存储格式选择 3.2.4 选择高配机器 3.3 优化操作符 3.3.1 过滤操作导致多小任务 3.3.2 降低单条记录开销 3.3.3 处理数据倾斜或者任务倾斜 3.