通过 spark.files 传入spark任务依赖的文件源码分析

版本:spak2.3

相关源码:org.apache.spark.SparkContext

在创建spark任务时候,往往会指定一些依赖文件,通常我们可以在spark-submit脚本使用--files /path/to/file指定来实现。

但是公司产品的架构是通过livy来调spark任务,livy的实现其实是对spark-submit的一个包装,所以如何指定依赖文件归根到底还是在spark这边。既然不能通过命令行--files指定,那在编程中怎么指定?任务在各个节点上运行时又是如何获取到这些文件的呢?

根据spark-submit的参数传递源码分析得知,spark-submit --files其实是由参数"spark.files"接收,所以在代码中可以通过sparkConf设置该参数。

比如:

SparkConf conf = new SparkConf();
conf.set("spark.files","/path/to/file");
//如果文件是放在hdfs上,可以通过conf.set("spark.files","hdfs:/path/to/file")指定,注意这里只需要加上个hdfs的schema即可,不需要ip port

spark官网关于该参数的解释:

spark.files  Comma-separated list of files to be placed in the working directory of each executor. Globs are allowed.

具体怎么读取用户指定的文件相关源码在SparkContext.scala中,如下(--jars指定依赖jar包同理):

def jars: Seq[String] = _jars
def files: Seq[String] = _files
...

_jars = Utils.getUserJars(_conf)
_files = _conf.getOption("spark.files").map(_.split(",")).map(_.filter(_.nonEmpty))
  .toSeq.flatten

...

// Add each JAR given through the constructor
if (jars != null) {
  jars.foreach(addJar)
}

if (files != null) {
  files.foreach(addFile)
}

addFile实现如下:

/**
* Add a file to be downloaded with this Spark job on every node.
*
* If a file is added during execution, it will not be available until the next TaskSet starts.
*
* @param path can be either a local file, a file in HDFS (or other Hadoop-supported
* filesystems), or an HTTP, HTTPS or FTP URI. To access the file in Spark jobs,
* use `SparkFiles.get(fileName)` to find its download location.
* @param recursive if true, a directory can be given in `path`. Currently directories are
* only supported for Hadoop-supported filesystems.
*    1. 文件会下载到每一个节点
*    2. 如果在运行中增加文件,那么只有到下一批taskset开始执行时有效
*    3. 文件的位置可以是本地文件,HDFS文件或者其他hadoop支持的文件系统上,HTTP,HTTPS或者FTP URI也可以。在spark jobs中可以通过
*        SparkFiles.get(fileName)访问此文件
*    4. 如果要递归获取文件,那么可以给定一个目录,但是这种方式只对Hadoop-supported filesystems有效。
*/
def addFile(path: String, recursive: Boolean): Unit = {
val uri = new Path(path).toUri
val schemeCorrectedPath = uri.getScheme match {
    //如果路径中不指定schema,也就是null.
    //在命令行指定--files 时候,--files /home/kong/log4j.properties等同于--files local:/home/kong/log4j.properties
  case null | "local" => new File(path).getCanonicalFile.toURI.toString
  case _ => path
}

val hadoopPath = new Path(schemeCorrectedPath)
val scheme = new URI(schemeCorrectedPath).getScheme
if (!Array("http", "https", "ftp").contains(scheme)) {
  val fs = hadoopPath.getFileSystem(hadoopConfiguration)
  val isDir = fs.getFileStatus(hadoopPath).isDirectory
  if (!isLocal && scheme == "file" && isDir) {
    throw new SparkException(s"addFile does not support local directories when not running " +
      "local mode.")
  }
  if (!recursive && isDir) {
    throw new SparkException(s"Added file $hadoopPath is a directory and recursive is not " +
      "turned on.")
  }
} else {
  // SPARK-17650: Make sure this is a valid URL before adding it to the list of dependencies
  Utils.validateURL(uri)
}

val key = if (!isLocal && scheme == "file") {
  env.rpcEnv.fileServer.addFile(new File(uri.getPath))
} else {
  schemeCorrectedPath
}
val timestamp = System.currentTimeMillis
if (addedFiles.putIfAbsent(key, timestamp).isEmpty) {
  logInfo(s"Added file $path at $key with timestamp $timestamp")
  // Fetch the file locally so that closures which are run on the driver can still use the
  // SparkFiles API to access files.
  Utils.fetchFile(uri.toString, new File(SparkFiles.getRootDirectory()), conf,
    env.securityManager, hadoopConfiguration, timestamp, useCache = false)
  postEnvironmentUpdate()
}
}

