spark-sql的概述以及编程模型的介绍

1、spark sql的概述

(1)spark sql的介绍:

  Spark SQL 是 Spark 用来处理结构化数据(结构化数据可以来自外部结构化数据源也可以通 过 RDD 获取)的一个模块,它提供了一个编程抽象叫做 DataFrame 并且作为分布式 SQL 查 询引擎的作用。
  外部的结构化数据源包括 JSON、Parquet(默认)、RMDBS、Hive 等。当前 Spark SQL 使用 Catalyst 优化器来对 SQL 进行优化,从而得到更加高效的执行方案。并且可以将结果存储到外部系统。

(2)spark sql的特点:

   - 容易整合
   - 统一的数据访问方式
   - 兼容hive
   - 标准的数据连接

(3)关于spark sql的版本迭代:

   - spark sql 的前身是shark。但是spark sql抛弃了原有shark的代码,汲取了shark的一些优点,如:列存储(In-Memory Columnar Storage)、Hive 兼容性等,重新开发 SparkSQL。
   - spark -1.1 2014 年 9 月 11 日,发布 Spark1.1.0。Spark 从 1.0 开始引入 SparkSQL(Shark 不再支持升级与维护)。Spark1.1.0 变化较大是 SparkSQL 和 MLlib
   - spark -1.3 增加了dataframe新
   - spark -1.4 增加了窗口分析函数
   - spark - 1.5 钨丝计划。Hive 中有 UDF 与 UDAF,Spark 中对 UDF 支持较早
   - spark 1.6 执行的 sql 中可以增加"--"注释,Spark-1.5/1.6 的新特性,引入 DataSet 的概念
   - spark 2.x SparkSQL+DataFrame+DataSet(正式版本),Structured Streaming(DataSet),引入 SparkSession 统一了 RDD,DataFrame,DataSet 的编程入口

2、spark sql的编程模型

(1)sparkSession的介绍:

  SparkSession 是 Spark-2.0 引如的新概念。SparkSession 为用户提供了统一的切入点,来让用户学习 Spark 的各项功能。
  随着 DataSet 和 DataFrame 的 API 逐渐成为标准的 API,SparkSession 作为 DataSet 和 DataFrame API 的切入点,SparkSession 封装了 SparkConf、SparkContext 和 SQLContext。为了向后兼容,SQLContext 和 HiveContext 也被保存下来。
  特点:
   - 为用户提供一个统一的切入点使用 Spark 各项功能
   - 允许用户通过它调用 DataFrame 和 Dataset 相关 API 来编写程序
   - 减少了用户需要了解的一些概念,可以很容易的与 Spark 进行交互
   - 与 Spark 交互之时不需要显示的创建 SparkConf、SparkContext 以及 SQlContext,这些对 象已经封闭在 SparkSession 中
   - SparkSession 提供对 Hive 特征的内部支持:用 HiveQL 写 SQL 语句,访问 Hive UDFs,从 Hive 表中读取数据
   SparkSession的创建
  在spark-shell中SparkSession 会被自动初始化一个对象叫做 spark,为了向后兼容,Spark-Shell 还提供了一个 SparkContext 的初始化对象,方便用户操作:

  在代码开发的时候创建

val conf = new SparkConf()
val spark: SparkSession = SparkSession.builder()
  .appName("_01spark_sql")
  .config(conf)
  .getOrCreate()

(2)RDD:

这里主要说的是RDD的局限性:
  - RDD是不支持spark-sql的
   - RDD 仅表示数据集,RDD 没有元数据,也就是说没有字段语义定义
   - RDD 需要用户自己优化程序,对程序员要求较高
   - 从不同数据源读取数据相对困难,读取到不同格式的数据都必须用户自己定义转换方式 合并多个数据源中的数据也较困难

(3)DataFrame:

  DataFrame 被称为 SchemaRDD。以行为单位构成的分布式数据集合,按照列赋予不同的名称。对 select、fileter、aggregation 和 sort 等操作符的抽象。其中 Schema 是就是元数据,是语义描述信息。DataFrame是分布式的Row对象的集合.
  DataFrame = RDD+Schema = SchemaRDD
   优势
   - DataFrame 是一种特殊类型的 Dataset,DataSet[Row] = DataFrame
   - DataFrame 自带优化器 Catalyst,可以自动优化程序
   - DataFrame 提供了一整套的 Data Source API
   特点
   - 支持 单机 KB 级到集群 PB 级的数据处理
   - 支持多种数据格式和存储系统
   - 通过 Spark SQL Catalyst 优化器可以进行高效的代码生成和优化
   - 能够无缝集成所有的大数据处理工具
   - 提供 Python, Java, Scala, R 语言 API

(4)DataSet:

   由于 DataFrame 的数据类型统一是 Row,所以 DataFrame 也是有缺点的。Row 运行时类型检查,比如 salary 是字符串类型,下面语句也只有运行时才进行类型检查。 dataframe.filter("salary>1000").show()

   Dataset扩展了 DataFrame API,提供了编译时类型检查,面向对象风格的 API。
   Dataset 可以和 DataFrame、RDD 相互转换。DataFrame=Dataset[Row],可见 DataFrame 是一种特殊的 Dataset。

(5)DataSet和DataFrame的区别?

   这里小编要重点强调一下二者的区别,但是在学习spark-sql的时候就对二者的关系不太清楚,而且在面试的时候也问到了这个问题,真的是一番血泪史啊。
   通过查看多个前辈对二者的总结我大概的总结一下二者的区别:
   - Dataset可以认为是DataFrame的一个特例,主要区别是Dataset每一个record存储的是一个强类型值而不是一个Row
   - DataSet可以在编译时检查类型,而DataFrame只有在正真运行的时候才会检查
   - DataFrame每一行的类型都是Row,不解析我们就无法知晓其中有哪些字段,每个字段又是什么类型。我们只能通过getAs[类型]或者row(i)的方式来获取特定的字段内容(超级大弊端);而dataSet每一行的类型是不一定的,在自定义了case class之后就可以很自由的获取每一行的信息。

好了 废话说了一堆,不如直接上代码:

object SparkSqlTest {
    def main(args: Array[String]): Unit = {
        //屏蔽多余的日志
        Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN)
        Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
        Logger.getLogger("org.project-spark").setLevel(Level.WARN)
        val conf: SparkConf = new SparkConf()
        conf.setMaster("local[2]")
            .setAppName("SparkSqlTest")
            //设置spark的序列化器
            .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
            //将自定义的对象,加入序列化器中
            .registerKryoClasses(Array(classOf[Person]))
        //构建SparkSession对象
        val spark: SparkSession = SparkSession.builder()
            .config(conf).getOrCreate()
        //创建sparkContext对象
        val sc: SparkContext = spark.sparkContext

        val list = List(
            new Person("委xx", 18),
            new Person("吴xx", 20),
            new Person("戚xx", 30),
            new Person("王xx", 40),
            new Person("薛xx", 18)
        )
        //创建DataFrame
        //构建元数据
        val schema = StructType(List(
            StructField("name", DataTypes.StringType),
            StructField("age", DataTypes.IntegerType)
        ))
        //构建RDD
        val listRDD: RDD[Person] = sc.makeRDD(list)
        val RowRDD: RDD[Row] = listRDD.map(field => {
            Row(field.name, field.age)
        })
        val perDF: DataFrame = spark.createDataFrame(RowRDD,schema)

        //创建DataSet
        import spark.implicits._  //这句话一定要加
        val perDS: Dataset[Person] = perDF.as[Person]

        /**
          * 这里主要介绍DF 和 DS的区别
          */
        perDF.foreach(field=>{
            val name=field.get(0)  //根据元素的index,取出相应的元素的值
            val age=field.getInt(1)  //根据元素的index和元素的类型取出元素的值
            field.getAs[Int]("age")  //根据元素的类型和元素的名称取出元素的值
            println(s"${age},${name}")
        })
        perDS.foreach(field=>{
            //直接根据上面定义的元素的名称取值
            val age=field.age
            val name=field.name
            println(s"${age},${name}")
        })
    }
}
case class Person(name: String, age: Int)

个人感觉,就是DataFrame虽然集成和很多优点,但是,如果想从DataFrame中取出具体的某个对象的某个属性,是不能确定的,步骤比较繁琐,而且类型不确定。但是使用DataSet则有效额的避免了所有的问题。

原文地址:http://blog.51cto.com/14048416/2339105

时间: 2024-12-14 23:15:09

spark-sql的概述以及编程模型的介绍的相关文章

概述异步编程模型

回调函数 事件监听 观察者模式 Promise async.await 原文地址:https://www.cnblogs.com/wuqilang/p/11366992.html

