Spark Rdd coalesce()方法和repartition()方法

在Spark的Rdd中,Rdd是分区的。

有时候需要重新设置Rdd的分区数量,比如Rdd的分区中,Rdd分区比较多,但是每个Rdd的数据量比较小,需要设置一个比较合理的分区。或者需要把Rdd的分区数量调大。还有就是通过设置一个Rdd的分区来达到设置生成的文件的数量。

有两种方法是可以重设Rdd的分区:分别是 coalesce()方法和repartition()。

这两个方法有什么区别,看看源码就知道了:

  def coalesce(numPartitions: Int, shuffle: Boolean = false)(implicit ord: Ordering[T] = null)
      : RDD[T] = withScope {
    if (shuffle) {
      /** Distributes elements evenly across output partitions, starting from a random partition. */
      val distributePartition = (index: Int, items: Iterator[T]) => {
        var position = (new Random(index)).nextInt(numPartitions)
        items.map { t =>
          // Note that the hash code of the key will just be the key itself. The HashPartitioner
          // will mod it with the number of total partitions.
          position = position + 1
          (position, t)
        }
      } : Iterator[(Int, T)]

      // include a shuffle step so that our upstream tasks are still distributed
      new CoalescedRDD(
        new ShuffledRDD[Int, T, T](mapPartitionsWithIndex(distributePartition),
        new HashPartitioner(numPartitions)),
        numPartitions).values
    } else {
      new CoalescedRDD(this, numPartitions)
    }
  }

coalesce()方法的作用是返回指定一个新的指定分区的Rdd。

如果是生成一个窄依赖的结果,那么不会发生shuffle。比如:1000个分区被重新设置成10个分区,这样不会发生shuffle。

关于Rdd的依赖,这里提一下。Rdd的依赖分为两种:窄依赖和宽依赖。

窄依赖是指父Rdd的分区最多只能被一个子Rdd的分区所引用,即一个父Rdd的分区对应一个子Rdd的分区,或者多个父Rdd的分区对应一个子Rdd的分区。

而宽依赖就是宽依赖是指子RDD的分区依赖于父RDD的多个分区或所有分区,即存在一个父RDD的一个分区对应一个子RDD的多个分区。1个父RDD分区对应多个子RDD分区,这其中又分两种情况:1个父RDD对应所有子RDD分区(未经协同划分的Join)或者1个父RDD对应非全部的多个RDD分区(如groupByKey)。

如下图所示:map就是一种窄依赖,而join则会导致宽依赖

回到刚才的分区,如果分区的数量发生激烈的变化,如设置numPartitions = 1,这可能会造成运行计算的节点比你想象的要少,为了避免这个情况,可以设置shuffle=true,

那么这会增加shuffle操作。

关于这个分区的激烈的变化情况,比如分区数量从父Rdd的几千个分区设置成几个,有可能会遇到这么一个错误。

Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 77.0 failed 4 times, most recent failure: Lost task 1.3 in stage 77.0 (TID 6334, 192.168.8.61): java.io.IOException: Unable to acquire 16777216 bytes of memory
        at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPage(UnsafeExternalSorter.java:351)
        at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPageIfNecessary(UnsafeExternalSorter.java:332)
        at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.insertKVRecord(UnsafeExternalSorter.java:461)
        at org.apache.spark.sql.execution.UnsafeKVExternalSorter.insertKV(UnsafeKVExternalSorter.java:139)
        at org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.switchToSortBasedAggregation(TungstenAggregationIterator.scala:489)
        at org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.processInputs(TungstenAggregationIterator.scala:379)
        at org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.start(TungstenAggregationIterator.scala:622)
        at org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1.org$apache$spark$sql$execution$aggregate$TungstenAggregate$$anonfun$$executePartition$1(TungstenAggregate.scala:110)
        at org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:119)
        at org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:119)
        at org.apache.spark.rdd.MapPartitionsWithPreparationRDD.compute(MapPartitionsWithPreparationRDD.scala:64)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
        at org.apache.spark.rdd.MapPartitionsWithPreparationRDD.compute(MapPartitionsWithPreparationRDD.scala:63)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
        at org.apache.spark.rdd.MapPartitionsWithPreparationRDD.compute(MapPartitionsWithPreparationRDD.scala:63)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
        at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:99)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
        at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
        at org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:96)
        at org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:95)
        at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
        at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:209)
        at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:73)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
        at org.apache.spark.scheduler.Task.run(Task.scala:88)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:744)

这个错误只要把shuffle设置成true即可解决。

当把父Rdd的分区数量增大时,比如Rdd的分区是100,设置成1000,如果shuffle为false,并不会起作用。

这时候就需要设置shuffle为true了,那么Rdd将在shuffle之后返回一个1000个分区的Rdd,数据分区方式默认是采用 hash partitioner。

最后来看看repartition()方法的源码:

  def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
    coalesce(numPartitions, shuffle = true)
  }

从源码可以看出,repartition()方法就是coalesce()方法shuffle为true的情况。那么如果说只是要减少父Rdd的分区数量,并且要设置的分区数量并不是很激烈,可以考虑直接使用coalesce方法来避免执行shuffle操作,提高效率。

如有错误遗漏的地方,请不吝赐教,我必将改正。

