Spark的wordcount程序产生多少个RDD?

val rdd = sc.textFile("hdfs://Master.hdp:9000/wc").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).sortBy(_._2,false).collectrdd.saveAsTextFile("hdfs://Master.hdp:9000/out01")

思考:在spark的wordcount过程一共产生多少个RDD?

通过该命令(scala> rdd.toDebugString)可以查看RDD的依赖关系

(6个,除了图中的五个,rdd.saveAsTextFile也还会产生一个RDD)

接下来一步步分析(通过查看spark源码进行分析)

(1) sc.textFile("hdfs://Master.hdp:9000/wc")

 产生两个RDD:HadoopRDD -> MapPartitinsRDD

查看Spark的textFile的源码:

/**
   * Read a text file from HDFS, a local file system (available on all nodes), or any
   * Hadoop-supported file system URI, and return it as an RDD of Strings.
   */
  def textFile(
      path: String,
      minPartitions: Int = defaultMinPartitions): RDD[String] = withScope {
    assertNotStopped()
    hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
      minPartitions).map(pair => pair._2.toString).setName(path)
  }

参数:路径,分区数(默认分区:如果是从HDFS里面读数据,分区的数量由切片数量决定,hadoop2.0一个切片默认128M)
调用hadoopFile方法: 传入path,读入方式是以InputFormat方式读取(也可以自定义读取方式),

classOf[LongWritable], classOf[Text]参数:(读取数据的类型)hadoop是以K—V形式读取数据

LongWritable是偏移量(这是hadoop对Int类型的序列化),Text是每一行的数据

在hadoopFile方法中,new了一个 HadoopRDD,这里产生了第一个RDD。

 def hadoopFile[K, V](
      path: String,
      inputFormatClass: Class[_ <: InputFormat[K, V]],
      keyClass: Class[K],
      valueClass: Class[V],
      minPartitions: Int = defaultMinPartitions): RDD[(K, V)] = withScope {
    assertNotStopped()
    // A Hadoop configuration can be about 10 KB, which is pretty big, so broadcast it.
    val confBroadcast = broadcast(new SerializableConfiguration(hadoopConfiguration))
    val setInputPathsFunc = (jobConf: JobConf) => FileInputFormat.setInputPaths(jobConf, path)
    new HadoopRDD(
      this,
      confBroadcast,
      Some(setInputPathsFunc),
      inputFormatClass,
      keyClass,
      valueClass,
      minPartitions).setName(path)
  }

再回到textFlie方法中,在调用hadoopFile方法产生一个hadoopRDD,又调用.map方法

hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
      minPartitions).map(pair => pair._2.toString).setName(path)

看下源码中hadoopRDD究竟是什么?

class HadoopRDD[K, V](
    sc: SparkContext,
    broadcastedConf: Broadcast[SerializableConfiguration],
    initLocalJobConfFuncOpt: Option[JobConf => Unit],
    inputFormatClass: Class[_ <: InputFormat[K, V]],
    keyClass: Class[K],
    valueClass: Class[V],
    minPartitions: Int)
  extends RDD[(K, V)](sc, Nil) with Logging {

  if (initLocalJobConfFuncOpt.isDefined) {
    sparkContext.clean(initLocalJobConfFuncOpt.get)
  }

  def this(
      sc: SparkContext,
      conf: JobConf,
      inputFormatClass: Class[_ <: InputFormat[K, V]],
      keyClass: Class[K],
      valueClass: Class[V],
      minPartitions: Int) = {
    this(
      sc,
      sc.broadcast(new SerializableConfiguration(conf))
        .asInstanceOf[Broadcast[SerializableConfiguration]],
      initLocalJobConfFuncOpt = None,
      inputFormatClass,
      keyClass,
      valueClass,
      minPartitions)
  }

hadoopRDD中存放的是key和value,key是偏移量,value才是我们需要的值所以接着调用.map(pair => pair._2.toString)取出我们需要的value值,map过程产生第二个RDD

def map[U: ClassTag](f: T => U): RDD[U] = withScope {
    val cleanF = sc.clean(f)
    new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
  }

可以看到map方法是会new 一个新的MapPartitionsRDD,传入的参数是一个hadoopRDD,pid是分区ID,iter是一个迭代器

(2).flatMap(_.split(" ")) //产生一个RDD :MapPartitinsRDD


/** *  Return a new RDD by first applying a function to all elements of this *  RDD, and then flattening the results. */

def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = withScope {  val cleanF = sc.clean(f)  new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.flatMap(cleanF))}//iter是每个分区数据的迭代器//clean方法是对数据进行检测,确定数据没有问题(比如说是否序列化等等)
/** * Clean a closure to make it ready to serialized and send to tasks * (removes unreferenced variables in $outer‘s, updates REPL variables) * If <tt>checkSerializable</tt> is set, <tt>clean</tt> will also proactively * check to see if <tt>f</tt> is serializable and throw a <tt>SparkException</tt> * if not. * * @param f the closure to clean * @param checkSerializable whether or not to immediately check <tt>f</tt> for serializability * @throws SparkException if <tt>checkSerializable</tt> is set but <tt>f</tt> is not *   serializable */private[spark] def clean[F <: AnyRef](f: F, checkSerializable: Boolean = true): F = {  ClosureCleaner.clean(f, checkSerializable)  f}

