Spark RDDRelation

package main.asiainfo.coc.sparksql

import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}

case class Record(key: Int, value: String)
object RDDRelation {
  def main(args: Array[String]) {
    val sparkConf = new SparkConf().setAppName("RDDRelation").setMaster("local")
    val sc = new SparkContext(sparkConf)
    val sqlContext = new SQLContext(sc)
  //引入SQL context 提供所有的SQL 与 隐式转换方法
    import sqlContext.implicits._
    //生成1到100的数字,并做成key value形式的DateFrame
    val df = sc.parallelize((1 to 100).map(i => Record(i, s"val_$i"))).toDF()  //df: "[key:int, value : String]"
    // 将DF做成以case class的临时表 L
    df.registerTempTable("records")

    // 随后便可以调用sqlContext查询这个临时表
    println("Result of SELECT *:")
    sqlContext.sql("SELECT * FROM records").collect().foreach(println)

    // 聚合查询
    val count = sqlContext.sql("SELECT COUNT(*) FROM records").collect().head.getLong(0) //count: 100
    println(s"COUNT(*): $count")

    // 查询的结果是一个普通的RDD,所以可以根据条件筛选你想要的数据哪一列数据
    val rddFromSql = sqlContext.sql("SELECT key, value FROM records WHERE key < 10") //rddFromSql:"[key : int, value String]"

    println("Result of RDD.map:")
    rddFromSql.map(row => s"Key: ${row(0)}, Value: ${row(1)}").collect().foreach(println)

    df.where($"key" === 1).orderBy($"value".asc).select($"key").collect().foreach(println)

    // 将文件存成parquet格式
    df.write.parquet("pair.parquet")

    // 读取parquet格式文件
    val parquetFile = sqlContext.read.parquet("pair.parquet")

    parquetFile.where($"key" === 1).select($"value".as("a")).collect().foreach(println)

    // parquetFile也可以做成临时表
    parquetFile.registerTempTable("parquetFile")
    sqlContext.sql("SELECT * FROM parquetFile").collect().foreach(println)

    sc.stop()
  }
}

注意 这里声明的是 sqlContext = new SQLContext(sc)  如果要存成hive 表 需用hivecontext.

时间: 2024-10-13 06:21:29

Spark RDDRelation的相关文章

spark执行源码中的例子时报错

在运行spark源码时报错: Error:(45, 66) not found: type SparkFlumeProtocol  val transactionTimeout: Int, val backOffInterval: Int) extends SparkFlumeProtocol with Logging {                                                                 ^ ... Error:(25, 27) no

基于Spark MLlib平台的协同过滤算法---电影推荐系统

基于Spark MLlib平台的协同过滤算法---电影推荐系统 又好一阵子没有写文章了,阿弥陀佛...最近项目中要做理财推荐,所以,回过头来回顾一下协同过滤算法在推荐系统中的应用. 说到推荐系统,大家可能立马会想到协同过滤算法.本文基于Spark MLlib平台实现一个向用户推荐电影的简单应用.其中,主要包括三部分内容: 协同过滤算法概述 基于模型的协同过滤应用---电影推荐 实时推荐架构分析     一.协同过滤算法概述 本人对算法的研究,目前还不是很深入,这里简单的介绍下其工作原理. 通常,

Spark SQL 之 Join 实现

原文地址:Spark SQL 之 Join 实现 Spark SQL 之 Join 实现 涂小刚 2017-07-19 217标签: spark , 数据库 Join作为SQL中一个重要语法特性,几乎所有稍微复杂一点的数据分析场景都离不开Join,如今Spark SQL(Dataset/DataFrame)已经成为Spark应用程序开发的主流,作为开发者,我们有必要了解Join在Spark中是如何组织运行的. SparkSQL总体流程介绍 在阐述Join实现之前,我们首先简单介绍SparkSQL

spark性能调优之资源调优

转https://tech.meituan.com/spark-tuning-basic.html spark作业原理 使用spark-submit提交一个Spark作业之后,这个作业就会启动一个对应的Driver进程.根据你使用的部署模式(deploy-mode)不同,Driver进程可能在本地启动,也可能在集群中某个工作节点上启动.Driver进程本身会根据我们设置的参数,占有一定数量的内存和CPU core.而Driver进程要做的第一件事情,就是向集群管理器(可以是Spark Stand

Spark 整合hive 实现数据的读取输出

实验环境: linux centOS 6.7 vmware虚拟机 spark-1.5.1-bin-hadoop-2.1.0 apache-hive-1.2.1 eclipse 或IntelJIDea 本次使用eclipse. 代码: import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.DataFrame; import o

spark 教程三 spark Map filter flatMap union distinct intersection操作

RDD的创建 spark 所有的操作都围绕着弹性分布式数据集(RDD)进行,这是一个有容错机制的并可以被并行操作的元素集合,具有只读.分区.容错.高效.无需物化.可以缓存.RDD依赖等特征 RDD的创建基础RDD 1.并行集合(Parallelized Collections):接收一个已经存在的Scala集合,然后进行各种并行运算 var sc=new SparkContext(conf) var rdd=sc.parallelize(Array(2,4,9,3,5,7,8,1,6)); rd

Spark运行命令示例

local单机模式:结果xshell可见:./bin/spark-submit --class org.apache.spark.examples.SparkPi --master local[1] ./lib/spark-examples-1.3.1-hadoop2.4.0.jar 100 standalone集群模式:需要的配置项1, slaves文件2, spark-env.shexport JAVA_HOME=/usr/soft/jdk1.7.0_71export SPARK_MASTE

Spark Job具体的物理执行

即使采用pipeline的方式,函数f对依赖的RDD中的数据集合的操作也会有两种方式: 1.f(record),f作用于集合的每一条记录,每次只作用于一条记录 2.f(records),f一次性作用于集合的全部数据: Spark采用的是第一种方式,因为: 1.无需等待,可以最大化的使用集群的计算资源 2.减少OOM的产生 3.最大化的有利于并发 4.可以精准的控制每一个Partition本身(Dependency)及其内部的计算(compute) 5.基于lineage的算子流动式函数式计算,可

Dataflow编程模型和spark streaming结合

Dataflow编程模型和spark streaming结合 主要介绍一下Dataflow编程模型的基本思想,后面再简单比较一下Spark  streaming的编程模型 == 是什么 == 为用户提供以流式或批量模式处理海量数据的能力,该服务的编程接口模型(或者说计算框架)也就是下面要讨论的dataflow model 流式计算框架处理框架很多,也有大量的模型/框架号称能较好的处理流式和批量计算场景,比如Lambda模型,比如Spark等等,那么dataflow模型有什么特别的呢? 这就要要从