关于hive on spark会话的共享状态

spark sql中有一个类:

org.apache.spark.sql.internal.SharedState

它是用来做:

1、元数据地址管理(warehousePath)
2、查询结果缓存管理(cacheManager)
3、程序中的执行状态和metrics的监控(statusStore)
4、默认元数据库的目录管理(externalCatalog)
5、全局视图管理(主要是防止元数据库中存在重复)(globalTempViewManager)

1:首先介绍元数据地址管理(warehousePath)

这块儿主要是获取spark sql元数据库的路径地址,那么一般情况,我们都是默认把hive默认作为spark sql的元数据库,因为

它首先去加载hive的配置文件"hive-site.xml" , 然后根据hive-site.xml中获取的信息来获取到hive元数据库的路径:

hive.metastore.warehouse.dir

那么有时候,我们不使用hive作为spark sql的元数据库,那么这个时候我们加载的hive元数据路径应该是null

val hiveWarehouseDir = sparkContext.hadoopConfiguration.get("hive.metastore.warehouse.dir")

如果hiveWarehouseDir是null,那么就去加载spark sql的自带的元数据管理地址(spark.sql.warehouse.dir),然后把这个地址的值赋予给hive.metastore.warehouse.dir

因此大概流程就是获取hiveWarehouseDir:

具体代码:

val warehousePath: String = {
    val configFile = Utils.getContextOrSparkClassLoader.getResource("hive-site.xml")
    if (configFile != null) {
      logInfo(s"loading hive config file: $configFile")
      sparkContext.hadoopConfiguration.addResource(configFile)
    }

    // hive.metastore.warehouse.dir only stay in hadoopConf
    sparkContext.conf.remove("hive.metastore.warehouse.dir")
    // Set the Hive metastore warehouse path to the one we use
    val hiveWarehouseDir = sparkContext.hadoopConfiguration.get("hive.metastore.warehouse.dir")
    if (hiveWarehouseDir != null && !sparkContext.conf.contains(WAREHOUSE_PATH.key)) {
      // If hive.metastore.warehouse.dir is set and spark.sql.warehouse.dir is not set,
      // we will respect the value of hive.metastore.warehouse.dir.
      sparkContext.conf.set(WAREHOUSE_PATH.key, hiveWarehouseDir)
      logInfo(s"${WAREHOUSE_PATH.key} is not set, but hive.metastore.warehouse.dir " +
        s"is set. Setting ${WAREHOUSE_PATH.key} to the value of " +
        s"hive.metastore.warehouse.dir (‘$hiveWarehouseDir‘).")
      hiveWarehouseDir
    } else {
      // If spark.sql.warehouse.dir is set, we will override hive.metastore.warehouse.dir using
      // the value of spark.sql.warehouse.dir.
      // When neither spark.sql.warehouse.dir nor hive.metastore.warehouse.dir is set,
      // we will set hive.metastore.warehouse.dir to the default value of spark.sql.warehouse.dir.
      val sparkWarehouseDir = sparkContext.conf.get(WAREHOUSE_PATH)
      logInfo(s"Setting hive.metastore.warehouse.dir (‘$hiveWarehouseDir‘) to the value of " +
        s"${WAREHOUSE_PATH.key} (‘$sparkWarehouseDir‘).")
      sparkContext.hadoopConfiguration.set("hive.metastore.warehouse.dir", sparkWarehouseDir)
      sparkWarehouseDir
    }
  }
  logInfo(s"Warehouse path is ‘$warehousePath‘.")

warehousePath

2:CacheManager

将查询结果缓存起来 ; 这样的好处就是,如果后面还需要本次查询出来的内容,就不需要在查询一遍数据源了(这块儿有时间单独写篇文章记录)

具体代码:

  /**
   * Class for caching query results reused in future executions.
   */
  val cacheManager: CacheManager = new CacheManager

cacheManager

3:statusStore

代码:

  /**
   * A status store to query SQL status/metrics of this Spark application, based on SQL-specific
   * [[org.apache.spark.scheduler.SparkListenerEvent]]s.
   */
  val statusStore: SQLAppStatusStore = {
    val kvStore = sparkContext.statusStore.store.asInstanceOf[ElementTrackingStore]
    val listener = new SQLAppStatusListener(sparkContext.conf, kvStore, live = true)
    sparkContext.listenerBus.addToStatusQueue(listener)
    val statusStore = new SQLAppStatusStore(kvStore, Some(listener))
    sparkContext.ui.foreach(new SQLTab(statusStore, _))
    statusStore
  }

