SparkContext、SparkConf和SparkSession的初始化

SparkContextSparkConf

  任何Spark程序都是SparkContext开始的,SparkContext的初始化需要一个SparkConf对象,SparkConf包含了Spark集群配置的各种参数。

初始化后,就可以使用SparkContext对象所包含的各种方法来创建和操作RDD和共享变量。

val conf = new SparkConf().setMaster("master").setAppName("appName")
val sc = new SparkContext(conf)
或者
val sc = new SparkContext("master","appName")

  Note:

  Once a SparkConf object is passed to Spark, it is cloned and can no longer be modified by the user.

  也就是说一旦设置完成SparkConf,就不可被使用者修改

  对于单元测试,您也可以调用SparkConf(false)来跳过加载外部设置,并获得相同的配置,无论系统属性如何。

  咱们再看看setMaster()和setAppName()源码:

  根据上面的解释,setMaster主要是连接主节点,如果参数是"local",则在本地用单线程运行spark,如果是 local[4],则在本地用4核运行,如果设置为spark://master:7077,就是作为单节点运行,而setAppName就是在web端显示应用名而已,它们说到底都调用了set()函数,让我们看看set()是何方神圣

  logDeprecation(key)是日志输出函数,防止输入参数名无效, 看看settings,是个HashMap结构,追溯一下:

  果然,是个ConcurrentHashMap对象,ConcurrentHashMap主要作用是解决多线程并发下数据段访问效率,该类相对于hashMap而言具有同步map中的数据,对于hashTable而言,该同步数据对于并发程序提高了极高的效率,所以在使用缓存机制的时候如果对map中的值具有高并发的情况的话,那么我们就需要使用ConcurrentHashMap,ConcurrentHashMap中主要实体类就是三个:ConcurrentHashMap(整个Hash表),Segment(桶),HashEntry(节点)

,CurrentHashMap的初始化一共有三个参数,一个initialCapacity,表示初始的容量,一个loadFactor,表示负载参数,最后一个是concurrentLevel,代表ConcurrentHashMap内部的Segment的数量,ConcurrentLevel一经指定,不可改变,这也是为什么SparkConf配置好了就无法更改的原因。

  ConcurrentHashMap应用了锁分段技术,HashTable容器在竞争激烈的并发环境下表现出效率低下的原因,是因为所有访问HashTable的线程都必须竞争同一把锁,那假如容器里有多把锁,每一把锁用于锁容器其中一部分数据,那么当多线程访问容器里不同数据段的数据时,线程间就不会存在锁竞争,从而可以有效的提高并发访问效率,这就是ConcurrentHashMap所使用的锁分段技术,首先将数据分成一段一段的存储,然后给每一段数据配一把锁,当一个线程占用锁访问其中一个段数据的时候,其他段的数据也能被其他线程访问。

  另外,如果ConcurrentHashMap的元素数量增加导致ConrruentHashMap需要扩容,ConcurrentHashMap是不会增加Segment的数量的,而只会增加Segment中链表数组的容量大小,这样的好处是扩容过程不需要对整个ConcurrentHashMap做rehash,而只需要对Segment里面的元素做一次rehash就可以了。

SparkSession: SparkSession实质上是SQLContext和HiveContext的组合(未来可能还会加上StreamingContext),所以在SQLContext和HiveContext上可用的API在SparkSession上同样是可以使用的。SparkSession内部封装了sparkContext,所以计算实际上是由sparkContext完成的。

val sparkSession = SparkSession.builder
                    .master("master")
                    .appName("appName")
                    .getOrCreate()
或者
SparkSession.builder.config(conf=SparkConf())

  上面代码类似于创建一个SparkContext,master设置为"xiaojukeji",然后创建了一个SQLContext封装它。如果你想创建hiveContext,可以使用下面的方法来创建SparkSession,以使得它支持Hive(HiveContext):

val sparkSession = SparkSession.builder
                    .master("master")
                    .appName("appName")
                    .enableHiveSupport()
                    .getOrCreate()
//sparkSession 从csv读取数据:

val dq = sparkSession.read.option("header", "true").csv("src/main/resources/scala.csv")

getOrCreate():有就拿过来,没有就创建,类似于单例模式:

s1 = SparkSession().builder.config("k1", "v1").getORCreat()
s2 = SparkSession().builder.config("k2", "v2").getORCreat()
return s1.conf.get("k1") == s2.conf.get("k2")

