Spark学习之路 (十八)SparkSQL简单使用

讨论QQ:1586558083

目录

正文

回到顶部

一、SparkSQL的进化之路

1.0以前:

Shark

1.1.x开始:

SparkSQL(只是测试性的)  SQL

1.3.x:

SparkSQL(正式版本)+Dataframe

1.5.x:

SparkSQL 钨丝计划

1.6.x:

SparkSQL+DataFrame+DataSet(测试版本)

  1. x:

SparkSQL+DataFrame+DataSet(正式版本)

SparkSQL:还有其他的优化

StructuredStreaming(DataSet)

回到顶部

二、认识SparkSQL

2.1 什么是SparkSQL?

spark SQL是spark的一个模块,主要用于进行结构化数据的处理。它提供的最核心的编程抽象就是DataFrame。

2.2 SparkSQL的作用

提供一个编程抽象(DataFrame) 并且作为分布式 SQL 查询引擎

DataFrame:它可以根据很多源进行构建,包括:结构化的数据文件,hive中的表,外部的关系型数据库,以及RDD

2.3 运行原理

将 Spark SQL 转化为 RDD, 然后提交到集群执行

2.4 特点

(1)容易整合

(2)统一的数据访问方式

(3)兼容 Hive

(4)标准的数据连接

2.5 SparkSession

SparkSession是Spark 2.0引如的新概念。SparkSession为用户提供了统一的切入点,来让用户学习spark的各项功能。    在spark的早期版本中,SparkContext是spark的主要切入点,由于RDD是主要的API,我们通过sparkcontext来创建和操作RDD。对于每个其他的API,我们需要使用不同的context。例如,对于Streming,我们需要使用StreamingContext;对于sql,使用sqlContext;对于Hive,使用hiveContext。但是随着DataSet和DataFrame的API逐渐成为标准的API,就需要为他们建立接入点。所以在spark2.0中,引入SparkSession作为DataSet和DataFrame API的切入点,SparkSession封装了SparkConf、SparkContext和SQLContext。为了向后兼容,SQLContext和HiveContext也被保存下来。        SparkSession实质上是SQLContext和HiveContext的组合(未来可能还会加上StreamingContext),所以在SQLContext和HiveContext上可用的API在SparkSession上同样是可以使用的。SparkSession内部封装了sparkContext,所以计算实际上是由sparkContext完成的。

特点:

   ---- 为用户提供一个统一的切入点使用Spark 各项功能

        ---- 允许用户通过它调用 DataFrame 和 Dataset 相关 API 来编写程序

        ---- 减少了用户需要了解的一些概念,可以很容易的与 Spark 进行交互

        ---- 与 Spark 交互之时不需要显示的创建 SparkConf, SparkContext 以及 SQlContext,这些对象已经封闭在 SparkSession 中

2.7 DataFrames   

在Spark中,DataFrame是一种以RDD为基础的分布式数据集,类似于传统数据库中的二维表格。DataFrame与RDD的主要区别在于,前者带有schema元信息,即DataFrame所表示的二维表数据集的每一列都带有名称和类型。这使得Spark SQL得以洞察更多的结构信息,从而对藏于DataFrame背后的数据源以及作用于DataFrame之上的变换进行了针对性的优化,最终达到大幅提升运行时效率的目标。反观RDD,由于无从得知所存数据元素的具体内部结构,Spark Core只能在stage层面进行简单、通用的流水线优化。

回到顶部

三、RDD转换成为DataFrame

使用spark1.x版本的方式

测试数据目录:/home/hadoop/apps/spark/examples/src/main/resources(spark的安装目录里面)

people.txt

3.1 方式一:通过 case class 创建 DataFrames(反射)

//定义case class,相当于表结构
case class People(var name:String,var age:Int)
object TestDataFrame1 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("RDDToDataFrame").setMaster("local")
    val sc = new SparkContext(conf)
    val context = new SQLContext(sc)
    // 将本地的数据读入 RDD, 并将 RDD 与 case class 关联
    val peopleRDD = sc.textFile("E:\\666\\people.txt")
      .map(line => People(line.split(",")(0), line.split(",")(1).trim.toInt))
    import context.implicits._
    // 将RDD 转换成 DataFrames
    val df = peopleRDD.toDF
    //将DataFrames创建成一个临时的视图
    df.createOrReplaceTempView("people")
    //使用SQL语句进行查询
    context.sql("select * from people").show()
  }
}

运行结果

3.2 方式二:通过 structType 创建 DataFrames(编程接口)

object TestDataFrame2 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("TestDataFrame2").setMaster("local")
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)
    val fileRDD = sc.textFile("E:\\666\\people.txt")
    // 将 RDD 数据映射成 Row,需要 import org.apache.spark.sql.Row
    val rowRDD: RDD[Row] = fileRDD.map(line => {
      val fields = line.split(",")
      Row(fields(0), fields(1).trim.toInt)
    })
    // 创建 StructType 来定义结构
    val structType: StructType = StructType(
      //字段名,字段类型,是否可以为空
      StructField("name", StringType, true) ::
        StructField("age", IntegerType, true) :: Nil
    )
    /**
      * rows: java.util.List[Row],
      * schema: StructType
      * */
    val df: DataFrame = sqlContext.createDataFrame(rowRDD,structType)
    df.createOrReplaceTempView("people")
    sqlContext.sql("select * from people").show()
  }
}

