Spark2.x(六十):在Structured Streaming流处理中是如何查找kafka的DataSourceProvider?

本章节根据源代码分析Spark Structured Streaming(Spark2.4)在进行DataSourceProvider查找的流程,首先,我们看下读取流数据源kafka的代码:

        SparkSession sparkSession = SparkSession.builder().getOrCreate();
        Dataset<Row> sourceDataset = sparkSession.readStream().format("kafka").option("xxx", "xxx").load();

sparkSession.readStream()返回的对象是DataSourceReader

DataSourceReader(https://github.com/apache/spark/blob/branch-2.4/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala),其中上边代码中的load()方法,正是DataSourceReader的方法。

format参数kafka在DataSourceReader中作为source属性:

@InterfaceStability.Evolving
final class DataStreamReader private[sql](sparkSession: SparkSession) extends Logging {
  /**
   * Specifies the input data source format.
   *
   * @since 2.0.0
   */
  def format(source: String): DataStreamReader = {
    this.source = source
    this
  }
。。。
}

DataSourceReader#format(source:String)中参数往往是csv/text/json/jdbc/kafka/console/socket等

DataSourceReader#load()方法

  /**
   * Loads input data stream in as a `DataFrame`, for data streams that don‘t require a path
   * (e.g. external key-value stores).
   *
   * @since 2.0.0
   */
  def load(): DataFrame = {
    if (source.toLowerCase(Locale.ROOT) == DDLUtils.HIVE_PROVIDER) {
      throw new AnalysisException("Hive data source can only be used with tables, you can not " +
        "read files of Hive data source directly.")
    }

    val ds = DataSource.lookupDataSource(source, sparkSession.sqlContext.conf).newInstance()
    // We need to generate the V1 data source so we can pass it to the V2 relation as a shim.
    // We can‘t be sure at this point whether we‘ll actually want to use V2, since we don‘t know the
    // writer or whether the query is continuous.
    val v1DataSource = DataSource(
      sparkSession,
      userSpecifiedSchema = userSpecifiedSchema,
      className = source,
      options = extraOptions.toMap)
    val v1Relation = ds match {
      case _: StreamSourceProvider => Some(StreamingRelation(v1DataSource))
      case _ => None
    }
    ds match {
      case s: MicroBatchReadSupport =>
        val sessionOptions = DataSourceV2Utils.extractSessionConfigs(
          ds = s, conf = sparkSession.sessionState.conf)
        val options = sessionOptions ++ extraOptions
        val dataSourceOptions = new DataSourceOptions(options.asJava)
        var tempReader: MicroBatchReader = null
        val schema = try {
          tempReader = s.createMicroBatchReader(
            Optional.ofNullable(userSpecifiedSchema.orNull),
            Utils.createTempDir(namePrefix = s"temporaryReader").getCanonicalPath,
            dataSourceOptions)
          tempReader.readSchema()
        } finally {
          // Stop tempReader to avoid side-effect thing
          if (tempReader != null) {
            tempReader.stop()
            tempReader = null
          }
        }
        Dataset.ofRows(
          sparkSession,
          StreamingRelationV2(
            s, source, options,
            schema.toAttributes, v1Relation)(sparkSession))
      case s: ContinuousReadSupport =>
        val sessionOptions = DataSourceV2Utils.extractSessionConfigs(
          ds = s, conf = sparkSession.sessionState.conf)
        val options = sessionOptions ++ extraOptions
        val dataSourceOptions = new DataSourceOptions(options.asJava)
        val tempReader = s.createContinuousReader(
          Optional.ofNullable(userSpecifiedSchema.orNull),
          Utils.createTempDir(namePrefix = s"temporaryReader").getCanonicalPath,
          dataSourceOptions)
        Dataset.ofRows(
          sparkSession,
          StreamingRelationV2(
            s, source, options,
            tempReader.readSchema().toAttributes, v1Relation)(sparkSession))
      case _ =>
        // Code path for data source v1.
        Dataset.ofRows(sparkSession, StreamingRelation(v1DataSource))
    }
  }

val ds=DataSoruce.lookupDataSource(source ,….).newInstance()用到了该source变量,要想知道ds是什么(Dataset还是其他),需要查看DataSource.lookupDataSource(source,。。。)方法实现。

DataSource.lookupDataSource(source, sparkSession.sqlContext.conf)解析

DataSource源代码文件:https://github.com/apache/spark/blob/branch-2.4/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala

其中lookupDataSource方法是DataSource类的object对象中定义的:

object DataSource extends Logging {

  。。。。。/**
   * Class that were removed in Spark 2.0. Used to detect incompatibility libraries for Spark 2.0.
   */
  private val spark2RemovedClasses = Set(
    "org.apache.spark.sql.DataFrame",
    "org.apache.spark.sql.sources.HadoopFsRelationProvider",
    "org.apache.spark.Logging")

  /** Given a provider name, look up the data source class definition. */
  def lookupDataSource(provider: String, conf: SQLConf): Class[_] = {
    val provider1 = backwardCompatibilityMap.getOrElse(provider, provider) match {
      case name if name.equalsIgnoreCase("orc") &&
          conf.getConf(SQLConf.ORC_IMPLEMENTATION) == "native" =>
        classOf[OrcFileFormat].getCanonicalName
      case name if name.equalsIgnoreCase("orc") &&
          conf.getConf(SQLConf.ORC_IMPLEMENTATION) == "hive" =>
        "org.apache.spark.sql.hive.orc.OrcFileFormat"
      case "com.databricks.spark.avro" if conf.replaceDatabricksSparkAvroEnabled =>
        "org.apache.spark.sql.avro.AvroFileFormat"
      case name => name
    }
    val provider2 = s"$provider1.DefaultSource"
    val loader = Utils.getContextOrSparkClassLoader
    val serviceLoader = ServiceLoader.load(classOf[DataSourceRegister], loader)

    try {
      serviceLoader.asScala.filter(_.shortName().equalsIgnoreCase(provider1)).toList match {
        // the provider format did not match any given registered aliases
        case Nil =>
          try {
            Try(loader.loadClass(provider1)).orElse(Try(loader.loadClass(provider2))) match {
              case Success(dataSource) =>
                // Found the data source using fully qualified path
                dataSource
              case Failure(error) =>
                if (provider1.startsWith("org.apache.spark.sql.hive.orc")) {
                  throw new AnalysisException(
                    "Hive built-in ORC data source must be used with Hive support enabled. " +
                    "Please use the native ORC data source by setting ‘spark.sql.orc.impl‘ to " +
                    "‘native‘")
                } else if (provider1.toLowerCase(Locale.ROOT) == "avro" ||
                  provider1 == "com.databricks.spark.avro" ||
                  provider1 == "org.apache.spark.sql.avro") {
                  throw new AnalysisException(
                    s"Failed to find data source: $provider1. Avro is built-in but external data " +
                    "source module since Spark 2.4. Please deploy the application as per " +
                    "the deployment section of \"Apache Avro Data Source Guide\".")
                } else if (provider1.toLowerCase(Locale.ROOT) == "kafka") {
                  throw new AnalysisException(
                    s"Failed to find data source: $provider1. Please deploy the application as " +
                    "per the deployment section of " +
                    "\"Structured Streaming + Kafka Integration Guide\".")
                } else {
                  throw new ClassNotFoundException(
                    s"Failed to find data source: $provider1. Please find packages at " +
                      "http://spark.apache.org/third-party-projects.html",
                    error)
                }
            }
          } catch {
            case e: NoClassDefFoundError => // This one won‘t be caught by Scala NonFatal
              // NoClassDefFoundError‘s class name uses "/" rather than "." for packages
              val className = e.getMessage.replaceAll("/", ".")
              if (spark2RemovedClasses.contains(className)) {
                throw new ClassNotFoundException(s"$className was removed in Spark 2.0. " +
                  "Please check if your library is compatible with Spark 2.0", e)
              } else {
                throw e
              }
          }
        case head :: Nil =>
          // there is exactly one registered alias
          head.getClass
        case sources =>
          // There are multiple registered aliases for the input. If there is single datasource
          // that has "org.apache.spark" package in the prefix, we use it considering it is an
          // internal datasource within Spark.
          val sourceNames = sources.map(_.getClass.getName)
          val internalSources = sources.filter(_.getClass.getName.startsWith("org.apache.spark"))
          if (internalSources.size == 1) {
            logWarning(s"Multiple sources found for $provider1 (${sourceNames.mkString(", ")}), " +
              s"defaulting to the internal datasource (${internalSources.head.getClass.getName}).")
            internalSources.head.getClass
          } else {
            throw new AnalysisException(s"Multiple sources found for $provider1 " +
              s"(${sourceNames.mkString(", ")}), please specify the fully qualified class name.")
          }
      }
    } catch {
      case e: ServiceConfigurationError if e.getCause.isInstanceOf[NoClassDefFoundError] =>
        // NoClassDefFoundError‘s class name uses "/" rather than "." for packages
        val className = e.getCause.getMessage.replaceAll("/", ".")
        if (spark2RemovedClasses.contains(className)) {
          throw new ClassNotFoundException(s"Detected an incompatible DataSourceRegister. " +
            "Please remove the incompatible library from classpath or upgrade it. " +
            s"Error: ${e.getMessage}", e)
        } else {
          throw e
        }
    }
  }
、、、
}

其业务流程:

1)优先从object DataSource预定义backwardCompatibilityMap中查找provider;

