spark源代码action系列-foreach与foreachPartition

RDD.foreachPartition/foreach的操作

在这个action的操作中:

这两个action主要用于对每一个partition中的iterator时行迭代的处理.通过用户传入的function对iterator进行内容的处理.

首先我们先看看foreach的操作:

在fureach中,传入一个function,这个函数的传入參数就是每一个partition中,每次的foreach得到的一个rdd的kv实例,也就是详细的内容,这样的处理你并不知道这个iterator的foreach什么时候结果,仅仅能是foreach的过程中,你得到一条数据,就处理一条数据.

由以下的红色部分能够看出,foreach操作是直接调用了partition中数据的foreach操作.

def foreach(f: T => Unit): Unit = withScope {

val cleanF = sc.clean(f)

sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF))

}

演示样例说明:

val list = new ArrayBuffer()

Rdd.foreach(record => {

list += record

If (list.size >= 10000) {

list.flush....

}

})

上面这段演示样例代码中,假设这么使用就会存在一个问题,

迭代的最后,list的结果可能还没有达到10000条,这个时候,你在内部的处理的flush部分就不会运行,也就是迭代的最后假设没有达到10000的数据就会丢失.

所以在foreach中,一般就是拿到一条数据进行下处理Rdd.foreach(record => {record._1 == a return})

然后接下来看看foreachPartition:

这个函数也是依据传入的function进行处理,但不同处在于,这里function的传入參数是一个partition相应数据的iterator.而不是直接使用iterator的foreach,

这样的情况下,假设是上面foreach的演示样例代码中list这个片段在这个action中就行正常的去处理.

def foreachPartition(f: Iterator[T] => Unit): Unit = withScope {

val cleanF = sc.clean(f)

sc.runJob(this, (iter: Iterator[T]) => cleanF(iter))

}

演示样例代码:

Val list = new ArrayBuffer

rdd.foreachPartition(it => {

It.foreach(r => {

List += r

If (list.size > 10000) flush

})

If (list.size > 0) flush

})

最后说下这两个action的差别:

Foreach与foreachPartition都是在每一个partition中对iterator进行操作,

不同的是,foreach是直接在每一个partition中直接对iterator运行foreach操作,而传入的function仅仅是在foreach内部使用,

而foreachPartition是在每一个partition中把iterator给传入的function,让function自己对iterator进行处理.

时间: 2024-10-29 12:51:33

spark源代码action系列-foreach与foreachPartition的相关文章

Spark入门实战系列--8.Spark MLlib(下)--机器学习库SparkMLlib实战

[注]该系列文章以及使用到安装包/测试数据 可以在<倾情大奉送--Spark入门实战系列>获取 1.MLlib实例 1.1 聚类实例 1.1.1 算法说明 聚类(Cluster analysis)有时也被翻译为簇类,其核心任务是:将一组目标object划分为若干个簇,每个簇之间的object尽可能相似,簇与簇之间的object尽可能相异.聚类算法是机器学习(或者说是数据挖掘更合适)中重要的一部分,除了最为简单的K-Means聚类算法外,比较常见的还有层次法(CURE.CHAMELEON等).网

Spark入门实战系列--6.SparkSQL(下)--Spark实战应用

[注]该系列文章以及使用到安装包/测试数据 可以在<倾情大奉送–Spark入门实战系列>获取 1 运行环境说明 1.1 硬软件环境 主机操作系统:Windows 64位,双核4线程,主频2.2G,10G内存 虚拟软件:VMware? Workstation 9.0.0 build-812388 虚拟机操作系统:CentOS 64位,单核 虚拟机运行环境: JDK:1.7.0_55 64位 Hadoop:2.2.0(需要编译为64位) Scala:2.10.4 Spark:1.1.0(需要编译)

Spark入门实战系列--3.Spark编程模型(上)--概念及SparkShell实战