statusStore

这段代码其实说白了就是将sql的状态和一些metrics指标写入到监听器中。

那么问题来了,监听器一定是实时的去监听的(读取的),然后spark sql还要不断的往监听器中写入,那么按照传统的list,map这种结构,在读取数据的时候还要在修改结构,会出现错误的;

因此spark sql采用了写时复制容器:

private[this] val listenersPlusTimers = new CopyOnWriteArrayList[(L, Option[Timer])]

将信息不断的写入同时,还不影响读取;

4、externalCatalog

获取spark 会话的内部目录(就是hiveWarehouseDir),如果不存在的话,就按照hiveWarehouseDir创建一个 , 当然,spark会通过回调函数的方式去监控当前目录中的事件:

externalCatalog.addListener(new ExternalCatalogEventListener {
      override def onEvent(event: ExternalCatalogEvent): Unit = {
        sparkContext.listenerBus.post(event)
      }
    })

此处代码:

/**
   * A catalog that interacts with external systems.
   */
  lazy val externalCatalog: ExternalCatalog = {
    val externalCatalog = SharedState.reflect[ExternalCatalog, SparkConf, Configuration](
      SharedState.externalCatalogClassName(sparkContext.conf),
      sparkContext.conf,
      sparkContext.hadoopConfiguration)

    val defaultDbDefinition = CatalogDatabase(
      SessionCatalog.DEFAULT_DATABASE,
      "default database",
      CatalogUtils.stringToURI(warehousePath),
      Map())
    // Create default database if it doesn‘t exist
    if (!externalCatalog.databaseExists(SessionCatalog.DEFAULT_DATABASE)) {
      // There may be another Spark application creating default database at the same time, here we
      // set `ignoreIfExists = true` to avoid `DatabaseAlreadyExists` exception.
      externalCatalog.createDatabase(defaultDbDefinition, ignoreIfExists = true)
    }

    // Make sure we propagate external catalog events to the spark listener bus
    externalCatalog.addListener(new ExternalCatalogEventListener {
      override def onEvent(event: ExternalCatalogEvent): Unit = {
        sparkContext.listenerBus.post(event)
      }
    })

    externalCatalog
  }

externalCatalog

5、

此处就是防止spark执行过程中的临时数据库出现在externalCatalog中,因为如果spark的GLOBAL_TEMP_DATABASE出现在externalCatalog中的话。那么随着程序的执行,下一个线程想要获取元数据库地址的时候,就没法在里面创建hiveWarehouseDir。因此,如果在externalCatalog中存在GLOBAL_TEMP_DATABASE,那么就抛异常

  /**
   * A manager for global temporary views.
   */
  lazy val globalTempViewManager: GlobalTempViewManager = {
    // System preserved database should not exists in metastore. However it‘s hard to guarantee it
    // for every session, because case-sensitivity differs. Here we always lowercase it to make our
    // life easier.
    val globalTempDB = sparkContext.conf.get(GLOBAL_TEMP_DATABASE).toLowerCase(Locale.ROOT)
    if (externalCatalog.databaseExists(globalTempDB)) {
      throw new SparkException(
        s"$globalTempDB is a system preserved database, please rename your existing database " +
          "to resolve the name conflict, or set a different value for " +
          s"${GLOBAL_TEMP_DATABASE.key}, and launch your Spark application again.")
    }
    new GlobalTempViewManager(globalTempDB)
  }

globalTempViewManager

原文地址:https://www.cnblogs.com/niutao/p/10915322.html

时间: 2024-08-29 06:53:55

关于hive on spark会话的共享状态的相关文章

Apache Spark源码走读之12 -- Hive on Spark运行环境搭建

欢迎转载,转载请注明出处,徽沪一郎. 楔子 Hive是基于Hadoop的开源数据仓库工具,提供了类似于SQL的HiveQL语言,使得上层的数据分析人员不用知道太多MapReduce的知识就能对存储于Hdfs中的海量数据进行分析.由于这一特性而收到广泛的欢迎. Hive的整体框架中有一个重要的模块是执行模块,这一部分是用Hadoop中MapReduce计算框架来实现,因而在处理速度上不是非常令人满意.由于Spark出色的处理速度,有人已经成功将HiveQL的执行利用Spark来运行,这就是已经非常

