Spark中的键值对操作-scala

1.PairRDD介绍

    Spark为包含键值对类型的RDD提供了一些专有的操作。这些RDD被称为PairRDD。PairRDD提供了并行操作各个键或跨节点重新进行数据分组的操作接口。例如,PairRDD提供了reduceByKey()方法,可以分别规约每个键对应的数据,还有join()方法,可以把两个RDD中键相同的元素组合在一起,合并为一个RDD。

2.创建Pair RDD

程序示例:对一个英语单词组成的文本行,提取其中的第一个单词作为key,将整个句子作为value,建立 PairRDD

val rdd=sc.parallelize(List("this is a test","how are you","do you love me","can you tell me"));//获取第一个单词作为键val words =rdd.map(x=>(x.split(" ")(0),x));words.collect().foreach(println);

输出结果:

(this,this is a test)

(how,how are you)

(do,do you love me)

(can,can you tell me)

3.PairRDD的转化操作

PairRDD可以使用所有标准RDD上可用的转化操作。传递函数的规则也适用于PairRDD。由于PairRDD中包含二元组,所以需要传递的函数应当操作而元素而不是独立的元素。

PairRDD的相关转化操作如下表所示

针对两个PairRDD的转化操作 rdd={(1,2),(3,4),(3,6)} other={(3,9)}

函数名 目的 示例 结果
substractByKey 删掉RDD中键与other RDD
中的键相同的元素
rdd.subtractByKey(other) {(1,2)}
join 对两个RDD进行内连接 rdd.join(other) {(3,(4,9)),(3,(6,9))}
rightOuterJoin 对两个RDD进行连接操作,右外连接 rdd.rightOuterJoin(other) {(3,(4,9)),(3,(6,9))}
leftOuterJoin 对两个RDD进行连接操作,左外连接 rdd.rightOuterJoin(other) {(1,(2,None)),(3,(4,9)),(3,(6,9))}
cogroup 将两个RDD中拥有相同键的数据分组 rdd.cogroup(other) {1,([2],[]),(3,[4,6],[9])}

程序实例:

针对2 中程序生成的PairRDD,删选掉长度超过20个字符的行。

val results=words.filter(value => value._2.length()<20);results.foreach(println)

RDD上有fold(),combine(),reduce()等行动操作,pair RDD上则有相应的针对键的转化操作。

(1)reduceByKey()与reduce()操作类似,它们都接收一个函数,并使用该函数对值进行合并。reduceByKey()会为数据集中的每个键进行并行的规约操作,每个规约操作会将键相同的值合并起来。reduceBykey()最终返回一个由各键规约出来的结果值组成的新的RDD。

程序示例:用reduceByKey实现单词计数

val rdd=sc.parallelize(List("this is a test","how are you","do you love me","can you tell me"));val words =rdd.flatMap(line => line.split(" "));val results=words.map(word => (word,1)).reduceByKey( {case(x,y) => x+y});results.foreach(println)

输出:

(are,1)

(this,1)

(is,1)

(you,3)

(can,1)

(a,1)

(love,1)

(do,1)

(how,1)

(tell,1)

(me,2)

(test,1)

(2)foldByKey()与fold()操作类似,他们都使用一个与RDD和合并函数中的数据类型相同的零值作为初始值。与fold()一样,foldByKey()操作所使用的合并函数对零值与另一个元素进行合并,结果仍为该元素。

程序示例:求对应key的value之和

val nums = sc.parallelize(List(Tuple2(1, 1), Tuple2(1, 3), Tuple2(2, 2), Tuple2(2, 8)));val results=nums.foldByKey(0)({case(x,y)=>x+y})results.collect().foreach(println)

结果:

(1,4)

(2,10)

(3)

