Learning Spark中文版--第四章--使用键值对(2)

Actions Available on Pair RDDs (键值对RDD可用的action)

??和transformation(转换)一样,键值对RDD也可以使用基础RDD上的action(开工),并且键值对RDD有一些利用键值对数据特性的的action,如下表:

表4-3 键值对RDD上的action

函数名 描述 例子 结果
countByKey() 计算每个键元素的总数 rdd.countByKey() {(1,1),(3,2)}
collectAsMap() 结果收集成一个map便于查询 rdd.collectAsMap() Map{(1,2),(3,4),(3,6)}
lookup(key) 根据键返回值 rdd.lookup(3) [4,6]

??键值对RDD还有很多其他保存RDD的action,我们将在第五章进行讨论。

Data Partitioning(Advanced)(数据分区)

??我们这一章讨论的最后一个Spark特性就是如何控制节点间的数据分区。在分布式程序中,主机间的通信代价高昂,所以把数据安排妥当来最小化网络间的通信可以极大地提高性能。很像单台机器的程序需要为数据选择正确的数据结构,Spark能够控制RDD的分区来减少网络间通信。分区不会对所有的应用都有用,举个例子,如果给定的RDD值只被扫描一次,那么预先对其分区没有什么意义。只有多次使用如join这样的的键操作的RDD,分区才有意义。稍后会有一些例子。

??Spark中的所有键值对RDD都可以使用分区,因为系统的分组函数是根据每个元素的键。尽管Spark没有明确地控制每个键所对应的工作节点(也因为系统在某些工作节点失败的情况下也能正常运行),它允许程序能够确保一组键会出现在同一个节点上。例如,你可以选择哈希分区(hashpartition)将一个RDD划分成100个分区,这样模除100后有相同哈希值的键会出现在一个节点上。或者你可以使用区间分区(range-partition)按区间对键进行分区,这样键在相同范围内的元素会在相同的节点上。

??举个简单例子,想想一个内存中保存大量用户信息的应用,(UserId,UserInfo)组成的键值对RDD,UserInfo包含用户话题订阅列表。这个应用定期把这个表和一个记录了过去五分钟发生的点击事件的小文件结合,就是一个(UserId,LinkInfo)键值对,记录了用户五分钟内点击网站链接的信息的日志。举例来讲,我们想统计用户访问和他们订阅主题无关的链接数量。我们可以执行Spark的join操作,把UserInfo和LinkInfo键值对根据UserId键分组。Example4-22展示了这个例子:

Example 4-22. Scala simple application

// Initialization code; we load the user info from a Hadoop SequenceFile on HDFS.
// This distributes elements of userData by the HDFS block where they are found,
// and doesn‘t provide Spark with any way of knowing in which partition a
// particular UserID is located.
//初始化代码;我们从HDFS的Hadoop SequenceFile加载用户信息,它通过他们找到的HDFS block来分发userData元素,Spark并不知道每个UserId在分区中的位置
val sc = new SparkContext(...)
val userData = sc.sequenceFile[UserID, UserInfo]("hdfs://...").persist()
// Function called periodically to process a logfile of events in the past 5 minutes;
// we assume that this is a SequenceFile containing (UserID, LinkInfo) pairs.
//函数定期被调用处理过去五分钟的事件日志,(假定这个SequenceFIle包含(UserId,Link//Info)键值对

def processNewLogs(logFileName: String) {
    val events = sc.sequenceFile[UserID, LinkInfo](logFileName)
    val joined = userData.join(events)// RDD of (UserID, (UserInfo, LinkInfo)) pairs
    val offTopicVisits = joined.filter {
        case (userId, (userInfo, linkInfo)) => // Expand the tuple into its components
            !userInfo.topics.contains(linkInfo.topic)
    }.count()
    println("Number of visits to non-subscribed topics: " + offTopicVisits)
}

