10.spark sql之快速入门

前世今生

Hive&Shark

??随着大数据时代的来临,Hadoop风靡一时。为了使熟悉RDBMS但又不理解MapReduce的技术人员快速进行大数据开发,Hive应运而生。Hive是当时唯一运行在Hadoop上的SQL-on-Hadoop工具。

??但是MapReduce计算过程中大量的中间磁盘落地过程消耗了大量的I/O,降低的运行效率。为了提高SQL-on-Hadoop的效率,大量的SQL-on-Hadoop工具开始产生,其中表现较为突出的是:

  • MapR的Drill
  • Cloudera的Impala
  • Shark

??Shark是伯克利实验室Spark生态的组件之一,它修改了Hive Driver的内存管理、物理计划、执行三个模块,使之能运行在Spark引擎上,从而使得SQL查询的速度得到10-100倍的提升。

Shark&Spark SQL

??Shark对于Hive的太多依赖(如采用Hive的语法解析器、查询优化器等等),制约了Spark的One Stack Rule Them All的既定方针,制约了Spark各个组件的相互集成,所以提出了SparkSQL项目。

??SparkSQL抛弃原有Shark的代码,汲取了Shark的一些优点,如内存列存储(In-Memory Columnar Storage)、Hive兼容性等,重新开发了SparkSQL代码。由于摆脱了对Hive的依赖性,SparkSQL无论在数据兼容、性能优化、组件扩展方面都得到了极大地提升。

  • 数据兼容方面

??不但兼容Hive,还可以从RDD、parquet文件、JSON文件中获取数据,也支持获取RDBMS数据以及cassandra等NOSQL数据。

  • 性能优化方面

??除了采取In-Memory Columnar Storage、byte-code generation等优化技术外,引进Cost Model对查询进行动态评估、获取最佳物理计划等。

  • 组件扩展方面

??无论是SQL的语法解析器、分析器还是优化器都可以重新定义,进行扩展。

??2014年Shark停止开发,团队将所有资源放SparkSQL项目上,至此,Shark的发展画上了句号,但也因此发展出两条线:SparkSQL和Hive on Spark。

??其中SparkSQL作为Spark生态的一员继续发展,而不再受限于Hive,只是兼容Hive;而Hive on Spark是一个Hive的发展计划,该计划将Spark作为Hive的底层引擎之一,也就是说,Hive将不再受限于一个引擎,可以采用Map-Reduce、Tez、Spark等引擎。

简介

??Spark SQL是一个用于结构化数据处理的模块。Spark SQL赋予待处理数据一些结构化信息,可以使用SQL语句或DataSet API接口与Spark SQL进行交互。

  • SQL

??Spark SQL可以使用sql读写Hive中的数据;也可以在编程语言中使用sql,返回Dataset/DataFrame结果集。

  • DataSets&DataFrames

??Dataset是一个分布式数据集,它结合了RDD与SparkSQL执行引擎的优点。Dataset可以通过JVM对象构造,然后使用算子操作进行处理。Java和Scala都有Dataset API;Python和R本身支持Dataset特性。

??DataFrame是一个二维结构的DataSet,相当于RDBMS中的表。DataFrame可以有多种方式构造,比如结构化数据文件、hive表、外部数据库、RDD等。在Scala、Java、Python及R中都有DataFrame API。

DataFrame与DataSet

DataFrame创建及操作

  • scala
import org.apache.spark.sql.SparkSession

// 构造SparkSession
val spark = SparkSession
  .builder()
  .appName("Spark SQL basic example")
  .config("spark.some.config.option", "some-value")
  .getOrCreate()

// 创建DataFrame
val df = spark.read.json("examples/src/main/resources/people.json")

// Displays the content of the DataFrame to stdout
df.show()
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+

// DataFrame操作
// This import is needed to use the $-notation
import spark.implicits._
// Print the schema in a tree format
df.printSchema()
// root
// |-- age: long (nullable = true)
// |-- name: string (nullable = true)