combineByKey()是最为常用的基于键进行聚合的函数。大多数基于键聚合的函数都是用它实现的。和aggregate()一样,combineByKey()可以让用户返回与输入数据类型不同的返回值。combineByKey()会遍历分区中的所有元素,因此,每个元素的键要么还么有遇到过,要么就和之前的某个元素的键相同。如果这是一个新的元素,combineByKey()会使用一个叫做createCombiner()的函数来创建那个键对应的累加器的初始值。需要注意的是,这一过程会在每个分区中第一次出现每个键时发生,而不是在整个RDD中第一次出现一个键时发生。

如果这是一个处理当前分区之前就已经遇到的键,它会使用mergeValue()方法将该键的累加器对应的当前值与这个新的值进行合并。

由于每个分区都是独立处理的,因此对于同一个键可以有多个累加器。如果有两个或者更多的分区都有对应一个键的累加器,就需要使用用户提供的mergeCombiners()方法将各个分区的结果进行合并。

以下程序示例使用combineBykey()求每个键对应的平均值。

val nums = sc.parallelize(List(Tuple2(1, 1), Tuple2(1, 3), Tuple2(2, 2), Tuple2(2, 8)));val results=nums.combineByKey(  (v)=>(v,1),  (acc:(Int,Int),v) =>(acc._1+v,acc._2+1),  (acc1:(Int,Int),acc2:(Int,Int))=>(acc1._1+acc2._1,acc1._2+acc2._2)).map{case(key,value)=>(key,value._1/value._2.toFloat)}results.collectAsMap().map(println)

结果:

(2,5.0)

(1,2.0)

成功求出每个key对应value对应的平均值

*(4)并行度调优

每个RDD都有固定数目的分区,分区数决定了在RDD上执行操作时的并行度。

在执行聚合或者分组操作时,可以要求Spark使用给定的分区数。Spark始终尝试根据集群的大小推断出一个有意义的默认值,但是你可以通过对并行度进行调优来获得更好的性能表现。

在Scala中,combineByKey()函数和reduceByKey()函数的最后一个可选的参数用于指定分区的数目,即numPartitions,使用如下:

val results=nums.reduceByKey({(x,y) =>x+y},2);

5.数据分组

(1)groupByKey()

groupByKey()会使用RDD中的键来对数据进行分组。对于一个由类型K的键和类型V的值组成的RDD,得到的RDD类型会是[K,Iterable[v]]。

以下是程序示例,对PairRDD调用groupByKey()函数之后,返回的RDD类型是RDD[K,Iterable[v]]

val nums = sc.parallelize(List(Tuple2(1, 1), Tuple2(1, 3), Tuple2(2, 2), Tuple2(2, 8)));val group=nums.groupByKey();val results=group.collect();for(value <- results){  print(value._1+": ")  for(elem <- value._2)    print(elem+" ")  println()

}

输出结果:

1: 1 3

2: 2 8

(2)cogroup()

除了对单个RDD的数据进行分组,还可以使用cogroup()函数对对个共享同一个键的RDD进行分组。对两个键的类型均为K而值得类型分别为V和W的RDD进行cogroup()时,得到结果的RDD类型为[(K,(Iterable[V],Iterable[W]))]。如果其中一个RDD对于另一个RDD中存在的某个键没有对应的记录,那么对应的迭代器则为空。

举例:

val nums1 = sc.parallelize(List(Tuple2(1, 1), Tuple2(2, 2), Tuple2(1, 3),Tuple2(2, 4),Tuple2(3, 4)));val nums2 = sc.parallelize(List(Tuple2(1,1),Tuple2(1,3),Tuple2(2,3)))val results=nums1.cogroup(nums2)for(tuple2 <- results.collect()){  print(tuple2._1+" [ ")  for(it <- tuple2._2._1)    print(it+" ")  print("] [ ")  for(it<-tuple2._2._2)    print(it+" ")  println("]")}

输出:

1 [ 1 3 ] [ 1 3 ]

3 [ 4 ] [ ]

2 [ 2 4 ] [ 3 ]

6.数据排序

在Scala中以字符串顺序对正数进行自定义排序

(1)对RDD进行排序:

val nums =sc.parallelize(List(12,4,6,8,0,8));//隐式转换声明排序的依据implicit  val sortIntegersByString = new Ordering[Int] {  override def compare(x: Int, y: Int): Int = x.toString().compareTo(y.toString())}val results=nums.sortBy(value=>value);results.collect().foreach(println)

(2)对PairRDD,按key的值进行排序

val nums = sc.parallelize(List(Tuple2(1, 1), Tuple2(2, 2), Tuple2(1, 3),Tuple2(2, 4),Tuple2(3, 4)));//隐式转换声明排序的依据implicit  val sortIntegersByString = new Ordering[Int] {  override def compare(x: Int, y: Int): Int = x.toString().compareTo(y.toString())}val results=nums.sortByKey();results.collect().foreach(println)

7.数据分区

(1)创建数据分区

在分布式程序中,通信的代价很大,控制数据分布以获得最少的网络传输可以极大地提升整体性能。Spark程序可以通过控制RDD分区的方式来减少通信消耗。只有当数据集多次在诸如连接这种基于键的操作中,分区才会有作用

Spark中所有的键值对RDD都可以进行分区。系统会根据一个针对键的函数对元素进行分组。Spark可以确保同一组的键出现在一个节点上。

举个简单的例子,应用如下:内存中保存着很大的用户信息表,由(UserID,UserInfo[])组成的RDD,UserInfo是用户所订阅的所有主题列表。该应用会周期性地将这张表和一个小文件进行组合,这个小文件中存这过去5分钟发生的时间,其实就是一系列(UserId,LinkInfo)RDD,其中LinkInfo是用户访问的链接的主题。我们需要对用户访问其未订阅主题的页面情况进行统计。我们可以使用Spark的join()操作进行组合操作。将两者根据UserId连接之后,过滤出不在UserInfo[]中的LinkInfo,就是用户访问其未订阅主题的情况。

val list1 =List(Tuple2("zhou",List("it","math")),Tuple2("gan",List("money","book")))val list2= List(Tuple2("zhou","it"),Tuple2("zhou","stock"),Tuple2("gan","money"),Tuple2("gan","book"))val userData =sc.parallelize(list1)val events = sc.parallelize(list2)val joined=userData.join(events)val results=joined.filter({  case (id, (info, link)) =>    !info.contains(link)  }).count()println(results)

输出:1

这段代码可以正确运行,但是效率不高。因为每5分钟就要进行一次join()操作,而我们对数据集如何分区却一无所知。默认情况下,连接操作会将两个数据集中的所有键的哈希值都求出来,将该哈希值相同的记录通过网络传到同一台机器上,然后在那台机器上对所有键相同的记录进行连接操作。因为userData表比每5分钟出现的访问日志表events要大很多,所以要浪费时间进行额外的工作:在每次调用时都对userDAta表进行哈希值计算和跨节点数据混洗,虽然这些数据从来不会变化。

要解决此问题:在程序开始的时候,对userData表进行partitionBy()转化操作,将这张表转化为哈希分区。可以通过向patitionBy传递一个spark.HashPartitioner对象来实现该操作。

scala自定义分区方式:

val list1 =List(Tuple2("zhou",List("it","math")),Tuple2("gan",List("money","book")))val list2= List(Tuple2("zhou","it"),Tuple2("zhou","stock"),Tuple2("gan","money"),Tuple2("gan","book"))val userData =sc.parallelize(list1).partitionBy(new HashPartitioner(100)).persist(StorageLevel.MEMORY_ONLY)

这样以后在调用join()时,Spark就知道了该RDD是根据键的哈希值来分区的,这样在调用join()时,Spark就会利用这一点,只会对events进行数据混洗操作,将events中特定userId的记录发送到userData的对应分区所在的那台机器上。这样,需要网络传输的数据就大大减小了,程序运行的速度也显著提高。

请注意,我们还对userData 这个RDD进行了持久化操作,默认情况下,每一个由转化操作得到的RDD都会在每次执行启动操作时重新计算生成,将userData持久化之后,就能保证userData能够在访问时被快速获取。

*进一步解释数据分区带来的好处:

如果没有将partitionBy()转化操作的结果进行持久化,那么后面每次用到这个RDD时都会重复对数据进行分区操作。不进行持久化会导致整个RDD谱系图重新求值。那样的话,partitionBy()带来的好处就会抵消,导致重复对数据进行分区以及跨节点的混洗,和没有指定分区方式时发生的情况是十分相似的。

(2)获取数据分区的方式

接(1)中程序:

val list1 =List(Tuple2("zhou",List("it","math")),Tuple2("gan",List("money","book")))val list2= List(Tuple2("zhou","it"),Tuple2("zhou","stock"),Tuple2("gan","money"),Tuple2("gan","book"))val userData =sc.parallelize(list1).partitionBy(new HashPartitioner(100)).persist(StorageLevel.MEMORY_ONLY)println(userData.partitioner)

RDD的属性partitioner就是存储了对应的分区方式

(3)从分区中获益的操作

Spark中的很多操作都引入了根据键跨结点进行混洗的过程。所有这些操作都会从数据分区中获益。能够从数据分区中获益的操作有:groupWith(),join(),leftOuterJoin(),rightOuterJoin(),groupByKey(),reduceByKey(),combineByKey(),以及lockup()。

对于像reduceByKey()这样只作用于单个RDD的操作,运行在未分区的RDD的时候或导致每个键所有对应值都在每台机器上进行本地计算,只需要把本地最终归约出的结果值从各工作节点传回主节点,所以原本的网络开销就不太大。而对于诸如cogroup()和join()这样的二元操作,预先进行数据分区会导致其中至少一个RDD(使用已知分区器的那个RDD)不发生数据混洗。如果两个RDD使用同样的分区方式,并且它们还缓存在同样的机器上(比如一个RDD是通过mapValues()从另一个RDD中创建出来的,这两个RDD就会拥有相同的键和分区方式),或者其中一个RDD还没有计算出来,那么跨节点数据混洗就不会发生了。

(4)影响分区方式的操作

所有会为生成的结果RDD设好分区方式的操作:cogroup(),groupWith(),join(),leftOuterJoin(),rightOuterJoin(),groupByKey(),reduceByKey(),combineByKey(),partitionBy(),sort(),mapValues()(如果父RDD有分区方式的话),filter()(如果父RDD有分区方式的话)。其他所有操作生成的结果都不会存在特定的分区方式。

注意:

对于二元操作,输出数据的分区方式取决于父RDD的分区方式。默认情况下,结果会采用哈希分区,分区的数量和操作的并行度是一样的。如果其中一个父RDD已经设置过分区方式,那么结果就会采用那种分区方式;如果两个父RDD都设置过分区方式,结果RDD会采用第一个RDD的分区方式。

8.示例程序-PageRank

PageRank算法是一种从RDD分区中获益的更复杂的算法,我们以它为例进行分析。PageRank算法用来根据外部文档指向一个文档的链接,对集合中每个文档的重要程度赋一个度量值。该算法可以用于对网页进行排序,当然,也可以用于排序科技文章或社交网络中有影响的用户。

算法会维护两个数据集,一个由(pageID,linklist[])组成,包含每个页面的链接到的页面的列表;另一个由(pageID,rank)元素组成,包含每个页面的当前排序值。它按以下步骤进行计算:

① 将每个页面的排序值初始化为1.0

②在每次迭代中,向每个有直接链接的页面,发送一个值为rank(p)/numNeighbors(p)(出链数目)   的贡献量

③将每个页面的排序值设置为0.15+0.85*contributionsReceived

最后两步会重复几个循环,在此过程中,算法会逐渐收敛于每个页面的实际PageRank值。在实际操作中,收敛通常需要进行十个迭代。

下面用Scala来实现PageRank算法:

