彻底解密WordCount运行原理(DT大数据梦工厂)

主要内容:

数据流动视角解密WordCount

RDD依赖关系视角解密WordCount

DAG与Lineage的思考

==========数据流动视角============

新建文件,里面输入

Hello Spark Hello Scala

Hello Hadoop

Hello Flink

Spark is awesome

修改代码:

package com.dt.spark.SparkApps.cores;

import java.util.Arrays;

import java.util.List;

import org.apache.spark.SparkConf;

import org.apache.spark.api.java.JavaPairRDD;

import org.apache.spark.api.java.JavaRDD;

import org.apache.spark.api.java.JavaSparkContext;

import org.apache.spark.api.java.function.FlatMapFunction;

import org.apache.spark.api.java.function.Function2;

import org.apache.spark.api.java.function.PairFunction;

import org.apache.spark.api.java.function.VoidFunction;

import scala.Tuple2;

public class WordCount_Cluster {

public static void main(String[] args) {

// TODO Auto-generated method stub

/**

* 1、创建Spark配置对象SparkConf,设置Spark程序的运行时的程序配置信息

* 例如:通过setMaster来设置程序要连接的Spark集群的 url,

* 如果设置为local,则代表Spark程序在本地运行,特别适合于机器配置条件非常差(例如只有1G的内存)的初学者

*/

SparkConf conf = new SparkConf().setAppName("Spark Word Count In JAVA");

/**

* 2、创建SparkContext对象

* SparkContext是Spark程序所有功能的唯一入口,无论采用 Scala、Java、Python、R等都必须要(

* 不同的语言具体的类名称不同,如果是JAVA,则为JavaSparkContext)

* SparkContext核心作用:初始化Spark应用程序所运行所需要的核心组件,包括DAGScheduler、TaskScheduler

* 、SchedulerBackend 同时还会负责Spark程序往Master注册程序等

* SparkContext是整个Spark应用程序中最为至关重要的一个对象

*/

JavaSparkContext sc = new JavaSparkContext(conf );// 其地产曾实际上就是 Scala的SparkContext

/**

* 3、根据具体的数据来源(HDFS、HBase、Local FS、DB、S3等)通过SparkContext来创建RDD

* RDD创建基本有三种方式:根据外部的数据来源(例如HDFS),根据 Scala集合、由其它的RDD操作

* 数据会被RDD划分成为一系列的Partitions,分配到每个Partition的数据属于一个Task的处理范畴

*/

JavaRDD<String> lines = sc.textFile("/historyserverforSpark/README.md" );

/**

* 4、对初始的RDD进行transformation级别的处理,例如map、filter等高阶函数的编程,来进行具体的数据计算

* 4.1、将每一行的字符串拆分成单个的单词 FlatMapFunction<String,

* String>是匿名内部类第二个参数泛型,因为知道是String,所以用String

*/

JavaRDD<String> words = lines.flatMap( new FlatMapFunction<String, String>() {

@Override

public Iterable<String> call(String line ) throws Exception {

// TODO Auto-generated method stub

return Arrays.asList( line.split( " "));

}

});

/**

* 4.2、在单词拆分的基础上对每个单词实例计数为1,也就是word=>(word,1)

*/

JavaPairRDD<String, Integer> pairs = words.mapToPair( new PairFunction<String, String, Integer>() {

@Override

public Tuple2<String, Integer> call(String word) throws Exception {

// TODO Auto-generated method stub

return new Tuple2<String, Integer>(word, 1);

}

});

/**

* 4.3、在每个单词实例计数为1的基础上统计每个单词在文件中出现的总次数

*/

JavaPairRDD<String, Integer> wordCounts = pairs .reduceByKey(new Function2<Integer, Integer, Integer>() {

@Override

public Integer call(Integer v1 , Integer v2) throws Exception {

// TODO Auto-generated method stub

return v1 + v2 ;

}

});

List<Tuple2<String, Integer>> wordCountsCollect = wordCounts.collect();

for(Tuple2<String, Integer> t2 :wordCountsCollect ){

System. out.println(t2 ._1 + ":" + t2 ._2 );

}

/**

* 5、关闭Spark上下文释放相关资源

*/

sc.close();

}

}

运行结果:

Flink:1

Spark:2

is:1

Hello:3

awesome:1

Hadoop:1

解释:

1、数据在HDFS中分布式存储数据,HelloSpark.txt这个文件

读取数据之后,会创建一个HadoopRDD,从HDFS上读取分布式数据,并且以数据分片的存在于集群之中

基于 HadoopRDD基础上框架会通过map生成MapPartitionRDD ,然后会生成去掉基于行的key的PartitionRDD

val lines = sc.textFile("F:/安装文件/操作系统/spark-1.6.0-bin-hadoop2.6/README.md", 1)

/**
 * Read a text file from HDFS, a local file system (available on all nodes), or any
 * Hadoop-supported file system URI, and return it as an RDD of Strings.
 */
def textFile(
    path: String,
    minPartitions: Int = defaultMinPartitions): RDD[String] = withScope {
  assertNotStopped()
  hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
    minPartitions).map(pair => pair._2.toString)
}