// Select only the "name" column
df.select("name").show()
// +-------+
// |   name|
// +-------+
// |Michael|
// |   Andy|
// | Justin|
// +-------+

// Select everybody, but increment the age by 1
df.select($"name", $"age" + 1).show()
// +-------+---------+
// |   name|(age + 1)|
// +-------+---------+
// |Michael|     null|
// |   Andy|       31|
// | Justin|       20|
// +-------+---------+

// Select people older than 21
df.filter($"age" > 21).show()
// +---+----+
// |age|name|
// +---+----+
// | 30|Andy|
// +---+----+

// Count people by age
df.groupBy("age").count().show()
// +----+-----+
// | age|count|
// +----+-----+
// |  19|    1|
// |null|    1|
// |  30|    1|
// +----+-----+
  • java
import org.apache.spark.sql.SparkSession;

//构造SparkSession
SparkSession spark = SparkSession
  .builder()
  .appName("Java Spark SQL basic example")
  .config("spark.some.config.option", "some-value")
  .getOrCreate();

//创建DataFrame
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

Dataset<Row> df = spark.read().json("examples/src/main/resources/people.json");

// Displays the content of the DataFrame to stdout
df.show();
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+

//DataFrame操作
// col("...") is preferable to df.col("...")
import static org.apache.spark.sql.functions.col;

// Print the schema in a tree format
df.printSchema();
// root
// |-- age: long (nullable = true)
// |-- name: string (nullable = true)

// Select only the "name" column
df.select("name").show();
// +-------+
// |   name|
// +-------+
// |Michael|
// |   Andy|
// | Justin|
// +-------+

// Select everybody, but increment the age by 1
df.select(col("name"), col("age").plus(1)).show();
// +-------+---------+
// |   name|(age + 1)|
// +-------+---------+
// |Michael|     null|
// |   Andy|       31|
// | Justin|       20|
// +-------+---------+

// Select people older than 21
df.filter(col("age").gt(21)).show();
// +---+----+
// |age|name|
// +---+----+
// | 30|Andy|
// +---+----+

// Count people by age
df.groupBy("age").count().show();
// +----+-----+
// | age|count|
// +----+-----+
// |  19|    1|
// |null|    1|
// |  30|    1|
// +----+-----+
  • python
from pyspark.sql import SparkSession

# 构造SparkSession
spark = SparkSession     .builder     .appName("Python Spark SQL basic example")     .config("spark.some.config.option", "some-value")     .getOrCreate()

# 创建DataFrame
# spark is an existing SparkSession
df = spark.read.json("examples/src/main/resources/people.json")
# Displays the content of the DataFrame to stdout
df.show()
# +----+-------+
# | age|   name|
# +----+-------+
# |null|Michael|
# |  30|   Andy|
# |  19| Justin|
# +----+-------+

# DataFrame操作
# spark, df are from the previous example
# Print the schema in a tree format
df.printSchema()
# root
# |-- age: long (nullable = true)
# |-- name: string (nullable = true)

# Select only the "name" column
df.select("name").show()
# +-------+
# |   name|
# +-------+
# |Michael|
# |   Andy|
# | Justin|
# +-------+

# Select everybody, but increment the age by 1
df.select(df[‘name‘], df[‘age‘] + 1).show()
# +-------+---------+
# |   name|(age + 1)|
# +-------+---------+
# |Michael|     null|
# |   Andy|       31|
# | Justin|       20|
# +-------+---------+

# Select people older than 21
df.filter(df[‘age‘] > 21).show()
# +---+----+
# |age|name|
# +---+----+
# | 30|Andy|
# +---+----+

# Count people by age
df.groupBy("age").count().show()
# +----+-----+
# | age|count|
# +----+-----+
# |  19|    1|
# |null|    1|
# |  30|    1|
# +----+-----+

DataSet创建及操作

??Datasets和RDD类似,但使用专门的Encoder编码器来序列化需要经过网络传输的数据对象,而不用RDD使用的Java序列化或Kryo库。Encoder编码器是动态生成的代码,允许直接执行各种算子操作,而不用反序列化。

  • scala