运行结果

3.3 方式三:通过 json 文件创建 DataFrames

object TestDataFrame3 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("TestDataFrame2").setMaster("local")
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)
    val df: DataFrame = sqlContext.read.json("E:\\666\\people.json")
    df.createOrReplaceTempView("people")
    sqlContext.sql("select * from people").show()
  }
}

回到顶部

四、DataFrame的read和save和savemode

4.1 数据的读取

object TestRead {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("TestDataFrame2").setMaster("local")
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)
    //方式一
    val df1 = sqlContext.read.json("E:\\666\\people.json")
    val df2 = sqlContext.read.parquet("E:\\666\\users.parquet")
    //方式二
    val df3 = sqlContext.read.format("json").load("E:\\666\\people.json")
    val df4 = sqlContext.read.format("parquet").load("E:\\666\\users.parquet")
    //方式三,默认是parquet格式
    val df5 = sqlContext.load("E:\\666\\users.parquet")
  }
}

4.2 数据的保存

object TestSave {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("TestDataFrame2").setMaster("local")
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)
    val df1 = sqlContext.read.json("E:\\666\\people.json")
    //方式一
    df1.write.json("E:\\111")
    df1.write.parquet("E:\\222")
    //方式二
    df1.write.format("json").save("E:\\333")
    df1.write.format("parquet").save("E:\\444")
    //方式三
    df1.write.save("E:\\555")

  }
}

4.3 数据的保存模式

使用mode

df1.write.format("parquet").mode(SaveMode.Ignore).save("E:\\444")

回到顶部

五、数据源

5.1 数据源只json

参考4.1

5.2 数据源之parquet

参考4.1

5.3 数据源之Mysql

object TestMysql {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("TestMysql").setMaster("local")
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)

    val url = "jdbc:mysql://192.168.123.102:3306/hivedb"
    val table = "dbs"
    val properties = new Properties()
    properties.setProperty("user","root")
    properties.setProperty("password","root")
    //需要传入Mysql的URL、表明、properties(连接数据库的用户名密码)
    val df = sqlContext.read.jdbc(url,table,properties)
    df.createOrReplaceTempView("dbs")
    sqlContext.sql("select * from dbs").show()

  }
}

运行结果

5.4 数据源之Hive

(1)准备工作

在pom.xml文件中添加依赖

<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-hive -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_2.11</artifactId>
            <version>2.3.0</version>
        </dependency>

开发环境则把resource文件夹下添加hive-site.xml文件,集群环境把hive的配置文件要发到$SPARK_HOME/conf目录下

<configuration>
        <property>
                <name>javax.jdo.option.ConnectionURL</name>
                <value>jdbc:mysql://localhost:3306/hivedb?createDatabaseIfNotExist=true</value>
                <description>JDBC connect string for a JDBC metastore</description>
                <!-- 如果 mysql 和 hive 在同一个服务器节点,那么请更改 hadoop02 为 localhost -->
        </property>
        <property>
                <name>javax.jdo.option.ConnectionDriverName</name>
                <value>com.mysql.jdbc.Driver</value>
                <description>Driver class name for a JDBC metastore</description>
        </property>
        <property>
                <name>javax.jdo.option.ConnectionUserName</name>
                <value>root</value>
                <description>username to use against metastore database</description>
        </property>
        <property>
                <name>javax.jdo.option.ConnectionPassword</name>
                <value>root</value>
        <description>password to use against metastore database</description>
        </property>
    <property>
                <name>hive.metastore.warehouse.dir</name>
                <value>/hive/warehouse</value>
                <description>hive default warehouse, if nessecory, change it</description>
        </property>
</configuration>

(2)测试代码

object TestHive {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local").setAppName(this.getClass.getSimpleName)
    val sc = new SparkContext(conf)
    val sqlContext = new HiveContext(sc)
    sqlContext.sql("select * from myhive.student").show()
  }
}

运行结果

原文地址:https://www.cnblogs.com/liuys635/p/11002788.html

时间: 2024-10-16 06:37:41

Spark学习之路 (十八)SparkSQL简单使用的相关文章

Spark学习之路 (八)SparkCore的调优之开发调优

讨论QQ:1586558083 目录 调优概述 原则一:避免创建重复的RDD 一个简单的例子 原则二:尽可能复用同一个RDD 一个简单的例子 原则三:对多次使用的RDD进行持久化 对多次使用的RDD进行持久化的代码示例 Spark的持久化级别 如何选择一种最合适的持久化策略 原则四:尽量避免使用shuffle类算子 Broadcast与map进行join代码示例 原则五:使用map-side预聚合的shuffle操作 原则六:使用高性能的算子 使用reduceByKey/aggregateByK