2、通过FlatMap操作, 对每个Partion中的每一行进行单词切分,并合并成一个大的单词实例的集合,生成新的MapPartitionRDD

val words = lines.flatMap { line => line.split(" ") }//对每一行的字符串进行单次拆分并把所有行的拆分结果通过flat合并成一个大的单次集合

/**
 *  Return a new RDD by first applying a function to all elements of this
 *  RDD, and then flattening the results.
 */
def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = withScope {
  val cleanF = sc.clean(f)
  new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.flatMap(cleanF))
}

3、对每个单词分别计数为1,单词实例变为形如(word,1)

val pairs = words.map { word => (word,1) }//其实编程了Tuple  (word,1)

4、reduceByKey的时候会先本地统计一下,就是Shuffle之前的localreduce操作,主要负责本地统计,并且把统计后的结果按照分区策略放到不同的file中

从第1到现在,全部在一个stage里面做,一个stage是完全基于内存迭代的

然后开始进行shuffle,开始新的stage,因为要传网络,所以会开始新的stage

会产生shuffledRDD,新的shuffledRDD可能会有Hello和Hadoop混在一起的,这个是Spark的分区策略

分区之后,每个区内部分别进行处理,在这个例子中是,每个区内部分别reduce

reduce之后就是一个MapPartitionRDD,从保存到HDFS角度将,MapPartitionRDD(计算的时候我们把key丢弃了,我们最后往HDFS写的时候,要求要有key,所以弄了个NullWritable ,这是符合对称法则和能量守恒形式之美的)

这个步骤是第二个stage!!!

val wordCounts = pairs.reduceByKey(_+_)//对相同的key,进行value的累加(包括local和Reducer级别同时reduce)

shuffle源码:

@Experimental
def combineByKeyWithClassTag[C](
    createCombiner: V => C,
    mergeValue: (C, V) => C,
    mergeCombiners: (C, C) => C,
    partitioner: Partitioner,
    mapSideCombine: Boolean = true,
    serializer: Serializer = null)(implicit ct: ClassTag[C]): RDD[(K, C)] = self.withScope {
  require(mergeCombiners != null, "mergeCombiners must be defined") // required as of Spark 0.9.0
  if (keyClass.isArray) {
    if (mapSideCombine) {
      throw new SparkException("Cannot use map-side combining with array keys.")
    }
    if (partitioner.isInstanceOf[HashPartitioner]) {
      throw new SparkException("Default partitioner cannot partition array keys.")
    }
  }
  val aggregator = new Aggregator[K, V, C](
    self.context.clean(createCombiner),
    self.context.clean(mergeValue),
    self.context.clean(mergeCombiners))
  if (self.partitioner == Some(partitioner)) {
    self.mapPartitions(iter => {
      val context = TaskContext.get()
      new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context))
    }, preservesPartitioning = true)
  } else {
    new ShuffledRDD[K, V, C](self, partitioner)
      .setSerializer(serializer)
      .setAggregator(aggregator)
      .setMapSideCombine(mapSideCombine)
  }
}

/**
 * Save this RDD as a text file, using string representations of elements.
 */
def saveAsTextFile(path: String): Unit = withScope {
  // https://issues.apache.org/jira/browse/SPARK-2075
  //
  // NullWritable is a `Comparable` in Hadoop 1.+, so the compiler cannot find an implicit
  // Ordering for it and will use the default `null`. However, it‘s a `Comparable[NullWritable]`
  // in Hadoop 2.+, so the compiler will call the implicit `Ordering.ordered` method to create an
  // Ordering for `NullWritable`. That‘s why the compiler will generate different anonymous
  // classes for `saveAsTextFile` in Hadoop 1.+ and Hadoop 2.+.
  //
  // Therefore, here we provide an explicit Ordering `null` to make sure the compiler generate
  // same bytecodes for `saveAsTextFile`.
  val nullWritableClassTag = implicitly[ClassTag[NullWritable]]
  val textClassTag = implicitly[ClassTag[Text]]
  val r = this.mapPartitions { iter =>
    val text = new Text()
    iter.map { x =>
      text.set(x.toString)
      (NullWritable.get(), text)
    }
  }
  RDD.rddToPairRDDFunctions(r)(nullWritableClassTag, textClassTag, null)
    .saveAsHadoopFile[TextOutputFormat[NullWritable, Text]](path)
}

第一个Stage的RDD:HadoopRDD、MapPartitionsRDD、MapPartitionsRDD、MapPartitionsRDD、MapPartitionsRDD、MapPartitionsRDD

第二个Stage的RDD:ShuffledRDD、MapPartitionsRDD

作业:

自己动手画出WordCount数据流程图,并结合源码说明

王家林老师名片:

中国Spark第一人

新浪微博:http://weibo.com/ilovepains

微信公众号:DT_Spark

博客:http://blog.sina.com.cn/ilovepains

手机:18610086859

QQ:1740415547

邮箱:[email protected]

时间: 2024-10-26 13:01:34

彻底解密WordCount运行原理(DT大数据梦工厂)的相关文章

