spark sql 之 RDD与DataFrame互相转化

一、RDD转DataFrame

  方法一:通过 case class 创建 DataFrames

  

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext

object TestDataFrame {

  def main(args: Array[String]): Unit = {

    /**
     * 1、初始化 spark config
     */
    val conf = new SparkConf().setAppName("TestDataFrame").setMaster("local");
    /**
     * 2、初始化spark context
     */
    val sc = new SparkContext(conf);

    /**
     * 3、初始化spark sql context
     */
    val ssc = new SQLContext(sc);

    /**
     * 4、做spark sql 的df获取工作
     */
    val PeopleRDD = sc.textFile("F:\\input.txt").map(line => People(line.split(" ")(0),line.split(" ")(1).trim.toInt))

    import ssc.implicits._

    var df = PeopleRDD.toDF

    //将DataFrame注册成临时的一张表,这张表相当于临时注册到内存中,是逻辑上的表,不会物化到磁盘  这种方式用的比较多
    df.registerTempTable("peopel")

    var df2 =ssc.sql("select * from peopel where age > 23")show()

    /**
     * 5、spark context 结束工作
     */
    sc.stop();

  }
}
case class People(var name:String ,var age : Int)

  方法二:通过 structType创建 DataFrames

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{StructType,StructField,StringType,IntegerType}

object TestDataFrame2{
  def test2(): Unit = {
    /**
     * 1、初始化 spark config
     */
    val conf = new SparkConf().setAppName("TestDataFrame").setMaster("local");
    /**
     * 2、初始化spark context
     */
    val sc = new SparkContext(conf);

    /**
     * 3、初始化spark sql context
     */
    val ssc = new SQLContext(sc);

    /**
     * 4、做spark sql 的df获取工作
     */
    val peopleRDD = sc.textFile("F:\\input.txt")map(line =>
      Row(line.split(" ")(0),line.split(" ")(1).trim().toInt))

    // 创建 StructType 来定义结构
    val structType : StructType = StructType(
        StructField("name",StringType,true)::
        StructField("age",IntegerType,true) ::Nil
    );

    val df : DataFrame = ssc.createDataFrame(peopleRDD, structType);
    df.registerTempTable("peopel");

    ssc.sql("select * from peopel").show();

     /**
     * 5、spark context 结束工作
     */
    sc.stop();
  }
}

  方法三:通过json创建 DataFream

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{StructType,StructField,StringType,IntegerType}
import org.apache.spark.sql.DataFrame

object TestDataFrame2{
  def test3() : Unit={
    /**
     * 1、初始化 spark config
     */
    val conf = new SparkConf().setAppName("TestDataFrame").setMaster("local");
    /**
     * 2、初始化spark context
     */
    val sc = new SparkContext(conf);

    /**
     * 3、初始化spark sql context
     */
    val ssc = new SQLContext(sc);

    /**
     * 4、做spark sql 的df获取工作
     */
    val df :DataFrame = ssc.read.json("F:\\json.json")
    df.registerTempTable("people")
    ssc.sql("select * from people").show();

     /**
     * 5、spark context 结束工作
     */
    sc.stop();
  }
}

二、RDD转DataFrame

df.rdd

原文地址:https://www.cnblogs.com/ddaifenxiang/p/11488025.html

时间: 2024-11-05 16:27:25

spark sql 之 RDD与DataFrame互相转化的相关文章

Spark SQL初始化和创建DataFrame的几种方式

一.前述       1.SparkSQL介绍 Hive是Shark的前身,Shark是SparkSQL的前身,SparkSQL产生的根本原因是其完全脱离了Hive的限制. SparkSQL支持查询原生的RDD. RDD是Spark平台的核心概念,是Spark能够高效的处理大数据的各种场景的基础. 能够在Scala中写SQL语句.支持简单的SQL语法检查,能够在Scala中写Hive语句访问Hive数据,并将结果取回作为RDD使用.     2.Spark on Hive和Hive on Spa

11.spark sql之RDD转换DataSet

