主要内容:
数据流动视角解密WordCount
RDD依赖关系视角解密WordCount
DAG与Lineage的思考
==========数据流动视角============
新建文件,里面输入
Hello Spark Hello Scala
Hello Hadoop
Hello Flink
Spark is awesome
修改代码:
package com.dt.spark.SparkApps.cores;
import java.util.Arrays;
import java.util.List;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import scala.Tuple2;
public class WordCount_Cluster {
public static void main(String[] args) {
// TODO Auto-generated method stub
/**
* 1、创建Spark配置对象SparkConf,设置Spark程序的运行时的程序配置信息
* 例如:通过setMaster来设置程序要连接的Spark集群的 url,
* 如果设置为local,则代表Spark程序在本地运行,特别适合于机器配置条件非常差(例如只有1G的内存)的初学者
*/
SparkConf conf = new SparkConf().setAppName("Spark Word Count In JAVA");
/**
* 2、创建SparkContext对象
* SparkContext是Spark程序所有功能的唯一入口,无论采用 Scala、Java、Python、R等都必须要(
* 不同的语言具体的类名称不同,如果是JAVA,则为JavaSparkContext)
* SparkContext核心作用:初始化Spark应用程序所运行所需要的核心组件,包括DAGScheduler、TaskScheduler
* 、SchedulerBackend 同时还会负责Spark程序往Master注册程序等
* SparkContext是整个Spark应用程序中最为至关重要的一个对象
*/
JavaSparkContext sc = new JavaSparkContext(conf );// 其地产曾实际上就是 Scala的SparkContext
/**
* 3、根据具体的数据来源(HDFS、HBase、Local FS、DB、S3等)通过SparkContext来创建RDD
* RDD创建基本有三种方式:根据外部的数据来源(例如HDFS),根据 Scala集合、由其它的RDD操作
* 数据会被RDD划分成为一系列的Partitions,分配到每个Partition的数据属于一个Task的处理范畴
*/
JavaRDD<String> lines = sc.textFile("/historyserverforSpark/README.md" );
/**
* 4、对初始的RDD进行transformation级别的处理,例如map、filter等高阶函数的编程,来进行具体的数据计算
* 4.1、将每一行的字符串拆分成单个的单词 FlatMapFunction<String,
* String>是匿名内部类第二个参数泛型,因为知道是String,所以用String
*/
JavaRDD<String> words = lines.flatMap( new FlatMapFunction<String, String>() {
@Override
public Iterable<String> call(String line ) throws Exception {
// TODO Auto-generated method stub
return Arrays.asList( line.split( " "));
}
});
/**
* 4.2、在单词拆分的基础上对每个单词实例计数为1,也就是word=>(word,1)
*/
JavaPairRDD<String, Integer> pairs = words.mapToPair( new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String word) throws Exception {
// TODO Auto-generated method stub
return new Tuple2<String, Integer>(word, 1);
}
});
/**
* 4.3、在每个单词实例计数为1的基础上统计每个单词在文件中出现的总次数
*/
JavaPairRDD<String, Integer> wordCounts = pairs .reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer v1 , Integer v2) throws Exception {
// TODO Auto-generated method stub
return v1 + v2 ;
}
});
List<Tuple2<String, Integer>> wordCountsCollect = wordCounts.collect();
for(Tuple2<String, Integer> t2 :wordCountsCollect ){
System. out.println(t2 ._1 + ":" + t2 ._2 );
}
/**
* 5、关闭Spark上下文释放相关资源
*/
sc.close();
}
}
运行结果:
Flink:1
Spark:2
is:1
Hello:3
awesome:1
Hadoop:1
解释:
1、数据在HDFS中分布式存储数据,HelloSpark.txt这个文件
读取数据之后,会创建一个HadoopRDD,从HDFS上读取分布式数据,并且以数据分片的存在于集群之中
基于 HadoopRDD基础上框架会通过map生成MapPartitionRDD ,然后会生成去掉基于行的key的PartitionRDD
val lines = sc.textFile("F:/安装文件/操作系统/spark-1.6.0-bin-hadoop2.6/README.md", 1)
/**
* Read a text file from HDFS, a local file system (available on all nodes), or any
* Hadoop-supported file system URI, and return it as an RDD of Strings.
*/
def textFile(
path: String,
minPartitions: Int = defaultMinPartitions): RDD[String] = withScope {
assertNotStopped()
hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
minPartitions).map(pair => pair._2.toString)
}
2、通过FlatMap操作, 对每个Partion中的每一行进行单词切分,并合并成一个大的单词实例的集合,生成新的MapPartitionRDD
val words = lines.flatMap { line => line.split(" ") }//对每一行的字符串进行单次拆分并把所有行的拆分结果通过flat合并成一个大的单次集合
/**
* Return a new RDD by first applying a function to all elements of this
* RDD, and then flattening the results.
*/
def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = withScope {
val cleanF = sc.clean(f)
new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.flatMap(cleanF))
}
3、对每个单词分别计数为1,单词实例变为形如(word,1)
val pairs = words.map { word => (word,1) }//其实编程了Tuple (word,1)
4、reduceByKey的时候会先本地统计一下,就是Shuffle之前的localreduce操作,主要负责本地统计,并且把统计后的结果按照分区策略放到不同的file中
从第1到现在,全部在一个stage里面做,一个stage是完全基于内存迭代的
然后开始进行shuffle,开始新的stage,因为要传网络,所以会开始新的stage
会产生shuffledRDD,新的shuffledRDD可能会有Hello和Hadoop混在一起的,这个是Spark的分区策略
分区之后,每个区内部分别进行处理,在这个例子中是,每个区内部分别reduce
reduce之后就是一个MapPartitionRDD,从保存到HDFS角度将,MapPartitionRDD(计算的时候我们把key丢弃了,我们最后往HDFS写的时候,要求要有key,所以弄了个NullWritable ,这是符合对称法则和能量守恒形式之美的)
这个步骤是第二个stage!!!
val wordCounts = pairs.reduceByKey(_+_)//对相同的key,进行value的累加(包括local和Reducer级别同时reduce)
shuffle源码:
@Experimental
def combineByKeyWithClassTag[C](
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C,
partitioner: Partitioner,
mapSideCombine: Boolean = true,
serializer: Serializer = null)(implicit ct: ClassTag[C]): RDD[(K, C)] = self.withScope {
require(mergeCombiners != null, "mergeCombiners must be defined") // required as of Spark 0.9.0
if (keyClass.isArray) {
if (mapSideCombine) {
throw new SparkException("Cannot use map-side combining with array keys.")
}
if (partitioner.isInstanceOf[HashPartitioner]) {
throw new SparkException("Default partitioner cannot partition array keys.")
}
}
val aggregator = new Aggregator[K, V, C](
self.context.clean(createCombiner),
self.context.clean(mergeValue),
self.context.clean(mergeCombiners))
if (self.partitioner == Some(partitioner)) {
self.mapPartitions(iter => {
val context = TaskContext.get()
new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context))
}, preservesPartitioning = true)
} else {
new ShuffledRDD[K, V, C](self, partitioner)
.setSerializer(serializer)
.setAggregator(aggregator)
.setMapSideCombine(mapSideCombine)
}
}
/**
* Save this RDD as a text file, using string representations of elements.
*/
def saveAsTextFile(path: String): Unit = withScope {
// https://issues.apache.org/jira/browse/SPARK-2075
//
// NullWritable is a `Comparable` in Hadoop 1.+, so the compiler cannot find an implicit
// Ordering for it and will use the default `null`. However, it‘s a `Comparable[NullWritable]`
// in Hadoop 2.+, so the compiler will call the implicit `Ordering.ordered` method to create an
// Ordering for `NullWritable`. That‘s why the compiler will generate different anonymous
// classes for `saveAsTextFile` in Hadoop 1.+ and Hadoop 2.+.
//
// Therefore, here we provide an explicit Ordering `null` to make sure the compiler generate
// same bytecodes for `saveAsTextFile`.
val nullWritableClassTag = implicitly[ClassTag[NullWritable]]
val textClassTag = implicitly[ClassTag[Text]]
val r = this.mapPartitions { iter =>
val text = new Text()
iter.map { x =>
text.set(x.toString)
(NullWritable.get(), text)
}
}
RDD.rddToPairRDDFunctions(r)(nullWritableClassTag, textClassTag, null)
.saveAsHadoopFile[TextOutputFormat[NullWritable, Text]](path)
}
第一个Stage的RDD:HadoopRDD、MapPartitionsRDD、MapPartitionsRDD、MapPartitionsRDD、MapPartitionsRDD、MapPartitionsRDD
第二个Stage的RDD:ShuffledRDD、MapPartitionsRDD
作业:
自己动手画出WordCount数据流程图,并结合源码说明
王家林老师名片:
中国Spark第一人
新浪微博:http://weibo.com/ilovepains
微信公众号:DT_Spark
博客:http://blog.sina.com.cn/ilovepains
手机:18610086859
QQ:1740415547
邮箱:[email protected]