Spark RDD Action 简单用例(二)

foreach(f: T => Unit)

对RDD的所有元素应用f函数进行处理,f无返回值。/** * Applies a function f to all elements of this RDD. */def foreach(f: T => Unit): Unit
scala> val rdd = sc.parallelize(1 to 9, 2)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24
scala> rdd.foreach(x=>{println(x)})
[Stage 0:>                                                          (0 + 0) / 2]1
2
3
4
5
6
7
8
9

foreachPartition(f: Iterator[T] => Unit)

遍历所有的分区进行f函数操作/** * Applies a function f to each partition of this RDD. */def foreachPartition(f: Iterator[T] => Unit): Unit


scala> val rdd = sc.parallelize(1 to 9, 2)
scala> rdd.foreachPartition(x=>{
     | while(x.hasNext){
     | println(x.next)
     | }
     | println("===========")
     | }
     | )
1
2
3
4
===========
5
6
7
8
9
===========

getCheckpointFile

获取RDD checkpoint的目录./** * Gets the name of the directory to which this RDD was checkpointed. * This is not defined if the RDD is checkpointed locally. */def getCheckpointFile: Option[String]


scala> val rdd = sc.parallelize(1 to 9,2)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at parallelize at <console>:24

scala> rdd.checkpoint

/*
checkpoint操作后直接查询得到None,说明checkpoint是lazy的
*/
scala> rdd.getCheckpointFile
res6: Option[String] = None

scala> rdd.count
res7: Long = 9                                                                  

scala> rdd.getCheckpointFile
res8: Option[String] = Some(file:/home/check/ca771099-b1bf-46c8-9404-68b4ace7feeb/rdd-1)

getNumPartitions

获取分区数量/** * Returns the number of partitions of this RDD. */@Since("1.6.0")final def getNumPartitions: Int = partitions.length
scala> val rdd = sc.parallelize(1 to 9,2)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[3] at parallelize at <console>:24

scala> rdd.getNumPartitions
res9: Int = 2

getStorageLevel

获取当前RDD的存储级别/** Get the RDD‘s current storage level, or StorageLevel.NONE if none is set. */def getStorageLevel: StorageLevel = storageLevel


scala> val rdd = sc.parallelize(1 to 9,2)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[3] at parallelize at <console>:24

scala> rdd.getStorageLevel
res10: org.apache.spark.storage.StorageLevel = StorageLevel(1 replicas)

scala> rdd.cache
res11: rdd.type = ParallelCollectionRDD[3] at parallelize at <console>:24

scala> rdd.getStorageLevel
res12: org.apache.spark.storage.StorageLevel = StorageLevel(memory, deserialized, 1 replicas)

isCheckpointed

获取该RDD是否已checkpoint处理/** * Return whether this RDD is checkpointed and materialized, either reliably or locally. */def isCheckpointed: Boolean
scala> val rdd = sc.parallelize(1 to 9,2)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[3] at parallelize at <console>:24

scala> rdd.isCheckpointed
res13: Boolean = false

scala> rdd.checkpoint

scala> rdd.isCheckpointed
res15: Boolean = false

scala> rdd.count
res16: Long = 9

scala> rdd.isCheckpointed
res17: Boolean = true

isEmpty()

获取RDD是否为空,如果RDD为Nothing或Null,则抛出异常/** * @note due to complications in the internal implementation, this method will raise an * exception if called on an RDD of `Nothing` or `Null`. This may be come up in practice * because, for example, the type of `parallelize(Seq())` is `RDD[Nothing]`. * (`parallelize(Seq())` should be avoided anyway in favor of `parallelize(Seq[T]())`.) * @return true if and only if the RDD contains no elements at all. Note that an RDD *         may be empty even when it has at least 1 partition. */def isEmpty(): Boolean


scala> val rdd = sc.parallelize(Seq())
rdd: org.apache.spark.rdd.RDD[Nothing] = ParallelCollectionRDD[5] at parallelize at <console>:24

scala> rdd.isEmpty
org.apache.spark.SparkDriverExecutionException: Execution error
  at org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:1187)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1656)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1618)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1607)
  at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
  at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:1871)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:1884)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:1897)
  at org.apache.spark.rdd.RDD$$anonfun$take$1.apply(RDD.scala:1305)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
  at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
  at org.apache.spark.rdd.RDD.take(RDD.scala:1279)
  at org.apache.spark.rdd.RDD$$anonfun$isEmpty$1.apply$mcZ$sp(RDD.scala:1413)
  at org.apache.spark.rdd.RDD$$anonfun$isEmpty$1.apply(RDD.scala:1413)
  at org.apache.spark.rdd.RDD$$anonfun$isEmpty$1.apply(RDD.scala:1413)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
  at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
  at org.apache.spark.rdd.RDD.isEmpty(RDD.scala:1412)
  ... 48 elided