简介 ??Spark SQL提供了两种方式用于将RDD转换为Dataset. 使用反射机制推断RDD的数据结构 ??当spark应用可以推断RDD数据结构时,可使用这种方式.这种基于反射的方法可以使代码更简洁有效. 通过编程接口构造一个数据结构,然后映射到RDD上 ??当spark应用无法推断RDD数据结构时,可使用这种方式. 反射方式 scala // For implicit conversions from RDDs to DataFrames import spark.implicits

Spark SQL的介绍和DataFrame的建立及使用

1. Spark SQL定位处理结构化数据的模块.SparkSQL提供相应的优化机制,并支持不同语言的开发API. java.scala.Python,类SQL的方法调用(DSL) 2. RDD与Spark SQL的比较说明: 使用Spark SQL的优势:a.面向结构化数据:b.优化机制: RDD缺点:a.没有优化机制,如对RDD执行Filter操作: b.RDD类型转换后无法进行模式推断 3. DataFrame/SchemaRDD DataFrame是一个分布式的数据集合,该数据集合以命名

Spark中的RDD和DataFrame

什么是DataFrame 在Spark中,DataFrame是一种以RDD为基础的分布式数据集,类似于传统数据库中的二维表格. RDD和DataFrame的区别 DataFrame与RDD的主要区别在于,DataFrame带有schema元信息,即DataFrame所表示的二维表数据集的每一列都带有名称和类型.使得Spark SQL得以洞察更多的结构信息,从而对藏于DataFrame背后的数据源以及作用于DataFrame之上的变换进行了针对性的优化,最终达到大幅提升运行时效率的目标. RDD,

spark RDD,DataFrame,DataSet 介绍

弹性分布式数据集(Resilient Distributed Dataset,RDD) RDD是Spark一开始就提供的主要API,从根本上来说,一个RDD就是你的数据的一个不可变的分布式元素集合,在集群中跨节点分布,可以通过若干提供了转换和处理的底层API进行并行处理.每个RDD都被分为多个分区,这些分区运行在集群不同的节点上. RDD支持两种类型的操作,转化操作(transform)和行动操作(action).转化操作会有一个RDD生成一个新的RDD,行动操作则要计算出来一个结果.spark

第56课:Spark SQL和DataFrame的本质

一.Spark SQL与Dataframe Spark SQL之所以是除Spark core以外最大和最受关注的组件的原因: a) 能处理一切存储介质和各种格式的数据(你同时可以方便的扩展Spark SQL的功能来支持更多的数据类型,例如KUDO) b)Spark SQL 把数据仓库的计算能力推向了一个新的高度.不仅是无敌的计算速度(Spark SQL比Shark快了一个数量级,Shark比Hive快了一个数量级),尤其是在tungsten成熟以后会更加无可匹敌.更为重要的是把数据仓库的计算复杂

Spark SQL, DataFrames and Datasets 指南

概述 Spark SQL 是 Spark 处理结构化数据的模块; 与基础的 Spark RDD API 不同, Spark SQL 提供的接口提供给 Spark 更多的关于数据和执行计算的结; 内在的, Spark SQL 使用这些额外的信息去执行额外的优化; 这里有几种包括 SQL 和 Datasets API 在内的与 Spark SQL 交互的方法; 当计算结果使用相同的执行引擎, 独立于你使用的表达计算的 API/语言; 这种统一意味着开发者可以依据哪种 APIs 对于给定的表达式提供了

Spark SQL数据加载和保存实战

一:前置知识详解: Spark SQL重要是操作DataFrame,DataFrame本身提供了save和load的操作, Load:可以创建DataFrame, Save:把DataFrame中的数据保存到文件或者说与具体的格式来指明我们要读取的文件的类型以及与具体的格式来指出我们要输出的文件是什么类型. 二:Spark SQL读写数据代码实战: import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRD

RDD、DataFrame和DataSet的区别

原文链接:http://www.jianshu.com/p/c0181667daa0 RDD.DataFrame和DataSet是容易产生混淆的概念,必须对其相互之间对比,才可以知道其中异同. RDD和DataFrame RDD-DataFrame 上图直观地体现了DataFrame和RDD的区别.左侧的RDD[Person]虽然以Person为类型参数,但Spark框架本身不了解Person类的内部结构.而右侧的DataFrame却提供了详细的结构信息,使得Spark SQL可以清楚地知道该数