Spark第一周

Why Scala

在数据集不是很大的时候,开发人员可以使用python、R、MATLAB等语言在单机上处理数据集。但是在大数据时代,数据集少说都是TB、PB级别,此时便需要分布式地处理。相较于上述语言,Scala有着现成的框架即Spark能分布式地处理问题,Scala中有着丰富的Spark API,开发时只需要进行函数的编写就能轻松解决各种需求。虽然其他语言也有Spark的API,比如python的pySpark,但是逊色于Spark对Scala的支持,毕竟Spark是用Scala开发出来的。

Spark和Hadoop

Spark和Hadoop是Apache下的两个不同开源项目,但是有着很强的关联性。简单理解,Spark是一个能在Hadoop生态圈中使用的计算框架,Hadoop本身也有计算框架MapReduce。Spark对比Hadoop中的MapReduce有以下优势:

  1. Spark is more expressive(实现方式). Spark‘s APIs are modeled after Scala‘s collections, which mean distributed computations in Spark are like immutable lists in Scala. You can use higher-order functions like map, flatMap, filter, and reduce, to build up rich pipelines of computation that are distributed in a very concise way. Whereas Hadoop on the other hand is much more rigid(不灵活). It forces map then reduce computations without all of these cool combinators, like flatMap and filter, and it requires a lot more boilerplate to build up interesting computation pipelines.
  2. The second reason is performance(性能). By now, I‘m sure you‘ve heard of Spark as being super fast. After all, Spark‘s tagline is Lightning-Fast Cluster Computing. Performance brings something very important to the table that we haven‘t had until Spark came along which is interactivity. Now it‘s possible to query your very large distributed data set interactively(交互式). So that‘s a really big deal. And also Spark is so much faster that Hadoop in some cases that jobs that would take tens of minutes to run, now only take a few seconds. This allows data scientists to interactively explore and experiment with their data, which in turn allows for data scientists to discover richer insights from their data faster. MapReduce job results need to be stored in HDFS before they can be used by another job.
  3. And finally, Spark is good for data science. It‘s much better for data science than Hadoop and it‘s not just due to performance reasons. Iteration is required by most algorithms in the data scientist‘s toolbox. That is, most analysis tasks require multiple passes over the same set of data. And while iteration is indeed possible in Hadoop with really quite a lot of effort, you have a bunch of boilerplate that you have to do and a required external libraries and frameworks that just degenerate a bunch of extra map reduce phases in order to simulate iteration. It‘s really, on the other hand, the downright simple to do in Spark. There‘s no boilerplate required whatsoever, you just write what feels like a normal program, which includes a few passes over the same dataset. You just have basically, something that looks like a for loop and say hey, until this condition is met iterate. Which is night and day when compared with Hadoop, because it‘s almost not possible in Hadoop.

Why Spark is causing such a shift in handling data

Spark对于数据的实时处理被人津津乐道,那么这个分布式计算框架为什么能如此优秀?

对于一个分布式计算框架,有两个问题始终难以回避:

  1. Partial failure(某些节点崩溃,难以工作): crash failures of a subset of the machines involved in distributed computation
  2. Latency(延迟): certain opeartions have a much higher latency than other operations due to the network communication

在实际开发中,Latency是难以规避的。虽然Spark是很优秀的框架,但是如何编写出更好的代码尽量降低Latency也是开发人员的工作。

一些延迟时间:
?

  • 蓝色:与内存读写有关
  • 橙色:与磁盘读写有关
  • 紫色:与网络传输有关

在继承Hadoop对于节点崩溃的容错性的同时,Spark在处理时效上有了巨大突破。原因在于
如果计算不涉及与其他节点进行数据交换,Spark可以在内存中一次性完成这些操作,也就是中间结果无须落盘(MR则需要存到HDFS先),减少了磁盘IO的操作。

RDD(resilient distributed dataset)

在scala-REPL中敲出下列代码可以查看Spark根目录中的LICENSE有几行,并且可以打印其中有几行包含“BSD”。

// sc是Spark Context类
scala> val licLines = sc.textFile("/Users/shayue/env/spark/spark-2.4.2-bin-hadoop2.7/LICENSE")
licLines: org.apache.spark.rdd.RDD[String] = /Users/shayue/env/spark/spark-2.4.2-bin-hadoop2.7/LICENSE MapPartitionsRDD[1] at textFile at <console>:24

scala> val lineCnt = licLines.count
[Stage 0:>
lineCnt: Long = 518

scala> val bsdLines = licLines.filter(line => line.contains("BSD"))
bsdLines: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[2] at filter at <console>:25

scala> bsdLines.count
res1: Long = 3

scala> bsdLines.foreach(bsdLine => println(bsdLine))
BSD 2-Clause
BSD 3-Clause
is distributed under the 3-Clause BSD license.

// 简化写法
scala> bsdLines.foreach(println)
BSD 2-Clause
BSD 3-Clause
is distributed under the 3-Clause BSD license.

这里代码中用到的filtercontains在scala的一些collection中也有。不过,licLinesbsdLines并不是scala的collection结构。它们时分布式的collection结构,在Spark中被称为RDDs。

RDD的特性如下:

  • Immutable (read-only) ,只读类型
  • Resilient (fault-tolerant) ,高容错性
  • Distributed (dataset spread out to more than one node),分布式

RDDs能执行很多数据转化操作,但结束后总是产生一个新的RDD实例。一旦创建,RDD永远不会改变,这也是上面提到的Immutable特性。因为Mutable会增加框架的复杂性,除此之外,immutable collections使得Spark更具有容错性。

一个RDD实例是分布在各台机器上的总和,但是Spark仅仅暴露一个接口给用户,使得用户感觉就像在单机上操作。RDDs使得将一个任务部署到多个机器上执行变得简单了。

其他分布式计算框架通过将数据复制到多台机器来保持容错,一旦节点出现故障就可以从正常的节点复制恢复。RDD的机制不同的,它们会记录如何在某个工作节点上生成一些数据的方式,称为RDD运算图(RDD lineage),如果哪个工作节点发生故障,则只需根据记录重新生成数据即可。

下面以上述代码为例:

  1. 加载文本文件的过程生成了licLines RDD
  2. 对licLines使用filter方法,生成新的bsdLines RDD
  3. 步骤1和步骤2一起构成了一个RDD运算图(RDD lineage)。它记录了如何从头到尾创建bsdLines RDD。也就是说,即使某个节点当掉,可以根据RDD lineage重新生成必要的数据。

基本的RDD操作

分为两种方式:

  1. Transformations(转换): 比如filtermap,可以发现transformations操作一般会通过原RDD进行更改,进而产生新的RDD实例
  2. Actions(执行): 比如countforeach,进行action操作主要是希望得到一些关于RDD实例性质的结果。

值得注意的是:
Spark的Transformations操作是Lazily的,即仅在执行某些Actions操作以输出一些想要的结果时,Transformations操作才会进行。

在RDD上触发action操作后,Spark会检查RDD lineage,并使用该信息构建需要执行的graph of operations(操作图)以计算操作。它告诉Spark需要执行哪些transformations,以及将以何种顺序执行。

“操作图”可以理解为在一些RDDs上接了许多带箭头的线,每条线代表一个transformation,只有最后点击Actions时,这些线上才会有数据流动。

Transformations实践

map

// sc.parallelize是一个创建RDD的方式,用本地的scala collect创建。makeRDD也可;(10 to 50 by 10)是生成Scala Range类型写法
// scala> List(10 to 50 by 10)
// res6: List[scala.collection.immutable.Range] = List(Range 10 to 50 by 10)

scala> val numbers = sc.parallelize(10 to 50 by 10)
numbers: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[3] at parallelize at <console>:24

scala> numbers.foreach(println)
20
30
40
50
10

// map操作,计算平方
scala> val numberSquared = numbers.map(num => num * num)
numberSquared: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[4] at map at <console>:25

scala> numberSquared.foreach(println)
100
1600
400
900
2500

// 对于平方数,转成string,再反转
scala> val reversed = numberSquared.map(num => num.toString.reverse)
reversed: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[5] at map at <console>:25

scala> reversed.foreach(println)
001
004
009
0061
0052

// 上述方式的简写
scala> val alsoReversed = numberSquared.map(_.toString.reverse)
alsoReversed: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[6] at map at <console>:25

distinct and flatMap

  1. .collect方法能获得RDDs实例中的scala collection类型
  2. .distinct获得不同的元素
  3. .flatMap对于def flatMap[U](f: (T) => TraversableOnce[U]): RDD[U]

代码:

scala> val lines = sc.textFile("/Users/shayue/client-ids.log")
lines: org.apache.spark.rdd.RDD[String] = /Users/shayue/client-ids.log MapPartitionsRDD[1] at textFile at <console>:24

scala> val idsStr = lines.flatMap(_.split(","))
idsStr: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[2] at flatMap at <console>:25

scala> idsStr.collect
res0: Array[String] = Array(15, 16, 20, 20, 77, 80, 94, 94, 98, 16, 31, 31, 15, 20)

scala> idsStr.first
res2: String = 15

scala> val idsInt = idsStr.map(_.toInt)
idsInt: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[3] at map at <console>:25

// idsInt.collect是Array[Int]
scala> idsInt.collect
res4: Array[Int] = Array(15, 16, 20, 20, 77, 80, 94, 94, 98, 16, 31, 31, 15, 20)

scala> idsInt.distinct
res5: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[6] at distinct at <console>:26

// res5指代idsInt.distinct生成的RDD实例
scala> res5
res6: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[6] at distinct at <console>:26

// 仅包含不同元素
scala> res5.collect
res8: Array[Int] = Array(16, 80, 98, 20, 94, 15, 77, 31)

// 由Array[String]到String,并且加上特定格式“;”
scala> idsStr.collect.mkString(";")
res9: String = 15;16;20;20;77;80;94;94;98;16;31;31;15;20

sample\takeSample\take

def sample(withReplacement: Boolean, fraction: Double, seed: Long = Utils.random.nextLong): RDD[T]
用于对一个RDD实例中的元素取样,Boolean是True的话就是有放回的取样;False就是无放回

统计量

mean\sum\histogram\variance\stdv

PPT 总结:赖得手打了

常用Transformations:
?

常用Actions:
?

其他:
?

Why Spark is good for Data Analysis

先来回顾一下Spark的特性:

  1. Transformtions is Lazy
  2. Actions is Eager

数据处理中做的最多的事情是迭代:

?

从上图看来,不像Hadoop需要重复地将数据写回File System中,Spark有能力直接在内存中读取数据进行迭代。

Spark allows us to control what is cached in memory. What we do is just simply call persist() or cache() on RDDs.

举例:
?
倘若不在第二行生成logsWithErrors时加上.persist()。由于filter()和contains()是Lazy的,在第4行执行Actions操作count时,需要重新执行.filter(_.contains("ERROR"))这个操作;但是加上.persist()就可以不用重复计算。

当然,既然是将数据保存在内存中,那显然很吃内存的容量。Spark有以下如下几种方式,配置保存在内存中的数据格式。
?

RDDs和collection的区别

  1. Lazy和Eager
  2. Spark更加聪明,比如:

?

取完10个元素之后,Spark便停止工作了;又或者:

?

collction中要先走完map再走filter再走count,RDDs中一次遍历时这些操作同时进行。

原文地址:https://www.cnblogs.com/shayue/p/week1.html

时间: 2024-11-05 19:29:57

Spark第一周的相关文章

20155336 2016-2017-2《JAVA程序设计》第一周学习总结

# 20155336  2016-2017-2<JAVA程序设计>第1周学习总结 ## 教材学习内容总结 开学的第一周,带着些许的欣喜和好奇,听完了老师的第一堂课.说心里话学习JAVA仿佛观看一部英文影视作品一样头疼, 因为总会有许许多多相似的名字让你记得晕头转向.JAVA也是一样,什么JVM啊JRE啊JDK啊 (/(ㄒoㄒ)/~~)  等等一系列 的英文缩写,让人心神意乱~~但总体上对JAVA有了一个初步的了解.通过课上老师的介绍以及课下对JAVA书第一章的浏览, 我简单的了解了JAVA艰辛

20145216 史婧瑶《信息安全系统设计基础》第一周学习总结

20145216 <信息安全系统设计基础>第一周学习总结 教材学习内容总结 Linux基础 1.ls命令 ls或ls .显示是当前目录的内容,这里“.”就是参数,表示当前目录,是缺省的可以省略.我们可以用ls -a .显示当前目录中的所有内容,包括隐藏文件和目录.其中“-a” 就是选项,改变了显示的内容.如图所示: 2.man命令 man命令可以查看帮助文档,如 man man : 若在shell中输入 man+数字+命令/函数 即可以查到相关的命令和函数:若不加数字,那man命令默认从数字较

第一周例行报告

PSP 内容 类别 预计时长 开始时间 结束时间 中断时间 实际花费时间 第一周作业一 写作 1h 2017-9-7  19:25 2017-9-7  20:18  回复微信消息5min 48min 看<构建之法> 阅读 一周,每天1h30min 2017-9-7  21:35 2017-9-10   每天抽空看一些 没有具体时间 采访记录 写作 1h 2017-9-9   20:46 2017-9-9 22:04  一边写一边吃火龙果  1h左右 构建之法读后感 写作 1h 2017-9-1

《嵌入式设计》第一周学习总结

<嵌入式设计>第一周学习总结 学习时遇到的主要问题 1.运行环境未及时安装 2.命令不熟练 3.上课状态不好 解决方法 1.及时安装环境并熟悉运行 2.参考Linux操作系统实用教程并百度一些问题的解决办法 3.及时调整自己状态 主要学习内容 Linux一些简单基本的操作, 用.c文件输出hello word 用vi test.c 创建.c文件 #include <stdio.h> int main(){ printf("hello word\n"); } 并用

20145311 《信息安全系统设计基础》第一周学习总结

20145311 <信息安全系统设计基础>第一周学习总结 教材学习内容总结 常用的部分命令 CTRL+SHIFT+T:新建标签页,编程时有重要应用: ALT+数字N:终端中切换到第N个标签页,编程时有重要应用: Tab:终端中命令补全,当输入某个命令的开头的一部分后,按下Tab键就可以得到提示或者帮助完成: CTRL+C:中断程序运行 Ctrl+D:键盘输入结束或退出终端 Ctrl+S: 暂定当前程序,暂停后按下任意键恢复运行 Ctrl+A: 将光标移至输入行头,相当于Home键 Ctrl+E

2016-7第一周工作总结

姓名 王奈 时间 第一周工作总结 学习内容 经过了一周的开发与改进,我们的easy工大已经初见成果.这一周,我们完成了:界面的重新设计,数据获取正确性修改,以及吐槽墙功能的实现等等.我的工作就是文档记录,在这过程中,我能够记录我们项目进步的点点滴滴,记录它是如何一步一步走到现在的样子,就像看到了软件的成长过程一样. 除此之外,我还完成了一个嵌入式开发项目的初步实现,学习了嵌入式编程的原则以及优化方法.如同软工所要求的一样,我在嵌入式开发的过程中也使用了文档记录的方法来管理我们的项目进度,这样一来

JS第一周总结1

JS第一周总结 这周我学习了JS的基础部分,大致分为了: a.基础变量及数据类型 b.特殊数据类型 c.JS基础语句类型 d.BOM操作 e.DOM操作 ###基础变量###     1.首先给变量取名,取名规范有3点:         1.变量首字符必须是字母或者美元符号$.下划线_ 三者之一.         2.变量名字中不能包括特殊符号,比如空格.加减号等符号.         3.变量中不能包括JS中的关键字,比如var之类的. 2.取好名字之后,我们需要将这个变量定义出来,并且赋值给

20145321曾子誉《Java程序设计》第一周学习总结

20145321 <Java程序设计>第1周学习总结 教材学习内容总结 第一章 1.三大平台:Java SE.Java EE .Java ME 2.Java SE:由JVM.JRE.JDK.Java语言四部分组成. JVM:操作系统,虚拟机. JRE:执行环境,包括JVM. JDK:包含JRE及开发过程中需要的一些工具程序. 3.JCP.JSR.RI.TCK的关系:任何想要提议加入Java的功能或特性,必须以JSR正式文件的方式提交,经过JCP这个国际组织投票通过,成为最终文件,由此做出的参考

20145201 《Java程序设计》第一周学习总结

# 20145201 <Java程序设计>第一周学习总结 ## 教材学习内容总结 万事开头难,终于开始学习了Java.寒假的时候看到老师的要求确实有点慌,但是这周翻开书,从书本知识第一行学起,发现并不是自己想想中那么难,只要一步一个脚印,每周有自己的计划,并按照计划按部就班的完成,最后一定会拥有自己的小成果的. 1.1Java不只是语言 Java经过多年的版本更新后,最新的版本是Java SE8. 在java发展的过程中,它的应用领域越来越广,根据不同级别的应用开发区分了不同的应用版本,最终j