spark复习总结03

1.DataFrame的创建方式

  1.1 通过加载外部文件创建

//通过sqlContext读取json文件创建DataFrame
DataFrame dataFrame=sqlContext.read().json("src/main/resources/datafromcreate.txt");//通过两种方式加载json文件//sqlContext.read().json("src/main/resources/datafromcreate.txt");sqlContext.read().format("json").load("src/main/resources/datafromcreate.txt");

  1.2 通过RDD和元数据进行转换

    1.2.1 通过使用动态构建的元数据的方式创建DataFrame

//创建sqlContext
SQLContext sqlContext=new SQLContext(context);
//使用程序构建DataFrame的元数据
StructType structType=new StructType(new StructField[]{
                DataTypes.createStructField("id", DataTypes.IntegerType, false),
                DataTypes.createStructField("name", DataTypes.StringType, true),
                DataTypes.createStructField("age", DataTypes.IntegerType, true)
        });

//创建studentsRdd
JavaRDD<Row> studentsRdd=context.textFile("src/main/resources/students.txt").map(new Function<String, Row>() {

            private static final long serialVersionUID = 1L;

            public Row call(String line) throws Exception {
                String[] words=line.split(" ");
                return RowFactory.create(Integer.parseInt(words[0]),words[1],Integer.parseInt(words[2]));
            }
        });

//使用动态构建的元数据创建DataFrame
DataFrame studentDataFrame= sqlContext.createDataFrame(studentsRdd, structType);

    1.2.2 通过反射的方式,使用javabean的属性作为DataFrame的元数据进行创建DataFrame

//封装为Student JavaRDD
JavaRDD<Student> students=context.textFile("src/main/resources/students.txt").map(new Function<String, Student>() {

            private static final long serialVersionUID = 1L;

            public Student call(String line) throws Exception {
                String[] words=line.split(" ");
                return new Student(Integer.parseInt(words[0]), words[1], Integer.parseInt(words[2]));
            }
        });

//使用反射技术,将javaRdd转换为DataFrame,使用javabean的属性定义DataFrame的元数据
DataFrame studentDataFrame= sqlContext.createDataFrame(students, Student.class);

    1.2.3 使用hiveContext.table方法将hive表中的数据装换为DataFrame

DataFrame goodStudentDF=hiveContext.table("sqark.good_student_info");

  1.3 加载分区表的parquet文件,自动推断分区字段

//加载parquet文件为DataFrame
 DataFrame usersDF=sqlContext.read().parquet("src/main/resources/parquet/users.parquet");

/**
 * root
    |-- name: string (nullable = true)
    |-- age: long (nullable = true)
*/
usersDF.printSchema();   

/**
* 加载区别表中的数据是会自动推断分区列,  users.parquet只有两个字段name,age;  female和coutry为分区字段
*/
usersDF=sqlContext.read().parquet("src/main/resources/parquet/female=male/coutry=US/users.parquet");

/**
* root
|-- name: string (nullable = true)
|-- age: long (nullable = true)
|-- female: string (nullable = true)
|-- coutry: string (nullable = true)
*/
usersDF.printSchema();

  1.4 合并分区

    开启合并元数据的两种方式:
      1) sqlContext.read().option("mergeSchema", "true")
       2) SparkConf().set("spark.sql.parquet.mergeSchema", "true")

/**
* megerschema/idandage.txt 中的内容只有id和age两个属性
* megerschema/idandname.txt 中的内容只有id和name两个属性
* 合并以后的元素为id,name,age三个属性
*/
DataFrame personDF=sqlContext.read().option("mergeSchema", "true").format("json").load("src/main/resources/megerschema");
personDF.printSchema();

2.将DataFrame进行保存到外部文件系统

//将DataFrame,默认以parquet类型进行保存,可以使用format修改保存的文件格式
personDF.write().save("src/main/resources/output/persons");
//将DataFrame使用json格式保存personDF.write().format("json").save("src/main/resources/output/persons");
时间: 2024-10-29 10:46:16

spark复习总结03的相关文章

【spark 深入学习 03】Spark RDD的蛮荒世界

RDD真的是一个很晦涩的词汇,他就是伯克利大学的博士们在论文中提出的一个概念,很抽象,很难懂:但是这是spark的核心概念,因此有必要spark rdd的知识点,用最简单.浅显易懂的词汇描述.不想用学术话的语言来阐述RDD是什么,用简单.容易理解的方式来描述. 一.什么是RDD,RDD出现的背景 Mapreduce计算模型的出现解决了分布式计算的诸多难题,但是由于MR对数据共享的解决方案比较低效,导致MR编程模型效率不高,将数据写到一个稳定的外部存储系统,如HDFS,这个会引起数据复写.磁盘IO

