Parallelism , Partitioner

转:spark通过合理设置spark.default.parallelism参数提高执行效率

spark中有partition的概念(和slice是同一个概念,在spark1.2中官网已经做出了说明),一般每个partition对应一个task。在我的测试过程中,如果没有设置spark.default.parallelism参数,spark计算出来的partition非常巨大,与我的cores非常不搭。我在两台机器上(8cores *2 +6g * 2)上,spark计算出来的partition达到2.8万个,也就是2.9万个tasks,每个task完成时间都是几毫秒或者零点几毫秒,执行起来非常缓慢。在我尝试设置了 spark.default.parallelism 后,任务数减少到10,执行一次计算过程从minute降到20second。

参数可以通过spark_home/conf/spark-default.conf配置文件设置。

eg.

spark.master  spark://master:7077

spark.default.parallelism  10

spark.driver.memory  2g

spark.serializer  org.apache.spark.serializer.KryoSerializer

spark.sql.shuffle.partitions  50

下面是官网的相关描述:

from:http://spark.apache.org/docs/latest/configuration.html

Property Name Default Meaning
spark.default.parallelism For distributed shuffle operations like reduceByKey and join, the largest number of partitions in a parent RDD. For operations likeparallelize with no parent RDDs, it depends on the cluster manager:

  • Local mode: number of cores on the local machine
  • Mesos fine grained mode: 8
  • Others: total number of cores on all executor nodes or 2, whichever is larger
Default number of partitions in RDDs returned by transformations like joinreduceByKey, and parallelize when not set by user.

from:http://spark.apache.org/docs/latest/tuning.html

Level of Parallelism

Clusters will not be fully utilized unless you set the level of parallelism for each operation high enough. Spark automatically sets the number of “map” tasks to run on each file according to its size (though you can control it through optional parameters to SparkContext.textFile, etc), and for distributed “reduce” operations, such as groupByKey and reduceByKey, it uses the largest parent RDD’s number of partitions. You can pass the level of parallelism as a second argument (see the spark.PairRDDFunctions documentation), or set the config propertyspark.default.parallelism to change the default. In general, we recommend 2-3 tasks per CPU core in your cluster.

原文地址:http://www.cnblogs.com/wrencai/p/4231966.html

时间: 2024-10-02 13:59:44

Parallelism , Partitioner的相关文章

Partitioner

使用自定义partitioner来处理手机上网日志信息 为什么要使用分区? 1.根据业务需要,产生多个输出文件 2.多个reduce任务在运行,提高整体job的运行效率 1 package partitioner; 2 3 import java.io.DataInput; 4 import java.io.DataOutput; 5 import java.io.IOException; 6 7 import org.apache.hadoop.conf.Configuration; 8 im

MapReduce框架Partitioner分区方法

前言:对于二次排序相信大家也是似懂非懂,我也是一样,对其中的很多方法都不理解诶,所有只有暂时放在一边,当你接触到其他的函数,你知道的越多时你对二次排序的理解也就更深入了,同时建议大家对wordcount的流程好好分析一下,要真正的知道每一步都是干什么的. 1.Partitioner分区类的作用是什么? 2.getPartition()三个参数分别是什么? 3.numReduceTasks指的是设置的Reducer任务数量,默认值是是多少? 扩展: 如果不同类型的数据被分配到了同一个分区,输出的数

Hadoop自定义分区Partitioner

一:背景 为了使得MapReduce计算后的结果显示更加人性化,Hadoop提供了分区的功能,可以使得MapReduce计算结果输出到不同的分区中,方便查看.Hadoop提供的Partitioner组件可以让Map对Key进行分区,从而可以根据不同key来分发到不同的reduce中去处理,我们可以自定义key的分发规则,如数据文件包含不同的省份,而输出的要求是每个省份对应一个文件. 二:技术实现 自定义分区很简单,我们只需要继承抽象类Partitioner,实现自定义的getPartitione

Partitioner和Combiner两个阶段

Partitioner编程 将有一些共同特性的数据,写入到同一个文件里. 排序和分组 在map和reduce阶段进行排序时,比较的是k2.v2是不参与排序比较的. 如果要想让v2也进行排序,需要把k2和v2组装成新的类,作为k2, 才能参与比较.如果想自定义排序规则,被排序的对象要实现 WritableComparable接口,在compareTo方法中实现排序规则, 然后将这个对象当做k2,即可完成排序分组时也是按照k2进行比较的. Combiners编程 1.每一个map会产生大量的输出,c

Partitioner分区过程分析

转自:http://blog.csdn.net/androidlushangderen/article/details/41172865 Partition的中文意思就是分区,分片的意思,这个阶段也是整个MapReduce过程的第三个阶段,就在Map任务的后面,他的作用就是使key分到通过一定的分区算法,分到固定的区域中,给不同的Reduce做处理,达到负载均衡的目的.他的执行过程其实就是发生在上篇文章提到的collect的过程阶段,当输入的key调用了用户的map函数时,中间结果就会被分区了.

Concurrency vs. Parallelism

http://getakka.net/docs/concepts/terminology Terminology and Concepts In this chapter we attempt to establish a common terminology to define a solid ground for communicating about concurrent, distributed systems which Akka.NET targets. Please note th

Hadoop学习笔记—9.Partitioner与自定义Partitioner

一.初步探索Partitioner 1.1 再次回顾Map阶段五大步凑 在第四篇博文<初始MapReduce>中,我们认识了MapReduce的八大步凑,其中在Map阶段总共五个步凑,如下图所示: 其中,step1.3就是一个分区操作.通过前面的学习我们知道Mapper最终处理的键值对<key, value>,是需要送到Reducer去合并的,合并的时候,有相同key的键/值对会送到同一个Reducer节点中进行归并.哪个key到哪个Reducer的分配过程,是由Partition

数据并行(Data Parallelism)

数据并行,不同的数据输入以并行方式运行同一个函数,它把一个任务分解成不连续的单元,因此,可以在单独的线程上并行处理,保证这个任务可以在可用的处理之间进行分配. 通常是处理数据集合,这种方法利用了集合中的项目自然提供了任务分配.最简单的情况,是一个并行的映射函数,对集合的中每一项应用一个转换,结果形成一个新的集合.这种简单的情况通常是可以工作的,是因为集合中的每一项通常可按任意顺序.进行单独处理:用这种处理更复杂的情况,比如汇总列表中所有项,也是可能的:然而,对于一些更复杂的情况,以及必须按顺序处

mapreduce中的combiner、partitioner、Shuffle

一.combiner combiner不是mapreduce的一个必备过程,是由开发者选择是否使用的,是mapreduce的一种优化手段. combiner的作用:combiner是为了解决mapreduce过程中的两个性能瓶颈,1.网络宽带严重被占降低程序效率,2.单一节点承载过重降低程序效率.所以性能有以下两个作用: 1.combiner实现本地key的聚合,对map输出的key排序value进行迭代 2.combiner还有本地reduce功能(其本质上就是一个reduce). 什么时候运