??这段代码能够达到我们的目的,但是效率会很差。这是因为每次调用processNewLogs()时调用的join()操作都不知道键在数据的分区方式。默认情况下,这些操作会用hash值混洗两个数据集的所有键,把具有相同哈希值的键发送到相同的机器中,然后在这台机器上join相同键的元素(如图4-4)。因为我们知道userData表比每五分钟的点击事件日志大很多,这浪费了大量工作:userData表每次被调用都要在通过网络把数据打乱再用哈希值对键分组,有时候用户表甚至没有变化也要这样做。

??改正其实很简单:在程序开始时对userData使用partitionBy()转换(transformation)来把数据进行哈希分区。还需传递一个spark.HashPartitioner对象给partitionBy,如例所示:

Example 4-23. Scala custom partitioner
val sc = new SparkContext(...)
val userData = sc.sequenceFile[UserID, UserInfo]("hdfs://...")
                .partitionBy(new HashPartitioner(100)) // Create 100 partitions
                .persist()

??processNewLogs()可以保持不变:事件RDD是从本地进入的processNewLogs()方法,并且在本方法中只被使用了一次,所以时间RDD被指定分区就没什么好处。因为我们构建userData时调用了partitionBy(),Spark会立即知道它被哈希分区了,调用join()时会利用这些信息。特别是当我们调用userData.join(events)时,Spark会只混洗eventsRDD,向包含用户数据的相应哈希分区的机器发送带有每个特定UserId的事件(如图4-5)。结果就是只有少量数据需要在网络中通信,程序极大提高速度。

??注意一点partitionBy()是transformation(转换),所以他总是返回一个新RDD,原始RDD不会被改变。RDD一旦创建可以永不改变。因此,持久化并且保存partitionBy()后的userData的结果是很重要的,而不是保存原始的sequenceFile()。并且,把100传递给partitionBy()表示分区的数量,这会控制相同数量的并行task(任务)在RDD执行后续的操作(例如:join);通常,这个数量至少和集群上的核心数一样大。

在partitionBy()之后未能持久化会导致后续使用RDD重复对数据分区。没有持久化,已分区RDD的使用将会导致对RDD完整继承关系的重新求导。这是partitionBy()的弊端,这会导致跨网络的重复分区和数据洗牌,类似于没有指定分区的情况。

??实际上,Spark的许多操作会自动生成附加分区信息的RDD,并且很多操作会利用这些分区信息,除了join()。举个例子,sortBykey()groupByKey()会分别生成区间分区和哈希分区。另一方面,类似map()操作产生的新RDD会忘记父RDD的分区信息,因为这种操作理论上有可能修改每条记录的键信息。后面部分会介绍如何决定RDD分区,和Spark不同的操作如何影响分区。

Java,Python和Scala三者的API受益于分区的方式并无二致。但是,在Python中,你不能把一个Hash Partitioner对象传递给partitionBy;你可以直接传递分区要求的数量(例如:rdd.partitionBy(100))。

Determining an RDD‘s Patitioner(决定RDD的分区器)

??在Scala和Java中,你可以通过partitioner的属性决定RDD如何分区(或者Java中的partitioner()方法)。这回返回一个scala.Option对象,一个Scala中包含可能存在可能不存在对象的容器类。你还可以调用Option对象的isDefined()来检查是否有值,get()方法返回这个值。如果有值,会是一个spark.Partioner对象。这本质上是一个表示RDD每个键的分区的函数;稍后会详细介绍。

??利用partitioner属性是个在Spark shell中测试Spark不同操作对分区影响的好手段,还能够检查你想在程序中执行的操作是否符合正确的结果(见Example4-24)。

Example 4-24. Determining partitioner of an RDD

scala> val pairs = sc.parallelize(List((1, 1), (2, 2), (3, 3)))
pairs: spark.RDD[(Int, Int)] = ParallelCollectionRDD[0] at parallelize at <console>:12

scala> pairs.partitioner
res0: Option[spark.Partitioner] = None

scala> val partitioned = pairs.partitionBy(new spark.HashPartitioner(2))
partitioned: spark.RDD[(Int, Int)] = ShuffledRDD[1] at partitionBy at <console>:14

