Spark: Best practice for retrieving big data from RDD to local machine

‘ve got big RDD(1gb) in yarn cluster. On local machine, which use this cluster I have only 512 mb. I‘d like to iterate over values in RDD on my local machine. I can‘t use collect(), because it would create too big array locally which more then my heap. I need some iterative way. There is method iterator(), but it requires some additional information, I can‘t provide.

UDP: commited toLocalIterator method


shareimprove this question

edited Apr 10 ‘14 at 13:56

asked Feb 11 ‘14 at 9:55





toLocalIterator is not ideal if you want to iterate locally over a partition at a time – Landon Kuhn Oct 29 ‘14 at 2:25

@LandonKuhn why not? – Tom Yubing Dong Aug 4 ‘15 at 23:02

add a comment

5 Answers


up vote26down voteaccepted

Update: RDD.toLocalIterator method that appeared after the original answer has been written is a more efficient way to do the job. It uses runJob to evaluate only a single partition on each step.

TL;DR And the original answer might give a rough idea how it works:

First of all, get the array of partition indexes:

val parts = rdd.partitions

Then create smaller rdds filtering out everything but a single partition. Collect the data from smaller rdds and iterate over values of a single partition:

for (p <- parts) {
    val idx = p.index
    val partRdd = rdd.mapPartitionsWithIndex(a => if (a._1 == idx) a._2 else Iterator(), true)
    //The second argument is true to avoid rdd reshuffling
    val data = partRdd.collect //data contains all values from a single partition
                               //in the form of array
    //Now you can do with the data whatever you want: iterate, save to a file, etc.

I didn‘t try this code, but it should work. Please write a comment if it won‘t compile. Of cause, it will work only if the partitions are small enough. If they aren‘t, you can always increase the number of partitions with rdd.coalesce(numParts, true).

shareimprove this answer

edited Nov 18 ‘15 at 8:36

answered Feb 15 ‘14 at 18:33





does this code cause each partition to be computed in serial when it loops through and call mapPartitionsWithIndex? What‘s the best way to remedy this? – foboi1122 Nov 18 ‘15 at 0:42

@foboi1122 Please see updated answer – Wildfire Nov 18 ‘15 at 8:36 

@Wildfire Will this approach resolve this. Else how to resolve using any or might be this approach. – ChikuMiku 2 days ago 

add a comment

Did you find this question interesting? Try our newsletter

Sign up for our newsletter and get our top new questions delivered to your inbox (see an example).

up vote10down vote

Wildfire answer seems semantically correct, but I‘m sure you should be able to be vastly more efficient by using the API of Spark. If you want to process each partition in turn, I don‘t see why you can‘t using map/filter/reduce/reduceByKey/mapPartitions operations. The only time you‘d want to have everything in one place in one array is when your going to perform a non-monoidal operation - but that doesn‘t seem to be what you want. You should be able to do something like:

rdd.mapPartitions(recordsIterator => your code that processes a single chunk)

Or this

rdd.foreachPartition(partition => {
  // Your code

shareimprove this answer

edited Apr 3 ‘14 at 15:55

answered Mar 30 ‘14 at 11:05





Is‘t these operators execute on cluster? – epahomov Apr 3 ‘14 at 7:05

Yes it will, but why are you avoiding that? If you can process each chunk in turn, you should be able to write the code in such a way so it can distribute - like using aggregate. – samthebest Apr 3 ‘14 at 15:54

Is not the iterator returned by forEachPartitition the data iterator for a single partition - and not an iterator of all partitions? – javadba May 20 at 8:23

add a comment

up vote5down vote

Here is the same approach as suggested by @Wildlife but written in pyspark.

The nice thing about this approach - it lets user access records in RDD in order. I‘m using this code to feed data from RDD into STDIN of the machine learning tool‘s process.

rdd = sc.parallelize(range(100), 10)
def make_part_filter(index):
    def part_filter(split_index, iterator):
        if split_index == index:
            for el in iterator:
                yield el
    return part_filter

for part_id in range(rdd.getNumPartitions()):
    part_rdd = rdd.mapPartitionsWithIndex(make_part_filter(part_id), True)
    data_from_part_rdd = part_rdd.collect()
    print "partition id: %s elements: %s" % (part_id, data_from_part_rdd)

Produces output:

partition id: 0 elements: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
partition id: 1 elements: [10, 11, 12, 13, 14, 15, 16, 17, 18, 19]
partition id: 2 elements: [20, 21, 22, 23, 24, 25, 26, 27, 28, 29]
partition id: 3 elements: [30, 31, 32, 33, 34, 35, 36, 37, 38, 39]
partition id: 4 elements: [40, 41, 42, 43, 44, 45, 46, 47, 48, 49]
partition id: 5 elements: [50, 51, 52, 53, 54, 55, 56, 57, 58, 59]
partition id: 6 elements: [60, 61, 62, 63, 64, 65, 66, 67, 68, 69]
partition id: 7 elements: [70, 71, 72, 73, 74, 75, 76, 77, 78, 79]
partition id: 8 elements: [80, 81, 82, 83, 84, 85, 86, 87, 88, 89]
partition id: 9 elements: [90, 91, 92, 93, 94, 95, 96, 97, 98, 99]

shareimprove this answer

edited Jun 5 ‘15 at 20:14

answered Jun 5 ‘15 at 20:07



add a comment

up vote1down vote

Map/filter/reduce using Spark and download the results later? I think usual Hadoop approach will work.

Api says that there are map - filter - saveAsFile commands:

shareimprove this answer

answered Feb 11 ‘14 at 10:09





Bad option. I don‘t want to do serialization/deserialization. So I want this data retrieving from spark – epahomov Feb 11 ‘14 at 10:37

How do you intend to get 1gb without serde(i.e. storing on the disk.) ? on a node with 512mb ? – scrapcodesFeb 12 ‘14 at 9:13

By iterating over the RDD. You should be able to get each partition in sequence to send each data item in sequence to the master, which can then pull them off the network and work on them. – interfect Feb 12 ‘14 at 18:07

add a comment

up vote1down vote

For Spark 1.3.1 , the format is as follows

val parts = rdd.partitions
    for (p <- parts) {
        val idx = p.index
        val partRdd = data.mapPartitionsWithIndex {
           case(index:Int,value:Iterator[(String,String,Float)]) =>
             if (index == idx) value else Iterator()}
        val dataPartitioned = partRdd.collect
        //Apply further processing on data


时间: 2024-11-10 02:57:39

Spark: Best practice for retrieving big data from RDD to local machine的相关文章

Accessing and Updating Data in ASP.NET: Retrieving XML Data with XmlDataSource Control

XmlDataSource Basics The XmlDataSource control exists merely as a proxy for retrieving XML data, which can then be programmatically accessed or bound to a data Web control. To access XML data from an ASP.NET page using the XmlDataSource control, star

Why Apache Spark is a Crossover Hit for Data Scientists [FWD]

Spark is a compelling multi-purpose platform for use cases that span investigative, as well as operational, analytics. Data science is a broad church. I am a data scientist — or so I’ve been told — but what I do is actually quite different from what

Spark IMF传奇行动第18课:RDD持久化、广播、累加器总结

昨晚听了王家林老师的Spark IMF传奇行动第18课:RDD持久化.广播.累加器,作业是unpersist试验,阅读累加器源码看内部工作机制: scala> val rdd = sc.parallelize(1 to 1000) rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:21 scala> rdd.persist res0: rdd.type

Spark IMF传奇行动第22课:RDD的依赖关系彻底解密

版权声明:本文为博主原创文章,未经博主允许不得转载.作者:HaiziS 昨晚听了王家林老师的Spark IMF传奇行动第22课:RDD的依赖关系彻底解密,笔记如下: 1,窄依赖是指每个父RDD的一个Partition最多被子RDD的一个Partition所使用,例如map.filter.union等都会产生窄依赖: 2宽依赖是指一个父RDD的Partition会被多个子RDD的Partition所使用,例如groupByKey.reduceByKey.sortByKey等操作都会产生宽依赖 表面

The ‘Microsoft.ACE.OLEDB.12.0′ provider is not registered on the local machine. (System.Data)

When you try to import Excel 2007 or later “.xlsx” files into an SQL Server 2008 database you may get the following error mesage : The ‘Microsoft.ACE.OLEDB.12.0′ provider is not registered on the local machine. (System.Data) To solve this problem ins

Spark SQL 源代码分析之Physical Plan 到 RDD的详细实现

/** Spark SQL源代码分析系列文章*/ 接上一篇文章Spark SQL Catalyst源代码分析之Physical Plan.本文将介绍Physical Plan的toRDD的详细实现细节: 我们都知道一段sql,真正的运行是当你调用它的collect()方法才会运行Spark Job,最后计算得到RDD. lazy val toRdd: RDD[Row] = executedPlan.execute() Spark Plan基本包括4种操作类型,即BasicOperator基本类型

大数据框架对比:Hadoop、Storm、Samza、Spark和Flink--容错机制(ACK,RDD,基于log和状态快照),消息处理at least once,exactly once两个是关键

分布式流处理是对无边界数据集进行连续不断的处理.聚合和分析.它跟MapReduce一样是一种通用计算,但我们期望延迟在毫秒或者秒级别.这类系统一般采用有向无环图(DAG). DAG是任务链的图形化表示,我们用它来描述流处理作业的拓扑.如下图,数据从sources流经处理任务链到sinks.单机可以运行DAG,但本篇文章主要聚焦在多台机器上运行DAG的情况. 关注点 当选择不同的流处理系统时,有以下几点需要注意的: 运行时和编程模型:平台框架提供的编程模型决定了许多特色功能,编程模型要足够处理各种


如果程序中,对某一个RDD,基于它进行了多次transformation或者action操作.那么就非常有必要对其进行持久化操作,以避免对一个RDD反复进行计算. 此外,如果要保证在RDD的持久化数据可能丢失的情况下,还要保证高性能,那么可以对RDD进行Checkpoint操作.(也就是多次用到中间RDD的生成值时可以持久化再checkPoint(当持久化数据没的时候会去checkPoint中寻找,详细见spark源码.))

【转载】Getting Started with Spark (in Python)

Getting Started with Spark (in Python) Benjamin Bengfort Hadoop is the standard tool for distributed computing across really large data sets and is the reason why you see "Big Data" on advertisements as you walk through the airport. It has becom