Spark学习之路 (八)SparkCore的调优之开发调优[转]

前言 在大数据计算领域,Spark已经成为了越来越流行.越来越受欢迎的计算平台之一.Spark的功能涵盖了大数据领域的离线批处理.SQL类处理.流式/实时计算.机器学习.图计算等各种不同类型的计算操作,应用范围与前景非常广泛.在美团?大众点评,已经有很多同学在各种项目中尝试使用Spark.大多数同学(包括笔者在内),最初开始尝试使用Spark的原因很简单,主要就是为了让大数据计算作业的执行速度更快.性能更高. 然而,通过Spark开发出高性能的大数据计算作业,并不是那么简单的.如果没有对Spar

【Unity 3D】学习笔记二十八:unity工具类

unity为开发者提供了很多方便开发的工具,他们都是由系统封装的一些功能和方法.比如说:实现时间的time类,获取随机数的Random.Range( )方法等等. 时间类 time类,主要用来获取当前的系统时间. using UnityEngine; using System.Collections; public class Script_04_13 : MonoBehaviour { void OnGUI() { GUILayout.Label("当前游戏时间:" + Time.t

Android学习路线(十八)支持不同设备——支持不同的屏幕

Android系统使用两个普通属性:尺寸和密度,来对设备屏幕进行分类.你需要先预测你的应用将会在什么样屏幕的设备上安装,包括屏幕尺寸和密度.这样的话,你就需要提供一些可选的资源类让你的应用在不同屏幕的设备上有最佳的展示. 有四种普遍的尺寸:small, normal, large, xlarge 还有四种普遍的密度:low (ldpi), medium (mdpi), high (hdpi), extra high (xhdpi) 要为不同的屏幕声明不同的布局和图片,你需要让这些可选的资源放在不

《Javascript权威指南》学习笔记之十八:BOM新成就(1)--客户端存储数据(Web SQL DataBase实现)

使用本地存储和会话存储可以实现简单的对象持久化,可以对简单的键值对或对象进行存储.但是,对于比较复杂的关系数据进行处理时,就要用Web SQL Database.浏览器对Web SQL Database的支持情况如图: 一.如何使用Web SQL Database <!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-t

JavaWeb学习总结(四十八)——模拟Servlet3.0使用注解的方式配置Servlet

JavaWeb学习总结(四十八)——模拟Servlet3.0使用注解的方式配置Servlet 一.Servlet的传统配置方式 在JavaWeb开发中, 每次编写一个Servlet都需要在web.xml文件中进行配置,如下所示: 1 <servlet> 2 <servlet-name>ActionServlet</servlet-name> 3 <servlet-class>me.gacl.web.controller.ActionServlet</s

马哥学习笔记二十八——nginx反向代理,负载均衡,缓存,URL重写及读写分离

Nginx反向代理 Nginx通过proxy模块实现反向代理功能.在作为web反向代理服务器时,nginx负责接收客户请求,并能够根据URI.客户端参数或其它的处理逻辑将用户请求调度至上游服务器上(upstream server).nginx在实现反向代理功能时的最重要指令为proxy_pass,它能够将location定义的某URI代理至指定的上游服务器(组)上.如下面的示例中,location的/uri将被替换为上游服务器上的/newuri. location /uri { proxy_pa

javaweb学习总结(二十八)——JSTL标签库之核心标签【转】

原文地址:javaweb学习总结(二十八)——JSTL标签库之核心标签 一.JSTL标签库介绍 JSTL标签库的使用是为弥补html标签的不足,规范自定义标签的使用而诞生的.使用JSLT标签的目的就是不希望在jsp页面中出现java逻辑代码 二.JSTL标签库的分类 核心标签(用得最多) 国际化标签(I18N格式化标签) 数据库标签(SQL标签,很少使用) XML标签(几乎不用) JSTL函数(EL函数) 三.核心标签库使用说明 JSTL的核心标签库标签共13个,使用这些标签能够完成JSP页面的

angular学习笔记(二十八)-$http(6)-使用ngResource模块构建RESTful架构

ngResource模块是angular专门为RESTful架构而设计的一个模块,它提供了'$resource'模块,$resource模块是基于$http的一个封装.下面来看看它的详细用法 1.引入angular-resource.min.js文件 2.在模块中依赖ngResourece,在服务中注入$resource var HttpREST = angular.module('HttpREST',['ngResource']); HttpREST.factory('cardResource

Android学习笔记(十八)——使用意图筛选器和实现浏览网页(附源码)

使用意图筛选器 点击下载源码 1.创建一个Intents项目,给该项目添加一个新类,命名为MyBrowserActivity,在res/layout文件夹下新增一个browser.xml: 2.在AndroidManifest.xml文件中添加如下代码: 添加权限: <uses-permission android:name="android.permission.CALL_PHONE" /> <uses-permission android:name="a