spark sql 小样

package dev.spark.sql

import java.util.Properties

import org.apache.spark.sql.{Row, SQLContext, SaveMode}import org.apache.spark.sql.hive.HiveContextimport org.apache.spark.sql.types.{IntegerType, StructField, StructType}import org.apache.spark.{SparkConf, SparkContext}

object DataFrame {

  val num = 0  val map = scala.collection.immutable.Map("url" -> "jdbc:mysql://192.168.0.1:3306/spark",    "dbtable"-> "tmp_table3",    "user"-> "spark",    "password"->"spark")  def main(args: Array[String]): Unit = {    val conf = new SparkConf()    conf.setMaster("local")    conf.setAppName("dataFrame")    val sc = new SparkContext(conf)    val ssc = new SQLContext(sc)    val df = ssc.read.json()    ssc.read.format("json").load(".json")    // dataFrame.show 直接查看数据集 按条件查看数据集    df.show()    df.filter(df.col("col")<= num).show()

    // 将dataFrame注册为临时表 按照SQL方式访问数据集    df.registerTempTable("tmp_table0")    // 返回的结果是将每行包装为ROW的数据集集    val dataSet0 = ssc.sql("SELECT col FROM tmp_table WHERE col <="+ num)    // dataSet属性方法很多    dataSet0.collect()foreach(println)    dataSet0.columns.foreach(println)    dataSet0.rdd.foreach(println)    dataSet0.explain()    dataSet0.alias("")    dataSet0.cache()    dataSet0.na

    // SQLContext格式化读取文件    // parquet    val pssc = new SQLContext(sc)    pssc.read.format("parquet")load(".parquet")    // jdbc    val dataSet3 = ssc.read.format("jdbc").options(map).load()

    dataSet3.write.jdbc("jdbc:mysql://192.168.0.1:3306/spark","tmp_table3",new Properties())    // HiveSQLContext在resources中配置hive-site.xml后对hive仓库进行查询 注意:优先从临时表中查询,可以通过数据库.表名的方式完全限定避免歧义,默认仓库是default    val hssc = new HiveContext(sc)

    val dataSet1 = hssc.sql("SELECT col FROM database.table")    dataSet1.registerTempTable("tmp_table1")    // 相同sparkContext上下文可以进行联表操作    hssc.sql("SELECT * FROM tmp_table0 t0 inner join tmp_table1 t1 on t0.col = t1.col")

    // 数据映射为表    dataSet0.rdd.map(line=>Row(line.size))    val rowkeyStructField = new StructField("rowkey", IntegerType,true)    val tableStructType = new StructType(Array(rowkeyStructField))    val dataSet2 = hssc.createDataFrame(dataSet0.rdd, tableStructType)    dataSet2.registerTempTable("tmp_table2")    dataSet2.write.mode(SaveMode.Append).saveAsTable("hive_spark.tmp_table2")

    // rdd转dataframe需要隐式转换    import ssc.implicits._    case class RowKeyClass (rowkey:Int)    dataSet0.rdd.map(x => new RowKeyClass(x.size)).toDF()  }}

原文地址:https://www.cnblogs.com/mrerror/p/10853264.html

时间: 2024-12-01 19:27:39

spark sql 小样的相关文章

详解Spark sql用户自定义函数:UDF与UDAF

UDAF = USER DEFINED AGGREGATION FUNCTION Spark sql提供了丰富的内置函数供猿友们使用,辣为何还要用户自定义函数呢?实际的业务场景可能很复杂,内置函数hold不住,所以Spark sql提供了可扩展的内置函数接口:哥们,你的业务太变态了,我满足不了你,自己按照我的规范去定义一个sql函数,该怎么折腾就怎么折腾! 例如,MySQL数据库中有一张task表,共两个字段taskid (任务ID)与taskParam(JSON格式的任务请求参数).简单起见,

Spark SQL 之 Join 实现

原文地址:Spark SQL 之 Join 实现 Spark SQL 之 Join 实现 涂小刚 2017-07-19 217标签: spark , 数据库 Join作为SQL中一个重要语法特性,几乎所有稍微复杂一点的数据分析场景都离不开Join,如今Spark SQL(Dataset/DataFrame)已经成为Spark应用程序开发的主流,作为开发者,我们有必要了解Join在Spark中是如何组织运行的. SparkSQL总体流程介绍 在阐述Join实现之前,我们首先简单介绍SparkSQL

Spark sql 在yarn-cluster模式下找不到表

在hive里建一个数据库test,在数据库里建了一张表user,然后在Spark程序中使用Spark sql读取这张表 "select * form test.user" 当部署模式是spark stand模式和yarn-client模式时,程序可以正常运行,但yarn-cluster模式就报了找不到"test.user"表的错误. 解决办法: spark和hive整合,把hive-site.xml加到spark根目录的conf下,所以,要在提交Spark任务的时候

spark SQL概述

Spark SQL是什么? 何为结构化数据 sparkSQL与spark Core的关系 Spark SQL的前世今生:由Shark发展而来 Spark SQL的前世今生:可以追溯到Hive Spark SQL的前世今生:Hive 到Shark(在Hive上做改进) Spark SQL的前世今生:Shark 到Spark SQL(彻底摆脱但是兼容Hive) Spark SQL的前世今生:Hive 到Hive on Spark

Spark SQL数据加载和保存实战

一:前置知识详解: Spark SQL重要是操作DataFrame,DataFrame本身提供了save和load的操作, Load:可以创建DataFrame, Save:把DataFrame中的数据保存到文件或者说与具体的格式来指明我们要读取的文件的类型以及与具体的格式来指出我们要输出的文件是什么类型. 二:Spark SQL读写数据代码实战: import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRD

spark sql 优化心得

本篇文章主要记录最近在使用spark sql 时遇到的问题已经使用心得. 1 spark 2.0.1 中,启动thriftserver 或者是spark-sql时,如果希望spark-sql run on hdfs,那样需要增加参数 "--conf spark.sql.warehouse.dir=hdfs://HOSTNAME:9000/user/hive/warehouse" 例如启动thriftserver: bin/start-thriftserver.sh --master s

Spark视频第5期:Spark SQL架构和案例深入实战

Spark SQL架构和案例深入实战 视频地址:http://pan.baidu.com/share/link?shareid=3629554384&uk=4013289088&fid=977951266414309 王家林老师(邮箱:[email protected] QQ: 1740415547) Spark亚太研究院院长和首席专家,中国目前唯一的移动互联网和云计算大数据集大成者. 在Spark.Hadoop.Android等方面有丰富的源码.实务和性能优化经验.彻底研究了Spark从

Spark SQL UDF

目前 Spark SQL 不支持自定义UDF ,底层 SQL 引擎用的 catalyst . 在SqlContext 中 有一个 Analyzer给的一个EmptyFunctionRegistry ,如果 SQL 引擎函数中找不到了,会到这个FunctionRegistry 中找 EmptyFunctionRegistry 中lookup 只是抛出一个异常. 所以自定义了一个 FunctionRegistry ,SqlContext @transient protected[sql]lazyva

Spark SQL Catalyst源码分析之Physical Plan 到 RDD的具体实现

接上一篇文章Spark SQL Catalyst源码分析之Physical Plan,本文将介绍Physical Plan的toRDD的具体实现细节: 我们都知道一段sql,真正的执行是当你调用它的collect()方法才会执行Spark Job,最后计算得到RDD. lazy val toRdd: RDD[Row] = executedPlan.execute() Spark Plan基本包含4种操作类型,即BasicOperator基本类型,还有就是Join.Aggregate和Sort这种