Spark SQL数据载入和保存实战

一:前置知识具体解释:

Spark SQL重要是操作DataFrame,DataFrame本身提供了save和load的操作。

Load:能够创建DataFrame。

Save:把DataFrame中的数据保存到文件或者说与具体的格式来指明我们要读取的文件的类型以及与具体的格式来指出我们要输出的文件是什么类型。

二:Spark SQL读写数据代码实战:

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.*;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

import java.util.ArrayList;
import java.util.List;

public class SparkSQLLoadSaveOps {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setMaster("local").setAppName("SparkSQLLoadSaveOps");
        JavaSparkContext sc = new JavaSparkContext(conf);
        SQLContext = new SQLContext(sc);
        /**
         * read()是DataFrameReader类型,load能够将数据读取出来
         */
        DataFrame peopleDF = sqlContext.read().format("json").load("E:\\Spark\\Sparkinstanll_package\\Big_Data_Software\\spark-1.6.0-bin-hadoop2.6\\examples\\src\\main\\resources\\people.json");

        /**
         * 直接对DataFrame进行操作
         * Json: 是一种自解释的格式。读取Json的时候怎么推断其是什么格式?
         * 通过扫描整个Json。扫描之后才会知道元数据
         */
        //通过mode来指定输出文件的是append。创建新文件来追加文件
   peopleDF.select("name").write().mode(SaveMode.Append).save("E:\\personNames");
    }
}

读取过程源代码分析例如以下:

1. read方法返回DataFrameReader,用于读取数据。

/**
 * :: Experimental ::
 * Returns a [[DataFrameReader]] that can be used to read data in as a [[DataFrame]].
 * {{{
 *   sqlContext.read.parquet("/path/to/file.parquet")
 *   sqlContext.read.schema(schema).json("/path/to/file.json")
 * }}}
 *
 * @group genericdata
 * @since 1.4.0
 */
@Experimental
//创建DataFrameReader实例,获得了DataFrameReader引用
def read: DataFrameReader = new DataFrameReader(this)
2.  然后再调用DataFrameReader类中的format,指出读取文件的格式。
/**
 * Specifies the input data source format.
 *
 * @since 1.4.0
 */
def format(source: String): DataFrameReader = {
  this.source = source
  this
}
3.  通过DtaFrameReader中load方法通过路径把传入过来的输入变成DataFrame。

/**
 * Loads input in as a [[DataFrame]], for data sources that require a path (e.g. data backed by
 * a local or distributed file system).
 *
 * @since 1.4.0
 */
// TODO: Remove this one in Spark 2.0.
def load(path: String): DataFrame = {
  option("path", path).load()
}

至此。数据的读取工作就完毕了,以下就对DataFrame进行操作。

以下就是写操作!

!。

1. 调用DataFrame中select函数进行对列筛选

/**
 * Selects a set of columns. This is a variant of `select` that can only select
 * existing columns using column names (i.e. cannot construct expressions).
 *
 * {{{
 *   // The following two are equivalent:
 *   df.select("colA", "colB")
 *   df.select($"colA", $"colB")
 * }}}
 * @group dfops
 * @since 1.3.0
 */
@scala.annotation.varargs
def select(col: String, cols: String*): DataFrame = select((col +: cols).map(Column(_)) : _*)
2.  然后通过write将结果写入到外部存储系统中。

/**
 * :: Experimental ::
 * Interface for saving the content of the [[DataFrame]] out into external storage.
 *
 * @group output
 * @since 1.4.0
 */
@Experimental
def write: DataFrameWriter = new DataFrameWriter(this)
3.   在保持文件的时候mode指定追加文件的方式
/**
 * Specifies the behavior when data or table already exists. Options include:
// Overwrite是覆盖
 *   - `SaveMode.Overwrite`: overwrite the existing data.
//创建新的文件,然后追加
 *   - `SaveMode.Append`: append the data.
 *   - `SaveMode.Ignore`: ignore the operation (i.e. no-op).
 *   - `SaveMode.ErrorIfExists`: default option, throw an exception at runtime.
 *
 * @since 1.4.0
 */
