Spark DataFrames入门指南:创建和操作DataFrame

一、从csv文件创建DataFrame

  本文将介绍如何从csv文件创建DataFrame。

如何做?

  从csv文件创建DataFrame主要包括以下几步骤:

  1、在build.sbt文件里面添加spark-csv支持库;

  2、创建SparkConf对象,其中包括Spark运行所有的环境信息;

  3、创建SparkContext对象,它是进入Spark的核心切入点,然后我们可以通过它创建SQLContext对象;

  4、使用SQLContext对象加载CSV文件;

  5、Spark内置是不支持解析CSV文件的,但是Databricks公司开发了一个类库可以支持解析CSV文件。所以我们需要把这个依赖文件加载到依赖文件中(pom.xml或者是build.sbt)

如果你是SBT工程,请加入以下依赖到build.sbt文件中:

libraryDependencies += "com.databricks" % "spark-csv_2.10" % "1.3.0"

如果你是Maven工程,请加入以下依赖到pom.xml文件中:

<dependency>
	<groupid>com.databricks</groupid>
	<artifactid>spark-csv_2.10</artifactid>
	<version>1.3.0</version>
</dependency>

6、SparkConf持有所有运行Spark程序的信息,在这个实例中,我们将以本地的方式运行这个程序,而且我们打算使用2个核(local[2]),部分代码片段如下:

import org.apache.spark.SparkConf
val conf = new SparkConf().setAppName("csvDataFrame").setMaster("local[2]")

7、使用SparkConf初始化SparkContext对象,SparkContext是进入Spark的核心切入点:

val sc = new SparkContext(conf) 

在Spark中查询数据最简单的一种方式就是使用SQL查询,所以我们可以定义一个SQLContext对象:

val sqlContext=new SQLContext(sc) 

8、现在我们就可以加载事先准备好的数据了:

import com.databricks.spark.csv._
val students=sqlContext.csvFile(filePath="StudentData.csv", useHeader=true, delimiter='|') 

其中,students对象的类型是org.apache. spark.sql.DataFrame。

如何工作的

  csvFile方法接收需要加载的csv文件路径filePath,如果需要加载的csv文件有头部信息,我们可以将useHeader设置为true,这样就可以将第一行的信息当作列名称来读;delimiter指定csv文件列之间的分隔符。

  除了使用csvFile函数,我们还可以使用sqlContext里面的load来加载csv文件:

val options = Map("header" -> "true", "path" -> "E:\\StudentData.csv")
val newStudents = sqlContext.read.options(options).format("com.databricks.spark.csv").load()

附录

为了方便大家测试,我提供了StudentData.csv文件的部分数据集:

id|studentName|phone|email
1|Burke|1-300-746-8446|[email protected]
2|Kamal|1-668-571-5046|[email protected]
3|Olga|1-956-311-1686|[email protected]
4|Belle|1-246-894-6340|[email protected]
5|Trevor|1-300-527-4967|[email protected]
6|Laurel|1-691-379-9921|[email protected]
7|Sara|1-608-140-1995|[email protected]
8|Kaseem|1-881-586-2689|[email protected]
9|Lev|1-916-367-5608|[email protected]
10|Maya|1-271-683-2698|[email protected]
11|Emi|1-467-270-1337|[email protected]
12|Caleb|1-683-212-0896|[email protected]
13|Florence|1-603-575-2444|[email protected]
14|Anika|1-856-828-7883|[email protected]
15|Tarik|1-398-171-2268|[email protected]
16|Amena|1-878-250-3129|[email protected]
17|Blossom|1-154-406-9596|[email protected]
18|Guy|1-869-521-3230|[email protected]
19|Malachi|1-608-637-2772|[email protected]
20|Edward|1-711-710-6552|[email protected]

二、从Scala case class中创建DataFrame

  在这篇文章中,你将学到如何从Scala case class中创建DataFrame。

如何做?

  1、我们首先创建一个case class,名为Employee,并且定义id和name两个参数,如下:

case class Employee(id: Int, name: String)

和先前一样,我们分别定义SparkConf、SparkContext以及SQLContext:

val conf = new SparkConf().setAppName("colRowDataFrame"). setMaster("local[2]")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc) 

2、我们可以通过很多方式来初始化Employee类,比如从关系型数据库中获取数据以此来定义Employee类。但是在本文为了简单起见,我将直接定义一个Employee类的List,如下:

val listOfEmployees = List(Employee(1, "iteblog"), Employee(2, "Jason"), Employee(3, "Abhi"))

3、我们将listOfEmployees列表传递给SQLContext类的createDataFrame 函数,这样我们就可以创建出DataFrame了!然后我们可以调用DataFrame的printuSchema函数,打印出该DataFrame的模式,我们可以看出这个DataFrame主要有两列:name和id,这正是我们定义Employee的两个参数,并且类型都一致。

val empFrame = sqlContext.createDataFrame(listOfEmployees)
empFrame.printSchema
root
 |-- id: integer (nullable = false)
 |-- name: string (nullable = true)

之所以DataFrame打印出的模式和Employee类的两个参数一致,那是因为DataFrame内部通过反射获取到的。

4、如果你对默认反射获取到的模式名称不感兴趣,你可以通过withColumnRenamed函数来指定列名:

val empFrameWithRenamedColumns = sqlContext.createDataFrame(listOfEmployees).withColumnRenamed("id", "empId")
empFrameWithRenamedColumns.printSchema

root
 |-- empId: integer (nullable = false)
 |-- name: string (nullable = true)

5、我们可以使用Spark支持的SQL功能来查询相关的数据。在使用这个功能之前,我们必须先对DataFrame注册成一张临时表,我们可以使用registerTempTable函数实现,如下:

empFrameWithRenamedColumns.registerTempTable("employeeTable")

6、现在我们就可以使用SQL语句来查询DataFrame里面的数据了:

val sortedByNameEmployees = sqlContext.sql("select * from employeeTable order by name desc")
sortedByNameEmployees.show()
+-----+-------+
|empId|   name|
+-----+-------+
|    1|iteblog|
|    2|  Jason|
|    3|   Abhi|
+-----+-------+

它如何工作的

  createDataFrame函数可以接收一切继承scala.Product类的集合对象:

def createDataFrame[A <: Product : TypeTag](rdd: RDD[A]): DataFrame

而case class类就是继承了Product。我们所熟悉的TupleN类型也是继承了scala.Product类的,所以我们也可以通过TupleN来创建DataFrame:

val mobiles=sqlContext.createDataFrame(Seq((1,"Android"), (2, "iPhone"))) mobiles.printSchema mobiles.show()

root
 |-- _1: integer (nullable = false)
 |-- _2: string (nullable = true)

+---+-------+
| _1|     _2|
+---+-------+
|  1|Android|
|  2| iPhone|
+---+-------+

我们知道,Tuple2的默认两个参数名字分别是_1和_2,同样,我们如果对这个默认的名字不是特别喜欢,我们也是可以通过withColumnRenamed函数对默认反射的列名进行重命名。

三、操作DataFrame

  在前面的文章中,我们介绍了如何创建DataFrame。本文将介绍如何操作DataFrame里面的数据和打印出DataFrame里面数据的模式

打印DataFrame里面的模式

  在创建完DataFrame之后,我们一般都会查看里面数据的模式,我们可以通过printSchema函数来查看。它会打印出列的名称和类型:

students.printSchema
root
 |-- id: string (nullable = true)
 |-- studentName: string (nullable = true)
 |-- phone: string (nullable = true)
 |-- email: string (nullable = true)

如果采用的是load方式参见DataFrame的,students.printSchema的输出则如下:

root
 |-- id|studentName|phone|email: string (nullable = true)

对DataFrame里面的数据进行采样

  打印完模式之后,我们要做的第二件事就是看看加载进DataFrame里面的数据是否正确。从新创建的DataFrame里面采样数据的方法有很多种。我们来对其进行介绍。

  最简单的就是使用show方法,show方法有四个版本:

  (1)、第一个需要我们指定采样的行数def show(numRows: Int);

  (2)、第二种不需要我们指定任何参数,这种情况下,show函数默认会加载出20行的数据def show();

  (3)、第三种需要指定一个boolean值,这个值说明是否需要对超过20个字符的列进行截取def show(truncate: Boolean);

  (4)、最后一种需要指定采样的行和是否需要对列进行截断def show(numRows: Int, truncate: Boolean)。实际上,前三个函数都是调用这个函数实现的。

  Show函数和其他函数不同的地方在于其不仅会显示需要打印的行,而且还会打印出头信息,并且会直接在默认的输出流打出(console)。来看看怎么使用吧:

students.show()  //打印出20行
+---+-----------+--------------+--------------------+
| id|studentName|         phone|               email|
+---+-----------+--------------+--------------------+
|  1|      Burke|1-300-746-8446|ullamcorper.velit...|
|  2|      Kamal|1-668-571-5046|[email protected]|
|  3|       Olga|1-956-311-1686|Aenean.eget.metus...|
|  4|      Belle|1-246-894-6340|vitae.aliquet.nec...|
|  5|     Trevor|1-300-527-4967|[email protected]|
|  6|     Laurel|1-691-379-9921|[email protected]|
|  7|       Sara|1-608-140-1995|[email protected]|
|  8|     Kaseem|1-881-586-2689|[email protected]|
|  9|        Lev|1-916-367-5608|[email protected]|
| 10|       Maya|1-271-683-2698|accumsan.convalli...|
| 11|        Emi|1-467-270-1337|        [email protected]|
| 12|      Caleb|1-683-212-0896|[email protected]|
| 13|   Florence|1-603-575-2444|[email protected]|
| 14|      Anika|1-856-828-7883|[email protected]|
| 15|      Tarik|1-398-171-2268|[email protected]|
| 16|      Amena|1-878-250-3129|[email protected]|
| 17|    Blossom|1-154-406-9596|Nunc.commodo.auct...|
| 18|        Guy|1-869-521-3230|senectus.et.netus...|
| 19|    Malachi|1-608-637-2772|[email protected]|
| 20|     Edward|1-711-710-6552|[email protected]|
+---+-----------+--------------+--------------------+
only showing top 20 rows
students.show(15)
+---+-----------+--------------+--------------------+
| id|studentName|         phone|               email|
+---+-----------+--------------+--------------------+
|  1|      Burke|1-300-746-8446|ullamcorper.velit...|
|  2|      Kamal|1-668-571-5046|[email protected]|
|  3|       Olga|1-956-311-1686|Aenean.eget.metus...|
|  4|      Belle|1-246-894-6340|vitae.aliquet.nec...|
|  5|     Trevor|1-300-527-4967|[email protected]|
|  6|     Laurel|1-691-379-9921|[email protected]|
|  7|       Sara|1-608-140-1995|[email protected]|
|  8|     Kaseem|1-881-586-2689|[email protected]|
|  9|        Lev|1-916-367-5608|[email protected]|
| 10|       Maya|1-271-683-2698|accumsan.convalli...|
| 11|        Emi|1-467-270-1337|        [email protected]|
| 12|      Caleb|1-683-212-0896|[email protected]|
| 13|   Florence|1-603-575-2444|[email protected]|
| 14|      Anika|1-856-828-7883|[email protected]|
| 15|      Tarik|1-398-171-2268|[email protected]|
+---+-----------+--------------+--------------------+
only showing top 15 rows

students.show(true)
+---+-----------+--------------+--------------------+
| id|studentName|         phone|               email|
+---+-----------+--------------+--------------------+
|  1|      Burke|1-300-746-8446|ullamcorper.velit...|
|  2|      Kamal|1-668-571-5046|[email protected]|
|  3|       Olga|1-956-311-1686|Aenean.eget.metus...|
|  4|      Belle|1-246-894-6340|vitae.aliquet.nec...|
|  5|     Trevor|1-300-527-4967|[email protected]|
|  6|     Laurel|1-691-379-9921|[email protected]|
|  7|       Sara|1-608-140-1995|[email protected]|
|  8|     Kaseem|1-881-586-2689|[email protected]|
|  9|        Lev|1-916-367-5608|[email protected]|
| 10|       Maya|1-271-683-2698|accumsan.convalli...|
| 11|        Emi|1-467-270-1337|        [email protected]|
| 12|      Caleb|1-683-212-0896|[email protected]|
| 13|   Florence|1-603-575-2444|[email protected]|
| 14|      Anika|1-856-828-7883|[email protected]|
| 15|      Tarik|1-398-171-2268|[email protected]|
| 16|      Amena|1-878-250-3129|[email protected]|
| 17|    Blossom|1-154-406-9596|Nunc.commodo.auct...|
| 18|        Guy|1-869-521-3230|senectus.et.netus...|
| 19|    Malachi|1-608-637-2772|[email protected]|
| 20|     Edward|1-711-710-6552|[email protected]|
+---+-----------+--------------+--------------------+
only showing top 20 rows

students.show(false)
+---+-----------+--------------+-----------------------------------------+
|id |studentName|phone         |email                                    |
+---+-----------+--------------+-----------------------------------------+
|1  |Burke      |1-300-746-8446|[email protected]|
|2  |Kamal      |1-668-571-5046|[email protected]        |
|3  |Olga       |1-956-311-1686|[email protected]   |
|4  |Belle      |1-246-894-6340|[email protected]            |
|5  |Trevor     |1-300-527-4967|[email protected]           |
|6  |Laurel     |1-691-379-9921|[email protected]         |
|7  |Sara       |1-608-140-1995|[email protected]        |
|8  |Kaseem     |1-881-586-2689|[email protected]              |
|9  |Lev        |1-916-367-5608|[email protected]              |
|10 |Maya       |1-271-683-2698|[email protected] |
|11 |Emi        |1-467-270-1337|[email protected]                             |
|12 |Caleb      |1-683-212-0896|[email protected]                  |
|13 |Florence   |1-603-575-2444|[email protected]   |
|14 |Anika      |1-856-828-7883|[email protected]                 |
|15 |Tarik      |1-398-171-2268|[email protected]                     |
|16 |Amena      |1-878-250-3129|[email protected]          |
|17 |Blossom    |1-154-406-9596|[email protected]        |
|18 |Guy        |1-869-521-3230|[email protected]       |
|19 |Malachi    |1-608-637-2772|[email protected]             |
|20 |Edward     |1-711-710-6552|[email protected]               |
+---+-----------+--------------+-----------------------------------------+
only showing top 20 rows