/*       #以下是url的内容:       www.baidu.com www.hao123.com       www.baidu.com www.2345.com       www.baidu.com www.zhouyang.com       www.hao123.com www.baidu.com       www.hao123.com www.zhouyang.com       www.zhouyang.com www.baidu.com    */val inputs =sc.textFile("C:\\url.txt")//url,[urls]val links =inputs.map(x=>(x.split(" ")(0),x.split(" ")(1)))  .distinct()  .groupByKey()  .cache()//url,rankvar ranks =links.mapValues(value =>1.0)for(i<-0 until 10){

val contribs =links.join(ranks).flatMap({    case(pageid,(links,rank))=>      //url Double      links.map(dest=>(dest,rank/links.size))  })  //reduce and add the contribs  ranks=contribs.reduceByKey((x,y)=>x+y).mapValues(v => 0.15+0.85*v)}ranks.collect().foreach(println)

结果:

(www.hao123.com,0.3685546839262259)

(www.baidu.com,0.761571325242544)

(www.2345.com,0.3685546839262259)

(www.zhouyang.com,0.5269013026650011)

9.Scala设置自定义分区方式

Spark允许你通过自定义Partitioner对象来控制RDD的分区方式,这样可以让你利用领域知识进一步减少通信消耗。

举个例子,假设我们要在一个网页的集合上运行前一届中的PageRank算法。在这里,每个页面的ID是页面的URL。当我们使用简单的哈希函数进行分区时,拥有相似的URL的页面比如 http://www.baidu.com/news 与 http://www.baidu.com/map 可能被分在完全不同的节点上。但是我们知道,同一个域名下的网页更有可能相互连接。由于PageRank需要在每次迭代中从每个页面向它所有相邻的页面发送一条消息,因袭把这些页面分组在同一个分区中会更好。可以使用自定义的分区器来实现仅根据域名而不是整个URL进行分区。

要实现先自定义Partitioner,需要继承Partitioner类并实现其下述方法:

override def numPartitions: Int = ???

返回创建的分区数量

override def getPartition(key: Any): Int = ???

返回给定键的数量

override def equals(other:Any):Boolean = ???

Spark需要这个方法来检查分区器对象是否与其他分区器实例相同,这样Spark才能判断两个RDD的分区方式是否相同。

class DomainNamePartitioner (numParts:Int) extends Partitioner{  override def numPartitions: Int = numParts  //根据hashCode和numPartitions取余来得到Partition,因为返回的必须是非负数,所以对于hashCode为负的情况做了特殊处理  override def getPartition(key: Any): Int = {    val domain = new URL(key.toString).getHost();    val code=(domain.hashCode%numPartitions)    if(code<0){      code+numPartitions    }else{      code    }  }

override def equals(other:Any):Boolean = other match {    //这个实例是DomainNamePartitioner的实例,并且numPartitions相同,返回true    case dnp:DomainNamePartitioner =>      dnp.numPartitions==numPartitions    //否则,返回false    case _ => false  }}

来自为知笔记(Wiz)

时间: 2024-09-30 06:44:59

Spark中的键值对操作-scala的相关文章

Spark学习笔记3:键值对操作

键值对RDD通常用来进行聚合计算,Spark为包含键值对类型的RDD提供了一些专有的操作.这些RDD被称为pair RDD.pair RDD提供了并行操作各个键或跨节点重新进行数据分组的操作接口. Spark中创建pair RDD的方法:存储键值对的数据格式会在读取时直接返回由其键值对数据组成的pair RDD,还可以使用map()函数将一个普通的RDD转为pair RDD. Pair RDD的转化操作 reduceByKey()  与reduce类似 ,接收一个函数,并使用该函数对值进行合并,

NSMutableDictionary 类中增加键值对方法分析

在iOS中可变字典增加一个键值对的方法有setObject: forKey: 和setValue : forKey: .为了方便我们把这两个方法简称为方法A和方法B. B这个方法中其中的value值是不能为nil,否则程序会出项崩溃.而A方法中的这个value可以为nil,但是当这个value位nil时,系统会自动调用removeObjectforKey:这个方法.这样就把这个键值对删除掉了.B方法中的key值可以是任意类型的,但是这个类型必须要实现NSCopying协议.而A方法中它的key值

PHP如何根据数组中的键值进行排序

主要是使用PHP的排序函数,asort()和arsort(). 为了减少代码的耦合性,我们将根据数组中的键值进行排序封装成一个函数 1 <?php 2 $array = array( 3 array('name'=>'aa','price'=>1050), 4 array('name'=>'bb','price'=>4300), 5 array('name'=>'cc','price'=>3100), 6 array('name'=>'dd','price'

信号量,消息队列,共享内存中ket_t键值的生成函数ftok。

在System V中,我们经常用用key_t的值来创建或者打开信号量,共享内存和消息队列.这个在IPC的环境中十分的重要,比如说,服务器创建了一个消息队列,等待 客户机发送请求.那么如何创建或者打开已有的消息队列呢?一般而言,我们对于服务器使用的路径和项目id(proj_id)是已知的,所以客户机可以获取 相同的key来打开 消息队列并进行操作.下面就是ftok的使用原型: ftok函数   函数ftok把一个已存在的路径名和一个整数标识得转换成一个key_t值,称为IPC键: # includ

【Java必修课】通过Value获取Map中的键值Key的四种方法

1 简介 我们都知道Map是存放键值对<Key,Value>的容器,知道了Key值,使用方法Map.get(key)能快速获取Value值.然而,有的时候我们需要反过来获取,知道Value值,求Key值. 本文将用实例介绍四种方法,通过传入Value值,获取得到Key值. 2 四种方法 2.1 循环法 循环法就是通过遍历Map里的Entry,一个个比较,把符合条件的找出来.会有三种情况: (1)找到一个值 (2)找到多个值 (3)找不到 具体代码如下: @Test public void lo

对GET/POST请求返回cookie中的键值对进行重新组合

get/post请求返回的cookie中并不是所有的键值对我们都需要,我们只需要提取我们需要的进行重新组合就可以了. 如下图是一个GET请求返回的cookie 我需要提取其中的 uin,skey等相关键值对. 以下函数可以完成我们的需要: //using System.Text.RegularExpressions; public string GetCookieByName(List<string> keylist, string cookie) { string str = "&

Redis中的键值对设计

丰富的数据结构使得redis的设计非常的有趣.不像关系型数据库那样,DEV和DBA需要深度沟通,review每行sql语句,也不像memcached那样,不需要DBA的参与.redis的DBA需要熟悉数据结构,并能了解使用场景. 下面举一些常见适合kv数据库的例子来谈谈键值的设计,并与关系型数据库做一个对比,发现关系型的不足之处. 用户登录系统 记录用户登录信息的一个系统, 我们简化业务后只留下一张表. 关系型数据库的设计 mysql> select * from login; +-------

如何在嵌入式产品中应用键值存储数据库

[ 声明:版权所有,欢迎转载,请勿用于商业用途. 联系信箱:[email protected]] 1.背景 随着互联网快速发展及大数据时代的到来,NoSQL数据库以其强大的可伸缩性.高效性.实时性等特点,而获得十足的发展.键值(Key-Value)存储数据库就是NoSQL的一种,大名鼎鼎的Redis就是一款用C开发的开源键值对存储数据库. 与此同时又有越来越多的厂家加入了IoT产品.可穿戴设备.智能家居的嵌入式产品开发行列中来,数据的持久化存储需求也就变得越来越多,选型一款伸缩性好.占用资源小.

怎样在嵌入式产品中应用键值存储数据库

[ 声明:版权全部.欢迎转载.请勿用于商业用途. 联系信箱:[email protected]] 1.背景 随着互联网高速发展及大数据时代的到来.NoSQL数据库以其强大的可伸缩性.高效性.实时性等特点,而获得十足的发展.键值(Key-Value)存储数据库就是NoSQL的一种,大名鼎鼎的Redis就是一款用C开发的开源键值对存储数据库. 与此同一时候又有越来越多的厂家增加了IoT产品.可穿戴设备.智能家居的嵌入式产品开发行列中来,数据的持久化存储需求也就变得越来越多,选型一款伸缩性好.占用资源