小记--------spark-job触发流程源码分析

job是串行执行的, 执行完上一个才执行下一个

eg:Wordcount案例
val lines = sc.textFile("本地URL or HDFS URL")//详解见代码1
val words = lines.flatMap(line => line.split(" "))//也会返回一个MapPartitionsRDD
val pairs = words.map(word => (word , 1))//同样也是返回一个MapPartitionsRDD
val counts = pairs.reduceByKey(_+_)//详解见代码2
counts.foreach(count => printLn(count._1 + ":" + count._2))//见代码4

源码位置:

SparkContext类:spark-core_2.11-2.1.0-sources.jar > org.apache.spark.SparkContext.scala

代码1
/**
* Read a text file from HDFS, a local file system (available on all nodes), or any
* Hadoop-supported file system URI, and return it as an RDD of Strings.
*/
/**
*首先,hadoopFile()方法的调用,会创建一个HadoopRDD,其中的元素,其实是(key,value) pair RDD . key 是hdfs或文本文件的每一行的offset, value 就是文本行
*然后对HadoopRDD调用map()方法,会剔除key,只保留value,然后会获得一个MapPartitionsRDD,MapPartitionsRDD内部的元素,其实就是一行一行的文本行
*/
def textFile(
    path: String,
    minPartitions: Int = defaultMinPartitions): RDD[String] = withScope {
  assertNotStopped()
  hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
    minPartitions).map(pair => pair._2.toString).setName(path)
}

//因为RDD.scala类中是没有ReduceByKey方法的,因此它会调用ReduceByKey方法时,会触发scala的隐式转换;此时就会在作用域内,寻找隐式转换,会在RDD中找到rddToPairRDDFunctions()隐式转换,然后再去PairRDDFunctions类里面调用ReduceByKey方法
implicit def rddToPairRDDFunctions[K, V](rdd: RDD[(K, V)])
  (implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null): PairRDDFunctions[K, V] = {
  new PairRDDFunctions(rdd)//代码详见代码3
}

代码3
def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = self.withScope {
  combineByKeyWithClassTag[V]((v: V) => v, func, func, partitioner)
}

代码4
//通过foreach方法进行runjob的多次重载到本RunJob方法
def runJob[T, U: ClassTag](
    rdd: RDD[T],
    func: (TaskContext, Iterator[T]) => U,
    partitions: Seq[Int],
    resultHandler: (Int, U) => Unit): Unit = {
  if (stopped.get()) {
    throw new IllegalStateException("SparkContext has been shutdown")
  }
  val callSite = getCallSite
  val cleanedFunc = clean(func)
  logInfo("Starting job: " + callSite.shortForm)
  if (conf.getBoolean("spark.logLineage", false)) {
    logInfo("RDD‘s recursive dependencies:\n" + rdd.toDebugString)
  }
// 调用SparkContext,之前初始化时创建的DAGScheduler的Runjob方法
// 会把当前执行action操作的RDD传到DAGScheduler的runjob方法中
  dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
  progressBar.foreach(_.finishAll())
  rdd.doCheckpoint()
}

原文地址:https://www.cnblogs.com/yzqyxq/p/12232685.html

时间: 2024-09-30 19:09:13

小记--------spark-job触发流程源码分析的相关文章

Activity启动流程源码分析之Launcher启动(二)

1.前述 在前一篇文章中我们简要的介绍Activity的启动流程Activity启动流程源码分析之入门(一),当时只是简单的分析了一下流程,而且在上一篇博客中我们也说了Activity的两种启动方式,现在我们就来分析其中的第一种方式--Launcher启动,这种启动方式的特点是会创建一个新的进程来加载相应的Activity(基于Android5.1源码). 2.Activity启动流程时序图 好啦,接下来我们先看一下Launcher启动Activity的时序图: 好啦,接下来我们将上述时序图用代

A2dp初始化流程源码分析