def mode(saveMode: SaveMode): DataFrameWriter = {
  this.mode = saveMode
  this
}
4.   最后,save()方法触发action。将文件输出到指定文件里。

/**
 * Saves the content of the [[DataFrame]] at the specified path.
 *
 * @since 1.4.0
 */
def save(path: String): Unit = {
  this.extraOptions += ("path" -> path)
  save()
}

三:Spark SQL读写整个流程图例如以下:

四:对于流程中部分函数源代码具体解释:

DataFrameReader.Load()

1. Load()返回DataFrame类型的数据集合。使用的数据是从默认的路径读取。

/**
 * Returns the dataset stored at path as a DataFrame,
 * using the default data source configured by spark.sql.sources.default.
 *
 * @group genericdata
 * @deprecated As of 1.4.0, replaced by `read().load(path)`. This will be removed in Spark 2.0.
 */
@deprecated("Use read.load(path). This will be removed in Spark 2.0.", "1.4.0")
def load(path: String): DataFrame = {
//此时的read就是DataFrameReader
  read.load(path)
}
2.  追踪load源代码进去,源代码例如以下:

在DataFrameReader中的方法。Load()通过路径把输入传进来变成一个DataFrame。

/**
 * Loads input in as a [[DataFrame]], for data sources that require a path (e.g. data backed by
 * a local or distributed file system).
 *
 * @since 1.4.0
 */
// TODO: Remove this one in Spark 2.0.
def load(path: String): DataFrame = {
  option("path", path).load()
}
3.  追踪load源代码例如以下:
/**
 * Loads input in as a [[DataFrame]], for data sources that don‘t require a path (e.g. external
 * key-value stores).
 *
 * @since 1.4.0
 */
def load(): DataFrame = {
//对传入的Source进行解析
  val resolved = ResolvedDataSource(
    sqlContext,
    userSpecifiedSchema = userSpecifiedSchema,
    partitionColumns = Array.empty[String],
    provider = source,
    options = extraOptions.toMap)
  DataFrame(sqlContext, LogicalRelation(resolved.relation))
}

DataFrameReader.format()

1. Format:具体指定文件格式,这就获得一个巨大的启发是:假设是Json文件格式能够保持为Parquet等此类操作。

Spark SQL在读取文件的时候能够指定读取文件的类型。比如,Json,Parquet.

/**
 * Specifies the input data source format.Built-in options include “parquet”,”json”,etc.
 *
 * @since 1.4.0
 */
def format(source: String): DataFrameReader = {
  this.source = source //FileType
  this
}

DataFrame.write()

1. 创建DataFrameWriter实例

/**
 * :: Experimental ::
 * Interface for saving the content of the [[DataFrame]] out into external storage.
 *
 * @group output
 * @since 1.4.0
 */
@Experimental
def write: DataFrameWriter = new DataFrameWriter(this)
2.  追踪DataFrameWriter源代码例如以下:

以DataFrame的方式向外部存储系统中写入数据。

/**
 * :: Experimental ::
 * Interface used to write a [[DataFrame]] to external storage systems (e.g. file systems,
 * key-value stores, etc). Use [[DataFrame.write]] to access this.
 *
 * @since 1.4.0
 */
