5.1 文件格式
5.2.1文本文件
当我们将一个文本文件读取为RDD时,输入的每一行都会成为RDD的一个元素,也可以将多个完整文本文件一次性读取为一个pair RDD,其中键是文件名,值是文件内容。
在Python中读取一个文本文件
input = sc.textFile("file:///home/holden/repos")
如果多个输入文件以一个包含数据所有部分的目录的形式出现,可以用两种方式来处理:
仍然使用textFile函数,传递目录作为参数,这样他会把各个部分都读取到RDD中;
如果文件足够小,那么可以使用SparkContext.wholeTextFile方法,该方法会返回一个pair RDD,其中键是输入文件的文件名
在Python中将数据保存为文本文件
result.saveAsTextFile(outputFile)
5.2.2JSON
在Python中读取非结构化的JSON
import json data = input.map(lambda x: json.loads(x))
在Python中保存为JSON
(data.filter(lambda x: x["lovesPandas"]).map(lambda x: json.dumps(x)).saveAsTextFile(outputFile))
5.2.3逗号分隔值与制表符分隔值
在Python中使用textFile读取csv
import csv import StringIO def loadRecord(line): """解析一行CSV记录""" input = StringIO.StringIO(line) reader = csv.DictReader(input, fieldnames = ["name", "favouriteAnimal"]) return reader.next() input = sc.textFile(inputFile).map(loadRecord)
如果在字段中嵌有换行符,就需要完整读入每个文件,然后解析各段:
在Python中读取完整CSV
def loadRecords(fileNameContents): """读取给定文件中的所有记录""" input = StringIO.StringIO(fileNameContents[1]) reader = csv.DictReader(input, fieldNames = ["name", "favouriateAnimal"] return reader fullFileData = sc.wholeTextFiles(inputFile).flatMap(loadRecords)
在Python中写CSV
def writeRecords(records): """写出一些CSV记录""" output = StringIO.StringIO() writer = csv.DictWriter(output, fieldsnames = ["name", "favouriateAnimal"]) for record in records: writer.writerow(record) return [output.getvalue()] pandaLovers.mapPartitions(writeRecords).saveAsTextFile(outputFile)
5.2.4 SequenceFile
5.2 文件系统
5.3 数据库
时间: 2024-10-01 04:00:07