Spark学习之数据读取与保存总结(二)

8、Hadoop输入输出格式

  除了 Spark 封装的格式之外,也可以与任何 Hadoop 支持的格式交互。Spark 支持新旧两套Hadoop 文件 API,提供了很大的灵活性。

  要使用新版的 Hadoop API 读入一个文件,需要告诉 Spark 一些东西。 newAPIHadoopFile接收一个路径以及三个类。第一个类是“格式”类,代表输入格式。相似的函数hadoopFile() 则用于使用旧的 API 实现的 Hadoop 输入格式。第二个类是键的类,最后一个类是值的类。如果需要设定额外的 Hadoop 配置属性,也可以传入一个 conf 对象。

import org.apache.spark.SparkContext
import org.apache.spark.SparkConf

import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.io.Text
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat

object Test {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("hadoop").setMaster("local")
    val sc = new SparkContext(conf)
    sc.setLogLevel("WARN")  // 设置日志显示级别
    val inputFile = "pandainfo.json"//读取csv文件
    // 新式API
    val job = new Job()
    val data = sc.newAPIHadoopFile("pandainfo.json", classOf[KeyValueTextInputFormat], classOf[Text], classOf[Text], job.getConfiguration)
    data.foreach(println)
    data.saveAsNewAPIHadoopFile("hadoop_json", classOf[Text], classOf[Text], classOf[TextOutputFormat[Text,Text]], job.getConfiguration)
//    // 旧式API
//    val input = sc.hadoopFile[Text,Text,KeyValueTextInputFormat](inputFile).map{
//      case(x,y) => (x.toString,y.toString)
//    }
//    input.foreach(println)
  }

}

  

  

9、文件压缩

  在大数据工作中,我们经常需要对数据进行压缩以节省存储空间和网络传输开销。对于大多数 Hadoop 输出格式来说,我们可以指定一种压缩编解码器来压缩数据。我们已经提过,Spark 原生的输入方式( textFile 和 sequenceFile )可以自动处理一些类型的压缩。在读取压缩后的数据时,一些压缩编解码器可以推测压缩类型。

  这些压缩选项只适用于支持压缩的 Hadoop 格式,也就是那些写出到文件系统的格式。写入数据库的 Hadoop 格式一般没有实现压缩支持。如果数据库中有压缩过的记录,那应该是数据库自己配置的。表列出了可用的压缩选项。

   

三、文件系统

  Spark 支持读写很多种文件系统,可以使用任何我们想要的文件格式。

  本地/“常规”文件系统:Spark 支持从本地文件系统中读取文件,不过它要求文件在集群中所有节点的相同路径下都可以找到。

  Amazon S3:将一个以s3n://开头的路径以s3n://bucket/path-within-bucket的形式传给Spark的输入方法。

  HDFS:在Spark中使用HDFS只需要将输入路径输出路径指定为hdfs://master:port/path就可以了。

四、Spark SQL中的结构化数据

1、Apache Hive

  Apache Hive 是 Hadoop 上的一种常见的结构化数据源。Hive 可以在 HDFS 内或者在其他存储系统上存储多种格式的表。这些格式从普通文本到列式存储格式,应有尽有。SparkSQL 可以读取 Hive 支持的任何表。

  要把 Spark SQL 连接到已有的 Hive 上,你需要提供 Hive 的配置文件。你需要将 hive-site.xml 文件复制到 Spark 的 ./conf/ 目录下。这样做好之后,再创建出 HiveContext 对象,也就是 Spark SQL 的入口,然后你就可以使用 Hive 查询语言(HQL)来对你的表进行查询,并以由行组成的 RDD 的形式拿到返回数据。

     //用scala创建HiveContext并查询数据
    val conf = new SparkConf().setAppName("wordcount").setMaster("local")
    val sc = new SparkContext(conf)
    sc.setLogLevel("WARN")  // 设置日志显示级别
    val hiveCtx = new HiveContext(sc)
    val rows = hiveCtx.sql("SELECT name,age FROM users")
    val firstRow = rows.first()
    println(firstRow.getString(0)) // 字段0是name字段

