阅读笔记
提出问题:
- 数据的重用在很多迭代的机器学习算法和图算法中都很常见,如网页排名、K-means聚类、逻辑回归等。
- 另一个数据重用的典型场景是交互式数据挖掘,用户在同一数据集上运行多重即席查询。
- 尽管现有框架为处理机群的计算资源提供了大量抽象,对于复杂计算中包含大量数据重用的这类应用,执行效率仍然很低。
- 现有框架唯一重用数据的方式就是将它写入到外部稳定的分布式文件系统中去。
- 这导致了大量的开销,如数据备份、磁盘I/O、序列化等,占用了大量执行时间。
- 认识到这一问题后,有研究者针对数据重用的应用定制了一些框架,如Pregel、HaLoop。
- 然而这些定制的框架只能支持特定的计算模式,通用性不强。
本文工作:
- 这篇论文提出了RDD,弹性分布式数据集的概念。这一模型具有良好的通用性、容错性与并行处理数据的能力。
- RDD使用户能够显式将计算结果保存在内存中,控制数据的划分,并使用更丰富的操作集合来处理。
- 作者认为主要的设计挑战在于保证容错性。
- 现有的框架对容错性通常依靠细粒度的对状态变化的更新来保证,即采用在不同机器上备份数据或记录数据更新的方式,对数据密集型任务来说代价很高。
- 相比之下,RDD采用一种基于粗粒度变换(map,filt,join等)的方法保证容错性。
- 它记录对数据应用的变换而不是记录数据本身来保证容错。如果部分数据丢失,RDD拥有足够的信息得知这部分数据是如何计算得到的,可以通过重新计算来得到丢失的数据。
- 这种恢复数据的方法非常快,无需大量的数据复制操作。
- 作者实现了一个基于RDD模型的系统,称为Spark。
- 本文对Spark性能通过benchmark与实际应用进行了评估,迭代型应用性能相比Hadoop有20倍提升。
关于RDD:
- RDD是只读的记录的分区集合,只能通过对稳定存储介质中的数据或其他RDD的确定性操作创建。
- 这些操作我们称为变换(transformations)。例如:map,filter,join等。
- RDD并不需要总被实体化,因为它包含了如何从其他数据集计算得到的信息(称为lineage)。
- 用户能控制RDD两方面的特性:持续性和数据划分。
- 持续性:用户可以声明将要重新使用的RDD并为他们选择一种存储策略。
- 数据划分:用户可以指定RDD按照记录中的某个键值在机群上被划分。
- Spark的实现中,每个RDD对应一个对象而转换操作对应操作这些对象的方法。
- Spark为用户提供了一些API用于操作RDD。典型的包括:
- count:返回RDD中的数据元素个数
- collect:返回RDD中的元素本身
- save:将RDD中的数据输出到存储系统
- persist:声明当前RDD将被重用,默认存于内存中。
示例:控制台日志信息采集
背景:一项Web服务运行出错,需要从HDFS中记录的TB级海量日志数据中找出错误原因。
基于Spark处理该问题,首先用Scala语言编写如下code:
lines = spark.textFile("hdfs://…")
errors = lines.filter(_.startsWith("ERROR”))
errors.persist()
首先定义一个RDD,数据源于HDFS,然后过滤该RDD得到包含所有错误信息的RDD(即error)。
将errors通过保存在内存中,可对加速后续对它的处理。执行:
errors.count()
统计错误信息记录数目,也可进一步对error进行过滤处理:
// Count errors mentioning MySQL:
errors.filter(_.contains("MySQL")).count()
// Return the time fields of errors mentioning
// HDFS as an array (assuming time is field
// number 3 in a tab-separated format):
errors.filter(_.contains("HDFS”)).map(_.split(’\t’)(3)).collect()
RDD模型的优点:
- RDD只允许通过变换操作粗粒度的形式创建,DSM能够以细粒度的形式读写内存的任意位置。
- 对RDD的限制使得它容错处理更容易,通过记录lineage信息重新计算的方式恢复丢失数据代价较低。
- RDD的另一个好处是系统可以通过运行执行较慢的作业的副本减轻运行较慢机器的负担。对于DSM则很难实现,因为同一份作业的两个副本位于内存的同一位置,更新数据时会互相干扰。
- 对于RDD中的批量操作,运行时将根据数据存放的位置来调度任务,从而提高性能。
- 对于基于扫描的操作,内存不足以缓存整个RDD时可进行部分缓存,把放不下的分区存储到磁盘上,此时性能与现有的数据流系统差不多。
注意:RDD只能粗粒度地写,但是可以细粒度地读
RDD的适用场合:
- RDD适用于对所有数据集中所有元素都进行相同操作的批处理作业。
- 因为对于这类场景记录RDD的转换过程需要维护的信息量较少。
- RDD不适合那些通过异步细粒度地更新来共享状态的应用,例如Web应用中的存储系统,或者增量抓取和索引Web数据的系统。
关于Spark编程接口:
- 使用Spark的开发者需要编写驱动程序(Driver),驱动程序会连接到集群来运行多个称为Worker的进程。
- Worker是长时间运行的进程,将RDD分区存储在内存中。
- 运行时。用户的Driver程序启动多个Worker,Worker从分布式文件系统中读取数据块,并将计算后的RDD分区缓存在内存中。
典型应用 1:逻辑回归
背景:给定一组点集,寻找一个最佳分割两组点(即垃圾邮件和非垃圾邮件)的超平面w。此算法可用于分类,例如区分邮件是否为垃圾邮件。
使用Spark实现:
val points = spark.textFile(...)
.map(parsePoint).persist()
var w = // random initial vector
for (i <- 1 to ITERATIONS) {
val gradient = points.map{ p =>
p.x * (1/(1+exp(-p.y*(w dot p.x)))-1)*p.y
}.reduce((a,b) => a+b)
w -= gradient
}
说明:
- 使用梯度下降法,设置了一个初始随机变量w,然后开始进行迭代。
- 每步将点集中的每个元素按"p.x * (1/(1+exp(-p.y*(w dot p.x)))-1)*p.y”进行映射。
- 通过reduce操作将所有数据集的映射结果进行累加,累加得到的结果实际是逻辑回归代价函数的偏导。
- 每步reduce结束后得到的gradient作为梯度,更新w的值,重复迭代若干步后,最终得到的w就可以用于判别了。
典型应用 2:网页排名
背景:该算法递归地根据引用关系更新网页的贡献度,按贡献度对网页进行排名。
使用Spark实现:
val links = spark.textFile(...).map(...).persist()
var ranks = // RDD of (URL, rank) pairs
for (i <- 1 to ITERATIONS) {
// Build an RDD of (targetURL, float) pairs
// with the contributions sent by each page
val contribs = links.join(ranks).flatMap {
(url, (links, rank)) =>
links.map(dest => (dest, rank/links.size))
}
// Sum contributions by URL and get new ranks
ranks = contribs.reduceByKey((x,y) => x+y)
.mapValues(sum => a/N + (1-a)*sum)
}
说明:
- 网页的排名以(URL,排名)的形式表示
- 算法迭代若干步,每个网页每步向它引用的页面贡献r/n, r为该页面排名,n为该页面的链出的页面个数。
- 每个url累加所有来自链入页面的贡献度得到sum,然后计算"a/N + (1-a) * sum”得到新的排名。
<— 进一步深入理解每一步操作完成的动作需要详细了解Spark提供的每一种转换操作的语义
RDD的表示形式:
- Spark中RDD的表示记录了如下信息:分区集合、依赖集合、如何由父分区计算得到、分区调度和数据存放的元数据信息。
- 其中RDD的依赖集合描述了数据的依赖关系,可分为窄依赖和宽依赖两类。
- 窄依赖(narrow dependencies)子RDD的每个分区依赖于常数个父分区(即与数据规模无关)
- 宽依赖(wide dependencies)子RDD的每个分区依赖于所有父RDD分区。例如,map产生窄依赖,而join则是宽依赖(除非父RDD被以相同的哈希方式分区)
- 窄依赖允许在一个集群节点上以流水线的方式(pipeline)对父分区数据进行计算。例如,可以逐个元素地执行map、然后filter操作。宽依赖则需要首先准备好所有父分区数据,然后在节点之间进行Shuffle来计算。
- 窄依赖能够更有效地对失效节点进行数据恢复,重新计算仅涉及丢失的RDD分区的父分区,且不同节点之间可以并行计算。而宽依赖关系的Lineage图比较复杂,单个节点失效可能导致这个RDD的所有父分区都参与重新计算。
关于Spark实现:
- 本文提出的Spark版本仅使用14000行scala代码实现。
- 能够使用Hadoop提供的API从HDFS或HBase中读取数据
- 本文在这一部分简述了Spark平台一些模块的实现,如任务调度、解释器、内存管理、检查点机制等。
任务调度:
- 调度器考虑了哪些RDD分区是缓存在内存中的。
- 调度器根据目标RDD的Lineage图创建一个由stage构成的无回路有向图(DAG)。
- 每个stage内部尽可能多地包含一组具有窄依赖关系的转换,并将它们流水线并行化(pipeline)。
- stage的边界有两种情况:一是宽依赖上的Shuffle操作;二是已缓存分区,它可以缩短父RDD的计算过程。
- 调度器计算每个划分好的stage,直到获得目标RDD。
- 调度器根据数据存放的位置采用延时调度算法分配任务,以最小化通信开销。
- 如果某个任务需要处理一个已缓存分区,则直接将任务分配给拥有这个分区的节点。
- 否则,如果需要处理的分区位于多个可能的位置(例如,由HDFS的数据存放位置决定),则将任务分配给这一组节点。
- 对于宽依赖(例如需要Shuffle的依赖),目前的实现方式是,在拥有父分区的节点上将中间结果物化,简化容错处理,这跟MapReduce中物化map输出很像。
- 如果某个任务失效,只要stage中的父RDD分区可用,则只需在另一个节点上重新运行这个任务即可。
- 如果某些stage不可用(例如,Shuffle时某个map输出丢失),则需要重新提交这个stage中的所有任务来计算丢失的分区。
集成Scala解释器:
- 类似Ruby和Python一样,Scala也有一个交互式shell。
- 基于内存的数据可以实现低延时,Spark设计者希望允许用户从解释器交互式地运行Spark,从而在大数据集上实现大规模并行数据挖掘。
- Spark集成了Scala解释器并做了两方面针对性的改动:
- 类传输:解释器能够支持基于HTTP传输类字节码
- 代码生成:通常每行上创建的单例对象通过对应类上的静态方法进行访问,将这种逻辑改为直接引用各个行对象的实例。
缓存管理:
- Worker节点将RDD分区以Java对象的形式缓存在内存中。
- 采取RDD级的LRU(最近最少使用)替换策略。
- 目前这种简单的策略适合大多数用户应用。
检查点支持:
- 尽管RDD中的Lineage信息可以用来故障恢复,但对于那些Lineage链较长的RDD来说,这种恢复可能很耗时。
- 如果将Lineage链存到物理存储中,再定期对RDD执行检查点操作就很有效。
- 一般来说,Lineage链较长、宽依赖的RDD需要采用检查点机制。
- 这种情况下,集群的节点故障可能导致每个父RDD的数据块丢失,因此需要全部重新计算。
- 当前Spark版本提供检查点API,但由用户决定是否需要执行检查点操作。计划在未来实现自动检查点,根据成本效益分析确定RDD Lineage图中的最佳检查点位置。
性能评估:
- 对于迭代式机器学习应用,Spark比Hadoop快20多倍。数据存储在内存中,同时Java对象缓存避免了反序列化操作是获得上述加速比的重要因素。
- 用户编写的应用程序执行结果很好。以一个分析报表的典型应用为例,Spark比Hadoop快40多倍。
- 作者对容错性和内存不足的情况页进行了针对性测试。
- 如果节点发生失效,通过重建那些丢失的RDD分区,Spark能够实现快速恢复。
- Spark能够在5-7s延时范围内,交互式地查询1TB大小的数据集。
- 测试集群:在100个m2.4xlarge EC2实例(8核68G内存)
- 主要任务:使用Spark分析1TB从2008-10到2009-4这段时间的Wikipedia页面浏览日志数据
- 在整个输入数据集上简单地查询如下内容以获取页面浏览总数:(1)全部页面;(2)页面的标题能精确匹配给定的关键词;(3)页面的标题能部分匹配给定的关键词。
- 对比直接操作磁盘数据快几个数量级。例如,从磁盘上查询1TB数据耗时170秒,体现了RDD缓存的作用。
Spark vs. Hadoop 20倍加速比的分析:
- Hadoop软件栈的最小开销
- Hadoop读数据时HDFS的开销
- Hadoop将二进制记录逆序列化转换成内存中的Java对象的开销
术语 && 知识补充
术语 :
- 序列化:将对象状态转换为可保持或传输的格式的过程。
- Scala:一种能够运行于JVM之上的静态类型、函数式编程语言。
知识补充:
- 关于逻辑回归,参考 梯度下降法求解逻辑回归
- HBase即Hadoop database,是一个高可靠性、高性能、面向列、可伸缩的分布式存储系统。
- Dryad: 微软发布的分布式数据计算平台,针对Windows HPC Server设计,使用.NET的LINQ查询语言模型。