Caused by: java.lang.ArrayStoreException: [Ljava.lang.Object;
  at scala.runtime.ScalaRunTime$.array_update(ScalaRunTime.scala:90)
  at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1884)
  at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1884)
  at org.apache.spark.scheduler.JobWaiter.taskSucceeded(JobWaiter.scala:59)
  at org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:1183)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1656)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1618)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1607)
  at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)

scala> val rdd = sc.parallelize(Seq(1 to 9))
rdd: org.apache.spark.rdd.RDD[scala.collection.immutable.Range.Inclusive] = ParallelCollectionRDD[6] at parallelize at <console>:24

scala> rdd.isEmpty
res19: Boolean = false

max()

/** * Returns the max of this RDD as defined by the implicit Ordering[T]. * @return the maximum element of the RDD * */def max()(implicit ord: Ordering[T]): T

min()

/** * Returns the min of this RDD as defined by the implicit Ordering[T]. * @return the minimum element of the RDD * */def min()(implicit ord: Ordering[T]): T


scala> val rdd = sc.parallelize(1 to 9)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[7] at parallelize at <console>:24

scala> rdd.max
res21: Int = 9

scala> rdd.min
res22: Int = 1

reduce(f: (T, T) => T)

对RDD所有元素进行聚合运算/** * Reduces the elements of this RDD using the specified commutative and * associative binary operator. */def reduce(f: (T, T) => T): T


scala> val rdd = sc.parallelize(1 to 9)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[7] at parallelize at <console>:24
scala> def func(x:Int, y:Int):Int={
     | if(x >= y){
     | x
     | }else{
     | y}
     | }
func: (x: Int, y: Int)Int

scala> rdd.reduce(func(_,_))
res23: Int = 9

scala> rdd.reduce((x,y)=>{
     | if(x>=y){
     | x
     | }else{
     | y
     | }
     | }
     | )
res24: Int = 9

saveAsObjectFile(path: String)

将RDD保存指定目录下文件中/** * Save this RDD as a SequenceFile of serialized objects. */def saveAsObjectFile(path: String): Unit


scala> val rdd = sc.parallelize(1 to 9)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[7] at parallelize at <console>:24
scala> rdd.saveAsObjectFile("/home/check/object")

[[email protected] ~]# ls /home/check/object/
part-00000  _SUCCESS

saveAsTextFile(path: String)

将RDD保存至文本文件
/** * Save this RDD as a text file, using string representations of elements. */def saveAsTextFile(path: String): Unit


scala> val rdd = sc.parallelize(1 to 9)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[7] at parallelize at <console>:24
scala> rdd.saveAsTextFile("/home/check/text")
[[email protected] ~]# ls /home/check/text/part-00000
/home/check/text/part-00000
[[email protected] ~]# more /home/check/text/part-00000
1
2
3
4
5
6
7
8
9

take(num: Int)

返回前num个元素。/** * Take the first num elements of the RDD. It works by first scanning one partition, and use the * results from that partition to estimate the number of additional partitions needed to satisfy * the limit. * * @note this method should only be used if the resulting array is expected to be small, as * all the data is loaded into the driver‘s memory. * * @note due to complications in the internal implementation, this method will raise * an exception if called on an RDD of `Nothing` or `Null`. */def take(num: Int): Array[T]


scala> val rdd = sc.parallelize(1 to 9)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[13] at parallelize at <console>:24

scala> rdd.take(5)
res28: Array[Int] = Array(1, 2, 3, 4, 5)

takeOrdered(num: Int)

排序后返回前num个元素
scala> val rdd = sc.parallelize(List(2,6,3,1,5,9))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[15] at parallelize at <console>:24

scala> rdd.takeOrdered(3)
res30: Array[Int] = Array(1, 2, 3)
def takeSample(    withReplacement: Boolean,    num: Int,    seed: Long = Utils.random.nextLong): Array[T]


scala> val rdd = sc.parallelize(List(2,6,3,1,5,9))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[18] at parallelize at <console>:24