scala> partitioned.partitioner
res1: Option[spark.Partitioner] = Some([email protected])

??在这个简单的shell会话中,我们创建了(Int,Int)键值对RDD,其初始化是没有任何分区信息(Option对象的值是None)。然后我们通过对第一个RDD哈希分区创建了第二个RDD。如果我们实际上想在后面的操作使用已定义的partitioned,我们应该在例子第三行输入的末尾加上persist()。这和在之前例子中需要对userData使用persist()的原因是相同的:如果不适用persist(),后续RDD的action计算分区的整个继承关系,这会导致键值对被一遍又一遍地哈希分区。

Operations That Benefit from Partitioning(从分区获益的操作)

??很多Spark的操作会导致在网络间根据键对数据洗牌。这些操作都可以通过分区进行优化。像Spark1.0,通过分区可以优化的操作有:cogroup(),groupWith(),join(),lefOuterJoin(),rightOuterJoin(),groupByKey(),reduceByKey(),combineByKey()和lookup()

??对于运行单独RDD上的操作,如reduceByKey(),运行一个分区RDD会导致每个键的所有值在单个机器上计算,只需要最后把本地归约的值从工作节点发送回主节点。对于二元运算,如cogroup()join(),预分区(pre-partitioning)会导致至少一个RDD(已知分区器的RDD)不被混洗。如果RDD都是相同的分区器并且缓存在相同的机器上或者其中之一仍未被计算,那么不会发生网络间的数据洗牌。

Operations That Affect Partitioning(影响分区的操作)

??Spark内部知道操作是如何影响分区的,自动对会为数据分区的操作创建的RDD设置分区器。举例来讲,假设你调用了join()来连接两个RDD;因为有相同键的元素已经被哈希分区到相同的机器上了,Spark知道结果就是哈希分区,在join产生的结果上的操作如reduceByKey()会明显变快。

??另一方面,有些无法确保会生成已知分区的转换,输出的RDD不会有分区器集。举例来说,如果你对一个哈希分区的键值对RDD调用map()map()中的函数参数理论上可以改变每个元素的键,所以结果不会包含分区器。Spark不会分析你的函数来检查是否改变了键,而是提供了两个操作,mapvalues()flatMapValues()来保证每个元组的键未被改变。

??总结一下,所有会输出分区器的操作:cogroup(), groupWith(), join(), leftOuterJoin(), rightOuterJoin(), groupByKey(), reduceByKey(), combineByKey(), partitionBy(), sort(),mapValues() (如果父RDD有分区器), flatMapValues() (如果父RDD有分区器), and filter() (如果父RDD有分区器)。剩下的操作不会产生分区器。

??对于二元操作,输出分区器的设置取决于父RDD的分区器。默认情况下,使用哈希分区器,分区的数量由操作的并行度确定。但是,如果父RDD其中之一有分区器集,那该分区器会设置为分区器,如果所有的父RDD都有分区器集,那么设置分区器为第一个父分区器。

Example:PageRank(例:PageRank)

??我们认为PageRank(网页排名)算法是一个典型的会因分区提升效率的例子。PageRank算法是以谷歌的Larry Page命名的,用于衡量特定网页相对于搜索引擎索引中的其他网页而言的重要程度。这可以用来为网页排名,也可以是论文或用户影响力。

??PageRank是一个会执行很多join的迭代算法,所以是使用RDD分区的好样本。这个算法包含两个数据集:一个是(pageId,linkList),包含每个网页的邻居列表(该页面包含其他页面的链接,这个链接页面称为邻居页面);另一个是(pageID,rank),包含每个网页的当前排名。它的计算流程大致如下:

1.把每个页面的初始级别设置为1.0.

2.每次迭代,页面p发送rank(p)/numNeighbors(p)的贡献给它的邻居网站(它有链接的页面)。

3.设置页面p级别为0.5+0.85*收到的贡献。

??最后两步重复迭代多次,在这个过程中,算法会渐渐收敛到每个网页正确的PageRank值。实际上,通常进行十次迭代。

Example 4-25 gives the code to implement PageRank in Spark.