(3).map((_, 1))//产生一个RDD MapPartitionsRDD

源代码同上

(4).reduceByKey(_+_)//产生一个RDD ShuffledRDD

 /**
   * Merge the values for each key using an associative reduce function. This will also perform
   * the merging locally on each mapper before sending results to a reducer, similarly to a
   * "combiner" in MapReduce.
   */
  def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = self.withScope {
    combineByKeyWithClassTag[V]((v: V) => v, func, func, partitioner)
  }

// reduceByKey调用了combineByKeyWithClassTag
/** * :: Experimental :: * Generic function to combine the elements for each key using a custom set of aggregation * functions. Turns an RDD[(K, V)] into a result of type RDD[(K, C)], for a "combined type" C * Note that V and C can be different -- for example, one might group an RDD of type * (Int, Int) into an RDD of type (Int, Seq[Int]). Users provide three functions: * *  - `createCombiner`, which turns a V into a C (e.g., creates a one-element list) *  - `mergeValue`, to merge a V into a C (e.g., adds it to the end of a list) *  - `mergeCombiners`, to combine two C‘s into a single one. * * In addition, users can control the partitioning of the output RDD, and whether to perform * map-side aggregation (if a mapper can produce multiple items with the same key). */@Experimentaldef combineByKeyWithClassTag[C](    createCombiner: V => C,    mergeValue: (C, V) => C,    mergeCombiners: (C, C) => C,    partitioner: Partitioner,    mapSideCombine: Boolean = true,    serializer: Serializer = null)(implicit ct: ClassTag[C]): RDD[(K, C)] = self.withScope {  require(mergeCombiners != null, "mergeCombiners must be defined") // required as of Spark 0.9.0  if (keyClass.isArray) {    if (mapSideCombine) {      throw new SparkException("Cannot use map-side combining with array keys.")    }    if (partitioner.isInstanceOf[HashPartitioner]) {      throw new SparkException("Default partitioner cannot partition array keys.")    }  }  val aggregator = new Aggregator[K, V, C](    self.context.clean(createCombiner),    self.context.clean(mergeValue),    self.context.clean(mergeCombiners))  if (self.partitioner == Some(partitioner)) {    self.mapPartitions(iter => {      val context = TaskContext.get()      new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context))    }, preservesPartitioning = true)  } else {    new ShuffledRDD[K, V, C](self, partitioner)      .setSerializer(serializer)      .setAggregator(aggregator)      .setMapSideCombine(mapSideCombine)  }}
// 重点::new ShuffledRDD
ShuffledRDD进行聚合,先局部聚合,再全局聚合

(5).saveAsTextFile("hdfs://Master.hdp:9000/out01")//产生一个RDD: mapPartitions
重点:spark向HDFS中写入数据,通过流的方式写入,如果一次一个数据就打开流写一次太不科学了,     通过每次将一个分区的数据以流的方式传入到HDFS中再关闭流,所以该方法中调用.mapPartitions,又产生了一个mapPartitionsRDD
/**
   * Save this RDD as a text file, using string representations of elements.
   */
  def saveAsTextFile(path: String): Unit = withScope {
    // https://issues.apache.org/jira/browse/SPARK-2075
    //
    // NullWritable is a `Comparable` in Hadoop 1.+, so the compiler cannot find an implicit
    // Ordering for it and will use the default `null`. However, it‘s a `Comparable[NullWritable]`
    // in Hadoop 2.+, so the compiler will call the implicit `Ordering.ordered` method to create an
    // Ordering for `NullWritable`. That‘s why the compiler will generate different anonymous
    // classes for `saveAsTextFile` in Hadoop 1.+ and Hadoop 2.+.
    //
    // Therefore, here we provide an explicit Ordering `null` to make sure the compiler generate
    // same bytecodes for `saveAsTextFile`.
    val nullWritableClassTag = implicitly[ClassTag[NullWritable]]
    val textClassTag = implicitly[ClassTag[Text]]
    val r = this.mapPartitions { iter =>                  val text = new Text()
      iter.map { x =>
        text.set(x.toString)
        (NullWritable.get(), text)
      }
    }
    RDD.rddToPairRDDFunctions(r)(nullWritableClassTag, textClassTag, null)
      .saveAsHadoopFile[TextOutputFormat[NullWritable, Text]](path)
  }
/** * Return a new RDD by applying a function to each partition of this RDD. * * `preservesPartitioning` indicates whether the input function preserves the partitioner, which * should be `false` unless this is a pair RDD and the input function doesn‘t modify the keys. */def mapPartitions[U: ClassTag](        //Iterator是一个分区的数据    f: Iterator[T] => Iterator[U],    preservesPartitioning: Boolean = false): RDD[U] = withScope {  val cleanedF = sc.clean(f)  new MapPartitionsRDD(    this,    (context: TaskContext, index: Int, iter: Iterator[T]) => cleanedF(iter),    preservesPartitioning)}

 
时间: 2024-12-23 10:44:21

Spark的wordcount程序产生多少个RDD?的相关文章

编写Spark的WordCount程序并提交到集群运行[含scala和java两个版本]

编写Spark的WordCount程序并提交到集群运行[含scala和java两个版本] 1. 开发环境 Jdk 1.7.0_72 Maven 3.2.1 Scala 2.10.6 Spark 1.6.2 Hadoop 2.6.4 IntelliJ IDEA 2016.1.1 2. 创建项目1) 新建Maven项目 2) 在pom文件中导入依赖pom.xml文件内容如下: <?xml version="1.0" encoding="UTF-8"?> &l