蓝牙启动的时候,会涉及到各个profile 的启动.这篇文章分析一下,蓝牙中a2dp profile的初始化流程. 我们从AdapterState.java中对于USER_TURN_ON 消息的处理说起: switch(msg.what) { case USER_TURN_ON: notifyAdapterStateChange(BluetoothAdapter.STATE_TURNING_ON); mPendingCommandState.setTurningOn(true); transit

5.Spark Streaming流计算框架的运行流程源码分析2

1 spark streaming 程序代码实例 代码如下: [html] view plain copy object OnlineTheTop3ItemForEachCategory2DB { def main(args: Array[String]){ val conf = new SparkConf() //创建SparkConf对象 //设置应用程序的名称,在程序运行的监控界面可以看到名称 conf.setAppName("OnlineTheTop3ItemForEachCategor

SpringMVC(十七):Web.xml加载流程源码分析

之前章节讲解了web.xml如何使用编码的方式替换掉,但是一直没有写web.xml是如何被加载的相关细节,觉得十分有必要写一篇文章类梳理下. 待完成... 参考 <SpringMVC初始化流程> <Spring 4.x源码分析-BeanWrapper> <第三章 DispatcherServlet详解 ——跟开涛学SpringMVC> <SpringMvc之DispatcherServlet详解> <Spring MVC入口Servlet详解(Http

Android Touch事件派发流程源码分析

分native侧事件派发到java侧和Framework派发事件到UI,流程看源码即可,此处不赘叙, Native侧派发事件的干活类图如下: Framework侧派发事件的类图如下: 从Activity.dispatchTouchEvent开始,Action_Down事件派发的时序如下: 分析Android 5.0源码可知,ViewGroup的事件派发是一个后序遍历树的递归过程,在Action_Down事件的处理中做了两个事情: 1.递归查找touchTarget,并标记在ViewGroup的m

HBase的put流程源码分析

hbase是一个nosql型数据库,本文我们会分析一下客户的数据是通过什么样的路径写入到hbase的. HBase作为一种列族数据库,其将相关性较高的列聚合成一个列族单元,不同的列族单元物理上存储在不同的文件(HFile)内.一个表的数据会水平切割成不同的region分布在集群中不同的regionserver上.客户端访问集群时会首先得到该表的region在集群中的分布,之后的数据交换由客户端和regionserver间通过rpc通信实现,下面我们从hbase源码里探究客户端put数据的流程.本

es lucene搜索及聚合流程源码分析

本文以TermQuery,GlobalOrdinalsStringTermsAggregator为例,通过代码,分析es,lucene搜索及聚合流程.1:协调节点收到请求后,将search任务发到相关的各个shard. 相关代码: TransportSearchAction.executeSearch TransportSearchAction.searchAsyncAction.start AbstractSearchAsyncAction.executePhase(SearchQueryTh

a2dp播放流程源码分析

之前分析了a2dp profile 的初始化的流程,这篇文章分析一下,音频流在bluedroid中的处理流程. 上层的音频接口是调用a2dp hal 里面的接口来进行命令以及数据的发送的. 关于控制通道的初始化以及建立的过程,这里就不分析了,我们主要看数据的流向和处理.我们从控制通道的最后一个命令start 开始分析流程. 我们直接看a2dp hal 中out_write的实现: static ssize_t out_write(struct audio_stream_out *stream,

SpringBoot自动装配流程源码分析

SpringBoot 传统方式的SSM框架因为需要配置大量文件而被开发人员诟病重复性工作,所以SpringBoot的出现在减少开发人员做大量重复性配置的工作,使得开发人员能够快速的开始项目开发.更加专注于业务代码的编写.但SpringBoot跟SSM有什么框架不同呢?为什么SpringBoot可以自动装配呢?SpringBoot自动装配是如何实现的呢? SpringBoot入口 写过SpringBoot应用的开发者都知道,SpringBoot应用的启动类是被@SpringBootApplicat