val rdd = sc.textFile("hdfs://Master.hdp:9000/wc").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).sortBy(_._2,false).collectrdd.saveAsTextFile("hdfs://Master.hdp:9000/out01")
通过该命令(scala> rdd.toDebugString)可以查看RDD的依赖关系
(1) sc.textFile("hdfs://Master.hdp:9000/wc")
产生两个RDD:HadoopRDD -> MapPartitinsRDD
/** * Read a text file from HDFS, a local file system (available on all nodes), or any * Hadoop-supported file system URI, and return it as an RDD of Strings. */ def textFile( path: String, minPartitions: Int = defaultMinPartitions): RDD[String] = withScope { assertNotStopped() hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text], minPartitions).map(pair => pair._2.toString).setName(path) }
调用hadoopFile方法: 传入path,读入方式是以InputFormat方式读取(也可以自定义读取方式),
classOf[LongWritable], classOf[Text]参数:(读取数据的类型)hadoop是以K—V形式读取数据
在hadoopFile方法中,new了一个 HadoopRDD,这里产生了第一个RDD。
def hadoopFile[K, V]( path: String, inputFormatClass: Class[_ <: InputFormat[K, V]], keyClass: Class[K], valueClass: Class[V], minPartitions: Int = defaultMinPartitions): RDD[(K, V)] = withScope { assertNotStopped() // A Hadoop configuration can be about 10 KB, which is pretty big, so broadcast it. val confBroadcast = broadcast(new SerializableConfiguration(hadoopConfiguration)) val setInputPathsFunc = (jobConf: JobConf) => FileInputFormat.setInputPaths(jobConf, path) new HadoopRDD( this, confBroadcast, Some(setInputPathsFunc), inputFormatClass, keyClass, valueClass, minPartitions).setName(path) }
hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text], minPartitions).map(pair => pair._2.toString).setName(path)
class HadoopRDD[K, V]( sc: SparkContext, broadcastedConf: Broadcast[SerializableConfiguration], initLocalJobConfFuncOpt: Option[JobConf => Unit], inputFormatClass: Class[_ <: InputFormat[K, V]], keyClass: Class[K], valueClass: Class[V], minPartitions: Int) extends RDD[(K, V)](sc, Nil) with Logging { if (initLocalJobConfFuncOpt.isDefined) { sparkContext.clean(initLocalJobConfFuncOpt.get) } def this( sc: SparkContext, conf: JobConf, inputFormatClass: Class[_ <: InputFormat[K, V]], keyClass: Class[K], valueClass: Class[V], minPartitions: Int) = { this( sc, sc.broadcast(new SerializableConfiguration(conf)) .asInstanceOf[Broadcast[SerializableConfiguration]], initLocalJobConfFuncOpt = None, inputFormatClass, keyClass, valueClass, minPartitions) }
hadoopRDD中存放的是key和value,key是偏移量,value才是我们需要的值所以接着调用.map(pair => pair._2.toString)取出我们需要的value值,map过程产生第二个RDD
def map[U: ClassTag](f: T => U): RDD[U] = withScope { val cleanF = sc.clean(f) new MapPartitionsRDD[U, T](this, (context, pid, iter) => }
可以看到map方法是会new 一个新的MapPartitionsRDD,传入的参数是一个hadoopRDD,pid是分区ID,iter是一个迭代器
(2).flatMap(_.split(" ")) //产生一个RDD :MapPartitinsRDD
/** * Return a new RDD by first applying a function to all elements of this * RDD, and then flattening the results. */ def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = withScope { val cleanF = sc.clean(f) new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.flatMap(cleanF))}//iter是每个分区数据的迭代器//clean方法是对数据进行检测,确定数据没有问题(比如说是否序列化等等)
/** * Clean a closure to make it ready to serialized and send to tasks * (removes unreferenced variables in $outer‘s, updates REPL variables) * If <tt>checkSerializable</tt> is set, <tt>clean</tt> will also proactively * check to see if <tt>f</tt> is serializable and throw a <tt>SparkException</tt> * if not. * * @param f the closure to clean * @param checkSerializable whether or not to immediately check <tt>f</tt> for serializability * @throws SparkException if <tt>checkSerializable</tt> is set but <tt>f</tt> is not * serializable */private[spark] def clean[F <: AnyRef](f: F, checkSerializable: Boolean = true): F = { ClosureCleaner.clean(f, checkSerializable) f}
(3).map((_, 1))//产生一个RDD MapPartitionsRDD
(4).reduceByKey(_+_)//产生一个RDD ShuffledRDD
/** * Merge the values for each key using an associative reduce function. This will also perform * the merging locally on each mapper before sending results to a reducer, similarly to a * "combiner" in MapReduce. */ def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = self.withScope { combineByKeyWithClassTag[V]((v: V) => v, func, func, partitioner) } // reduceByKey调用了combineByKeyWithClassTag
/** * :: Experimental :: * Generic function to combine the elements for each key using a custom set of aggregation * functions. Turns an RDD[(K, V)] into a result of type RDD[(K, C)], for a "combined type" C * Note that V and C can be different -- for example, one might group an RDD of type * (Int, Int) into an RDD of type (Int, Seq[Int]). Users provide three functions: * * - `createCombiner`, which turns a V into a C (e.g., creates a one-element list) * - `mergeValue`, to merge a V into a C (e.g., adds it to the end of a list) * - `mergeCombiners`, to combine two C‘s into a single one. * * In addition, users can control the partitioning of the output RDD, and whether to perform * map-side aggregation (if a mapper can produce multiple items with the same key). */@Experimentaldef combineByKeyWithClassTag[C]( createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, partitioner: Partitioner, mapSideCombine: Boolean = true, serializer: Serializer = null)(implicit ct: ClassTag[C]): RDD[(K, C)] = self.withScope { require(mergeCombiners != null, "mergeCombiners must be defined") // required as of Spark 0.9.0 if (keyClass.isArray) { if (mapSideCombine) { throw new SparkException("Cannot use map-side combining with array keys.") } if (partitioner.isInstanceOf[HashPartitioner]) { throw new SparkException("Default partitioner cannot partition array keys.") } } val aggregator = new Aggregator[K, V, C]( self.context.clean(createCombiner), self.context.clean(mergeValue), self.context.clean(mergeCombiners)) if (self.partitioner == Some(partitioner)) { self.mapPartitions(iter => { val context = TaskContext.get() new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context)) }, preservesPartitioning = true) } else { new ShuffledRDD[K, V, C](self, partitioner) .setSerializer(serializer) .setAggregator(aggregator) .setMapSideCombine(mapSideCombine) }}
// 重点::new ShuffledRDD
ShuffledRDD进行聚合,先局部聚合,再全局聚合 (5).saveAsTextFile("hdfs://Master.hdp:9000/out01")//产生一个RDD: mapPartitions
重点:spark向HDFS中写入数据,通过流的方式写入,如果一次一个数据就打开流写一次太不科学了, 通过每次将一个分区的数据以流的方式传入到HDFS中再关闭流,所以该方法中调用.mapPartitions,又产生了一个mapPartitionsRDD
/** * Save this RDD as a text file, using string representations of elements. */ def saveAsTextFile(path: String): Unit = withScope { // // // NullWritable is a `Comparable` in Hadoop 1.+, so the compiler cannot find an implicit // Ordering for it and will use the default `null`. However, it‘s a `Comparable[NullWritable]` // in Hadoop 2.+, so the compiler will call the implicit `Ordering.ordered` method to create an // Ordering for `NullWritable`. That‘s why the compiler will generate different anonymous // classes for `saveAsTextFile` in Hadoop 1.+ and Hadoop 2.+. // // Therefore, here we provide an explicit Ordering `null` to make sure the compiler generate // same bytecodes for `saveAsTextFile`. val nullWritableClassTag = implicitly[ClassTag[NullWritable]] val textClassTag = implicitly[ClassTag[Text]] val r = this.mapPartitions { iter => val text = new Text() { x => text.set(x.toString) (NullWritable.get(), text) } } RDD.rddToPairRDDFunctions(r)(nullWritableClassTag, textClassTag, null) .saveAsHadoopFile[TextOutputFormat[NullWritable, Text]](path) }
/** * Return a new RDD by applying a function to each partition of this RDD. * * `preservesPartitioning` indicates whether the input function preserves the partitioner, which * should be `false` unless this is a pair RDD and the input function doesn‘t modify the keys. */def mapPartitions[U: ClassTag]( //Iterator是一个分区的数据 f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false): RDD[U] = withScope { val cleanedF = sc.clean(f) new MapPartitionsRDD( this, (context: TaskContext, index: Int, iter: Iterator[T]) => cleanedF(iter), preservesPartitioning)}