spark sql 操作

DSL风格语法

1、查看DataFrame中的内容

scala> df1.show
+---+--------+---+
| id| name|age|
+---+--------+---+
| 1|zhansgan| 16|
| 2| lisi| 18|
| 3| wangwu| 21|
| 4|xiaofang| 22|
+---+--------+---+

2、查看DataFrame部分列的数据

scala> df1.select(df1.col("name")).show
+--------+
| name|
+--------+
|zhansgan|
| lisi|
| wangwu|
|xiaofang|
+--------+

  

scala> df1.select(col("name"), col("age")).show
+--------+---+
| name|age|
+--------+---+
|zhansgan| 16|
| lisi| 18|
| wangwu| 21|
|xiaofang| 22|
+--------+---+
scala> df1.select("name").show
+--------+
| name|
+--------+
|zhansgan|
| lisi|
| wangwu|
|xiaofang|
+--------+

3、查看DataFrame schema信息

scala> df1.printSchema
root
|-- id: integer (nullable = false)
|-- name: string (nullable = true)
|-- age: integer (nullable = false)

4、查询name和age并将age + 1

scala> df1.select(col("name"), col("age") + 1).show
+--------+---------+
| name|(age + 1)|
+--------+---------+
|zhansgan| 17|
| lisi| 19|
| wangwu| 22|
|xiaofang| 23|
+--------+---------+

  

scala> df1.select(df1("name"), df1("age") + 1).show
+--------+---------+
| name|(age + 1)|
+--------+---------+
|zhansgan| 17|
| lisi| 19|
| wangwu| 22|
|xiaofang| 23|
+--------+---------+

5、过滤年龄大于20的人

scala> df1.filter(col("age") > 20).show
+---+--------+---+
| id| name|age|
+---+--------+---+
| 3| wangwu| 21|
| 4|xiaofang| 22|
+---+--------+---+

  

6、按年龄分组,并统计年龄相同的人数

scala> df1.groupBy("age").count().show
+---+-----+
|age|count|
+---+-----+
| 16| 1|
| 18| 1|
| 21| 1|
| 22| 1|
+---+-----+

  

SQL风格

在使用SQL风格前,首先需要将DataFrame注册成表

df1.registerTempTable("t_person")

1、查询年龄最大的前两个人

scala> sqlContext.sql("select * from t_person order by age desc limit 2").show
+---+--------+---+
| id| name|age|
+---+--------+---+
| 4|xiaofang| 22|
| 3| wangwu| 21|
+---+--------+---+

  

2、显示表的schema信息

scala> sqlContext.sql("desc t_person").show
+--------+---------+-------+
|col_name|data_type|comment|
+--------+---------+-------+
| id| int| |
| name| string| |
| age| int| |
+--------+---------+-------+

  

DataFrame api 操作

package bigdata.spark.sql

import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkContext, SparkConf}

import scala.reflect.internal.util.TableDef.Column

/**
  * Created by Administrator on 2017/4/27.
  */
object SparkSqlDemo {

  def main(args: Array[String]) {
    val conf = new SparkConf()
    conf.setAppName("SparkSqlDemo")
    conf.setMaster("local")
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)
    val rdd1 = sc.textFile("hdfs://m1:9000/persons.txt").map(_.split(" "))
    val rdd2 = rdd1.map(x => Person(x(0).toInt, x(1), x(2).toInt))

    // 导入隐式转换,里面包含了RDD隐式转换为DataFrame的方法
    import sqlContext.implicits._
    // df1现在已经是DataFrame了
    val df1 = rdd2.toDF
    df1.show

    df1.select("age").show()

    df1.select(col="age").show
    df1.select(df1.col("age")).show

    import df1._
    df1.select(col("age")).show

    df1.select(col("age") > 20).show

    df1.select(col("age") + 1).show

    df1.filter(col("age") > 20).show()

    df1.registerTempTable("t_person")

    sqlContext.sql("select * from t_person").show()

    sqlContext.sql("select * from t_person order by age desc limit 2").show()

    sc.stop()

  }

  // 这个类必须放在main方法外面,不然的话会报错
  case class Person(id:Int, name:String, age:Int)

}

  

StructType指定Schema

package bigdata.spark.sql

import org.apache.spark.sql.types.{StringType, IntegerType, StructField, StructType}
import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.{SparkContext, SparkConf}

import scala.reflect.internal.util.TableDef.Column

/**
  * Created by Administrator on 2017/4/27.
  */
object SparkSqlDemo {

  def main(args: Array[String]) {
    val conf = new SparkConf()
    conf.setAppName("SparkSqlDemo")
    conf.setMaster("local")
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)
    val rdd1 = sc.textFile("hdfs://m1:9000/persons.txt").map(_.split(" "))
    val rdd2 = rdd1.map(x => Row(x(0).toInt, x(1), x(2).toInt))
    // 创建schema
    val schema = StructType(
      List(
        // 名称 类型 是否可以为空
        StructField("id", IntegerType, false),
        StructField("name", StringType, false),
        StructField("age", IntegerType, false)
      )
    )

