1.1RDD解读(二)

(6)transformation 操作,通过外在的不同RDD表现形式来达到内部数据的处理过程。这类操作并不会触发作业的执行,也常被称为lazy操作。

        大部分操作会生成并返回一个新的RDD,例sortByKey就不会产生一个新的RDD。

1) map函数,一行数据经过map函数处理后还是一行数据

//将map函数作用在RDD的所有元素上,并返回一个新的RDD

def map[U: ClassTag](f: T => U): RDD[U] = withScope {
val cleanF = sc.clean(f)
//将函数作用在父RDD的每一个分区上

new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
}

2) flatMap函数,和map函数功能类似,但一行数据经过flatMap函数处理后是多行数据

def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = withScope {
val cleanF = sc.clean(f)
new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.flatMap(cleanF))
}

3) filter函数,将不满足条件的数据过滤掉,并返回一个新的RDD

def filter(f: T => Boolean): RDD[T] = withScope {
val cleanF = sc.clean(f)
new MapPartitionsRDD[T, T](
this,
    (context, pid, iter) => iter.filter(cleanF),
    preservesPartitioning = true)
}

4) distinct函数,将重复的元素去掉,返回不同的元素,并返回一个新的RDD

def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
  map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1)
}

具体过程如下所示:

5) repartition函数,对RDD重新分区,并返回一个新的RDD

该方法用于增加或减少RDD的并行度,实际上是通过shuffle来分发数据的

如果想要减少RDD的分区,考虑使用‘coalesce’函数,避免shuffle

def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
  coalesce(numPartitions, shuffle = true)
}

6) coalesce函数,将RDD重新分区并返回一个新的RDD

   这个操作是窄依赖,比如,如果你从1000个分区合并为100个分区,这个合并过程并没有shuffle,而是100个新的分区将每个分区将是原来的10个分区。

def coalesce(numPartitions: Int, shuffle: Boolean = false)(implicit ord: Ordering[T] = null)
    : RDD[T] = withScope {
if (shuffle) {
//从一个随机的分区开始,将数据均匀地分布到新分区上

val distributePartition = (index: Int, items: Iterator[T]) => {
var position = (new Random(index)).nextInt(numPartitions)
      items.map { t =>
position = position + 1
(position, t)
      }
    } : Iterator[(Int, T)]
new CoalescedRDD(
new ShuffledRDD[Int, T, T](mapPartitionsWithIndex(distributePartition),
new HashPartitioner(numPartitions)),
      numPartitions).values
  } else {
new CoalescedRDD(this, numPartitions)
  }
}

7) sample函数,随机返回RDD的部分样例数据

def sample(
    withReplacement: Boolean,
    fraction: Double,
    seed: Long = Utils.random.nextLong): RDD[T] = withScope {
require(fraction >= 0.0, "Negative fraction value: " + fraction)
if (withReplacement) {
new PartitionwiseSampledRDD[T, T](this, new PoissonSampler[T](fraction), true, seed)
  } else {
new PartitionwiseSampledRDD[T, T](this, new BernoulliSampler[T](fraction), true, seed)
  }
}

8) sortBy将RDD根据所给的key函数排序,并返回本身,注意不是创建一个新的RDD,同时也说明并不是所有的transformation都是创建一个新的RDD

def sortBy[K](
    f: (T) => K,
    ascending: Boolean = true,
    numPartitions: Int = this.partitions.length)
    (implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T] = withScope {
this.keyBy[K](f)
      .sortByKey(ascending, numPartitions)
      .values
}

9) glom函数,将每个分区的元素合并成一个数组并返回一个新的RDD

def glom(): RDD[Array[T]] = withScope {
new MapPartitionsRDD[Array[T], T](this, (context, pid, iter) => Iterator(iter.toArray))
}

10) groupByKey函数,返回key和相同key的value结合组成的RDD。

这个操作可能开销比较大,如果想要求总数sum或均值,用PairRDDFunctions.aggregateByKey或PairRDDFunctions.reduceByKey会有更好的效果。

def groupBy[K](f: T => K, p: Partitioner)(implicit kt: ClassTag[K], ord: Ordering[K] = null)
    : RDD[(K, Iterable[T])] = withScope {
val cleanF = sc.clean(f)
this.map(t => (cleanF(t), t)).groupByKey(p)
}

(7)Action操作,触发作业的执行并将返回值反馈给用户程序

1) foreach函数,将此函数应用于RDD的所有元素上

def foreach(f: T => Unit): Unit = withScope {
val cleanF = sc.clean(f)
  sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF))
}

2) foreachPartition函数,将此函数作用于RDD的每一个分区上,比如连接数据库的连接可以一个分区共用一个连接

def foreachPartition(f: Iterator[T] => Unit): Unit = withScope {
val cleanF = sc.clean(f)
  sc.runJob(this, (iter: Iterator[T]) => cleanF(iter))
}

3) collect函数,将包含在RDD中所有的元素以数组形式返回

def collect(): Array[T] = withScope {
val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
  Array.concat(results: _*)
}

4) count函数,返回RDD中元素的个数

def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum

5) take函数,取RDD的前num元素。先取一个分区的元素,如果不够再取其他分区的元素。

def take(num: Int): Array[T] = withScope {
if (num == 0) {
new Array[T](0)
  } else {
val buf = new ArrayBuffer[T]
val totalParts = this.partitions.length
var partsScanned = 0
while (buf.size < num && partsScanned < totalParts) {
var numPartsToTry = 1
if (partsScanned > 0) {
if (buf.size == 0) {
          numPartsToTry = partsScanned * 4
} else {
numPartsToTry = Math.max((1.5 * num * partsScanned / buf.size).toInt - partsScanned, 1)
          numPartsToTry = Math.min(numPartsToTry, partsScanned * 4)
        }
      }
val left = num - buf.size
val p = partsScanned until math.min(partsScanned + numPartsToTry, totalParts)
val res = sc.runJob(this, (it: Iterator[T]) => it.take(left).toArray, p)
      res.foreach(buf ++= _.take(num - buf.size))
      partsScanned += numPartsToTry
    }
    buf.toArray
  }
}

6) first函数,取RDD中的第一个元素,实际上是take(1)操作

def first(): T = withScope {
  take(1) match {
case Array(t) => t
case _ => throw new UnsupportedOperationException("empty collection")
  }
}

7) top函数,返回RDD中的top k,隐式排序按照Ordering[T]排序,即降序,刚好和[takeOrdered]相反

def top(num: Int)(implicit ord: Ordering[T]): Array[T] = withScope {
  takeOrdered(num)(ord.reverse)
}

8) saveAsTextFile函数,将RDD保存为文本文件

def saveAsTextFile(path: String): Unit = withScope {
val nullWritableClassTag = implicitly[ClassTag[NullWritable]]
val textClassTag = implicitly[ClassTag[Text]]
val r = this.mapPartitions { iter =>
val text = new Text()
    iter.map { x =>
      text.set(x.toString)
      (NullWritable.get(), text)
    }
  }
  RDD.rddToPairRDDFunctions(r)(nullWritableClassTag, textClassTag, null)
    .saveAsHadoopFile[TextOutputFormat[NullWritable, Text]](path)
}

9) saveAsObjectFile函数,将RDD中的元素序列化并保存为文件

def saveAsObjectFile(path: String): Unit = withScope {
this.mapPartitions(iter => iter.grouped(10).map(_.toArray))
    .map(x => (NullWritable.get(), new BytesWritable(Utils.serialize(x))))
    .saveAsSequenceFile(path)
}

(8)隐式转换

在RDD object中定义了好多隐式转换函数,这些函数额外提供了许多本身不具有的功能

比如将RDD隐式转化为PairRDDFunctions,那么该RDD就具有了reduceByKey等功能。

implicit def rddToPairRDDFunctions[K, V](rdd: RDD[(K, V)])
  (implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null): PairRDDFunctions[K, V] = {
new PairRDDFunctions(rdd)
}

时间: 2024-07-29 23:13:33

1.1RDD解读(二)的相关文章

SD3.0协议解读二

在阅读本文章之前,请先思考一下什么是总线,总线的作用是什么?相信大家都学过I2C总线,它由SCL和SDA两条线组成,通过这两条线就能完成各种通信.同样地,SD卡通信也需要有自己的总线模式.SD卡还比较牛逼,支持SD总线和SPI总线,老衲接触的比较多的是SD总线,所以这篇文章仅介绍SD总线,对于SPI总线老衲以后有机会再介绍. SD总线: 大家都知道总线一般支持多种频率,在默认的频率下,SD总线支持一(主)对多(从)的模式,即支持一个HOST对多个SD卡的模式.但是,在高速和UHS-I,SD总线只

解读二调

“二调”是什么?      第二次全国土地调查,简称“二调”,作为一项重大的国情国力调查,目的是全面查清目前全国土地利用状况,掌握真实的土地基础数据,建立和完善土地调查.统计和登记制度,实现土地资源信息的社会化服务,满足经济社会发展及国土资源管理的需要. “二调”为什么?       一是国家的需要       土地是重要的资源和资产,在国民经济和社会发展中的地位和作用越来越突出.国家对土地问题十分重视,将土地作为参与宏观调控的重要手段.曾培炎副总理曾明 确指示,土地数据口径要统一,要尽快建立起

1.1RDD解读

1.RDD(Resilient Distributed DataSet)是Spark生态系统中最基本的抽象,代表不可变的.可并行操作的分区元素集合.RDD这个类有RDD系列所有基本的操作,比如map.filter.persist.另外,org.apache.spark.rdd.PairRDDFunctions含有key-value类型的RDD的基本操作,比如groupby.join:org.apache.spark.rdd.DoubleRDDFunctions含有Double类型的RDD的基本操

NSObject头文件解析 / 消息机制 / Runtime解读 (二)

本章接着NSObject头文件解析 / 消息机制 / Runtime解读(一)写 给类添加属性: BOOL class_addProperty(Class cls, const char *name, const objc_property_attribute_t *attributes, unsigned int attributeCount) 其中有一个参数我们再在上一篇中提到过 typedef struct { const char *name;           /**< The na

java多线程解读二(内存篇)

线程的内存结构图 一.主内存与工作内存 1.Java内存模型的主要目标是定义程序中各个变量的访问规则.此处的变量与Java编程时所说的变量不一样,指包括了实例字段.静态字段和构成数组对象的元素,但是不包括局部变量与方法参数,因为它们是线程私有的,不会被共享. 2.Java内存模型中规定了所有的变量都存储在主内存中,每条线程还有自己的虚拟内存.线程的虚拟内存中保存了该线程使用到的变量到主内存副本拷贝.线程对变量的所有操作(读取.赋值)都必须在自己的虚拟内存中进行,而不能直接读写主内存中的变量.不同

(转)go语言nsq源码解读二 nsqlookupd、nsqd与nsqadmin

转自:http://www.baiyuxiong.com/?p=886 ----------------------------------------------------------------------- 上一篇go语言nsq源码解读-基本介绍  介绍了最基本的nsq环境搭建及使用.在最后使用时,我们用到了几个命令:nsqlookupd.nsqd.nsqadmin.curl及 nsq_to_file,并看到用curl命令写入的几个”hello world”被nsq_to_file命令保

java8完全解读二

继续着上次的java完全解读一 继续着上次的java完全解读一1.强大的Stream API1.1什么是Stream1.2 Stream操作的三大步骤1.2.1 创建Stream1.2.2 Stream的中间操作筛选和切片映射排序1.2.3 Stream 的终止操作查找与匹配归约收集2 新时间日期API2.1 使用LocalDate.LocalTime.LocalDateTime2.2 Duration 和Period2.3 解析与格式化2.4 时区的处理3 接口中的默认方法与静态方法3.1 接

cjson源代码解读 (二)解析流程

先从test.c 开始说, 我们看到test.c 里面有两个函数用来测试, doit 和dofile,  一个是读char* 一个是读file, 肯定是读字符串的要简单, 所以先看doit. /* Parse text to JSON, then render back to text, and print! */ void doit(char *text) { char *out;cJSON *json; json=cJSON_Parse(text); if (!json) {printf("

tars framework 源码解读(二) libservant部分源码的简介

还是直接用官方原图解说 服务端:可以理解成对外公开的接口 被调用时候响应流程 的底层封装 (响应端) NetThread: 收发包,连接管理,多线程(可配置),采用epoll ET触发实现,支持tcp/udp: BindAdapter: 绑定端口类,用于管理Servant对应的绑定端口的信息操作: ServantHandle:业务线程类,根据对象名分派Servant的对象和接口调用: AdminServant: 管理端口的对象: ServantImp: 继承Servant的业务处理基类(Serv