Spark学习笔记4:数据读取与保存

Spark对很多种文件格式的读取和保存方式都很简单。Spark会根据文件扩展名选择对应的处理方式。

Spark支持的一些常见文件格式如下:

 1、文本文件

   使用文件路径作为参数调用SparkContext中的textFile()函数,就可以读取一个文本文件。也可以指定minPartitions控制分区数。传递目录作为参数,会把目录中的各部分都读取到RDD中。例如:

val input = sc.textFile("E:\\share\\new\\chapter5")
input.foreach(println)

 chapter目录有三个txt文件,内容如下:

输出结果:

用SparkContext.wholeTextFiles()也可以处理多个文件,该方法返回一个pair RDD,其中键是输入文件的文件名。

例如:

    val input = sc.wholeTextFiles("E:\\share\\new\\chapter5")
    input.foreach(println)

  输出结果:

保存文本文件用saveAsTextFile(outputFile)

  •  JSON

JSON是一种使用较广的半结构化数据格式,这里使用json4s来解析JSON文件。

如下:

import org.apache.spark.{SparkConf, SparkContext}
import org.json4s.ShortTypeHints
import org.json4s.jackson.JsonMethods._
import org.json4s.jackson.Serialization

object TestJson {

  case class Person(name:String,age:Int)

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local").setAppName("JSON")
    val sc = new SparkContext(conf)
    implicit val formats = Serialization.formats(ShortTypeHints(List()))
    val input = sc.textFile("E:\\share\\new\\test.json")
    input.collect().foreach(x => {var c = parse(x).extract[Person];println(c.name + "," + c.age)})

  }

}

 json文件内容:

输出结果:

保存JSON文件用saveASTextFile(outputFile)即可

如下:

val datasave = input.map { myrecord =>
      implicit val formats = DefaultFormats
      val jsonObj = parse(myrecord)
      jsonObj.extract[Person]
    }
datasave.saveAsTextFile("E:\\share\\spark\\savejson")

输出结果:

  • CSV文件

读取CSV文件和读取JSON数据相似,都需要先把文件当作普通文本文件来读取数据,再对数据进行处理。

如下:

import org.apache.spark.{SparkConf, SparkContext}
import java.io.StringReader

import au.com.bytecode.opencsv.CSVReader

object DataReadAndSave {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local").setAppName("CSV")
    val sc = new SparkContext(conf)

    val input = sc.textFile("E:\\share\\spark\\test.csv")
    input.foreach(println)
    val result = input.map{
      line =>
        val reader = new CSVReader(new StringReader(line))
        reader.readNext()
    }
    for(res <- result){
      for(r <- res){
        println(r)
      }
    }
  }

}

test.csv内容:

输出结果:

保存csv

如下:

val inputRDD = sc.parallelize(List(Person("Mike", "yes")))
        inputRDD.map(person  => List(person.name,person.favoriteAnimal).toArray)
        .mapPartitions { people =>
          val stringWriter = new StringWriter()
          val csvWriter = new CSVWriter(stringWriter)
          csvWriter.writeAll(people.toList)
          Iterator(stringWriter.toString)
        }.saveAsTextFile("E:\\share\\spark\\savecsv")
  • SequenceFile

SequenceFile是由没有相对关系结构的键值对文件组成的常用Hadoop格式。是由实现Hadoop的Writable接口的元素组成,常见的数据类型以及它们对应的Writable类如下:

读取SequenceFile

调用sequenceFile(path , keyClass , valueClass , minPartitions)

保存SequenceFile

调用saveAsSequenceFile(outputFile)

  • 对象文件

对象文件使用Java序列化写出,允许存储只包含值的RDD。对象文件通常用于Spark作业间的通信。

保存对象文件调用 saveAsObjectFile    读取对象文件用SparkContext的objectFile()函数接受一个路径,返回对应的RDD

  • Hadoop输入输出格式

Spark可以与任何Hadoop支持的格式交互。