True
时间: 2025-01-05 03:27:50

SparkContext、SparkConf和SparkSession的初始化的相关文章

es-09-spark集成

es和spark的集成比较简单, 直接使用内部封装的一些方法即可 版本设置说明: https://www.elastic.co/guide/en/elasticsearch/hadoop/current/requirements.html maven依赖说明: https://www.elastic.co/guide/en/elasticsearch/hadoop/current/install.html 1, maven配置: <?xml version="1.0" encod

SparkConf加载与SparkContext创建(源码)

即日起开始spark源码阅读之旅,这个过程是相当痛苦的,也许有大量的看不懂,但是每天一个方法,一点点看,相信总归会有极大地提高的.那么下面开始: 创建sparkConf对象,那么究竟它干了什么了类,从代码层面,我们可以看到我们需要setMaster啊,setAppName啊,set blabla啊...等等~ val sparkConf = new SparkConf().setMaster("local").setAppName("TopActiveLocations&qu

spark快速大数据分析学习笔记*初始化sparkcontext(一)

初始化SparkContext 1// 在java中初始化spark 2 import org.apache.spark.SparkConf; 3 import org.apache.spark.api.java.JavaSparkContext; 4 5 SparkConf conf=new SparkConf().setMaster("local").setAppName("my app"); 6 //集群url:本例是运行在本地单机local:应用名,可以在集

Apache Spark-1.0.0代码浅析(二):Spark初始化

LocalWordCount中,需要首先创建SparkConf配置Master.AppName等环境参数,如果程序中没有设置,则会读取系统参数.然后,以SparkConf作为参数创建SparkContext,初始化Spark环境. val sparkConf = new SparkConf().setMaster("local").setAppName("Local Word Count") val sc = new SparkContext(sparkConf)

【Spark】SparkContext源码解读

SparkContext的初始化 SparkContext是应用启动时创建的Spark上下文对象,是进行Spark应用开发的主要接口,是Spark上层应用与底层实现的中转站(SparkContext负责给executors发送task). SparkContext在初始化过程中,主要涉及一下内容: SparkEnv DAGScheduler TaskScheduler SchedulerBackend SparkUI 生成SparkConf SparkContext的构造函数中最重要的入参是Sp

3 pyspark学习---sparkContext概述

1 Tutorial Spark本身是由scala语言编写,为了支持py对spark的支持呢就出现了pyspark.它依然可以通过导入Py4j进行RDDS等操作. 2 sparkContext (1)sparkContext是spark运用的入口点,当我们运行spark的时候,驱动启动同时上下文也开始初始化. (2)sparkContext使用py4j调用JVM然后创建javaSparkContext,默认为'sc',所以如果在shell下就直接用sc.方法就可以.如果你再创建上下文,将会报错c

spark教程(四)-SparkContext 和 RDD 算子

SparkContext SparkContext 是在 spark 库中定义的一个类,作为 spark 库的入口点: 它表示连接到 spark,在进行 spark 操作之前必须先创建一个 SparkContext 的实例,并且只能创建一个: 利用 SparkContext 实例创建的对象都是 RDD,这是相对于 SparkSession 说的,因为 它创建的对象都是 DataFrame: 创建 sc class SparkContext(__builtin__.object): def __i

Apache Spark 2.2.0 中文文档 - Spark Streaming 编程指南 | ApacheCN

Spark Streaming 编程指南 概述 一个入门示例 基础概念 依赖 初始化 StreamingContext Discretized Streams (DStreams)(离散化流) Input DStreams 和 Receivers(接收器) DStreams 上的 Transformations(转换) DStreams 上的输出操作 DataFrame 和 SQL 操作 MLlib 操作 缓存 / 持久性 Checkpointing Accumulators, Broadcas

HBase连接的几种方式(二)

1. HBase连接的方式概况 主要分为: 纯Java API连接HBase的方式: Spark连接HBase的方式: Flink连接HBase的方式: HBase通过Phoenix连接的方式: 第一种方式是HBase自身提供的比较原始的高效操作方式,而第二.第三则分别是Spark.Flink集成HBase的方式,最后一种是第三方插件Phoenix集成的JDBC方式,Phoenix集成的JDBC操作方式也能在Spark.Flink中调用. 注意: 这里我们使用HBase2.1.2版本,以下代码都