定制班第1课:通过案例对SparkStreaming 透彻理解三板斧之一:解密SparkStreaming另类实验及SparkStreaming本质解析

从今天起,我们踏上了新的Spark学习旅途。我们的目标是要像Spark官方机构那样有能力去定制Spark版本。

我们最开始将从Spark Streaming着手。

为何从Spark Streaming切入Spark版本定制?Spark的子框架已有若干,为何选择Spark Streaming?让我们细细道来。

Spark最开始只有Spark Core,没有目前的这些子框架。我们通过对一个框架的彻底研究,肯定可以精通Spark力量的源泉和所有问题的解决之道。

我们再看看目前的这些子框架。Spark SQL有太多语法,研究这些会太浪费精力。SparkR还没完善。Spark GraphX已无太多可改进之处,可能发展到尽头,图计算相关的数学知识也不是目前重点。Spark MLlib中的机器学习也有太多算法是与数学相关,也不是做改进的好的选择 。所以我们选择了Spark
Streaming。

2015年是流式处理的一年。大家考虑用Spark,主要也是因为Spark Streaming。这是一个流处理的时代,一切数据如果与流式处理不相关的话,都是无效的数据。Spark之所以强悍的一个重要原因在于,它的流式处理可以在线使用图计算、机器学习或者SparkR的成果,这得益于Spark一体化、多元化的基础架构设计。也就是在Spark
Streaming中可以调用其它子框架,无需任何设置。这是Spark无可匹敌之处,也是Spark Streaming必将一统天下的根源。但Spark的应用中,Spark Streaming也是最容易出问题的。

Spark Streaming与其它子框架不同之处在于,它更像是Spark Core之上的一个应用程序。所以如果要做Spark的定制开发,Spark Streaming则提供了最好的参考。你想掌握Spark
Streaming,但你不去精通Spark Core的话,那是不可能的。所以我们选择Spark Streaming来提升自己,是找到了关键点。如果对照风水学的说法,我们已经幸运地找到了龙脉。如果要寻龙点穴,那么Spark Streaming就是龙穴之所在。找到了穴位,我们就能一日千里。

本期内容

1 Spark Streaming另类在线实验

2 瞬间理解Spark Streaming本质

1 Spark Streaming另类在线实验

我们在研究Spark Streaming的过程中,会有困惑的事情:如何清晰的看到数据的流入、被处理的过程?

我们的技巧是,通过调大Batch Interval的方式,来降低批处理次数,以方便看清楚各个环节。

我们从已写过的广告点击的在线黑名单过滤的Spark Streaming应用程序入手。

object OnlineBlackListFilter {

def main(args: Array[String]){

/**

* 第1步:创建Spark的配置对象SparkConf,设置Spark程序的运行时的配置信息,

* 例如说通过setMaster来设置程序要链接的Spark集群的Master的URL,如果设置

* 为local,则代表Spark程序在本地运行,特别适合于机器配置条件非常差(例如

* 只有1G的内存)的初学者       *

*/

val conf = new SparkConf() //创建SparkConf对象

conf.setAppName("OnlineBlackListFilter") //设置应用程序的名称,在程序运行的监控界面可以看到名称

conf.setMaster("spark://Master:7077") //此时,程序在Spark集群

val ssc = new StreamingContext(conf, Seconds(30))

/**

* 黑名单数据准备,实际上黑名单一般都是动态的,例如在Redis或者数据库中,黑名单的生成往往有复杂的业务

* 逻辑,具体情况算法不同,但是在Spark Streaming进行处理的时候每次都能工访问完整的信息

*/

val blackList = Array(("hadoop", true),("mahout", true))

val blackListRDD = ssc.sparkContext.parallelize(blackList, 8)

val adsClickStream = ssc.socketTextStream("Master", 9999)

/**

* 此处模拟的广告点击的每条数据的格式为:time、name

* 此处map操作的结果是name、(time,name)的格式

*/

val adsClickStreamFormatted = adsClickStream.map { ads => (ads.split(" ")(1), ads) }

adsClickStreamFormatted.transform(userClickRDD => {

//通过leftOuterJoin操作既保留了左侧用户广告点击内容的RDD的所有内容,又获得了相应点击内容是否在黑名单中

val joinedBlackListRDD = userClickRDD.leftOuterJoin(blackListRDD)

/**

* 进行filter过滤的时候,其输入元素是一个Tuple:(name,((time,name), boolean))

* 其中第一个元素是黑名单的名称,第二元素的第二个元素是进行leftOuterJoin的时候是否存在在值

* 如果存在的话,表面当前广告点击是黑名单,需要过滤掉,否则的话则是有效点击内容;

*/

val validClicked = joinedBlackListRDD.filter(joinedItem => {

if(joinedItem._2._2.getOrElse(false))

{

false

} else {

true

}

})

validClicked.map(validClick => {validClick._2._1})

}).print

/**

* 计算后的有效数据一般都会写入Kafka中,下游的计费系统会从kafka中pull到有效数据进行计费

*/

ssc.start()

ssc.awaitTermination()

}

}