students.show(10,false)

+---+-----------+--------------+-----------------------------------------+
|id |studentName|phone         |email                                    |
+---+-----------+--------------+-----------------------------------------+
|1  |Burke      |1-300-746-8446|[email protected]|
|2  |Kamal      |1-668-571-5046|[email protected]        |
|3  |Olga       |1-956-311-1686|[email protected]   |
|4  |Belle      |1-246-894-6340|[email protected]            |
|5  |Trevor     |1-300-527-4967|[email protected]           |
|6  |Laurel     |1-691-379-9921|[email protected]         |
|7  |Sara       |1-608-140-1995|[email protected]        |
|8  |Kaseem     |1-881-586-2689|[email protected]              |
|9  |Lev        |1-916-367-5608|[email protected]              |
|10 |Maya       |1-271-683-2698|[email protected] |
+---+-----------+--------------+-----------------------------------------+
only showing top 10 rows

我们还可以使用head(n: Int)方法来采样数据,这个函数也需要输入一个参数标明需要采样的行数,而且这个函数返回的是Row数组,我们需要遍历打印。当然,我们也可以使用head()函数直接打印,这个函数只是返回数据的一行,类型也是Row。

students.head(5).foreach(println)
[1,Burke,1-300-746-8446,[email protected]]
[2,Kamal,1-668-571-5046,[email protected]]
[3,Olga,1-956-311-1686,[email protected]]
[4,Belle,1-246-894-6340,[email protected]]
[5,Trevor,1-300-527-4967,[email protected]]
println(students.head())
[1,Burke,1-300-746-8446,[email protected]]

除了show、head函数。我们还可以使用first和take函数,他们分别调用head()和head(n)

println(students.first())
[1,Burke,1-300-746-8446,[email protected]]
students.take(5).foreach(println)
[1,Burke,1-300-746-8446,[email protected]]
[2,Kamal,1-668-571-5046,[email protected]]
[3,Olga,1-956-311-1686,[email protected]]
[4,Belle,1-246-894-6340,[email protected]]
[5,Trevor,1-300-527-4967,[email protected]]

查询DataFrame里面的列

  正如你所看到的,所有的DataFrame里面的列都是有名称的。Select函数可以帮助我们从DataFrame中选择需要的列,并且返回一个全新的DataFrame,下面我将此进行介绍。

  1、只选择一列。假如我们只想从DataFrame中选择email这列,因为DataFrame是不可变的(immutable),所以这个操作会返回一个新的DataFrame:

val emailDataFrame: DataFrame = students.select("email")

现在我们有一个名叫emailDataFrame全新的DataFrame,而且其中只包含了email这列,让我们使用show来看看是否是这样的:

emailDataFrame.show(3)
+--------------------+
|               email|
+--------------------+
|ullamcorper.velit...|
|[email protected]|
|Aenean.eget.metus...|
+--------------------+
only showing top 3 rows

2、选择多列。其实select函数支持选择多列。

val studentEmailDF = students.select("studentName", "email")
studentEmailDF.show(5)
+-----------+--------------------+
|studentName|               email|
+-----------+--------------------+
|      Burke|ullamcorper.velit...|
|      Kamal|[email protected]|
|       Olga|Aenean.eget.metus...|
|      Belle|vitae.aliquet.nec...|
|     Trevor|[email protected]|
+-----------+--------------------+
only showing top 5 rows

需要主要的是,我们select列的时候,需要保证select的列是有效的,换句话说,就是必须保证select的列是printSchema打印出来的。如果列的名称是无效的,将会出现org.apache.spark.sql.AnalysisException异常,如下:

val studentEmailDF = students.select("studentName", "iteblog")
studentEmailDF.show(5)

Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot resolve 'iteblog' given input columns id, studentName, phone, email;

根据条件过滤数据

  现在我们已经知道如何在DataFrame中选择需要的列,让我们来看看如何根据条件来过滤DataFrame里面的数据。对应基于Row的数据,我们可以将DataFrame看作是普通的Scala集合,然后我们根据需要的条件进行相关的过滤,为了展示清楚,我在语句没后面都用show函数展示过滤的结果。

students.filter("id > 5").show(7)
+---+-----------+--------------+--------------------+
| id|studentName|         phone|               email|
+---+-----------+--------------+--------------------+
|  6|     Laurel|1-691-379-9921|[email protected]|
|  7|       Sara|1-608-140-1995|[email protected]|
|  8|     Kaseem|1-881-586-2689|[email protected]|
|  9|        Lev|1-916-367-5608|[email protected]|
| 10|       Maya|1-271-683-2698|accumsan.convalli...|
| 11|        Emi|1-467-270-1337|        [email protected]|
| 12|      Caleb|1-683-212-0896|[email protected]|
| 13|   Florence|1-603-575-2444|[email protected]|
| 14|      Anika|1-856-828-7883|[email protected]|
| 15|      Tarik|1-398-171-2268|[email protected]|
+---+-----------+--------------+--------------------+
only showing top 10 rows

