浅谈Spark(2) - RDD

  Spark依赖于一个很特别的数据抽象,叫做弹性分布式数据集(resilient distributed datasets),也就是RDD,它是一个被集群分区(partitioned)的in-memory read-only对象。每一个RDD都是根据range(partitioning of consecutive records),或者是每条记录的key的hash值来进行分区。当然这两种不同的分区方法在特定的use case上有它自己的优点。例如利用hash值来分区,当不同的dataset共享一个key的时候,它能够通过这些record的局部性(locality),来提高join的效率。而采用range的方法,能够加快访问一小段经过过滤的数据。

  尽管RDD并通过物理空间来存储数据,但是RDD依旧能够做到fault-tolerant。在这个过程中,它们并不需要复制或者保留备份数据,它们拥有一个”血统“(lineage)的概念,能够记住一些列建造这些RDD的指令,从而当数据丢失的时候,可以通过这些指令重新建造一份。

  对于Spark来说,所有的工作只分为三种:建造新的RDD,转换(transformation)已存在的RDD和在RDD上运行各种指令。有一点需要声明的是,RDD采用的是lazily computed and ephemeral。所谓lazily computed,就是所有的transformation操作要到第一个action被用到的时候,它们才会开始执行。所谓Emphermal,就是当RDDs被某个应用程序用到的时候,可能会加载到内存里,并且在内存里进行计算,但随后它们就会从内存里被抹掉。

  

图 1 RDD和DSM的比较

  RDD抽象是一种分布式共享收集系统(distributed shared collection system),它和传统的分布式共享内存系统 (Distributed Shared Memory systems, DSM)很像,所以我觉得可以把两者进行比较一番,可以看出个中的区别。

  下面我来讲一下RDD的生命周期。

  • Creating RDDs

  在Spark的程序里,一个RDD默认为一个Scala的对象,虽然它也可以是Java或者是Python的对象。它通过以下几种方式被建造:

  1. 从分布式系统(如HDFS)的文件里
  2. 通过并行化(parallelizing)一个集合(collection或者array)并分配到许多节点上
  3. 通过转换(transforming)一个已经存在的RDD(后面有详细说明)
  4. 通过改变一个已经存在的RDD的persistence level,它有以下两种action:
    • cache:它能够让RDD在第一次运行后依旧保存在内存里,以备将来再次调用
    • save:把数据写到分布式系统上,如HDFS。

  下面是以下create新的RDD的例子(本文采用Scala,它能够直接在Spark Scala Shell上运行):

    • 通过textFile()方法把加载一个文本文档”server.logs“到RDD上:
>>> log_lines_RDD = sc.textFile("server.logs")
    • 并行化一个已经存在的RDD:
>>> greeting_lines_RDD = sc.parallelize(["hello", "world"])
  • RDD上的指令

  RDD一旦被建立起来,它支持两种不同的指令:

  1. Transformations:从已存在的RDD里重新create一个新的RDD的指令
  2. Actions:在RDD上计算并且返回一个对象给driver。

  正如之前所说,Transformations是lazy的,他们并不会马上被计算,他们成批的存储起来,直到一个Action被执行的时候他们才会被执行(如图2)。一个action的执行使得所有lineage上的RDD被“物化”,存到内存里。当然,一旦计算结束,一个RDD只有在显式指令下才能够继续保持,不然就会从内存中被抹掉。

  下面是一些常见的transformation和actions:

  下面再举一些简单的例子,希望能让读者对RDD的指令有更为深刻的理解:

  Transformations:

>>> log_lines_RDD = sc.textFile("server.logs")
>>> xss_RDD =  log_lines_RDD.filter(lambda x: "%3C%73%63%72%69%70%74%3E" in x)
>>> sqli_RDD =  log_lines_RDD.filter(lambda x: "bobby_tables" in x)
>>> owasp_attacks_RDD = xss_RDD.union(sqli_RDD)

  上述代码是用来过滤log_lines_RDD,通过对某个关键字段的逐行搜索,找到web server被攻击的记录。

  filter()并不会修改原来的RDD,相反,上例中它建立了两个新的RDD,而原来的log_lines_RDD仍然可以在以后被使用上。union()也是一个transformation,把两个RDD合并起来并且生成一个新的owasp_attacks_RDD。

  Actions

>>> log_lines_RDD = sc.textFile("server.logs")
>>> xss_RDD =  log_lines_RDD.filter(lambda x: "%3C%73%63%72%69%70%74%3E" in x)
>>> sqli_RDD =  log_lines_RDD.filter(lambda x: "bobby_tables" in x)
>>> owasp_attacks_RDD = xss_RDD.union(sqli_RDD)

  上述代码是用来计算owasp_attacks_RDD中web server被攻击的总数。一个action会导致所有batched的RDD被materialization(存储到内存中)。

  由此可以看出,RDD非常适合批量指令(batch operations),尤其是当这些指令可以同时被应用到数据集的所有元素上。因为他们是免疫的(immutable),所以,为每一个额外的输入计算一个新的RDD,它的开销十分巨大。所以,当处理real-time数据输入的时候,Spark经常在短时间内批量(batch)所有的变化,而并不马上执行它。

时间: 2024-10-10 14:37:37

浅谈Spark(2) - RDD的相关文章

浅谈Spark内部运行机制