2、JSON

  如果你有记录间结构一致的 JSON 数据,Spark SQL 也可以自动推断出它们的结构信息,并将这些数据读取为记录,这样就可以使得提取字段的操作变得很简单。要读取 JSON 数据,首先需要和使用 Hive 一样创建一个 HiveContext 。(不过在这种情况下我们不需要安装好 Hive,也就是说你也不需要 hive-site.xml 文件。)然后使用 HiveContext.jsonFile 方法来从整个文件中获取由 Row 对象组成的 RDD。除了使用整个 Row 对象,你也可以将 RDD数据注册为一张表,然后从中选出特定的字段。例如:

import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.SQLContext

object Test {
  def main(args: Array[String]): Unit = {
//     再Scala中使用SparkSQL读取json数据
    val conf = new SparkConf().setAppName("wordcount").setMaster("local")
    val sc = new SparkContext(conf)
    sc.setLogLevel("WARN")  // 设置日志显示级别
    val hiveCtx = new HiveContext(sc)
    val input = hiveCtx.jsonFile("tweets.json")
    input.registerTempTable("tweets")
    val results = hiveCtx.sql("SELECT user.name,text FROM tweets")
    results.collect().foreach(println)
  }

}

  

  

3、数据库

  通过数据库提供的 Hadoop 连接器或者自定义的 Spark 连接器,Spark 可以访问一些常用的数据库系统。常见的有四种常见的连接器:Java数据库连接、Cassandra、HBase、Elasticsearch。下面演示如何使用 jdbcRDD 连接 MySQL 数据库。

  首先我们有一个名为teachdb的数据库,里面有一张名为student1的表,有如下数据:

  

import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.SQLContext

object Test {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("wordcount").setMaster("local")
    val sc = new SparkContext(conf)
    sc.setLogLevel("WARN")  // 设置日志显示级别
    val sqlContext = new SQLContext(sc)
    val mysql = sqlContext.read.format("jdbc").option("url", "jdbc:mysql://localhost:3306/teachdb")
                .option("dbtable", "student1").option("driver", "com.mysql.jdbc.Driver") // 注意这儿需要导入"mysql-connector-java-5.1.40-bin"包
                .option("user", "root").option("password", "0000").load()
    mysql.registerTempTable("student1")
    mysql.sqlContext.sql("select * from student1").collect().foreach(println)
  }

}

  

   这篇博文主要来自《Spark快速大数据分析》这本书里面的第五章,内容有删减,还有本书的一些代码的实验结果。

原文地址:https://www.cnblogs.com/xiaoyh/p/10712225.html

时间: 2024-11-10 10:18:48

Spark学习之数据读取与保存总结(二)的相关文章

Spark学习笔记——数据读取和保存

spark所支持的文件格式 1.文本文件 在 Spark 中读写文本文件很容易. 当我们将一个文本文件读取为 RDD 时,输入的每一行 都会成为 RDD 的 一个元素. 也可以将多个完整的文本文件一次性读取为一个 pair RDD, 其中键是文件名,值是文件内容.

大数据技术之_19_Spark学习_02_Spark Core 应用解析+ RDD 概念 + RDD 编程 + 键值对 RDD + 数据读取与保存主要方式 + RDD 编程进阶 + Spark Core 实例练习

第1章 RDD 概念1.1 RDD 为什么会产生1.2 RDD 概述1.2.1 什么是 RDD1.2.2 RDD 的属性1.3 RDD 弹性1.4 RDD 特点1.4.1 分区1.4.2 只读1.4.3 依赖1.4.4 缓存1.4.5 CheckPoint第2章 RDD 编程2.1 RDD 编程模型2.2 RDD 创建2.2.1 由一个已经存在的 Scala 集合创建,即集合并行化(测试用)2.2.2 由外部存储系统的数据集创建(开发用)2.3 RDD 编程2.3.1 Transformatio