    // 创建DataFrame
    val df1 = sqlContext.createDataFrame(rdd2, schema)

    df1.registerTempTable("t_person")

    sqlContext.sql("select * from t_person").show()

    sc.stop()

  }

}

  

时间: 2024-07-30 13:28:37

spark sql 操作的相关文章

Spark SQL操作详细讲解

一. Spark SQL和SchemaRDD 关于Spark SQL的前生就不再多说了,我们只关注它的操作.但是,首先要搞明白一个问题,那就是究竟什么是SchemaRDD呢?从Spark的Scala API可以知道org.apache.spark.sql.SchemaRDD和class SchemaRDD extends RDD[Row] with SchemaRDDLike,我们可以看到类SchemaRDD继承自抽象类RDD.官方文档的定义是"An RDD of Row objects tha

Spark SQL之External DataSource外部数据源(二)源代码分析

上周Spark1.2刚公布,周末在家没事,把这个特性给了解一下,顺便分析下源代码,看一看这个特性是怎样设计及实现的. /** Spark SQL源代码分析系列文章*/ (Ps: External DataSource使用篇地址:Spark SQL之External DataSource外部数据源(一)演示样例 http://blog.csdn.net/oopsoom/article/details/42061077) 一.Sources包核心 Spark SQL在Spark1.2中提供了Exte

Spark SQL之External DataSource外部数据源(二)源码分析

上周Spark1.2刚发布,周末在家没事,把这个特性给了解一下,顺便分析下源码,看一看这个特性是如何设计及实现的. /** Spark SQL源码分析系列文章*/ (Ps: External DataSource使用篇地址:Spark SQL之External DataSource外部数据源(一)示例 http://blog.csdn.net/oopsoom/article/details/42061077) 一.Sources包核心 Spark SQL在Spark1.2中提供了External

Spark SQL下的Parquet使用最佳实践和代码实战

一:Spark SQL下的Parquet使用最佳实践 1,过去整个业界对大数据的分析的技术栈的Pipeline一般分为一下两种方式: A)Data Source -> HDFS -> MR/Hive/Spark(相当于ETL) -> HDFS Parquet -> SparkSQL/impala -> Result Service(可以放在DB中,也有可能被通过JDBC/ODBC来作为数据服务使用): B)Data Source -> Real time update

Spark SQL with Hive

前一篇文章是Spark SQL的入门篇Spark SQL初探,介绍了一些基础知识和API,但是离我们的日常使用还似乎差了一步之遥. 终结Shark的利用有2个: 1.和Spark程序的集成有诸多限制 2.Hive的优化器不是为Spark而设计的,计算模型的不同,使得Hive的优化器来优化Spark程序遇到了瓶颈. 这里看一下Spark SQL 的基础架构: Spark1.1发布后会支持Spark SQL CLI , Spark SQL的CLI会要求被连接到一个Hive Thrift Server

Spark 官方文档(5)——Spark SQL,DataFrames和Datasets 指南

Spark版本:1.6.2 概览 Spark SQL用于处理结构化数据,与Spark RDD API不同,它提供更多关于数据结构信息和计算任务运行信息的接口,Spark SQL内部使用这些额外的信息完成特殊优化.可以通过SQL.DataFrames API.Datasets API与Spark SQL进行交互,无论使用何种方式,SparkSQL使用统一的执行引擎记性处理.用户可以根据自己喜好,在不同API中选择合适的进行处理.本章中所有用例均可以在spark-shell.pyspark shel

第十一篇:Spark SQL 源码分析之 External DataSource外部数据源

上周Spark1.2刚发布,周末在家没事,把这个特性给了解一下,顺便分析下源码,看一看这个特性是如何设计及实现的. /** Spark SQL源码分析系列文章*/ (Ps: External DataSource使用篇地址:Spark SQL之External DataSource外部数据源(一)示例 http://blog.csdn.net/oopsoom/article/details/42061077) 一.Sources包核心 Spark SQL在Spark1.2中提供了External

spark sql的简单操作

测试数据 sparkStu.text zhangxs 24 chenxy wangYr 21 teacher wangx 26 teacher sparksql { "name":"zhangxs","age":24,"job":"chengxy", "name":"li","age":21,"job":"teache

Spark SQL中Dataframe join操作含null值的列

当在Spark SQL中对两个Dataframe使用join时,当作为连接的字段的值含有null值.由于null表示的含义是未知,既不知道有没有,在SQL中null值与任何其他值的比较(即使是null)永远不会为真.故在进行连接操作时null == null不为True,所以结果中不会出现该条记录,即左侧表格的这条记录对应右侧的值均为null.示例如下: table_a: date serverId lvSection 2018-03-04 1 10 2018-03-05 null 9 2018