Spark-Sql之DataFrame实战详解

1、DataFrame简介:

在Spark中,DataFrame是一种以RDD为基础的分布式数据据集,类似于传统数据库听二维表格,DataFrame带有Schema元信息,即DataFrame所表示的二维表数据集的每一列都带有名称和类型。

类似这样的

root
 |-- age: long (nullable = true)
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)  

2、准备测试结构化数据集

people.json

{"id":1, "name":"Ganymede", "age":32}
{"id":2, "name":"Lilei", "age":19}
{"id":3, "name":"Lily", "age":25}
{"id":4, "name":"Hanmeimei", "age":25}
{"id":5, "name":"Lucy", "age":37}
{"id":6, "name":"Tom", "age":27}  

3、通过编程方式理解DataFrame

1)  通过DataFrame的API来操作数据

import org.apache.spark.sql.SQLContext
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.log4j.Level
import org.apache.log4j.Logger  

object DataFrameTest {
  def main(args: Array[String]): Unit = {
    //日志显示级别
    Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
    Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.ERROR)  

    //初始化
    val conf = new SparkConf().setAppName("DataFrameTest")
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)
    val df = sqlContext.read.json("people.json")  

    //查看df中的数据
    df.show()
    //查看Schema
    df.printSchema()
    //查看某个字段
    df.select("name").show()
    //查看多个字段,plus为加上某值
    df.select(df.col("name"), df.col("age").plus(1)).show()
    //过滤某个字段的值
    df.filter(df.col("age").gt(25)).show()
    //count group 某个字段的值
    df.groupBy("age").count().show()  

    //foreach 处理各字段返回值
    df.select(df.col("id"), df.col("name"), df.col("age")).foreach { x =>
      {
        //通过下标获取数据
        println("col1: " + x.get(0) + ", col2: " + "name: " + x.get(2) + ", col3: " + x.get(2))
      }
    }  

    //foreachPartition 处理各字段返回值,生产中常用的方式
    df.select(df.col("id"), df.col("name"), df.col("age")).foreachPartition { iterator =>
      iterator.foreach(x => {
        //通过字段名获取数据
        println("id: " + x.getAs("id") + ", age: " + "name: " + x.getAs("name") + ", age: " + x.getAs("age"))  

      })
    }  

  }
}  

2)通过注册表,操作sql的方式来操作数据

  1. import org.apache.spark.sql.SQLContext
    import org.apache.spark.SparkConf
    import org.apache.spark.SparkContext
    import org.apache.log4j.Level
    import org.apache.log4j.Logger  
    
    /**
     * @author Administrator
     */
    object DataFrameTest2 {
      def main(args: Array[String]): Unit = {
        Logger.getLogger("org.apache.spark").setLevel(Level.ERROR);
        Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.ERROR);  
    
        val conf = new SparkConf().setAppName("DataFrameTest2")
        val sc = new SparkContext(conf)
        val sqlContext = new SQLContext(sc)
        val df = sqlContext.read.json("people.json")  
    
        df.registerTempTable("people")  
    
        df.show();
        df.printSchema();  
    
        //查看某个字段
        sqlContext.sql("select name from people ").show()
        //查看多个字段
        sqlContext.sql("select name,age+1 from people ").show()
        //过滤某个字段的值
        sqlContext.sql("select age from people where age>=25").show()
        //count group 某个字段的值
        sqlContext.sql("select age,count(*) cnt from people group by age").show()  
    
        //foreach 处理各字段返回值
        sqlContext.sql("select id,name,age  from people ").foreach { x =>
          {
            //通过下标获取数据
            println("col1: " + x.get(0) + ", col2: " + "name: " + x.get(2) + ", col3: " + x.get(2))
          }
        }  
    
        //foreachPartition 处理各字段返回值,生产中常用的方式
        sqlContext.sql("select id,name,age  from people ").foreachPartition { iterator =>
          iterator.foreach(x => {
            //通过字段名获取数据
            println("id: " + x.getAs("id") + ", age: " + "name: " + x.getAs("name") + ", age: " + x.getAs("age"))  
    
          })
        }  
    
      }
    }  

两种方式运行结果是一样的,第一种适合程序员,第二种适合熟悉sql的人员。

4、对于非结构化的数据