时间: 2024-08-01 15:00:12

Spark Rdd coalesce()方法和repartition()方法的相关文章

详解equals()方法和hashCode()方法

前言 Java的基类Object提供了一些方法,其中equals()方法用于判断两个对象是否相等,hashCode()方法用于计算对象的哈希码.equals()和hashCode()都不是final方法,都可以被重写(overwrite). 本文介绍了2种方法在使用和重写时,一些需要注意的问题. 一.equal()方法 Object类中equals()方法实现如下: public boolean equals(Object obj) { return (this == obj); } 通过该实现

【学习笔记】【OC语言】set方法和get方法

1.set方法和get方法的使用场合@public的成员可以被随意赋值,应该使用set方法和get方法来管理成员的访问(类似机场的安检.水龙头过滤,过滤掉不合理的东西),比如僵尸的生命值不能为负数2.set方法作用:用来设置成员变量,可以在方法里面过滤掉一些不合理的值命名规范:方法都是以set开头,而且后面跟上成员变量名,成员变量名的首字母必须大写形参名称不要跟成员变量同名3.get方法作用:返回对象内部的成员变量命名规范:get方法的名称一般就跟成员变量同名4.成员变量的命名规范成员变量都以下

java 数组的 toString 方法和 equals 方法以及 java.lang.Object 对象的 toString 方法和 equals 方法

1 public class Test { 2 public static void main(String[] args) { 3 int[] a = {1, 2, 4, 6}; 4 int[] b = a; 5 int[] c = {1, 2, 4, 6}; 6 7 //下面这个方法打印的是a数组的引用地址 8 System.out.println(a.toString()); 9 //下面这个方法比较的是两个数组的引用是否相等 10 System.out.println("a.equals

黑马程序员 02-set方法和get方法

———Java培训.Android培训.iOS培训..Net培训.期待与您交流! ——— 1.set方法与get方法的使用场合 @public的成员变量可以被外界随意赋值,往往会产生脏数据,应该使用set方法和get方法来管理成员的访问(类似安检.水龙头过滤,过滤掉不合理的对象),不如人的年龄不可能为负. 2.set方法 (1)作用:给外界提供一个公共的方法用来设置成员变量值,可以在方法里面过滤掉一些不合理的值: (2)命名规范: 1> 方法名必须以set开头 2> set后面跟上成员变量的名

virtual方法和abstract方法

在C#的学习中,容易混淆virtual方法和abstract方法的使用,现在来讨论一下二者的区别.二者都牵涉到在派生类中与override的配合使用. 一.Virtual方法(虚方法) virtual 关键字用于在基类中修饰方法.virtual的使用会有两种情况: 情况1:在基类中定义了virtual方法,但在派生类中没有重写该虚方法.那么在对派生类实例的调用中,该虚方法使用的是基类定义的方法. 情况2:在基类中定义了virtual方法,然后在派生类中使用override重写该方法.那么在对派生

Android中的Sqlite中的onCreate方法和onUpgrade方法的执行时机

今天在做数据库升级的时候,遇到一个问题,就是onCreate方法和onUpgrade方法的执行时机的问题,这个当时在操作的时候,没有弄清楚,很是迷糊,所以写代码的时候出现了很多的问题,所以没办法就去扒源代码看了.不过在此之前我讲解过一篇关于数据库升级的文章,但是那里没有详细的讲解一下这两个方法的执行时机,所以这里就在单独说一下 关于数据库升级的文章:http://blog.csdn.net/jiangwei0910410003/article/details/39670813 不多说,下面直接进

详解HTTP请求:get方法和post方法的区别

在讨论get方法和post方法的区别时,我们经常会提到两点: 1.get传送的数据量较小,不能大于2KB,而post传送的数据量较大,一般被默认为不受限制: 2.get安全性非常低,但是post安全性较高: 究其根本,为什么呢?就需要提到http报文以及http报文的格式. 首先我们先看一下HTTP请求报文的通用格式: 在<计算机网络--自顶向下方法>一书中提到很关键的两句话: 使用get方法时实体主体为空,而使用post方法时才使用. HTML表单经常使用GET方法,将输入数据(在表单字段)

彻底理解了call()方法,apply()方法和bind()方法

javascript中的每一个作用域中都有一个this对象,它代表的是调用函数的对象.在全局作用域中,this代表的是全局对象(在web浏览器中指的是window).如果包含this的函数是一个对象的方法,this指向的就是这个对象.因此在上面例子中就不用直接写对象的名字,而是使用this代替它,例如: var human = { name: '霍林林', sayName: function(){ console.log(this.name); } } human.sayName(); 下面这个

序列化和持久化 merge方法和saveOrUpdate方法

merge方法和saveOrUpdate方法的区别 merge方法是把我们提供的对象转变为托管状态的对象:而saveOrUpdate则是把我们提供的对象变成一个持久化对象:说的通俗一点就是:saveOrUpdate后的对象会纳入session的管理,对象的状态会跟数据库同步,再次查询该对象会直接从session中取,merge后的对 象不会纳入session的管理,再次查询该对象还是会从数据库中取. 使用merge方法修改时如果两次修改的值没有变,它只会执行select而不会进行update.