读取其他Hadoop输入格式,使用newAPIHadoopFile接收一个路径以及三个类,第一个类是格式类,代表输入格式,第二个类是键的类,最后一个类是值的类。

hadoopFile()函数用于使用旧的API实现的Hadoop输入格式。

KeyValueTextInputFormat 是最简单的 Hadoop 输入格式之一,可以用于从文本文件中读取键值对数据。每一行都会被独立处理,键和值之间用制表符隔开。

例子:

import org.apache.hadoop.io.{IntWritable, LongWritable, MapWritable, Text}
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark._
import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat

object HadoopFile {

  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("hadoopfile").setMaster("local")
    val sc = new SparkContext(conf)

    val job = new Job()
    val data = sc.newAPIHadoopFile("E:\\share\\spark\\test.json" ,
      classOf[KeyValueTextInputFormat],
      classOf[Text],
      classOf[Text],
      job.getConfiguration)
    data.foreach(println)

    data.saveAsNewAPIHadoopFile(
      "E:\\share\\spark\\savehadoop",
      classOf[Text],
      classOf[Text],
      classOf[TextOutputFormat[Text,Text]],
      job.getConfiguration)

  }
}

  输出结果:

读取

保存

若使用旧API如下:

val input = sc.hadoopFile[Text, Text, KeyValueTextInputFormat]("E:\\share\\spark\\test.json
").map { case (x, y) => (x.toString, y.toString) } input.foreach(println)

  

  • 文件压缩

对数据进行压缩可以节省存储空间和网络传输开销,Spark原生的输入方式(textFile和sequenFile)可以自动处理一些类型的压缩。在读取压缩后的数据时,一些压缩编解码器可以推测压缩类型。

时间: 2024-10-13 23:33:17

Spark学习笔记4:数据读取与保存的相关文章

大数据技术之_19_Spark学习_02_Spark Core 应用解析+ RDD 概念 + RDD 编程 + 键值对 RDD + 数据读取与保存主要方式 + RDD 编程进阶 + Spark Core 实例练习

第1章 RDD 概念1.1 RDD 为什么会产生1.2 RDD 概述1.2.1 什么是 RDD1.2.2 RDD 的属性1.3 RDD 弹性1.4 RDD 特点1.4.1 分区1.4.2 只读1.4.3 依赖1.4.4 缓存1.4.5 CheckPoint第2章 RDD 编程2.1 RDD 编程模型2.2 RDD 创建2.2.1 由一个已经存在的 Scala 集合创建,即集合并行化(测试用)2.2.2 由外部存储系统的数据集创建(开发用)2.3 RDD 编程2.3.1 Transformatio

spark学习笔记总结-spark入门资料精化

Spark学习笔记 Spark简介 spark 可以很容易和yarn结合,直接调用HDFS.Hbase上面的数据,和hadoop结合.配置很容易. spark发展迅猛,框架比hadoop更加灵活实用.减少了延时处理,提高性能效率实用灵活性.也可以与hadoop切实相互结合. spark核心部分分为RDD.Spark SQL.Spark Streaming.MLlib.GraphX.Spark R等核心组件解决了很多的大数据问题,其完美的框架日受欢迎.其相应的生态环境包括zepplin等可视化方面

Spark学习笔记之SparkRDD

Spark学习笔记之SparkRDD 一.   基本概念 RDD(resilient distributed datasets)弹性分布式数据集. 来自于两方面 ①   内存集合和外部存储系统 ②   通过转换来自于其他RDD,如map,filter等 2.创建操作(creation operation):RDD的创建由SparkContext来负责. 3.转换操作(transformation operation):将一个RDD通过一定操作转换为另一个RDD. 4.控制操作(control o

Windows phone 8 学习笔记(2) 数据文件操作(转)

Windows phone 8 应用用于数据文件存储访问的位置仅仅限于安装文件夹.本地文件夹(独立存储空间).媒体库和SD卡四个地方.本节主要讲解它们的用法以及相关限制性.另外包括本地数据库的使用方式. 快速导航:一.分析各类数据文件存储方式二.安装文件夹三.本地文件夹(独立存储空间)四.媒体库操作五.本地数据库 一.分析各类数据文件存储方式 1)安装文件夹 安装文件夹即应用安装以后的磁盘根文件夹,它提供只读的访问权限.它在手机中对应的路径为" C:\Data\Programs\{XXXXXXX

Symfony2学习笔记之数据校验

校验在web应用程序中是一个常见的任务.数据输入到表单需要被校验.数据在被写入数据库之前或者传入一个webservice时也需要被校验. Symfony2 配备了一个Validator 组件,它让校验工作变得简单易懂.该组件是基于JSR303 Bean校验规范.一个Java规范用在PHP中. 基本验证理解校验的最好方法是看它的表现.首先,假设你已经创建了一个用于你应用程序某个地方的PHP对象. //src/Acme/BlogBundle/Entity/Author.php namespace A

python 学习笔记 3 -- 数据结构篇上

数据结构是可以处理一些 数据 的 结构 .或者说,它们是用来存储一组相关数据的.在Python中有三种内建的数据结构--列表.元组和字典.本文主要对这三种数据类型以及相关的使用做介绍,以例子的形式演示更加容易理解! 1.列表(List) 列表是处理一组有序项目的数据结构,即你可以在一个列表中存储一个 序列 的项目.在Python中,你在每个项目之间用逗号分割. 列表中的项目应该包括在**方括号**中,这样Python就知道你是在指明一个列表.一旦你创建了一个列表,你可以添加.删除或是搜索列表中的

python 学习笔记 3 -- 数据结构篇下

5.引用 当你创建一个对象并给它赋一个变量的时候,这个变量仅仅 引用 那个对象,而不是表示这个对象本身!也就是说,变量名指向你计算机中存储那个对象的内存.这被称作名称到对象的绑定.eg. [python] view plaincopy # -*- coding: utf-8 -*- shoplist = ['apple', 'mango', 'carrot', 'banana'] print "we copy the shoplist to mylist directly \"with

springmvc学习笔记(15)-数据回显

springmvc学习笔记(15)-数据回显 springmvc学习笔记15-数据回显 pojo数据回显方法 简单类型数据回显 本文介绍springmvc中数据回显的几种实现方法 数据回显:提交后,如果出现错误,将刚才提交的数据回显到刚才的提交页面. pojo数据回显方法 1.springmvc默认对pojo数据进行回显. pojo数据传入controller方法后,springmvc自动将pojo数据放到request域,key等于pojo类型(首字母小写) 使用@ModelAttribute

Android开发学习笔记:数据存取之SQLite浅析

一.SQLite的介绍 1.SQLite简介 SQLite是一款轻型的数据库,是遵守ACID的关联式数据库管理系统,它的设计目标是嵌入 式的,而且目前已经在很多嵌入式产品中使用了它,它占用资源非常的低,在嵌入式设备中,可能只需要几百K的内存就够了.它能够支持 Windows/Linux/Unix等等主流的操作系统,同时能够跟很多程序语言相结合,比如Tcl.PHP.Java.C++..Net等,还有ODBC接口,同样比起 Mysql.PostgreSQL这两款开源世界著名的数据库管理系统来讲,它的

#学习笔记#JSP数据交互

#学习笔记#JSP数据交互 数据库的使用方式: 当用户在第一个页面的查询框输入查询语句点提交的时候我们是用什么样的方式完成这个查询的? 答:我们通过在第一个页面提交表单的形式,真正的数据库查询时在第二个服务器页面进行的,第一个request对象里面放置了查询的内容,我们可以通过request.getParameter()方法获得,在服务器内我们之间把所有的查询直接用html语句输出是很困难的,所以我们把查询的结果赋给request的Attribute,使用request.sex`x`x``tAt