students.filter("studentName =''").show(7)
+---+-----------+--------------+--------------------+
| id|studentName|         phone|               email|
+---+-----------+--------------+--------------------+
| 21|           |1-598-439-7549|consectetuer.adip...|
| 32|           |1-184-895-9602|[email protected]|
| 45|           |1-245-752-0481|Suspendisse.eleif...|
| 83|           |1-858-810-2204|[email protected]|
| 94|           |1-443-410-7878|Praesent.eu.nulla...|
+---+-----------+--------------+--------------------+

注意看第一个过滤语句,虽然id被解析成String了,但是程序依然正确地做出了比较。我们也可以对多个条件进行过滤:

students.filter("studentName ='' OR studentName = 'NULL'").show(7)
+---+-----------+--------------+--------------------+
| id|studentName|         phone|               email|
+---+-----------+--------------+--------------------+
| 21|           |1-598-439-7549|consectetuer.adip...|
| 32|           |1-184-895-9602|[email protected]|
| 33|       NULL|1-105-503-0141|[email protected]|
| 45|           |1-245-752-0481|Suspendisse.eleif...|
| 83|           |1-858-810-2204|[email protected]|
| 94|           |1-443-410-7878|Praesent.eu.nulla...|
+---+-----------+--------------+--------------------+

我们还可以采用类SQL的语法对数据进行过滤:

students.filter("SUBSTR(studentName,0,1) ='M'").show(7)
+---+-----------+--------------+--------------------+
| id|studentName|         phone|               email|
+---+-----------+--------------+--------------------+
| 10|       Maya|1-271-683-2698|accumsan.convalli...|
| 19|    Malachi|1-608-637-2772|[email protected]|
| 24|    Marsden|1-477-629-7528|Donec.dignissim.m...|
| 37|      Maggy|1-910-887-6777|facilisi.Sed.nequ...|
| 61|     Maxine|1-422-863-3041|aliquet.molestie....|
| 77|      Maggy|1-613-147-4380| [email protected]|
| 97|    Maxwell|1-607-205-1273|[email protected]|
+---+-----------+--------------+--------------------+
only showing top 7 rows

对DataFrame里面的数据进行排序

使用sort函数我们可以对DataFrame中指定的列进行排序:

students.sort(students("studentName").desc).show(7)
+---+-----------+--------------+--------------------+
| id|studentName|         phone|               email|
+---+-----------+--------------+--------------------+
| 50|      Yasir|1-282-511-4445|eget.odio.Aliquam...|
| 52|       Xena|1-527-990-8606|[email protected]|
| 86|     Xandra|1-677-708-5691|[email protected]|
| 43|     Wynter|1-440-544-1851|[email protected]|
| 31|    Wallace|1-144-220-8159| [email protected]|
| 66|      Vance|1-268-680-0857|[email protected]|
| 41|     Tyrone|1-907-383-5293|[email protected]|
+---+-----------+--------------+--------------------+
only showing top 7 rows

也可以对多列进行排序:

students.sort("studentName", "id").show(10)
+---+-----------+--------------+--------------------+
| id|studentName|         phone|               email|
+---+-----------+--------------+--------------------+
| 21|           |1-598-439-7549|consectetuer.adip...|
| 32|           |1-184-895-9602|[email protected]|
| 45|           |1-245-752-0481|Suspendisse.eleif...|
| 83|           |1-858-810-2204|[email protected]|
| 94|           |1-443-410-7878|Praesent.eu.nulla...|
| 91|       Abel|1-530-527-7467|    [email protected]|
| 69|       Aiko|1-682-230-7013|turpis.vitae.puru...|
| 47|       Alma|1-747-382-6775|    [email protected]|
| 26|      Amela|1-526-909-2605| [email protected]|
| 16|      Amena|1-878-250-3129|[email protected]|
+---+-----------+--------------+--------------------+
only showing top 10 rows

从上面的结果我们可以看出,默认是按照升序进行排序的。我们也可以将上面的语句写成下面的:

students.sort(students("studentName").asc, students("id").asc).show(10) 

这两个语句运行的效果是一致的。

对列进行重命名

  如果我们对DataFrame中默认的列名不感兴趣,我们可以在select的时候利用as对其进行重命名,下面的列子将studentName重命名为name,而email这列名字不变:

students.select(students("studentName").as("name"), students("email")).show(10)
+--------+--------------------+
|    name|               email|
+--------+--------------------+
|   Burke|ullamcorper.velit...|
|   Kamal|[email protected]|
|    Olga|Aenean.eget.metus...|
|   Belle|vitae.aliquet.nec...|
|  Trevor|[email protected]|
|  Laurel|[email protected]|
|    Sara|[email protected]|
|  Kaseem|[email protected]|
|     Lev|[email protected]|
|    Maya|accumsan.convalli...|
+--------+--------------------+
only showing top 10 rows