[注]该系列文章以及使用到安装包/测试数据 可以在<倾情大奉送–Spark入门实战系列>获取 1 Spark编程模型 1.1 术语定义 应用程序(Application): 基于Spark的用户程序,包含了一个Driver Program 和集群中多个的Executor: 驱动程序(Driver Program):运行Application的main()函数并且创建SparkContext,通常用SparkContext代表Driver Program: 执行单元(Executor): 是为某

Spark入门实战系列--1.Spark及其生态圈简介

[注]该系列文章以及使用到安装包/测试数据 可以在<倾情大奉送--Spark入门实战系列>获取 1.简介 1.1 Spark简介 Spark是加州大学伯克利分校AMP实验室(Algorithms, Machines, and People Lab)开发通用内存并行计算框架.Spark在2013年6月进入Apache成为孵化项目,8个月后成为Apache顶级项目,速度之快足见过人之处,Spark以其先进的设计理念,迅速成为社区的热门项目,围绕着Spark推出了Spark SQL.Spark St

Spark入门实战系列--7.Spark Streaming(上)--实时流计算Spark Streaming介绍

[注]该系列文章以及使用到安装包/测试数据 可以在<倾情大奉送–Spark入门实战系列>获取 1 Spark Streaming简介 1.1 概述 Spark Streaming 是Spark核心API的一个扩展,可以实现高吞吐量的.具备容错机制的实时流数据的处理.支持从多种数据源获取数据,包括Kafk.Flume.Twitter.ZeroMQ.Kinesis 以及TCP sockets,从数据源获取数据之后,可以使用诸如map.reduce.join和window等高级函数进行复杂算法的处理

Spark源码系列(四)图解作业生命周期

这一章我们探索了Spark作业的运行过程,但是没把整个过程描绘出来,好,跟着我走吧,let you know! 我们先回顾一下这个图,Driver Program是我们写的那个程序,它的核心是SparkContext,回想一下,从api的使用角度,RDD都必须通过它来获得. 下面讲一讲它所不为认知的一面,它和其它组件是如何交互的. Driver向Master注册Application过程 SparkContext实例化之后,在内部实例化两个很重要的类,DAGScheduler和TaskSched

Spark入门实战系列--2.Spark编译与部署(下)--Spark编译安装

[注]该系列文章以及使用到安装包/测试数据 可以在<倾情大奉送--Spark入门实战系列>获取 1.编译Spark Spark可以通过SBT和Maven两种方式进行编译,再通过make-distribution.sh脚本生成部署包.SBT编译需要安装git工具,而Maven安装则需要maven工具,两种方式均需要在联网下进行,通过比较发现SBT编译速度较慢(原因有可能是1.时间不一样,SBT是白天编译,Maven是深夜进行的,获取依赖包速度不同 2.maven下载大文件是多线程进行,而SBT是

Spark入门实战系列--9.Spark图计算GraphX介绍及实例

[注]该系列文章以及使用到安装包/测试数据 可以在<倾情大奉送--Spark入门实战系列>获取 1.GraphX介绍 1.1 GraphX应用背景 Spark GraphX是一个分布式图处理框架,它是基于Spark平台提供对图计算和图挖掘简洁易用的而丰富的接口,极大的方便了对分布式图处理的需求. 众所周知·,社交网络中人与人之间有很多关系链,例如Twitter.Facebook.微博和微信等,这些都是大数据产生的地方都需要图计算,现在的图处理基本都是分布式的图处理,而并非单机处理.Spark

Spark源代码分析之六:Task调度(二)

话说在<Spark源代码分析之五:Task调度(一)>一文中,我们对Task调度分析到了DriverEndpoint的makeOffers()方法.这种方法针对接收到的ReviveOffers事件进行处理.代码例如以下: // Make fake resource offers on all executors     // 在全部的executors上提供假的资源(抽象的资源.也就是资源的对象信息,我是这么理解的)     private def makeOffers() {       /