people.txt

  1. 1,Ganymede,32
    2, Lilei, 19
    3, Lily, 25
    4, Hanmeimei, 25
    5, Lucy, 37
    6, wcc, 4  

1)  通过字段反射来映射注册临时表

     import org.apache.spark.sql.SQLContext  

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.log4j.Level
import org.apache.log4j.Logger
import org.apache.spark.sql.types.IntegerType
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.Row  

/**
 * @author Administrator
 */
object DataFrameTest3 {
  def main(args: Array[String]): Unit = {
    Logger.getLogger("org.apache.spark").setLevel(Level.ERROR);
    Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.ERROR);  

    val conf = new SparkConf().setAppName("DataFrameTest3")
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)
    val people = sc.textFile("people.txt")  

    val peopleRowRDD = people.map { x => x.split(",") }.map { data =>
      {
        val id = data(0).trim().toInt
        val name = data(1).trim()
        val age = data(2).trim().toInt
        Row(id, name, age)
      }
    }  

    val structType = StructType(Array(
      StructField("id", IntegerType, true),
      StructField("name", StringType, true),
      StructField("age", IntegerType, true)));  

    val df = sqlContext.createDataFrame(peopleRowRDD, structType);  

    df.registerTempTable("people")  

    df.show()
    df.printSchema()  

  }
}  

2)   通过case class反射来映射注册临时表


import org.apache.spark.sql.SQLContext
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.log4j.Level
import org.apache.log4j.Logger
import org.apache.spark.sql.types.IntegerType
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.Row  

/**
 * @author Administrator
 */
object DataFrameTest4 {
  case class People(id: Int, name: String, age: Int)
  def main(args: Array[String]): Unit = {
    Logger.getLogger("org.apache.spark").setLevel(Level.ERROR);
    Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.ERROR);  

    val conf = new SparkConf().setAppName("DataFrameTest4")
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)
    val people = sc.textFile("people.txt")  

    val peopleRDD = people.map { x => x.split(",") }.map { data =>
      {
        People(data(0).trim().toInt, data(1).trim(), data(2).trim().toInt)
      }
    }  

    //这里需要隐式转换一把
    import sqlContext.implicits._
    val df = peopleRDD.toDF()
    df.registerTempTable("people")  

    df.show()
    df.printSchema()  

  }
}  

5、总结:

Spark SQL是Spark中的一个模块,主要用于进行结构化数据的处理。它提供的最核心的编程抽象,就是DataFrame。同时Spark SQL还可以作为分布式的SQL查询引擎。Spark SQL最重要的功能之一,就是从Hive中查询数据。

DataFrame,可以理解为是,以列的形式组织的,分布式的数据集合。它其实和关系型数据库中的表非常类似,但是底层做了很多的优化。DataFrame可以通过很多来源进行构建,包括:结构化的数据文件,Hive中的表,外部的关系型数据库,以及RDD。

时间: 2024-10-23 20:00:27

Spark-Sql之DataFrame实战详解的相关文章

Scala 深入浅出实战经典 第60讲:Scala中隐式参数实战详解以及在Spark中的应用源码解析

王家林亲授<DT大数据梦工厂>大数据实战视频 Scala 深入浅出实战经典(1-87讲)完整视频.PPT.代码下载:百度云盘:http://pan.baidu.com/s/1c0noOt6 腾讯微云:http://url.cn/TnGbdC 360云盘:http://yunpan.cn/cQ4c2UALDjSKy 访问密码 45e2土豆:http://www.tudou.com/programs/view/IVN4EuFlmKk/优酷:http://v.youku.com/v_show/id_

机器学习Spark Mllib算法源码及实战详解进阶与提高视频教程

38套大数据,云计算,架构,数据分析师,Hadoop,Spark,Storm,Kafka,人工智能,机器学习,深度学习,项目实战视频教程 视频课程包含: 38套大数据和人工智能精品高级课包含:大数据,云计算,架构,数据挖掘实战,实时推荐系统实战,电视收视率项目实战,实时流统计项目实战,离线电商分析项目实战,Spark大型项目实战用户分析,智能客户系统项目实战,Linux基础,Hadoop,Spark,Storm,Docker,Mapreduce,Kafka,Flume,OpenStack,Hiv

