以一个简单的WordCount代码为例
sc.textFile("hdfs://...").flatMap(_.split(" ")).map(_,1).reduceByKey(_+_).map(x => (x._2,x._1)).sortByKey(false).map(x => (x._2,x._1)).saveAsTextFile("hdfs://....")
以上代码的作用是读取指定位置的文件,经过map和reduceByKey之后按照数量大小进行排序,最后保存在hdfs中
首先应用程序创建SparkContext的实例sc,利用sc读取文件生成初始的RDD,经过一连串的transformation操作,将原始的RDD转换成其他类型的RDD
当action作用于转换之后的RDD时,会触发sc的runJob方法,这是引起Spark内核一系列反应的起点
提交这个作业到Spark集群中,Spark是如何工作的?中间的处理过程RDD经过怎样的转换?都需要经过什么样的处理过程?
首先来看网上一张简单的Spark事件流示意图:
Spark的作业分为transformation(flatmap等)和action(reduceByKey等)两种,action获取结果之前的transformation操作都是不执行的,而是绘制成一张DAG有向无环图,真正执行作业的时候会根据这个DAG图来执行
runJob中会将例如WordCount等作业,生成的DAG图和argetRDD,partitions等信息提交给DAGScheduler
DAGScheduler会根据DAG图中RDD之间的依赖关系将整个DAG图分成若干个stage,并生成taskSet(由stage组成)然后将这些taskSet通过submitTasks提交到TaskSchaduler
TaskSchaduler会将作业提交到集群或者本地中环境进行计算,并对有错误的taskSet进行相应的容错处理,最后将运行结果返回
上面是对Spark作业调度的一次简单描述,这其中可能会有一些关于RDD依赖关系的疑问
RDD之间的依赖关系是什么:
依赖关系可以分两种,窄依赖和宽依赖
窄依赖:子RDD中的每个数据块只依赖于父RDD中对应的有限个固定的数据块,可以理解成父子RDD是一对一的关系,例如:map变换,前后的数据都是一行对一行的。一个子RDD可以根据其父RDD直接计算得出,因而子RDD出现计算错误的时候,只需要重新计算对应的父RDD即可
宽依赖:子RDD中的一个数据块可以依赖于父RDD中的所有数据块。即一对多的情况,例如:groupByKey变换,子RDD中的数据块会依赖于多个父RDD中的数据块,因为一个key可能存在于父RDD的任何一个数据块中 。宽依赖中的子RDD要等到所有的父RDD计算完成之后才能进行计算,当数据丢失时需要对所有祖先RDD进行重新计算
之前是在应用程序作业提交的角度看Spark的作业调度机制,下面换一个视角,即从Spark集群的角度看作业调度
1.在集群启动的时候,各个slave节点(也可以说是worker)会向集群的Master注册,告诉Master我随时可以干活了,随叫随到
2.Master会根据一种心跳机制来实时监察集群中各个worker的状态,是否能正常工作
3.Driver Application提交作业的时候也会先向Master注册信息
4.作业注册完毕之后,Master会向worker发射Executor命令
5.worker产生若干个Executor准备执行
6.各个worker中的Executor会向Driver Application注册Executor信息,以便Driver Application能够将作业分发到具体的Executor
7.Executor会定期向Driver Application报告当前的状态更新信息
8.Driver Application发射任务到Executor执行
每个worker中会包含若干个Executor,Executor是比worker更小的单位,作业的具体执行都是在Executor上完成
结合以上两张图和之前的描述可以得出一些结论:
DAGSchaduler的输入是DAG图,按照RDD依赖关系划分stage之后,输出的是TaskSet
TaskSchaduler的输入是TaskSet,输出结果是将Task加到队列中,指定由哪个worker来执行Task
在这些worker中启动Executor来执行tasks
下图是Spark集群中作业执行完毕之后的处理事件流示意图:
最后记录一下提交Spark作业的方法
在spark的bin目录下
执行spark-submit脚本
./spark-submit \
–class 入口函数所在的类名全称 \
–master spark master节点的地址(默认端口7077)\
–executor-memory 指定worker中Executor的内存 \
–total-executor-cores 100 \
jar文件所在的目录 \