在addJar和addFile方法的最后都调用了postEnvironmentUpdate方法,而且在SparkContext初始化过程的
最后也会调用postEnvironmentUpdate,代码如下:

  /** Post the environment update event once the task scheduler is ready */
  private def postEnvironmentUpdate() {
    if (taskScheduler != null) {
      val schedulingMode = getSchedulingMode.toString
      val addedJarPaths = addedJars.keys.toSeq
      val addedFilePaths = addedFiles.keys.toSeq
        // 通过调用SparkEnv的方法environmentDetails将环境的JVM参数、Spark 属性、系统属性、classPath等信息设置为环境明细信息。
      val environmentDetails = SparkEnv.environmentDetails(conf, schedulingMode, addedJarPaths,
        addedFilePaths)
        // 生成SparkListenerEnvironmentUpdate事件,并投递到事件总线
      val environmentUpdate = SparkListenerEnvironmentUpdate(environmentDetails)
      listenerBus.post(environmentUpdate)
    }
  }

environmentDetails方法:

  /**
   * Return a map representation of jvm information, Spark properties, system properties, and
   * class paths. Map keys define the category, and map values represent the corresponding
   * attributes as a sequence of KV pairs. This is used mainly for SparkListenerEnvironmentUpdate.
   */
  private[spark]
  def environmentDetails(
      conf: SparkConf,
      schedulingMode: String,
      addedJars: Seq[String],
      addedFiles: Seq[String]): Map[String, Seq[(String, String)]] = {

    import Properties._
    val jvmInformation = Seq(
      ("Java Version", s"$javaVersion ($javaVendor)"),
      ("Java Home", javaHome),
      ("Scala Version", versionString)
    ).sorted

    // Spark properties
    // This includes the scheduling mode whether or not it is configured (used by SparkUI)
    val schedulerMode =
      if (!conf.contains("spark.scheduler.mode")) {
        Seq(("spark.scheduler.mode", schedulingMode))
      } else {
        Seq.empty[(String, String)]
      }
    val sparkProperties = (conf.getAll ++ schedulerMode).sorted

    // System properties that are not java classpaths
    val systemProperties = Utils.getSystemProperties.toSeq
    val otherProperties = systemProperties.filter { case (k, _) =>
      k != "java.class.path" && !k.startsWith("spark.")
    }.sorted

    // Class paths including all added jars and files
    val classPathEntries = javaClassPath
      .split(File.pathSeparator)
      .filterNot(_.isEmpty)
      .map((_, "System Classpath"))
    val addedJarsAndFiles = (addedJars ++ addedFiles).map((_, "Added By User"))
    val classPaths = (addedJarsAndFiles ++ classPathEntries).sorted

    Map[String, Seq[(String, String)]](
      "JVM Information" -> jvmInformation,
      "Spark Properties" -> sparkProperties,
      "System Properties" -> otherProperties,
      "Classpath Entries" -> classPaths)
  }

原文地址:https://www.cnblogs.com/dtmobile-ksw/p/11556901.html

时间: 2024-10-07 07:42:34

通过 spark.files 传入spark任务依赖的文件源码分析的相关文章

Spark大师之路:广播变量(Broadcast)源码分析