Apache Spark 2.2.0 中文文档 - Spark SQL, DataFrames and Datasets Guide | ApacheCN

Spark SQL, DataFrames and Datasets Guide Overview SQL Datasets and DataFrames 开始入门 起始点: SparkSession 创建 DataFrames 无类型的Dataset操作 (aka DataFrame 操作) Running SQL Queries Programmatically 全局临时视图 创建Datasets RDD的互操作性 使用反射推断Schema 以编程的方式指定Schema Aggregatio

异步编程模型(APM)

一.概念 APM即异步编程模式的简写(Asynchronous Programming Model).大家在写代码的时候或者查看.NET 的类库的时候肯定会经常看到和使用以BeginXXX和EndXXX类似的方法,其实你在使用这些方法的时候,你就再使用异步编程模型来编写程序.NET Framework很多类也实现了该模式,同时我们也可以自定义类来实现该模式,(也就是在自定义的类中实现返回类型为IAsyncResult接口的BeginXXX方法和EndXXX方法),另外委托类型也定义了BeginI

DataFrame编程模型初谈与Spark SQL

Spark SQL在Spark内核基础上提供了对结构化数据的处理,在Spark1.3版本中,Spark SQL不仅可以作为分布式的SQL查询引擎,还引入了新的DataFrame编程模型. 在Spark1.3版本中,Spark SQL不再是Alpha版本,除了提供更好的SQL标准兼容之外,还引进了新的组件DataFrame.同时,Spark SQL数据源API也实现了与新组件DataFrame的交互,允许用户直接通过Hive表.Parquet文件以及一些其他数据源生成DataFrame.用户可以在

大数据技术之_16_Scala学习_11_客户信息管理系统+并发编程模型 Akka+Akka 网络编程-小黄鸡客服案例+Akka 网络编程-Spark Master Worker 进程通讯项目

第十五章 客户信息管理系统15.1 项目的开发流程15.2 项目的需求分析15.3 项目的界面15.4 项目的设计-程序框架图15.5 项目的功能实现15.5.1 完成 Customer 类15.5.2 完成显示主菜单和退出软件功能15.5.3 完成显示客户列表的功能15.5.4 完成添加客户的功能15.5.5 完成删除客户的功能15.5.6 完善退出确认功能15.5.7 完善删除确认功能15.5.8 完成修改客户的功能第十六章 并发编程模型 Akka16.1 Akka 的介绍16.2 Acto

Spark机器学习:Spark 编程模型及快速入门

http://blog.csdn.net/pipisorry/article/details/52366356 Spark编程模型 SparkContext类和SparkConf类 我们可通过如下方式调用 SparkContext 的简单构造函数,以默认的参数值来创建相应的对象.val sc = new SparkContext("local[4]", "Test Spark App") 这段代码会创建一个4线程的 SparkContext 对象,并将其相应的任务命

Spark入门实战系列--3.Spark编程模型(上)--概念及SparkShell实战

[注]该系列文章以及使用到安装包/测试数据 可以在<倾情大奉送–Spark入门实战系列>获取 1 Spark编程模型 1.1 术语定义 应用程序(Application): 基于Spark的用户程序,包含了一个Driver Program 和集群中多个的Executor: 驱动程序(Driver Program):运行Application的main()函数并且创建SparkContext,通常用SparkContext代表Driver Program: 执行单元(Executor): 是为某

Spark入门实战系列--3.Spark编程模型(下)--IDEA搭建及实战

[注]该系列文章以及使用到安装包/测试数据 可以在<倾情大奉送–Spark入门实战系列>获取 1 安装IntelliJ IDEA IDEA 全称 IntelliJ IDEA,是java语言开发的集成环境,IntelliJ在业界被公认为最好的java开发工具之一,尤其在智能代码助手.代码自动提示.重构.J2EE支持.Ant.JUnit.CVS整合.代码审查. 创新的GUI设计等方面的功能可以说是超常的.IDEA是JetBrains公司的产品,这家公司总部位于捷克共和国的首都布拉格,开发人员以严谨

Spark SQL 编程初级实践

1.Spark SQL 基本操作 将下列 JSON 格式数据复制到 Linux 系统中,并保存命名为 employee.json. { "id":1 , "name":" Ella" , "age":36 } { "id":2, "name":"Bob","age":29 } { "id":3 , "name"