我们把程序的Batch Interval设置成300秒:

val ssc = new StreamingContext(conf, Seconds(300))

我们重新生成一下jar包 。

先确认启动了Spark的History Server。

打开数据发送的端口:

nc -lk 9999

用spark-submit运行前面生成的jar包。

在数据发送端口输入若干数据,比如:

2255554 Spark

455554444 Hadoop

55555 Flink

66666 Kafka

6666855 RockySpark

666638 Scala

66666 DT_Spark

打开浏览器,看History Server的日志信息:

点击最新的应用,看我们目前运行的应用程序中有些什么Job:

总共竟然有5个Job。这完全不是我们此前做Spark SQL之类的应用程序时看到的样子。

我们接下来看一看这些Job的内容,主要揭示一些现象,不会做完全深入的剖析,只是为了先让大家进行一些思考。

Job 0:可以发现所有Executor上都有它的执行。而此Job不体现我们的业务逻辑代码。其实这是出于对后面计算的负载均衡的考虑。

Job 0包含有Stage 0、Stage 1。随便看一个Stage,比如Stage 1。看看其中的Aggregated Metrics by Executor部分:

Job 1:运行时间比较长,耗时1.5分钟。

点击Stage 2的链接,进去看看Aggregated Metrics By Executor部分:

可以知道,Stage 2只在Worker4上的一个Executor执行,而且执行了1.5分钟。

是否会觉得奇怪:从业务处理的角度看,我们发送的那么一点数据,没有必要去启动一个运行1.5分钟的任务吧。那这个任务是做什么呢?

从DAG Visualization部分,就知道此Job实际就是启动了一个接收数据的Receiver:

原来Receiver是通过一个Job来启动的。那肯定有一个Action来触发它。

看看Tasks部分:

只有一个Worker运行此Job。是用于接收数据。

Locality Level是PROCESS_LOCAL,原来是内存节点。所以,默认情况下,数据接收不会使用磁盘,而是直接使用内存中的数据。

看来,Spark Streaming应用程序启动后,自己会启动一些Job。默认启动了一个Job来接收数据,为后续处理做准备。

重要启示:一个Spark应用程序中可以启动很多Job,而这些不同的Job之间可以相互配合。这一认识为我们写复杂Spark程序奠定了良好的基础。

Job 2:看Details可以发现有我们程序的主要业务逻辑,体现在有Stag 3、Stag4、Stag 5中。

我们看Stag3、Stage4的详情,可以知道这2个Stage都是用4个Executor执行的。所有数据处理是在4台机器上进行的。

Stag 5只在Worker4上。这是因为这个Stage有Shuffle操作。

Job3:

看看一个Stage 8的Aggregated Metrics by Executor部分。可以看到,数据处理是在4台机器上进行的:

Job4:也体现了我们应用程序中的业务逻辑 。

看看一个Stage 11的详情。可以看到,数据处理是在Worker4之外的其它3台机器上进行的:

可见,Spark Streaming的一个应用中,运行了这么多Job,远不是我们从网络博客或者书籍上看的那么简单。

我们有必要通过这些现象,反过来回溯去寻根问源。不过这次暂不做深入分析。

我们的神奇之旅才刚刚开始。

2 瞬间理解Spark Streaming本质

DStream是一个没有边界的集合,没有大小的限制。

DStream代表了时空的概念。随着时间的推移,里面不断产生RDD。

时间已固定,我们就锁定到空间的操作。

从空间的维度来讲,就是处理层面。

对DStream的操作,构成了DStreamGraph。如以下图例所示:

上图中每个foreach都会触发一个作业,就会顺着依赖从后往前回溯,形成DAG,如下图所示:

空间维度确定之后,随着时间不断推进,会不断实例化RDD Graph,然后触发Job去执行处理。

现在再去读官方的Spark Streaming的文档,就好理解多了。

看来我们的学习,将从Spark Streaming的现象开始,深入到Spark Core和Spark Streaming的本质。

正巧还在着手编写Spark Streaming源码剖析的书。有王家林这样的老师指引方向,这本书可以写得相当有料。

时间: 2024-10-20 00:36:05

定制班第1课:通过案例对SparkStreaming 透彻理解三板斧之一:解密SparkStreaming另类实验及SparkStreaming本质解析的相关文章

第1课:通过案例对Spark Streaming透彻理解三板斧之一Spark Streaming另类实验及本质解析