概述 最近工作上忙死了--广播变量这一块其实早就看过了,一直没有贴出来. 本文基于Spark 1.0源码分析,主要探讨广播变量的初始化.创建.读取以及清除. 类关系 BroadcastManager类中包含一个BroadcastFactory对象的引用.大部分操作通过调用BroadcastFactory中的方法来实现. BroadcastFactory是一个Trait,有两个直接子类TorrentBroadcastFactory.HttpBroadcastFactory.这两个子类实现了对Htt

Spark GraphX图计算【代码实现,源码分析】

一.简介 参考:https://www.cnblogs.com/yszd/p/10186556.html 二.代码实现 1 package big.data.analyse.graphx 2 3 import org.apache.log4j.{Level, Logger} 4 import org.apache.spark.graphx._ 5 import org.apache.spark.rdd.RDD 6 import org.apache.spark.sql.SparkSession

Spark技术内幕:Stage划分及提交源码分析

当触发一个RDD的action后,以count为例,调用关系如下: org.apache.spark.rdd.RDD#count org.apache.spark.SparkContext#runJob org.apache.spark.scheduler.DAGScheduler#runJob org.apache.spark.scheduler.DAGScheduler#submitJob org.apache.spark.scheduler.DAGSchedulerEventProcess

spark DAGScheduler、TaskSchedule、Executor执行task源码分析

摘要 spark的调度一直是我想搞清楚的东西,以及有向无环图的生成过程.task的调度.rdd的延迟执行是怎么发生的和如何完成的,还要就是RDD的compute都是在executor的哪个阶段调用和执行我们定义的函数的.这些都非常的基础和困难.花一段时间终于弄白了其中的奥秘.总结起来,以便以后继续完善.spark的调度分为两级调度:DAGSchedule和TaskSchedule.DAGSchedule是根据job来生成相互依赖的stages,然后把stages以TaskSet形式传递给Task

【Spark Core】任务执行机制和Task源码浅析1

引言 上一小节<TaskScheduler源码与任务提交原理浅析2>介绍了Driver侧将Stage进行划分,根据Executor闲置情况分发任务,最终通过DriverActor向executorActor发送任务消息. 我们要了解Executor的执行机制首先要了解Executor在Driver侧的注册过程,这篇文章先了解一下Application和Executor的注册过程. 1. Task类及其相关 1.1 Task类 Spark将由Executor执行的Task分为ShuffleMap

Spark源码分析之七:Task运行(一)

在Task调度相关的两篇文章<Spark源码分析之五:Task调度(一)>与<Spark源码分析之六:Task调度(二)>中,我们大致了解了Task调度相关的主要逻辑,并且在Task调度逻辑的最后,CoarseGrainedSchedulerBackend的内部类DriverEndpoint中的makeOffers()方法的最后,我们通过调用TaskSchedulerImpl的resourceOffers()方法,得到了TaskDescription序列的序列Seq[Seq[Tas

《深入理解SPARK:核心思想与源码分析》——SparkContext的初始化(中)

<深入理解Spark:核心思想与源码分析>一书前言的内容请看链接<深入理解SPARK:核心思想与源码分析>一书正式出版上市 <深入理解Spark:核心思想与源码分析>一书第一章的内容请看链接<第1章 环境准备> <深入理解Spark:核心思想与源码分析>一书第二章的内容请看链接<第2章 SPARK设计理念与基本架构> 由于本书的第3章内容较多,所以打算分别开辟三篇随笔分别展现. <深入理解Spark:核心思想与源码分析>一

小记--------spark的worker原理分析及源码分析

Worker类源码位置: org.apache.spark.deploy.worker /** *启动driver的源码分析 */ case LaunchDriver(driverId, driverDesc) => logInfo(s"Asked to launch driver $driverId") //创建DriverRunner线程 val driver = new DriverRunner( conf, driverId, workDir, sparkHome, dr

spark core源码分析6 Spark job的提交

本节主要讲解SparkContext的逻辑 首先看一个spark自带的最简单的例子: object SparkPi { def main(args: Array[String]) { val conf = new SparkConf().setAppName("Spark Pi") val spark = new SparkContext(conf) val slices = if (args.length > 0) args(0).toInt else 2 val n = ma