Scala 深入浅出实战经典 第78讲:Type与Class实战详解

王家林亲授<DT大数据梦工厂>大数据实战视频 Scala 深入浅出实战经典(1-87讲)完整视频.PPT.代码下载: 百度云盘:http://pan.baidu.com/s/1c0noOt6 腾讯微云:http://url.cn/TnGbdC 360云盘:http://yunpan.cn/cQ4c2UALDjSKy 访问密码 45e2土豆:http://www.tudou.com/programs/view/2vZ06RMcD6I/优酷:http://v.youku.com/v_show/id

spark结构化数据处理:Spark SQL、DataFrame和Dataset

本文讲解Spark的结构化数据处理,主要包括:Spark SQL.DataFrame.Dataset以及Spark SQL服务等相关内容.本文主要讲解Spark 1.6.x的结构化数据处理相关东东,但因Spark发展迅速(本文的写作时值Spark 1.6.2发布之际,并且Spark 2.0的预览版本也已发布许久),因此请随时关注Spark SQL官方文档以了解最新信息. 文中使用Scala对Spark SQL进行讲解,并且代码大多都能在spark-shell中运行,关于这点请知晓. 概述 相比于

第131讲:Hadoop集群管理工具均衡器Balancer 实战详解学习笔记

第131讲:Hadoop集群管理工具均衡器Balancer 实战详解学习笔记 为什么需要均衡器呢? 随着集群运行,具体hdfs各个数据存储节点上的block可能分布得越来越不均衡,会导致运行作业时降低mapreduce的本地性. 分布式计算中精髓性的一名话:数据不动代码动.降低本地性对性能的影响是致使的,而且不能充分利用集群的资源,因为导致任务计算会集中在部分datanode上,更易导致故障. balancer是hadoop的一个守护进程.会将block从忙的datanode移动到闲的datan

第130讲:Hadoop集群管理工具DataBlockScanner 实战详解学习笔记

第130讲:Hadoop集群管理工具DataBlockScanner 实战详解学习笔记 DataBlockScanner在datanode上运行的block扫描器,定期检测当前datanode节点上所有的block,从而在客户端读到有问题的块前及时检测和修复有问题的块. 它有所有维护的块的列表,通过对块的列表依次的扫描,查看是否有校验问题或错误问题,它还有截流机制. 什么叫截流机制?DataBlockScanner扫描时会消耗大量的磁盘带宽,如果占用磁盘带宽太大,会有性能问题.所以它会只占用一小

Scala 深入浅出实战经典 第58讲:Scala中Abstract Types实战详解

王家林亲授<DT大数据梦工厂>大数据实战视频 Scala 深入浅出实战经典(1-87讲)完整视频.PPT.代码下载: 百度云盘:http://pan.baidu.com/s/1c0noOt6 腾讯微云:http://url.cn/TnGbdC 360云盘:http://yunpan.cn/cQ4c2UALDjSKy 访问密码 45e2土豆:http://www.tudou.com/programs/view/IVN4EuFlmKk/优酷:http://v.youku.com/v_show/id

Scala 深入浅出实战经典 第55讲:Scala中Infix Type实战详解

王家林亲授<DT大数据梦工厂>大数据实战视频 Scala 深入浅出实战经典(1-64讲)完整视频.PPT.代码下载: 百度云盘:http://pan.baidu.com/s/1c0noOt6 腾讯微云:http://url.cn/TnGbdC 360云盘:http://yunpan.cn/cQ4c2UALDjSKy 访问密码 45e2土豆:http://www.tudou.com/programs/view/9JKSqMiQuBE/优酷:http://v.youku.com/v_show/id

Scala 深入浅出实战经典 第53讲:Scala中结构类型实战详解

王家林亲授<DT大数据梦工厂>大数据实战视频 Scala 深入浅出实战经典(1-64讲)完整视频.PPT.代码下载:百度云盘:http://pan.baidu.com/s/1c0noOt6 腾讯微云:http://url.cn/TnGbdC 360云盘:http://yunpan.cn/cQ4c2UALDjSKy 访问密码 45e2土豆:http://www.tudou.com/programs/view/pR_4sY0cJLs/优酷:http://v.youku.com/v_show/id_