2)查找失败,返回原名字;

3)使用serviceLoader加载DataSourceRegister的子类集合;

4)过滤3)中集合中shortName与provider相等的provider;

5)返回providerClass。

其中的backwardCompatibilityMap也是DataSource的object对象中的定义的,相当于是一个预定义provider的集合。

object DataSource extends Logging {

  /** A map to maintain backward compatibility in case we move data sources around. */
  private val backwardCompatibilityMap: Map[String, String] = {
    val jdbc = classOf[JdbcRelationProvider].getCanonicalName
    val json = classOf[JsonFileFormat].getCanonicalName
    val parquet = classOf[ParquetFileFormat].getCanonicalName
    val csv = classOf[CSVFileFormat].getCanonicalName
    val libsvm = "org.apache.spark.ml.source.libsvm.LibSVMFileFormat"
    val orc = "org.apache.spark.sql.hive.orc.OrcFileFormat"
    val nativeOrc = classOf[OrcFileFormat].getCanonicalName
    val socket = classOf[TextSocketSourceProvider].getCanonicalName
    val rate = classOf[RateStreamProvider].getCanonicalName

    Map(
      "org.apache.spark.sql.jdbc" -> jdbc,
      "org.apache.spark.sql.jdbc.DefaultSource" -> jdbc,
      "org.apache.spark.sql.execution.datasources.jdbc.DefaultSource" -> jdbc,
      "org.apache.spark.sql.execution.datasources.jdbc" -> jdbc,
      "org.apache.spark.sql.json" -> json,
      "org.apache.spark.sql.json.DefaultSource" -> json,
      "org.apache.spark.sql.execution.datasources.json" -> json,
      "org.apache.spark.sql.execution.datasources.json.DefaultSource" -> json,
      "org.apache.spark.sql.parquet" -> parquet,
      "org.apache.spark.sql.parquet.DefaultSource" -> parquet,
      "org.apache.spark.sql.execution.datasources.parquet" -> parquet,
      "org.apache.spark.sql.execution.datasources.parquet.DefaultSource" -> parquet,
      "org.apache.spark.sql.hive.orc.DefaultSource" -> orc,
      "org.apache.spark.sql.hive.orc" -> orc,
      "org.apache.spark.sql.execution.datasources.orc.DefaultSource" -> nativeOrc,
      "org.apache.spark.sql.execution.datasources.orc" -> nativeOrc,
      "org.apache.spark.ml.source.libsvm.DefaultSource" -> libsvm,
      "org.apache.spark.ml.source.libsvm" -> libsvm,
      "com.databricks.spark.csv" -> csv,
      "org.apache.spark.sql.execution.streaming.TextSocketSourceProvider" -> socket,
      "org.apache.spark.sql.execution.streaming.RateSourceProvider" -> rate
    )
  }
。。。
}