Spark学习笔记4:数据读取与保存

Spark对很多种文件格式的读取和保存方式都很简单.Spark会根据文件扩展名选择对应的处理方式. Spark支持的一些常见文件格式如下: 1.文本文件 使用文件路径作为参数调用SparkContext中的textFile()函数,就可以读取一个文本文件.也可以指定minPartitions控制分区数.传递目录作为参数,会把目录中的各部分都读取到RDD中.例如: val input = sc.textFile("E:\\share\\new\\chapter5") input.fore

从相册读取本地保存的二维码并跳转h5链接

因公司业务需求,在扫描二维码基础的前提下,也需要满足用户点击相册按妞,从相册获取本地保存二维码实现签到功能,在网上查阅相关资料后,整理了下,有以下几种方式: ios8.0以后可以通过使用系统原生的框架实现该功能,即CIDetector,直接上代码,但是通过验证发现,大部分二维码都能够识别,但是对于通过拍照保存的二维码,则出现很大概率无法识别,故此方法限制性比较大,不建议推荐使用,如想使用,可以直接照搬网上相关的代码; 使用先阶段比较流行的zxingObjC框架来扫描相册的二维码,但该框架不好用的

(5)数据读取与保存

5.1 文件格式 5.2.1文本文件 当我们将一个文本文件读取为RDD时,输入的每一行都会成为RDD的一个元素,也可以将多个完整文本文件一次性读取为一个pair RDD,其中键是文件名,值是文件内容. 在Python中读取一个文本文件 input = sc.textFile("file:///home/holden/repos") 如果多个输入文件以一个包含数据所有部分的目录的形式出现,可以用两种方式来处理: 仍然使用textFile函数,传递目录作为参数,这样他会把各个部分都读取到R

Unity游戏数据用Json保存

(一)关于路径 unity有几个关键的路径 (1).Application.dataPath 只读路径,就是工作目录的Assets路径 (2).Application.streamingAssetsPath 只读路径,在pc可写,程序打包后里面的所有资源都原封不动的打到游戏包里面 (3).Application.persistentDataPath 读写路径,pc端:C:/Users/用户名/AppData/LocalLow/公司名/包名/文件. Android:Android/data/包名/

pandas学习(常用数学统计方法总结、读取或保存数据、缺省值和异常值处理)

pandas学习(常用数学统计方法总结.读取或保存数据.缺省值和异常值处理) 目录 常用数学统计方法总结 读取或保存数据 缺省值和异常值处理 常用数学统计方法总结 count 计算非NA值的数量 describe 针对Series或DataFrame列计算统计 min/max/sum 计算最小值 最大值 总和 argmin argmax 计算能够获取到最小值和最大值的索引位置(整数) idxmin idxmax 计算能够获取到最小值和最大值的索引值 quantile 计算样本的分位数(0到1)

spark源码阅读笔记RDD(七) RDD的创建、读取和保存

Spark支持很多输入和输出源,同时还支持内建RDD.Spark本身是基于Hadoop的生态圈,它可以通过 Hadoop MapReduce所使用的InpoutFormat和OutputFormat接口访问数据.而且大部分的文件格式和存储系统 (HDFS,Hbase,S3等)都支持这种接口.Spark常见的数据源如下: (1) 文件格式和文件系统,也就是我们经常用的TXT,JSON,CSV等這些文件格式 (2)SparkSQL中的结构化数据源 (3)数据库与键值存储(Hbase和JDBC源) 当

Spark SQL数据载入和保存实战

一:前置知识具体解释: Spark SQL重要是操作DataFrame,DataFrame本身提供了save和load的操作. Load:能够创建DataFrame. Save:把DataFrame中的数据保存到文件或者说与具体的格式来指明我们要读取的文件的类型以及与具体的格式来指出我们要输出的文件是什么类型. 二:Spark SQL读写数据代码实战: import org.apache.spark.SparkConf; import org.apache.spark.api.java.Java