@Experimental
final class DataFrameWriter private[sql](df: DataFrame) {

DataFrameWriter.mode()

1. Overwrite是覆盖,之前写的数据全都被覆盖了。

Append:是追加。对于普通文件是在一个文件里进行追加。可是对于parquet格式的文件则创建新的文件进行追加。

/**
 * Specifies the behavior when data or table already exists. Options include:
 *   - `SaveMode.Overwrite`: overwrite the existing data.
 *   - `SaveMode.Append`: append the data.
 *   - `SaveMode.Ignore`: ignore the operation (i.e. no-op).
//默认操作
 *   - `SaveMode.ErrorIfExists`: default option, throw an exception at runtime.
 *
 * @since 1.4.0
 */
def mode(saveMode: SaveMode): DataFrameWriter = {
  this.mode = saveMode
  this
}
2.  通过模式匹配接收外部參数
/**
 * Specifies the behavior when data or table already exists. Options include:
 *   - `overwrite`: overwrite the existing data.
 *   - `append`: append the data.
 *   - `ignore`: ignore the operation (i.e. no-op).
 *   - `error`: default option, throw an exception at runtime.
 *
 * @since 1.4.0
 */
def mode(saveMode: String): DataFrameWriter = {
  this.mode = saveMode.toLowerCase match {
    case "overwrite" => SaveMode.Overwrite
    case "append" => SaveMode.Append
    case "ignore" => SaveMode.Ignore
    case "error" | "default" => SaveMode.ErrorIfExists
    case _ => throw new IllegalArgumentException(s"Unknown save mode: $saveMode. " +
      "Accepted modes are ‘overwrite‘, ‘append‘, ‘ignore‘, ‘error‘.")
  }
  this
}

DataFrameWriter.save()

1. save将结果保存传入的路径。

/**
 * Saves the content of the [[DataFrame]] at the specified path.
 *
 * @since 1.4.0
 */
def save(path: String): Unit = {
  this.extraOptions += ("path" -> path)
  save()
}
2.  追踪save方法。
/**
 * Saves the content of the [[DataFrame]] as the specified table.
 *
 * @since 1.4.0
 */
def save(): Unit = {
  ResolvedDataSource(
    df.sqlContext,
    source,
    partitioningColumns.map(_.toArray).getOrElse(Array.empty[String]),
    mode,
    extraOptions.toMap,
    df)
}
3.  当中source是SQLConf的defaultDataSourceName
private var source: String = df.sqlContext.conf.defaultDataSourceName

当中DEFAULT_DATA_SOURCE_NAME默认參数是parquet。

// This is used to set the default data source
val DEFAULT_DATA_SOURCE_NAME = stringConf("spark.sql.sources.default",
  defaultValue = Some("org.apache.spark.sql.parquet"),
  doc = "The default data source to use in input/output.")

DataFrame.scala中部分函数具体解释:

1. toDF函数是将RDD转换成DataFrame

/**
 * Returns the object itself.
 * @group basic
 * @since 1.3.0
 */
// This is declared with parentheses to prevent the Scala compiler from treating
// `rdd.toDF("1")` as invoking this toDF and then apply on the returned DataFrame.
def toDF(): DataFrame = this
2.  show()方法:将结果显示出来
/**
 * Displays the [[DataFrame]] in a tabular form. For example:
 * {{{
 *   year  month AVG(‘Adj Close) MAX(‘Adj Close)
 *   1980  12    0.503218        0.595103
 *   1981  01    0.523289        0.570307
 *   1982  02    0.436504        0.475256
 *   1983  03    0.410516        0.442194
 *   1984  04    0.450090        0.483521
 * }}}
 * @param numRows Number of rows to show
 * @param truncate Whether truncate long strings. If true, strings more than 20 characters will
 *              be truncated and all cells will be aligned right
 *
 * @group action
 * @since 1.5.0
 */
// scalastyle:off println
def show(numRows: Int, truncate: Boolean): Unit = println(showString(numRows, truncate))
// scalastyle:on println

追踪showString源代码例如以下:showString中触发action收集数据。

/**
 * Compose the string representing rows for output
 * @param _numRows Number of rows to show
 * @param truncate Whether truncate long strings and align cells right
 */
private[sql] def showString(_numRows: Int, truncate: Boolean = true): String = {
  val numRows = _numRows.max(0)
  val sb = new StringBuilder
  val takeResult = take(numRows + 1)
  val hasMoreData = takeResult.length > numRows
  val data = takeResult.take(numRows)
  val numCols = schema.fieldNames.length

原文地址:https://www.cnblogs.com/zhchoutai/p/8571489.html

时间: 2024-08-02 19:37:05

Spark SQL数据载入和保存实战的相关文章

Spark视频第5期:Spark SQL架构和案例深入实战

Spark SQL架构和案例深入实战 视频地址:http://pan.baidu.com/share/link?shareid=3629554384&uk=4013289088&fid=977951266414309 王家林老师(邮箱:[email protected] QQ: 1740415547) Spark亚太研究院院长和首席专家,中国目前唯一的移动互联网和云计算大数据集大成者. 在Spark.Hadoop.Android等方面有丰富的源码.实务和性能优化经验.彻底研究了Spark从

spark视频-Spark SQL架构和案例深入实战

Spark亚太研究院决胜大数据时代公益大讲坛第五期:Spark SQL架构和案例深入实战,视频地址:http://pan.baidu.com/share/link?shareid=3629554384&uk=4013289088&fid=977951266414309 王家林老师(邮箱:[email protected] QQ: 1740415547) Spark亚太研究院院长和首席专家,中国目前唯一的移动互联网和云计算大数据集大成者. 在Spark.Hadoop.Android等方面有丰

Spark SQL数据加载和保存实战

一:前置知识详解: Spark SQL重要是操作DataFrame,DataFrame本身提供了save和load的操作, Load:可以创建DataFrame, Save:把DataFrame中的数据保存到文件或者说与具体的格式来指明我们要读取的文件的类型以及与具体的格式来指出我们要输出的文件是什么类型. 二:Spark SQL读写数据代码实战: import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRD

第57课:Spark SQL on Hive配置及实战

1,首先需要安装hive,参考http://lqding.blog.51cto.com/9123978/1750967 2,在spark的配置目录下添加配置文件,让Spark可以访问hive的metastore. [email protected]:/usr/local/spark/spark-1.6.0-bin-hadoop2.6/conf# vi hive-site.xml <configuration> <property>   <name>hive.metast

整理对Spark SQL的理解

Catalyst Catalyst是与Spark解耦的一个独立库,是一个impl-free的执行计划的生成和优化框架. 目前与Spark Core还是耦合的,对此user邮件组里有人对此提出疑问,见mail. 以下是Catalyst较早时候的架构图,展示的是代码结构和处理流程. Catalyst定位 其他系统如果想基于Spark做一些类sql.标准sql甚至其他查询语言的查询,需要基于Catalyst提供的解析器.执行计划树结构.逻辑执行计划的处理规则体系等类体系来实现执行计划的解析.生成.优化

day61-Spark SQL数据加载和保存内幕深度解密实战

Spark SQL加载数据 SparkSQl 数据输入输入输出主要是DataFrame,DataFrame提供了一些通用的load和save操作. 通过load可以创建出DataFrame:通过save可以将DataFrame数据保存到文件中或者说以具体的格式来指明要读取的文件是什么格式或者输出的数据是什么格式:直接读取 文件的指定类型: SQLContext源码: load 和save方法 @deprecated("Use read.load(path). This will be remov

spark sql中保存数据的几种方式

从官网来copy过来的几种模式描述: Scala/Java Python Meaning SaveMode.ErrorIfExists(default) "error"(default) When saving a DataFrame to a data source, if data already exists, an exception is expected to be thrown. SaveMode.Append "append" When saving

大数据技术之_19_Spark学习_02_Spark Core 应用解析+ RDD 概念 + RDD 编程 + 键值对 RDD + 数据读取与保存主要方式 + RDD 编程进阶 + Spark Core 实例练习

第1章 RDD 概念1.1 RDD 为什么会产生1.2 RDD 概述1.2.1 什么是 RDD1.2.2 RDD 的属性1.3 RDD 弹性1.4 RDD 特点1.4.1 分区1.4.2 只读1.4.3 依赖1.4.4 缓存1.4.5 CheckPoint第2章 RDD 编程2.1 RDD 编程模型2.2 RDD 创建2.2.1 由一个已经存在的 Scala 集合创建,即集合并行化(测试用)2.2.2 由外部存储系统的数据集创建(开发用)2.3 RDD 编程2.3.1 Transformatio

以慕课网日志分析为例 进入大数据 Spark SQL 的世界

详情请交流  QQ  709639943 01.以慕课网日志分析为例 进入大数据 Spark SQL 的世界 02.漫谈spring cloud分布式服务架构 03.Spring Cloud微服务实战视频课程 04.漫谈spring cloud 与 spring boot 基础架构 05.Java秒杀系统方案优化 高性能高并发实战 06.Java深入微服务原理改造房产销售平台 07.快速上手Linux 玩转典型应用 08.快速上手Ionic3 多平台开发企业级问答社区 09.Java Sprin