Spark读取Hbase中的数据_云帆大数据分享

Spark读取Hbase中的数据 大家可能都知道很熟悉Spark的两种常见的数据读取方式(存放到RDD中):(1).调用parallelize函数直接从集合中获取数据,并存入RDD中:Java版本如下: 1 JavaRDD<Integer> myRDD = sc.parallelize(Arrays.asList(1,2,3)); Scala版本如下: 1 val myRDD= sc.parallelize(List(1,2,3)) 这种方式很简单,很容易就可以将一个集合中的数据变成RDD的初

Apache Kafka-0.8.1.1源代码编译

作者:过往记忆 | 新浪微博:左手牵右手TEL | 能够转载, 但必须以超链接形式标明文章原始出处和作者信息及版权声明博客地址:http://www.iteblog.com/文章标题:<Apache Kafka-0.8.1.1源代码编译>本文链接:http://www.iteblog.com/archives/1044Hadoop.Hive.Hbase.Flume等QQ交流群:138615359(已满),请增加新群:149892483本博客的微信公共帐号为:iteblog_hadoop.欢迎大

开发系列:03、Spark Streaming Custom Receivers(译)

Spark Streaming can receive streaming data from any arbitrary data source beyond the one’s for which it has in-built support (that is, beyond Flume, Kafka, files, sockets, etc.). This requires the developer to implement a receiver that is customized

.Net学习笔记----2015-07-21(C#基础复习03,简单工厂和抽象类)

static void Main(string[] args) { //使用进程打开指定文件 ProcessStartInfo psi = new ProcessStartInfo(@"C:\Users\Administrator\Desktop\Adobe注册机使用说明.txt"); Process p = new Process(); p.StartInfo = psi; p.Start(); } 会和前面的笔记重复,但是还是复习一遍吧,上次就没太整明白 模拟控制台打开文件:(复习

03. 搭建Spark集群(CentOS7+Spark2.1.1+Hadoop2.8.0)

一.下载安装scala 1.官网下载 2.spar01和02都建立/opt/scala目录,解压tar -zxvf scala-2.12.8.tgz 3.配置环境变量 vi /etc/profile 增加一行 export    SCALA_HOME=/opt/scala/scala-2.12.8 同时把hadoop的环境变量增加进去,完整版是: export JAVA_HOME=/opt/java/jdk1.8.0_191export HADOOP_HOME=/opt/hadoop/hadoo

2015 03 03 复习 上课笔记(一)

1 微信APP 2 ps 切片 3 c语言(编程的思维) 4 html html5 (1 语义化 : 1 本身是机器语言 2针对我们的信息选择,采用对应的标签) 4.1 新增元素 5 Doctype :模式 标准 严谨 过度和怪异 5.1 为什么要声明Doctype 这三种模式? 网景 怪 支持以前的版本 过度 标准 支持现在的版本 6 lable for 获取焦点 7 html5 新增的功能 新的语义化标签(日期 date) 画布 canvas 托拽接口 地理定位 本地存储(移动终端) 声音和

【JS复习笔记】03 继承

关于继承 好吧,说到底JS还是原型继承的,而不是类继承.所以在这个上面要经常用到prototype去继承另一个对象. 所有的构造器函数都约定命名为首字母大写的形式,并且不以首字母大写的形式拼写任何其它的东西.当然一个更好的备选方案是根本就不用new.(太棒了,你说什么就是什么咯,我就当JS没这个东西了) 为什么呢,因为伪类模式就是一种画虎不成反类犬的模式,它试图去迎合我们这些玩类继承的程序员,但是其实它还有更多更好的选择. 那就是原型继承,然后创建对象的时候函数化,这样就可以对创建有私有变量的对

NIO复习03

SocketChannel: 1. Java NIO中的SocketChannel是一个连接到TCP网络套接字的通道.可以通过以下2种方式创建SocketChannel: 打开一个SocketChannel并连接到互联网上的某台服务器. 一个新连接到达ServerSocketChannel时,会创建一个SocketChannel. 2. 打开 SocketChannel(发送端channel?): SocketChannel socketChannel = SocketChannel.open(