scala> rdd.takeSample(true,6,8)
res34: Array[Int] = Array(5, 2, 2, 5, 3, 2)

scala> rdd.takeSample(false,6,8)
res35: Array[Int] = Array(9, 3, 2, 6, 1, 5)

top(num: Int)

降序排列后返回top n/** @param num k, the number of top elements to return * @param ord the implicit ordering for T * @return an array of top elements */def top(num: Int)(implicit ord: Ordering[T]): Array[T]


scala> val rdd = sc.parallelize(List(2,6,3,1,5,9))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[18] at parallelize at <console>:24
scala> rdd.top(3)
res37: Array[Int] = Array(9, 6, 5)



时间: 2024-08-05 10:07:48

Spark RDD Action 简单用例(二)的相关文章

Spark RDD Action 简单用例(一)

collectAsMap(): Map[K, V] 返回key-value对,key是唯一的,如果rdd元素中同一个key对应多个value,则只会保留一个./** * Return the key-value pairs in this RDD to the master as a Map. * * Warning: this doesn't return a multimap (so if you have multiple values to the same key, only * on

Spark RDD Transformation 简单用例(二)

aggregateByKey(zeroValue)(seqOp, combOp, [numTasks]) aggregateByKey(zeroValue)(seqOp, combOp, [numTasks]) When called on a dataset of (K, V) pairs, returns a dataset of (K, U) pairs where the values for each key are aggregated using the given combine

Spark RDD Transformation 简单用例(三)

cache和persist 将RDD数据进行存储,persist(newLevel: StorageLevel)设置了存储级别,cache()和persist()是相同的,存储级别为MEMORY_ONLY.因为RDD的transformation是lazy的,只有action算子才会触发transformain真正的执行,如果一个rdd需要进行多次的action算子操作,最好能够使用cache或persist将rdd缓存至内存中,这样除第一次action会触发transformation操作,后

Spark RDD Transformation 简单用例(一)

map(func) /** * Return a new RDD by applying a function to all elements of this RDD. */ def map[U: ClassTag](f: T => U): RDD[U]  map(func) Return a new distributed dataset formed by passing each element of the source through a function func.  将原RDD中的

Spark RDD Action操作

reduce def reduce(f: (T, T) => T): T通过func函数聚集RDD中的所有元素,这个功能必须是可交换且可并联的 1 2 3 4 5 6 7 8 9 10 11 scala> val rdd1 = sc.makeRDD(1 to 10,2) rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[85] at makeRDD at <console>:24 scala> rdd1.r

spark RDD transformation与action函数巩固 (未完)

1.创建RDD val lines = sc.parallelize(List("pandas","i like pandas")) 2.加载本地文件到RDD val linesRDD = sc.textFile("yangsy.txt") 3.过滤 filter 需要注意的是 filter并不会在原有RDD上过滤,而是根据filter的内容重新创建了一个RDD val spark = linesRDD.filter(line => lin

Java8函数式编程(二):类比Spark RDD算子的Stream流操作

1 Stream流 对集合进行迭代时,可调用其iterator方法,返回一个iterator对象,之后便可以通过该iterator对象遍历集合中的元素,这被称为外部迭代(for循环本身正是封装了其的语法糖),其示意图如下: 除此之外,还有内部迭代方法,这正是这里要说明的集合的stream()方法返回的Stream对象的一系列操作,比如,要统计一个数字列表的偶数元素个数,当使用Stream对象的操作时,如下: List<Integer> list = new ArrayList<Integ

Spark RDD编程(二)

转载请注明出处:http://blog.csdn.net/gamer_gyt @高阳团 博主微博:http://weibo.com/234654758 Github:https://github.com/thinkgamer ============================================================ SparkRDD编程(一) Spark 的键值对(pair RDD)操作,Scala实现 RDD的分区函数 目前Spark中实现的分区函数包括两种 Ha

Apache Spark RDD(Resilient Distributed Datasets)论文

Spark RDD(Resilient Distributed Datasets)论文 概要 1: 介绍 2: Resilient Distributed Datasets(RDDs) 2.1 RDD 抽象 2.2 Spark 编程接口 2.2.1 例子 – 监控日志数据挖掘 2.3 RDD 模型的优势 2.4 不适合用 RDDs 的应用 3 Spark 编程接口 3.1 Spark 中 RDD 的操作 3.2 举例应用 3.2.1 线性回归 3.2.2 PageRank 4 表达 RDDs 5