HA下Spark集群工作原理(DT大数据梦工厂)

Spark高可用HA实战 Spark集群工作原理详解 资源主要指内存.CPU 如果是单点的话,Master如果出现故障,则集群不能对外工作 Spark通过Zookeeper做HA,一般做HA是一个active级别,standby active就是当前工作 standby是随时准备active的挂了之后会切换成为active级别 以前一般是2台机器,一个active,一个standby 现在一般是3台机器,一个active,两个standby,甚至3台以上 Zookeeper包含了哪些内容:所有的

DT大数据梦工厂第三十五课 Spark系统运行循环流程

本节课内容: 1.     TaskScheduler工作原理 2.     TaskScheduler源码 一.TaskScheduler工作原理 总体调度图: 通过前几节课的讲解,RDD和DAGScheduler以及Worker都已有深入的讲解,这节课我们主要讲解TaskScheduler的运行原理. 回顾: DAGScheduler面向整个Job划分多个Stage,划分是从后往前的回溯过程:运行时从前往后运行的.每个Stage中有很多任务Task,Task是可以并行执行的.它们的执行逻辑完

底层战详解使用Java开发Spark程序(DT大数据梦工厂)

Scala开发Spark很多,为什么还要用Java开发原因:1.一般Spark作为数据处理引擎,一般会跟IT其它系统配合,现在业界里面处于霸主地位的是Java,有利于团队的组建,易于移交:2.Scala学习角度讲,比Java难.找Scala的高手比Java难,项目的维护和二次开发比较困难:3.很多人员有Java的基础,确保对Scala不是很熟悉的人可以编写课程中的案例预测:2016年Spark取代Map Reduce,拯救HadoopHadoop+Spark = A winning combat

Eclipse下开发Scala(DT大数据梦工厂)

本讲主要内容:环境安装.配置.本地模式.集群模式.自动化脚本.web状态监控 ==========单机============ 开发工具开发 下载最新版Scala For Eclipse 1.建立工程,修改scala编译版本 2.加入Spark1.6.0的jar文件依赖 下载 http://apache.opencas.org/spark/spark-1.6.0/spark-1.6.0-bin-hadoop2.6.tgz spark-assembly-1.6.0-hadoop2.6.0.jar

IDEA下Spark的开发(DT大数据梦工厂)

IDEA越使用效果越好,快捷键方便,阅读源码方便 一般阅读Spark或者Scala的源码都采用IDEA使用 下载IDEA最新版本的社区版本即可, 安装的时候必须安装Scala,这个过程是IDEA自动化的插件管理,所以点击后会自动下载(跳过在setting plugins里面也可以安装) 本地JAVA8和Scala2.10.4软件套件的安装和Eclipse不同 打开 打开之后点击File->Project Structure来设置工程的Libraries 核心是添加Spark的jar依赖 代码拷贝

DT大数据梦工厂免费实战大数据视频全集 分享

接触大数据有几年,以前一直都是对hadoop的使用.相比于日新月异的前端技术,我还是比较喜欢大数据--这个已经被热炒多年的课题,也相信从事大数据方面的技术研究是IT从业者的一条光明坦途. 2010年hadoop开始进入我的视野, 不断自学,使用,感谢hadoop国内还是有很多书籍.去年开始各大社区开始讨论spark,也开始学习scala语言,断断续续没有坚持,没多久也会忘掉.今年初看到了王老师的<Spark亚太研究院Spark公益大讲堂>视频,一些实时性需求在工作中出现让我不得不对比hadoo

DT大数据梦工厂 第5讲

DT大数据梦工厂 第5讲 http://yun.baidu.com/s/1jGjFpWy 本节王老师讲了数组.最主要的是使用了scala worksheet这个功能.这个功能可以打印出每一行代码的运行情况. package com.dt.scala.hello import scala.collection.mutable.ArrayBuffer object ArrayOps { def main(args: Array[String]): Unit = {     val nums = ne

DT大数据梦工厂 第76讲

王家林亲授<DT大数据梦工厂>大数据实战视频“Scala深入浅出实战经典”视频.音频和PPT下载!第76讲:模式匹配下的赋值语句百度云:http://pan.baidu.com/s/1qWkPspm腾讯微云:http://url.cn/c2XO4B360云盘:http://yunpan.cn/cdKX92weEFGNd 访问密码 2990模式匹配用做赋值语句.用tuple中的元素来接收值val [email protected]=1000 a为b的别名,背后是用模式匹配完成的同时返回一个tup

DT大数据梦工厂 温故而知新 之1~4讲

王家林亲授 大数据开发语言<Scala深入浅出实战经典>(1-71讲全部视频.代码.PPT)下载 ,总共有140讲Scala视频!百度云:http://pan.baidu.com/s/1jGjFpWy腾讯微云:http://url.cn/TnGbdC360云盘:http://yunpan.cn/cQ4c2UALDjSKy 访问密码 45e2温故而知新 之 DT大数据梦工厂 第1-4讲首先安装java,配置环境变量不多说了.然后到scala-lang.org/download 下载scala,找