nodejs学习笔记_nodejs和PHP在基础架构上的差别--共享状态的并发

绝大多数对于Node.js的讨论都把关注点放在了处理高并发能力上,做开发的时候一定要明确node内部做出的权衡,以及node应用性能好的原因. node 为javascript引入了一个复杂的概念,:共享状态的并发. node採用一个长期执行的进程 而php在apache中会产生多个进程 例如以下图所看到的: 代码验证: PHP: <?php $i = 0; $i++; echo $i nodejs: var http = require('http'); var i=0; http.creat

Hive on Spark

Hive On Spark 一.概述 Hive 是一种数据仓库,即是一种sql翻译器,hive可以将sql翻译成mapreduce程序在hadoop中去执行,默认支持原生的Mapreduce引擎.从hive1.1版本以后开始支持Spark.可以将sql翻译成RDD在spark里面执行.Hive支持的spark是那种spark-without-hive,即没有编译支持hive包的spark. 二.安装版本及软件 需要安装:scala-2.12.jdk1.8.hive-2.1.1.spark-1.6

hive on Spark部署

一.环境 1.zk集群 10.10.103.144:2181,10.10.103.246:2181,10.10.103.62:2181 2.metastore数据库 10.10.103.246:3306 二.安装 1.安装配置数据库 yum -y install mysql55-server mysql55 GRANT ALL PRIVILEGES ON metastore.* TO 'hive'@'localhost' IDENTIFIED BY 'hive'; GRANT ALL PRIVI

无法更新运行时文件夹共享状态:在客户机操作系统内装载共享文件夹文件系统时出错--解决办法

1.问题描述: 在物理主机和虚拟机CentOS6.4共享文件的时候出现:无法更新运行时文件夹共享状态:在客户机操作系统内装载共享文件夹文件系统时出错 其他症状: vmware-hgfsclient  能够看到共享的文件夹名字 mount.vmhgfs  .host:/ /mnt  报错: Error: cannot mount filesystem: No such device 2.解决思路 /etc/vmware-tools/services.sh restart 如果出现FAILD yum

nodejs学习笔记_nodejs和PHP在基础架构上的区别--共享状态的并发

绝大多数对于Node.js的讨论都把关注点放在了处理高并发能力上,做开发的时候一定要明白node内部做出的权衡,以及node应用性能好的原因. node 为javascript引入了一个复杂的概念,:共享状态的并发. node采用一个长期运行的进程 而php在apache中会产生多个进程 如下图所示: 代码验证: PHP: <?php $i = 0; $i++; echo $i nodejs: var http = require('http'); var i=0; http.createSer

关于hive和spark日志问题

在用控制台学习hive和spark的时候,总是打印出来的各种日志烦得不行(对我而言).所以就想把着写我不关心的信息屏蔽掉,只保留错误信息.其实输出的日志信息还是很有用的,因为里面的日志信息可以清楚的描述任务执行的过程和细节.对hive和spark基本运行原理有了了解之后,我觉得完全可以屏蔽掉这些信息. 于是寻找教程,搞了一把.主要就是修改log4j.properties这个文件 没处理之前是这个样子的: 处理之后是这个样子的: 具体的怎么设置,看下面的博文吧,很详细. http://blog.c

hive on spark 编译

前置条件说明 Hive on Spark是Hive跑在Spark上,用的是Spark执行引擎,而不是MapReduce,和Hive on Tez的道理一样. 从Hive 1.1版本开始,Hive on Spark已经成为Hive代码的一部分了,并且在spark分支上面,可以看这里https://github.com/apache/hive/tree/spark,并会定期的移到master分支上面去. 关于Hive on Spark的讨论和进度,可以看这里https://issues.apache

并行编程中的取消任务、共享状态,等等

在面对相互独立的数据或者相互独立的任务时,也许正是Parallel登场的时候. 比如说有一个盒子的集合,分别让盒子旋转一定的角度. void RotateBox(IEnumerable<Box> boxes, float degree) { Parallel.ForEach(boxes, box => box.Rotate(degree)); } 如果并行任务中的一个任务出现异常,需要结束出问题的任务呢? Parallel.ForEach为我们提供了一个重载方法,可以控制任务是否继续.