Example 4-25. Scala PageRank

// Assume that our neighbor list was saved as a Spark objectFile
//假设邻页列表存在了Spark的objectFile中。
val links = sc.objectFile[(String, Seq[String])]("links")
                .partitionBy(new HashPartitioner(100))
                .persist()

// Initialize each page‘s rank to 1.0; since we use mapValues, the resulting RDD
// will have the same partitioner as links
//把每个页面的初始值设为1,因为使用mapValue,所以RDD会有和链接相同的分区器
var ranks = links.mapValues(v => 1.0)

// Run 10 iterations of PageRank
//运行十遍PageRank的迭代
for (i <- 0 until 10) {
    val contributions = links.join(ranks).flatMap {
        case (pageId, (links, rank)) =>
            links.map(dest => (dest, rank / links.size))
    }
    ranks = contributions.reduceByKey((x, y) => x + y).mapValues(v => 0.15 + 0.85*v)
}
// Write out the final ranks
ranks.saveAsTextFile("ranks")

??就是这样!算法开始时,rankRDD对每个元素设置1.0的初始值,每次迭代都会更新rank变量。PageRank算法在Spark中表达非常简单:首先将当前等级RDD和静态链接RDD通过join()结合,这是为了得到(页面ID,链接列表,页面等级)元组,然后使用flatMap生成“贡献”值发送给每个邻居页面。我们把贡献值按页面ID求和并且设置页面的等级为0.15+0.85*收到的贡献

??虽然代码本身很简单,但是这个例子为了确保使用高效的方式分区RDD和最小化网络通信做了很多事情:

1.注意每次迭代都是links.join(ranks)。因为links是一个静态数据集,程序开始就用partitionBy对它分区了,所以这个RDD并不需要在网络间对数据洗牌。实际上,linksRDD可能会比ranksRDD大很多,因为它保存了每个页面的邻居页面列表,所以这种优化比简单地实现PageRank(如,使用简单的MapReduce)减少了大量的网络任务。

2.同理,我们把linksRDDpersist()避免对其迭代。

3.当我们第一次创建ranksRDD时,我们使用mapValues()而不是map()来保存父RDD(links)的分区信息,所以我们第一次join开销很小。

4.再循环体中,我们在reduceByKey()之后执行mapValues();因为reduceByKey()的结果已经被哈希分区了,这将使得将map的结果与下一次迭代中的链接结合起来更有效率。

为了最大限度地发挥分区优化的潜力,当你不改变键的值时应该使用mapValues()flatMapValues()

Custom Partitioners(定制分区器)

??尽管哈希分区器和区间分区器可以在很多场景使用,Spark仍然允许你通过提供一个定制的Partitioner对象来自定义RDD分区方式。这可以帮助你利用特定领域的知识来减少网络通信的消耗。