将DataFrame看作是关系型数据表

  DataFrame的一个强大之处就是我们可以将它看作是一个关系型数据表,然后在其上运行SQL查询语句,只要我们进行下面两步即可实现:

  (1)、将DataFrame注册成一张名为students的表:

students.registerTempTable("students") 

(2)、然后我们在其上用标准的SQL进行查询:

sqlContext.sql("select * from students where studentName!='' order by email desc").show(7)

+---+-----------+--------------+--------------------+
| id|studentName|         phone|               email|
+---+-----------+--------------+--------------------+
| 87|      Selma|1-601-330-4409|[email protected]|
| 96|   Channing|1-984-118-7533|viverra.Donec.tem...|
|  4|      Belle|1-246-894-6340|vitae.aliquet.nec...|
| 78|       Finn|1-213-781-6969|[email protected]|
| 53|     Kasper|1-155-575-9346|[email protected]|
| 63|      Dylan|1-417-943-8961|[email protected]|
| 35|     Cadman|1-443-642-5919|[email protected]|
+---+-----------+--------------+--------------------+
only showing top 7 rows

对两个DataFrame进行Join操作

  前面我们已经知道如何将DataFrame注册成一张表,现在我们来看看如何使用普通的SQL对两个DataFrame进行Join操作。

  1、内联:内联是默认的Join操作,它仅仅返回两个DataFrame都匹配到的结果,来看看下面的例子:

val students1 = sqlContext.csvFile(filePath = "E:\\StudentPrep1.csv", useHeader = true, delimiter = '|')
val students2 = sqlContext.csvFile(filePath = "E:\\StudentPrep2.csv", useHeader = true, delimiter = '|')
val studentsJoin = students1.join(students2, students1("id") === students2("id"))
studentsJoin.show(studentsJoin.count.toInt)

+---+-----------+--------------+--------------------+---+------------------+--------------+--------------------+
| id|studentName|         phone|               email| id|       studentName|         phone|               email|
+---+-----------+--------------+--------------------+---+------------------+--------------+--------------------+
|  1|      Burke|1-300-746-8446|ullamcorper.velit...|  1|BurkeDifferentName|1-300-746-8446|ullamcorper.velit...|
|  2|      Kamal|1-668-571-5046|[email protected]|  2|KamalDifferentName|1-668-571-5046|[email protected]|
|  3|       Olga|1-956-311-1686|Aenean.eget.metus...|  3|              Olga|1-956-311-1686|Aenean.eget.metus...|
|  4|      Belle|1-246-894-6340|vitae.aliquet.nec...|  4|BelleDifferentName|1-246-894-6340|vitae.aliquet.nec...|
|  5|     Trevor|1-300-527-4967|[email protected]|  5|            Trevor|1-300-527-4967|dapibusDifferentE...|
|  6|     Laurel|1-691-379-9921|[email protected]|  6|LaurelInvalidPhone|     000000000|[email protected]|
|  7|       Sara|1-608-140-1995|[email protected]|  7|              Sara|1-608-140-1995|[email protected]|
|  8|     Kaseem|1-881-586-2689|[email protected]|  8|            Kaseem|1-881-586-2689|[email protected]|
|  9|        Lev|1-916-367-5608|[email protected]|  9|               Lev|1-916-367-5608|[email protected]|
| 10|       Maya|1-271-683-2698|accumsan.convalli...| 10|              Maya|1-271-683-2698|accumsan.convalli...|
+---+-----------+--------------+--------------------+---+------------------+--------------+--------------------+

2、右外联:在内连接的基础上,还包含右表中所有不符合条件的数据行,并在其中的左表列填写NULL ,来看看下面的实例:

val studentsRightOuterJoin = students1.join(students2, students1("id") === students2("id"), "right_outer")
studentsRightOuterJoin.show(studentsRightOuterJoin.count.toInt)
+----+-----------+--------------+--------------------+---+--------------------+--------------+--------------------+
|  id|studentName|         phone|               email| id|         studentName|         phone|               email|
+----+-----------+--------------+--------------------+---+--------------------+--------------+--------------------+
|   1|      Burke|1-300-746-8446|ullamcorper.velit...|  1|  BurkeDifferentName|1-300-746-8446|ullamcorper.velit...|
|   2|      Kamal|1-668-571-5046|[email protected]|  2|  KamalDifferentName|1-668-571-5046|[email protected]|
|   3|       Olga|1-956-311-1686|Aenean.eget.metus...|  3|                Olga|1-956-311-1686|Aenean.eget.metus...|
|   4|      Belle|1-246-894-6340|vitae.aliquet.nec...|  4|  BelleDifferentName|1-246-894-6340|vitae.aliquet.nec...|
|   5|     Trevor|1-300-527-4967|[email protected]|  5|              Trevor|1-300-527-4967|dapibusDifferentE...|
|   6|     Laurel|1-691-379-9921|[email protected]|  6|  LaurelInvalidPhone|     000000000|[email protected]|
|   7|       Sara|1-608-140-1995|[email protected]|  7|                Sara|1-608-140-1995|[email protected]|
|   8|     Kaseem|1-881-586-2689|[email protected]|  8|              Kaseem|1-881-586-2689|[email protected]|
|   9|        Lev|1-916-367-5608|[email protected]|  9|                 Lev|1-916-367-5608|[email protected]|
|  10|       Maya|1-271-683-2698|accumsan.convalli...| 10|                Maya|1-271-683-2698|accumsan.convalli...|
|null|       null|          null|                null|999|LevUniqueToSecondRDD|1-916-367-5608|[email protected]|
+----+-----------+--------------+--------------------+---+--------------------+--------------+--------------------+

3、左外联:在内连接的基础上,还包含左表中所有不符合条件的数据行,并在其中的右表列填写NULL ,同样我们来看看下面的实例:

val studentsLeftOuterJoin = students1.join(students2, students1("id") === students2("id"), "left_outer")
studentsLeftOuterJoin.show(studentsLeftOuterJoin.count.toInt)
+---+-----------+--------------+--------------------+----+------------------+--------------+--------------------+
| id|studentName|         phone|               email|  id|       studentName|         phone|               email|
+---+-----------+--------------+--------------------+----+------------------+--------------+--------------------+
|  1|      Burke|1-300-746-8446|ullamcorper.velit...|   1|BurkeDifferentName|1-300-746-8446|ullamcorper.velit...|
|  2|      Kamal|1-668-571-5046|[email protected]|   2|KamalDifferentName|1-668-571-5046|[email protected]|
|  3|       Olga|1-956-311-1686|Aenean.eget.metus...|   3|              Olga|1-956-311-1686|Aenean.eget.metus...|
|  4|      Belle|1-246-894-6340|vitae.aliquet.nec...|   4|BelleDifferentName|1-246-894-6340|vitae.aliquet.nec...|
|  5|     Trevor|1-300-527-4967|[email protected]|   5|            Trevor|1-300-527-4967|dapibusDifferentE...|
|  6|     Laurel|1-691-379-9921|[email protected]|   6|LaurelInvalidPhone|     000000000|[email protected]|
|  7|       Sara|1-608-140-1995|[email protected]|   7|              Sara|1-608-140-1995|[email protected]|
|  8|     Kaseem|1-881-586-2689|[email protected]|   8|            Kaseem|1-881-586-2689|[email protected]|
|  9|        Lev|1-916-367-5608|[email protected]|   9|               Lev|1-916-367-5608|[email protected]|
| 10|       Maya|1-271-683-2698|accumsan.convalli...|  10|              Maya|1-271-683-2698|accumsan.convalli...|
| 11|    iteblog|        999999| [email protected]|null|              null|          null|                null|
+---+-----------+--------------+--------------------+----+------------------+--------------+--------------------+

将DataFrame保存成文件

  下面我来介绍如何将DataFrame保存到一个文件里面。前面我们加载csv文件用到了load函数,与之对于的用于保存文件可以使用save函数。具体操作包括以下两步:

  1、首先创建一个map对象,用于存储一些save函数需要用到的一些属性。这里我将制定保存文件的存放路径和csv的头信息。

val saveOptions = Map("header" -> "true", "path" -> "iteblog.csv")

为了基于学习的态度,我们从DataFrame里面选择出studentName和email两列,并且将studentName的列名重定义为name。

val copyOfStudents = students.select(students("studentName").as("name"), students("email"))

2、下面我们调用save函数保存上面的DataFrame数据到iteblog.csv文件夹中

copyOfStudents.write.format("com.databricks.spark.csv").mode(SaveMode.Overwrite).options(saveOptions).save()

mode函数可以接收的参数有Overwrite、Append、Ignore和ErrorIfExists。从名字就可以很好的理解,Overwrite代表覆盖目录下之前存在的数据;Append代表给指定目录下追加数据;Ignore代表如果目录下已经有文件,那就什么都不执行;ErrorIfExists代表如果保存目录下存在文件,那么抛出相应的异常。

  需要注意的是,上述path参数指定的是保存文件夹,并不是最后的保存文件名。

时间: 2024-11-05 22:57:49

Spark DataFrames入门指南:创建和操作DataFrame的相关文章

Spark SQL初始化和创建DataFrame的几种方式

一.前述       1.SparkSQL介绍 Hive是Shark的前身,Shark是SparkSQL的前身,SparkSQL产生的根本原因是其完全脱离了Hive的限制. SparkSQL支持查询原生的RDD. RDD是Spark平台的核心概念,是Spark能够高效的处理大数据的各种场景的基础. 能够在Scala中写SQL语句.支持简单的SQL语法检查,能够在Scala中写Hive语句访问Hive数据,并将结果取回作为RDD使用.     2.Spark on Hive和Hive on Spa