shortName为kafka且实现了DataSourceRegister接口的类:

满足“shortName为kafka且实现了DataSourceRegister接口的类”就是:KafkaSourceProvider(https://github.com/apache/spark/blob/master/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala

/**
 * The provider class for all Kafka readers and writers. It is designed such that it throws
 * IllegalArgumentException when the Kafka Dataset is created, so that it can catch
 * missing options even before the query is started.
 */
private[kafka010] class KafkaSourceProvider extends DataSourceRegister
    with StreamSourceProvider
    with StreamSinkProvider
    with RelationProvider
    with CreatableRelationProvider
    with TableProvider
    with Logging {
  import KafkaSourceProvider._

  override def shortName(): String = "kafka"
。。。。
}

DataSourceRegister类定义

https://github.com/apache/spark/blob/branch-2.4/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala

/**
 * Data sources should implement this trait so that they can register an alias to their data source.
 * This allows users to give the data source alias as the format type over the fully qualified
 * class name.
 *
 * A new instance of this class will be instantiated each time a DDL call is made.
 *
 * @since 1.5.0
 */
@InterfaceStability.Stable
trait DataSourceRegister {

  /**
   * The string that represents the format that this data source provider uses. This is
   * overridden by children to provide a nice alias for the data source. For example:
   *
   * {{{
   *   override def shortName(): String = "parquet"
   * }}}
   *
   * @since 1.5.0
   */
  def shortName(): String
}

继承了DataSourceRegister的类有哪些?

继承了DataSourceRegister的类包含:

https://github.com/apache/spark/blob/branch-2.4/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala

https://github.com/apache/spark/blob/branch-2.4/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala

https://github.com/apache/spark/blob/branch-2.4/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala

https://github.com/apache/spark/blob/branch-2.4/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala

https://github.com/apache/spark/blob/branch-2.4/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormat.scala

https://github.com/apache/spark/blob/branch-2.4/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala

https://github.com/apache/spark/blob/branch-2.4/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProvider.scala

https://github.com/apache/spark/blob/branch-2.4/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala

https://github.com/apache/spark/blob/branch-2.4/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala

https://github.com/apache/spark/blob/branch-2.4/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala

https://github.com/apache/spark/blob/branch-2.4/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala

https://github.com/apache/spark/blob/branch-2.4/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala

https://github.com/apache/spark/blob/branch-2.4/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala

https://github.com/apache/spark/blob/branch-2.4/sql/core/src/test/scala/org/apache/spark/sql/sources/fakeExternalSources.scala

https://github.com/apache/spark/blob/branch-2.4/sql/core/src/test/scala/org/apache/spark/sql/sources/DDLSourceLoadSuite.scala

https://github.com/apache/spark/blob/branch-2.4/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileDataSourceV2.scala

https://github.com/apache/spark/blob/branch-2.4/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/noop/NoopDataSource.scala

https://github.com/apache/spark/blob/branch-2.4/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala

https://github.com/apache/spark/blob/branch-2.4/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala

https://github.com/apache/spark/blob/branch-2.4/mllib/src/main/scala/org/apache/spark/ml/source/image/ImageFileFormat.scala

https://github.com/apache/spark/blob/branch-2.4/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketSourceProvider.scala

原文地址:https://www.cnblogs.com/yy3b2007com/p/11421238.html

时间: 2024-10-13 14:47:29

Spark2.x(六十):在Structured Streaming流处理中是如何查找kafka的DataSourceProvider?的相关文章

泛型算法(十五)之有序序列中的边界查找算法

1.equal_range(forIterBegin, forIterEnd, targetVal):在已排序的序列中查找目标值的位置范围:返回范围的下界与上界.对于随机迭代器,用二分查找:否则线性查找.返回pair<ForwardIterator, ForwardIterator> std::vector<int> c = {0, 1, 2, 2, 2, 2, 4}; //查找序列中值为2的元素的位置范围 auto p = std::equal_range(c.begin(),

学习Spark2.0中的Structured Streaming(一)

转载自:http://lxw1234.com/archives/2016/10/772.htm Spark2.0新增了Structured Streaming,它是基于SparkSQL构建的可扩展和容错的流式数据处理引擎,使得实时流式数据计算可以和离线计算采用相同的处理方式(DataFrame&SQL).Structured Streaming顾名思义,它将数据源和计算结果都映射成一张"结构化"的表,在计算的时候以结构化的方式去操作数据流,大大方便和提高了数据开发的效率. Sp

Structured Streaming教程(1) —— 基本概念与使用

近年来,大数据的计算引擎越来越受到关注,spark作为最受欢迎的大数据计算框架,也在不断的学习和完善中.在Spark2.x中,新开放了一个基于DataFrame的无下限的流式处理组件--Structured Streaming,它也是本系列的主角,废话不多说,进入正题吧! 简单介绍 在有过1.6的streaming和2.x的streaming开发体验之后,再来使用Structured Streaming会有一种完全不同的体验,尤其是在代码设计上. 在过去使用streaming时,我们很容易的理解

Structured Streaming曲折发展史

Structured Streaming曲折发展史 1.1. Spark Streaming 在2.0之前,Spark Streaming作为核心API的扩展,针对实时数据流,提供了一套可扩展.高吞吐.可容错的流式计算模型.Spark Streaming会接收实时数据源的数据,并切分成很多小的batches,然后被Spark Engine执行,产出同样由很多小的batchs组成的结果流.本质上,这是一种micro-batch(微批处理)的方式处理,这种设计让Spark Streaming面对复杂

人生六十才开始 (洪昭光)

以<怎样活到100岁>的讲演而闻名遐迩的洪昭光教授,想必大家都熟悉,今天我把他新世纪的新作———<人生六十才开始>奉献给大家.也许有人早已拜读,那就让我们一起重温一遍.这是新时代健康的新理念:健康快乐一百岁,天天都有好心情.六十以前没有病,八十以前不衰老.轻轻松松一百岁,高高兴兴一辈子.全文如下: 人生健康的里程碑 大自然给予人类美好的生命120岁,其中0—60岁是第一个春天,61-120岁是第二个春天.第一春是播种耕耘,辛勤劳作的春天,很辛苦:第二春是收获硕果,享受人生的春天,很

Spark Structured Streaming框架(2)之数据输入源详解

Spark Structured Streaming目前的2.1.0版本只支持输入源:File.kafka和socket. 1. Socket Socket方式是最简单的数据输入源,如Quick example所示的程序,就是使用的这种方式.用户只需要指定"socket"形式并配置监听的IP和Port即可. val scoketDF = spark.readStream .format("socket") .option("host","

Apache Spark 2.2.0 中文文档 - Structured Streaming 编程指南 | ApacheCN

Structured Streaming 编程指南 概述 快速示例 Programming Model (编程模型) 基本概念 处理 Event-time 和延迟数据 容错语义 API 使用 Datasets 和 DataFrames 创建 streaming DataFrames 和 streaming Datasets Input Sources (输入源) streaming DataFrames/Datasets 的模式接口和分区 streaming DataFrames/Dataset

Structured Streaming教程(3) —— 与Kafka的集成

Structured Streaming最主要的生产环境应用场景就是配合kafka做实时处理,不过在Strucured Streaming中kafka的版本要求相对搞一些,只支持0.10及以上的版本.就在前一个月,我们才从0.9升级到0.10,终于可以尝试structured streaming的很多用法,很开心~ 引入 如果是maven工程,直接添加对应的kafka的jar包即可: <dependency> <groupId>org.apache.spark</groupI

Structured Streaming编程模型

编程模型 ●编程模型概述 一个流的数据源从逻辑上来说就是一个不断增长的动态表格,随着时间的推移,新数据被持续不断地添加到表格的末尾. 用户可以使用 Dataset/DataFrame 函数式API或者 SQL 来对这个动态数据源进行实时查询.每次查询在逻辑上就是对当前的表格内容执行一次 SQL 查询.什么时候执行查询则是由用户通过触发器(Trigger)来设定时间(毫秒级).用户既可以设定执行周期让查询尽可能快地执行,从而达到实时的效果也可以使用默认的触发. 一个流的输出有多种模式,既可以是基于