??举例来说,假如我们想使用PageRank算法计算一组web页面,以页面URL做RDD的键,即PageID为URL,使用哈希分区的话,域名相同后缀不相同的URL不会在一个分区(如,http://www.cnn.com/WORLDhttp://www.cnn.com/US)。我们知道同一个域名中的链接往往彼此连接。由于PageRank需要在每次迭代时将每个页面的消息发送给每个邻居,因此定制分区器有助于将这些页面分组到相同的分区中。我们可以定制一个分区器把相同域名的URL分区到一个节点上。

??定制分区器需要是org.apache.spark.Partitioner的子类并且实现三个方法:

  • numPartitions:Int,返回你创建分区的数量。
  • getPartition(key:Any):Int,返回对应键的分区ID(0到numPartitions-1)。
  • equals(),标准Java相等方法。这个实现很重要,因为Spark需要测试你的分区器与其它实例是否相等来判断两个RDD的分区是否是一种方式。

??有一点需要注意的是如果你的算法中依赖了Java的hashCode()方法,这有可能返回一个负数。你需要确保getPartition()不会返回负数。

??Example4-26展示了一个我们之前描述的域名分区器,这个分区器只对每个URL的域名进行哈希分区。

Example 4-26. Scala custom partitioner
class DomainNamePartitioner(numParts: Int) extends Partitioner {
    override def numPartitions: Int = numParts
    override def getPartition(key: Any): Int = {
        val domain = new Java.net.URL(key.toString).getHost()
        val code = (domain.hashCode % numPartitions)
        if (code < 0) {
            code + numPartitions // Make it non-negative
        } else {
            code
    }
}
// Java equals method to let Spark compare our Partitioner objects
override def equals(other: Any): Boolean = other match {
    case dnp: DomainNamePartitioner =>
        dnp.numPartitions == numPartitions
    case _ =>
        false
    }
}

??注意在equals()方法中,我们使用Scala的模式匹配操作测试other是否是一个DomainNamePartitioner对象,如果是的就跳进里面的方法;这和使用Java的intanceof是一样的。

??使用定制分区器非常简单:把它传给partitionBy()方法就行了。Spark中很多基于数据洗牌的操作,如join()groupByKey(),也可以使用可选的Partitioner对象来控制分区的输出。

??在Java中创建一个定制分区器和Scala很相似:直接继承spark.Partitioner类并且实现需要的方法就行。

??在Python中,你不需要继承Partitioner类,但是需要给RDD.partitionBy()方法传递一个哈希函数作为额外的参数。示例如下:

Example 4-27. Python custom partitioner

import urlparse

def hash_domain(url):
    return hash(urlparse.urlparse(url).netloc)

rdd.partitionBy(20, hash_domain) # Create 20 partitions

??注意一点你传递的哈希函数会作为和其他RDD比较的标识。如果你想使用相同的分区器对多个RDD分区,那么需要传递相同的函数对象(如,全局函数)而不是为每个创建一个lambda表达式。

Conclusion(总结)

??本章中,我们了解了Spark中处理键值对数据的特殊函数。第三章学到的技术对键值对仍然适用。下一章节,我们将了解如何加载保存数据。

原文地址:https://www.cnblogs.com/krcys/p/8497172.html

时间: 2024-08-24 03:25:59

Learning Spark中文版--第四章--使用键值对(2)的相关文章

Learning Spark中文版--第三章--RDD编程(1)

? ?本章介绍了Spark用于数据处理的核心抽象概念,具有弹性的分布式数据集(RDD).一个RDD仅仅是一个分布式的元素集合.在Spark中,所有工作都表示为创建新的RDDs.转换现有的RDDs,或者调用RDDs上的操作来计算结果.在底层,Spark自动将数据中包含的数据分发到你的集群中,并将你对它们执行的操作进行并行化.数据科学家和工程师都应该阅读这一章,因为RDDs是Spark的核心概念.我们强烈建议你在这些例子中尝试一些 交互式shell(参见"Spark的Python和Scala she

Java Persistence with MyBatis 3(中文版) 第四章 使用注解配置SQL映射器

在上一章,我们看到了我们是怎样在映射器Mapper XML配置文件中配置映射语句的.MyBatis也支持使用注解来配置映射语句.当我们使用基于注解的映射器接口时,我们不再需要在XML配置文件中配置了.如果你愿意,你也可以同时使用基于XML和基于注解的映射语句. 本章将涵盖以下话题: l 在映射器Mapper接口上使用注解 l 映射语句 @Insert,@Update,@Delete,@SeelctStatements l 结果映射 一对一映射 一对多映射 l 动态SQL @SelectProvi

Learning Scrapy 中文版翻译 第二章

为了从网页中提取信息,你有必要对网页的结构做一些了解.我们将快速学习HMTL,HTML数状结构以及用XPath在网页上提取信息 HTML, DOM树结构以及XPath 让我们花一点时间来了解当用户在浏览器中输入了一个URL到屏幕上显示出页面的处理过程.从本书的角度来说,这个过程分为4步: 浏览器中输入URL.URL的第一部分(域名,比如gumtree.com)用来在网络中查找合适的服务器, URL和像cookie样的数据形成了一个发送到服务器的请求 服务器给浏览器发送HTML页面.值得注意的是服

Netty In Action中文版 - 第四章:Transports(传输)

本章内容 Transports(传输) NIO(non-blocking IO,New IO), OIO(Old IO,blocking IO), Local(本地), Embedded(嵌入式) Use-case(用例) APIs(接口) 网络应用程序一个非常重要的工作是数据传输. 数据传输的过程不一样取决是使用哪种交通工具,可是传输的方式是一样的:都是以字节码传输.Java开发网络程序数据传输的过程和方式是被抽象了的.我们不须要关注底层接口.仅仅须要使用Java API或其它网络框架如Net

Java Persistence with MyBatis 3(中文版) 第三章 使用XML配置SQL映射器

关系型数据库和SQL是经受时间考验和验证的数据存储机制.和其他的ORM 框架如Hibernate不同,MyBatis鼓励开发者可以直接使用数据库,而不是将其对开发者隐藏,因为这样可以充分发挥数据库服务器所提供的SQL语句的巨大威力.与此同时,MyBaits消除了书写大量冗余代码的痛苦,它使使用SQL更容易. 在代码里直接嵌套SQL语句是很差的编码实践,并且维护起来困难.MyBaits使用了映射器配置文件或注解来配置SQL语句.在本章中,我们会看到具体怎样使用映射器配置文件来配置映射SQL语句.

Java Persistence with MyBatis 3(中文版) 第五章 与Spring集成

MyBatis-Spring是MyBatis框架的子模块,用来提供与当前流行的依赖注入框架Spring的无缝集成. Spring框架是一个基于依赖注入(Dependency Injection)和面向切面编程(Aspect Oriented Programming,AOP)的Java框架,鼓励使用基于POJO的编程模型.另外,Spring提供了声明式和编程式的事务管理能力,可以很大程度上简化应用程序的数据访问层(data access layer)的实现.在本章中,我们将看到在基于Spring的

【原】Learning Spark (Python版) 学习笔记(四)----Spark Sreaming与MLlib机器学习

本来这篇是准备5.15更的,但是上周一直在忙签证和工作的事,没时间就推迟了,现在终于有时间来写写Learning Spark最后一部分内容了. 第10-11 章主要讲的是Spark Streaming 和MLlib方面的内容.我们知道Spark在离线处理数据上的性能很好,那么它在实时数据上的表现怎么样呢?在实际生产中,我们经常需要即使处理收到的数据,比如实时机器学习模型的应用,自动异常的检测,实时追踪页面访问统计的应用等.Spark Streaming可以很好的解决上述类似的问题. 了解Spar

大数据技术之_19_Spark学习_02_Spark Core 应用解析+ RDD 概念 + RDD 编程 + 键值对 RDD + 数据读取与保存主要方式 + RDD 编程进阶 + Spark Core 实例练习

第1章 RDD 概念1.1 RDD 为什么会产生1.2 RDD 概述1.2.1 什么是 RDD1.2.2 RDD 的属性1.3 RDD 弹性1.4 RDD 特点1.4.1 分区1.4.2 只读1.4.3 依赖1.4.4 缓存1.4.5 CheckPoint第2章 RDD 编程2.1 RDD 编程模型2.2 RDD 创建2.2.1 由一个已经存在的 Scala 集合创建,即集合并行化(测试用)2.2.2 由外部存储系统的数据集创建(开发用)2.3 RDD 编程2.3.1 Transformatio

Spark的Python和Scala shell介绍(翻译自Learning.Spark.Lightning-Fast.Big.Data.Analysis)

Spark提供了交互式shell,交互式shell让我们能够点对点(原文:ad hoc)数据分析.如果你已经使用过R,Python,或者Scala中的shell,或者操作系统shell(例如bash),又或者Windows的命令提示符界面,你将会对Spark的shell感到熟悉. 但实际上Spark shell与其它大部分shell都不一样,其它大部分shell让你通过单个机器上的磁盘或者内存操作数据,Spark shell让你可以操作分布在很多机器上的磁盘或者内存里的数据,而Spark负责在集