一. 源码定制为什么从Spark Streaming切入? Spark 一开始并没我们今天看到的Spark SQL, Spark Streaming, MLlib(machine learning), GraphX(graph), Spark R等相关内容,只有原始的Spark Core. Spark Streaming本身是Spark Core上的一个框架,透过一个框架的彻底研究肯定可以精通Spark力量的源泉和所有Spark问题的解决之道. Spark现在使用较多的除了Spark Core之

Spark源码定制第一课:通过案例对SparkStreaming透彻理解三板斧之一

第一课:通过案例对SparkStreaming透彻理解三板斧之一:解密SparkStreaming另类实验及SparkStreaming本质解析 本期导读: 1 Spark源码定制选择从SparkStreaming入手: 2 Spark Streaming另类在线实验: 3 瞬间理解SparkStreaming本质. 1.    从Spark Streaming入手开始Spark源码版本定制之路 1.1           从Spark Streaming入手Spark源码版本定制之路的理由 从

第1课:通过案例对SparkStreaming 透彻理解三板斧之一

感谢DT大数据梦工厂支持提供以下内容,DT大数据梦工厂专注于Spark发行版定制.详细信息请查看 联系邮箱[email protected] 电话:18610086859 QQ:1740415547 微信号:18610086859 一 为什么从Spark Streaming入手? Spark目前有四个Spark SQL, Spark Streaming, MLlib(machine learning), GraphX(graph), Spark Core是基础,四个框架是建立在基础之上的. 所以

Spark发行版笔记1:通过案例对SparkStreaming透彻理解三板斧之一

本节课通过二个部分阐述SparkStreaming的理解: 一.解密SparkStreaming另类在线实验 二.瞬间理解SparkStreaming本质 Spark源码定制班主要是自己做发行版.自己动手改进Spark源码,通常在电信.金融.教育.医疗.互联网等领域都有自己不同的业务,如果Sprak官方版本没有你需要的业务功能,你自己可以定制.扩展Spark的功能,满足公司的业务需要. 选择SparkStreaming框架源码研究.二次开发的原因 1.Spark起初只有Spark Core基础框

Spark定制版1:通过案例对SparkStreaming透彻理解三板斧之一

本节课通过二个部分阐述SparkStreaming的理解: 一.解密SparkStreaming另类在线实验 二.瞬间理解SparkStreaming本质 Spark源码定制,自己动手改进Spark源码,通常在电信.金融.教育.医疗.互联网等领域都有自己不同的业务,如果Sprak官方版本没有你需要的业务功能,你自己可以定制.扩展Spark的功能,满足公司的业务需要. 选择SparkStreaming框架源码研究.二次开发的原因 1.Spark起初只有Spark Core基础框架没有其他的子框架(

Spark发行版笔记2:通过案例对SparkStreaming透彻理解三板斧之一

本节课主要从以下二个方面来解密SparkStreaming: 一.解密SparkStreaming运行机制 二.解密SparkStreaming架构 SparkStreaming运行时更像SparkCore上的应用程序,SparkStreaming程序启动后会启动很多job,每个batchIntval.windowByKey的job.框架运行启动的job.例如,Receiver启动时也启动了job,此job为其他job服务,所以需要做复杂的spark程序,往往多个job之间互相配合.SparkS

Spark定制版2:通过案例对SparkStreaming透彻理解三板斧之二

本节课主要从以下二个方面来解密SparkStreaming: 一.解密SparkStreaming运行机制 二.解密SparkStreaming架构 SparkStreaming运行时更像SparkCore上的应用程序,SparkStreaming程序启动后会启动很多job,每个batchIntval.windowByKey的job.框架运行启动的job.例如,Receiver启动时也启动了job,此job为其他job服务,所以需要做复杂的spark程序,往往多个job之间互相配合.SparkS

Spark版本定制:通过案例对SparkStreaming透彻理解三板斧之一

本期内容: 1 Spark Streaming另类在线实验 2 瞬间理解Spark Streaming本质 问:为什么从Spark Streaming来切入spark源码版本订制? Spark最开始的时候并没有Spark Streaming.Spark Sql.Spark ML.Spark R.Spark Graphx等相关的内容,就是很原始的Spark Core,Spark Streaming本身是Spark Core上的一个框架,透过一个框架的彻底研究可以彻底精通spark的方方面面: Sp

Spark版本定制:通过案例对SparkStreaming透彻理解三板斧之二:解密SparkStreaming运行机制和架构

本期内容: 1.解密Spark Streaming运行机制 2.解密Spark Streaming架构 上期回顾: 1.技术界的寻龙点穴,每个领域都有自己的龙脉,Spark就是大数据界的龙脉,Spark Streaming就是Spark的龙血: 2.采用了降维(把时间Batch Interval放大)的方式,进行案例演示实战,得到的结论是:特定的时间内是RDD在执行具体的Job: 一.解密Spark Streaming运行机制和架构 运行机制概念:       Spark Streaming运行