// Note: Case classes in Scala 2.10 can support only up to 22 fields. To work around this limit,
// you can use custom classes that implement the Product interface
case class Person(name: String, age: Long)

// Encoders are created for case classes
val caseClassDS = Seq(Person("Andy", 32)).toDS()
caseClassDS.show()
// +----+---+
// |name|age|
// +----+---+
// |Andy| 32|
// +----+---+

// Encoders for most common types are automatically provided by importing spark.implicits._
val primitiveDS = Seq(1, 2, 3).toDS()
primitiveDS.map(_ + 1).collect() // Returns: Array(2, 3, 4)

// DataFrames can be converted to a Dataset by providing a class. Mapping will be done by name
val path = "examples/src/main/resources/people.json"
val peopleDS = spark.read.json(path).as[Person]
peopleDS.show()
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+
  • java
import java.util.Arrays;
import java.util.Collections;
import java.io.Serializable;

import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;

public static class Person implements Serializable {
  private String name;
  private int age;

  public String getName() {
    return name;
  }

  public void setName(String name) {
    this.name = name;
  }

  public int getAge() {
    return age;
  }

  public void setAge(int age) {
    this.age = age;
  }
}

// Create an instance of a Bean class
Person person = new Person();
person.setName("Andy");
person.setAge(32);

// Encoders are created for Java beans
Encoder<Person> personEncoder = Encoders.bean(Person.class);
Dataset<Person> javaBeanDS = spark.createDataset(
  Collections.singletonList(person),
  personEncoder
);
javaBeanDS.show();
// +---+----+
// |age|name|
// +---+----+
// | 32|Andy|
// +---+----+

// Encoders for most common types are provided in class Encoders
Encoder<Integer> integerEncoder = Encoders.INT();
Dataset<Integer> primitiveDS = spark.createDataset(Arrays.asList(1, 2, 3), integerEncoder);
Dataset<Integer> transformedDS = primitiveDS.map(
    (MapFunction<Integer, Integer>) value -> value + 1,
    integerEncoder);
transformedDS.collect(); // Returns [2, 3, 4]

// DataFrames can be converted to a Dataset by providing a class. Mapping based on name
String path = "examples/src/main/resources/people.json";
Dataset<Person> peopleDS = spark.read().json(path).as(personEncoder);
peopleDS.show();
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+

SQL操作

  • scala
// Register the DataFrame as a SQL temporary view
df.createOrReplaceTempView("people")
//df.createGlobalTempView("people")

val sqlDF = spark.sql("SELECT * FROM people")
sqlDF.show()
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+
  • java
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

// Register the DataFrame as a SQL temporary view
df.createOrReplaceTempView("people");
//df.createGlobalTempView("people")

Dataset<Row> sqlDF = spark.sql("SELECT * FROM people");
sqlDF.show();
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+
  • python
# Register the DataFrame as a SQL temporary view
df.createOrReplaceTempView("people")
# df.createGlobalTempView("people")

sqlDF = spark.sql("SELECT * FROM people")
sqlDF.show()
# +----+-------+
# | age|   name|
# +----+-------+
# |null|Michael|
# |  30|   Andy|
# |  19| Justin|
# +----+-------+

原文地址:http://blog.51cto.com/12967015/2172864

时间: 2024-10-08 20:01:59

10.spark sql之快速入门的相关文章

13.spark streaming之快速入门

简介   Spark Streaming是Spark核心API的扩展,可以实现可伸缩.高吞吐量.具备容错机制的实时流时数据的处理.支持多种数据源,比如Kafka.Flume.Twitter.ZeroMQ.Kinesis 以及TCP sockets.   可以使用诸如map.reduce.join和window等高级函数进行复杂算法(比如,机器学习和图计算)的处理.最后还可以将处理结果存储到文件系统,数据库和仪表盘. 架构与抽象 抽象   Spark Streaming接收实时流的数据,并根据一定

14.spark mllib之快速入门

简介 ??MLlib是Spark提供提供机器学习的库,专为在集群上并行运行的情况而设计.MLlib包含很多机器学习算法,可在Spark支持的所有编程语言中使用. ??MLlib设计理念是将数据以RDD的形式表示,然后在分布式数据集上调用各种算法.其实,MLlib就是RDD上一系列可供调用的函数的集合. 数据类型 ??MLlib包含一些特有的数据类型,位于org.apache.spark.mllib包(Java/Scala)或pyspark.mllib(Python)中.主要的几个类有: Vect

Chapter 0.SymmetricDS快速入门指南( Quick Start Guide)

本文档是SymmetricDS3.6.14文档的第一章节Quick Start Guide文档的翻译,的目的是帮助读者快速搭建一个SymmetricDS集群并普及一些基本概念术语. 本文档描述了如何在两个SymmetricDS节点之间同步两个相同schema的数据库.下面的例子构建了一个分销业务模型,有一个中央数据库(我们叫它root或者corp节点)和多个零售商店的数据库(我们叫它client或者store节点).对于本教程,我们将只有一个store(商店)节点,如下图.如果你愿意,可以再教程

AngularJS快速入门指南01:导言

AngularJS使用新的attributes扩展了HTML AngularJS对单页面应用的支持非常好(SPAs) AngularJS非常容易学习 现在就开始学习AngularJS吧! 关于本指南 本指南旨在帮助你尽可能快速而有效地学习AngularJS.通过该指南你会学习到AngularJS的一些基本特性,例如指令.表达式.过滤器.模块和控制器等.以及其它所有你需要知道的有关AngularJS的东西,如事件.DOM节点.表单.用户输入.数据验证.Http对象等. AngularJS快速入门指

(SymmetricDS快速入门指南)Chapter 1. Quick Start Guide

本文档是SymmetricDS3.6.14文档的第一章节Quick Start Guide文档的翻译,的目的是帮助读者快速搭建一个SymmetricDS集群并普及一些基本概念术语. 本文档描述了如何在两个SymmetricDS节点之间同步两个相同schema的数据库.下面的例子构建了一个分销业务模型,有一个中央数据库(我们叫它root或者corp节点)和多个零售商店的数据库(我们叫它client或者store节点).对于本教程,我们将只有一个store(商店)节点,如下图.如果你愿意,可以再教程

Spark1.1.0 Spark SQL Programming Guide

Spark SQL Programming Guide Overview Getting Started Data Sources RDDs Inferring the Schema Using Reflection Programmatically Specifying the Schema Parquet Files Loading Data Programmatically Configuration JSON Datasets Hive Tables Performance Tuning

AngularJS快速入门指南09:SQL

我们可以将之前章节中的代码用来从数据库中读取数据. 通过PHP Server从MySQL数据库中获取数据 <div ng-app="myApp" ng-controller="customersCtrl"> <table> <tr ng-repeat="x in names"> <td>{{ x.Name }}</td> <td>{{ x.Country }}</td&

spark快速入门

  spark框架是用scala写的,运行在Java虚拟机(JVM)上.支持Python.Java.Scala或R多种语言编写客户端应用. 下载Spark   访问http://spark.apache.org/downloads.html选择预编译的版本进行下载. 解压Spark   打开终端,将工作路径转到下载的spark压缩包所在的目录,然后解压压缩包.可使用如下命令: cd ~ tar -xf spark-2.2.2-bin-hadoop2.7.tgz -C /opt/module/ c

Gradle用户指南(章10:Web应用程序快速入门)

Gradle用户指南(章10:Web应用程序快速入门) 本章正在完善中..... 本章介绍gradle对web应用程序的支持.Gradle为web应用程序开发提供了两个插件:War插件和Jetty插件.War插件扩展了java插件并且能为你的项目构建一个war包.Jetty插件扩展了War插件并且提供了发布你的web项目到Jetty容器中的功能. 构建一个War文件 构建War文件,你需要在项目中添加War插件. 注意:你可以在gradle发布库和源文件的samples/webApplicati