Spark中最重要的机制有那些? 1.RDD,2.Spark调度机制,3Shuffle过程 什么是RDD? 可以这么说,你懂了RDD,基本上就可以对Hadoop和Spark的一半给吃透了,那么到底是RDD RDD(弹性分布式数据集)首先体现数据集,RDD是对原始数据的封装,该种数据结构内部可以对数据进行逻辑分区,其次分布式体现是并行计算以及需要解决容错问题,也就是根据依赖,找到第一层RDD,最后根据RDD编号与分区编号,可以唯一确定该分区对应的块编号,就能从存储介质中提取出分区对应的数据.在就是

Apache Spark源码走读之21 -- 浅谈mllib中线性回归的算法实现

欢迎转载,转载请注明出处,徽沪一郎. 概要 本文简要描述线性回归算法在Spark MLLib中的具体实现,涉及线性回归算法本身及线性回归并行处理的理论基础,然后对代码实现部分进行走读. 线性回归模型 机器学习算法是的主要目的是找到最能够对数据做出合理解释的模型,这个模型是假设函数,一步步的推导基本遵循这样的思路 假设函数 为了找到最好的假设函数,需要找到合理的评估标准,一般来说使用损失函数来做为评估标准 根据损失函数推出目标函数 现在问题转换成为如何找到目标函数的最优解,也就是目标函数的最优化

关于分布式程序 java的内存管理浅谈

关于分布式程序 java的内存管理浅谈 标签(空格分隔): 分布式 内存管理 java Preface 当前全球正处于互联网时代,是个信息大爆炸时代.对于商家来说,每一天信息都是宝贵的,都可以转换成money的.所以对数据的处理要求也变的越来越严格,从以前的hadoop/MapReduce 的离线处理,到现在的准实时和实时处理,都是由数据需求而引起的技术革命.数据的处理快慢取决于很多因素.现在主流的解决方法,像Spark,Flink,Pular,包括腾讯,阿里,百度的诸多为开源的框架都是基于分布

浅谈产业界与学术界的合作研究(转)

[编者注:原文可参阅: http://blog.sciencenet.cn/blog-414166-795432.html ] 最近网络上有一个流传甚广的微故事:"某企业引进了一条香皂包装线,结果发现经常会有空盒流过.厂长聘请一个博士后花了200 万设计出一个全自动分检系统.一个乡镇企业遇到了同样的问题,民工花90 元买了一台大电扇放在生产线旁,一有空盒经过便会吹走."这个微故事不断出现在笔者的视线中,想必在网络上得到了公众的认可.引起了共鸣,所以大家争相转发.平心而论,大多数人的内心

java序列化与反序列化以及浅谈一下hadoop的序列化

1.什么是序列化和反序列化 神马是序列化呢,序列化就是把内存中的对象的状态信息,转换成字节序列以便于存储(持久化)和网络传输.(网络传输和硬盘持久化,你没有一定的手段来进行辨别这些字节序列是什么东西,有什么信息,这些字节序列就是垃圾). 反序列化就是将收到字节序列或者是硬盘的持久化数据,转换成内存中的对象. 2.JDK的序列化 JDK的序列化只有实现了serializable接口就能实现序列化与反序列化,但是记得一定要加上序列化版本ID serialVersionUID 这个是识别序列化的之前那

.net中对象序列化技术浅谈

.net中对象序列化技术浅谈 2009-03-11 阅读2756评论2 序列化是将对象状态转换为可保持或传输的格式的过程.与序列化相对的是反序列化,它将流转换为对象.这两个过程结合起来,可以轻松地存储和传输数 据.例如,可以序列化一个对象,然后使用 HTTP 通过 Internet 在客户端和服务器之间传输该对象.反之,反序列化根据流重新构造对象.此外还可以将对象序列化后保存到本地,再次运行的时候可以从本地文件 中“恢复”对象到序列化之前的状态.在.net中有提供了几种序列化的方式:二进制序列化

浅谈——页面静态化

现在互联网发展越来越迅速,对网站的性能要求越来越高,也就是如何应对高并发量.像12306需要应付上亿人同时来抢票,淘宝双十一--所以,如何提高网站的性能,是做网站都需要考虑的. 首先网站性能优化的方面有很多:1,使用缓存,最传统的一级二级缓存:2,将服务和数据库分开,使用不同的服务器,分工更加明确,效率更加高:3,分布式,提供多台服务器,利用反向代理服务器nginx进行反向代理,将请求分散开来:4,数据库的读写分离,不同的数据库,将读操作和写操作分开,并实时同步即可:5,分布式缓存,使用memc

单页应用SEO浅谈

单页应用SEO浅谈 前言 单页应用(Single Page Application)越来越受web开发者欢迎,单页应用的体验可以模拟原生应用,一次开发,多端兼容.单页应用并不是一个全新发明的技术,而是随着互联网的发展,满足用户体验的一种综合技术. SEO 一直以来,搜索引擎优化(SEO)是开发者容易忽略的部分.SEO是针对搜索(Google.百度.雅虎搜索等)在技术细节上的优化,例如语义.搜索关键词与内容相关性.收录量.搜索排名等.SEO也是同行.市场竞争常用的的营销手段.Google.百度的搜

浅谈html标签

浅谈html各常用标签用法 标题标签:<h1>-<h6>来表示,使标题字体变粗. <br />换行标记 <hr />水平分隔符 &nbsp空格符 &copy版权符 <a href>a标签超链接 href可接链接地址 <p>段落标签<blockquote>引用标签及可用做缩进 <table>表格中的<ul>无序列表<ol>有序列表<dl>自定义列表<row