Apache Spark 2.2.0 中文文档 - Spark Streaming 编程指南 | ApacheCN

Spark Streaming 编程指南 概述 一个入门示例 基础概念 依赖 初始化 StreamingContext Discretized Streams (DStreams)(离散化流) Input DStreams 和 Receivers(接收器) DStreams 上的 Transformations(转换) DStreams 上的输出操作 DataFrame 和 SQL 操作 MLlib 操作 缓存 / 持久性 Checkpointing Accumulators, Broadcas

Spark性能优化指南——高级篇

Spark性能优化指南--高级篇 [TOC] 前言 继基础篇讲解了每个Spark开发人员都必须熟知的开发调优与资源调优之后,本文作为<Spark性能优化指南>的高级篇,将深入分析数据倾斜调优与shuffle调优,以解决更加棘手的性能问题. 数据倾斜调优 调优概述 有的时候,我们可能会遇到大数据计算中一个最棘手的问题--数据倾斜,此时Spark作业的性能会比期望差很多.数据倾斜调优,就是使用各种技术方案解决不同类型的数据倾斜问题,以保证Spark作业的性能. 数据倾斜发生时的现象 绝大多数tas

Spark从入门到上手实战

Spark从入门到上手实战 课程学习地址:http://www.xuetuwuyou.com/course/186 课程出自学途无忧网:http://www.xuetuwuyou.com 讲师:轩宇老师 课程简介: Spark属于新起的基于内存处理海量数据的框架,由于其快速被众公司所青睐.Spark 生态栈框架,非常的强大,可以对数据进行批处理.流式处理.SQL 交互式处理及机器学习和Graphx 图像计算.目前绝大数公司都使用,主要在于 Spark SQL 结构化数据的处理,非常的快速,高性能

【转载】Spark性能优化指南——高级篇

前言 数据倾斜调优 调优概述 数据倾斜发生时的现象 数据倾斜发生的原理 如何定位导致数据倾斜的代码 查看导致数据倾斜的key的数据分布情况 数据倾斜的解决方案 解决方案一:使用Hive ETL预处理数据 解决方案二:过滤少数导致倾斜的key 解决方案三:提高shuffle操作的并行度 解决方案四:两阶段聚合(局部聚合+全局聚合) 解决方案五:将reduce join转为map join 解决方案六:采样倾斜key并分拆join操作 解决方案七:使用随机前缀和扩容RDD进行join 解决方案八:多

(升级版)Spark从入门到精通(Scala编程、案例实战、高级特性、Spark内核源码剖析、Hadoop高端)

本课程主要讲解目前大数据领域最热门.最火爆.最有前景的技术——Spark.在本课程中,会从浅入深,基于大量案例实战,深度剖析和讲解Spark,并且会包含完全从企业真实复杂业务需求中抽取出的案例实战.课程会涵盖Scala编程详解.Spark核心编程.Spark SQL和Spark Streaming.Spark内核以及源码剖析.性能调优.企业级案例实战等部分.完全从零起步,让学员可以一站式精通Spark企业级大数据开发,提升自己的职场竞争力,实现更好的升职或者跳槽,或者从j2ee等传统软件开发工程

Spark Graphx编程指南

问题导读 1.GraphX提供了几种方式从RDD或者磁盘上的顶点和边集合构造图?2.PageRank算法在图中发挥什么作用?3.三角形计数算法的作用是什么? Spark中文手册-编程指南Spark之一个快速的例子Spark之基本概念Spark之基本概念Spark之基本概念(2)Spark之基本概念(3)Spark-sql由入门到精通Spark-sql由入门到精通续spark GraphX编程指南(1) Pregel API 图本身是递归数据结构,顶点的属性依赖于它们邻居的属性,这些邻居的属性又依

Microsoft Orleans 之 入门指南

Microsoft Orleans 在.net用简单方法构建高并发.分布式的大型应用程序框架. 原文:http://dotnet.github.io/orleans/ 在线文档:http://dotnet.github.io/orleans/What's-new-in-Orleans 源码地址:https://github.com/dotnet/orleans 简介:Orleans 框架可以构建大规模.高并发.分布式应用程序,而不需要学习专业分布式以及并发知识框架.它是由微软研究和设计应用于云计

Spark1.0.x入门指南

Spark1.0.x入门指南 1 节点说明 IP Role 192.168.1.111 ActiveNameNode 192.168.1.112 StandbyNameNode,Master,Worker 192.168.1.113 DataNode,Master,Worker 192.168.1.114 DataNode,Worker HDFS集群和Spark集群之间节点共用. 2 安装HDFS 见HDFS2.X和Hive的安装部署文档:http://www.cnblogs.com/Scott