用SBT编译Spark的WordCount程序

问题导读: 1.什么是sbt? 2.sbt项目环境如何建立? 3.如何使用sbt编译打包scala? sbt介绍 sbt是一个代码编译工具,是scala界的mvn,可以编译scala,java等,需要java1.6以上. sbt项目环境建立 sbt编译需要固定的目录格式,并且需要联网,sbt会将依赖的jar包下载到用户home的.ivy2下面,目录结构如下: |--build.sbt |--lib |--project |--src | |--main | | |--scala | |--tes

使用java开发spark的wordcount程序

package spark; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.SparkSession; import scala.Tuple2

spark快速入门与WordCount程序机制深度解析 spark研习第二季

2.spark wordCount程序深度剖析 标签: spark 一.Eclipse(scala IDE)开发local和cluster (一). 配置开发环境 要在本地安装好java和scala. 由于spark1.6需要scala 2.10.X版本的.推荐 2.10.4,java版本最好是1.8.所以提前我们要需要安装好java和scala并在环境变量中配置好. 下载scala IDE for eclipse安装 连接:http://scala-ide.org/download/sdk.h

将java开发的wordcount程序部署到spark集群上运行

1 package cn.spark.study.core; 2 3 import java.util.Arrays; 4 5 import org.apache.spark.SparkConf; 6 import org.apache.spark.api.java.JavaPairRDD; 7 import org.apache.spark.api.java.JavaRDD; 8 import org.apache.spark.api.java.JavaSparkContext; 9 impo

将java开发的wordcount程序提交到spark集群上运行

今天来分享下将java开发的wordcount程序提交到spark集群上运行的步骤. 第一个步骤之前,先上传文本文件,spark.txt,然用命令hadoop fs -put spark.txt /spark.txt,即可. 第一:看整个代码视图 打开WordCountCluster.java源文件,修改此处代码: 第二步: 打好jar包,步骤是右击项目文件----RunAs--Run Configurations 照图填写,然后开始拷贝工程下的jar包,如图,注意是拷贝那个依赖jar包,不是第

spark 的 wordcount

记录spark的Wordcount小程序: 前提:hdfs已经打开 创建一个name为wc.input的文件,上传到hdfs中的/user/hadoop/spark/中,内容如上图 [[email protected] hadoop-2.6.0-cdh5.4.0]# bin/hdfs dfs -put wc.input /user/hadoop/spark/            上传 [[email protected] hadoop-2.6.0-cdh5.4.0]# bin/hdfs dfs

1.spark的wordcount解析

一.Eclipse(scala IDE)开发local和cluster (一). 配置开发环境 要在本地安装好java和scala. 由于spark1.6需要scala 2.10.X版本的.推荐 2.10.4,java版本最好是1.8.所以提前我们要需要安装好java和scala并在环境变量中配置好. 下载scala IDE for eclipse安装 连接:http://scala-ide.org/download/sdk.html  打开ide新建scala project 点击file -

09、高级编程之基于排序机制的wordcount程序

package sparkcore.java; import java.util.Arrays; import java.util.Iterator; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext;