(版本定制)第1课:Spark Streaming另类在线实验及Spark Streaming本质理解

本节课内容:

1、Spark Streaming另类在线实验解析

2、Spark Streaming本质理解

Spark Streaming是Spark Core上的一个子框架,如果我们能够完全精通这个子框架,我们就能够更好的驾驭Spark。Spark Streaming和Spark SQL是目前最流行的框架,从研究角度而言,Spark SQL有太多涉及到SQL优化的问题,不太适合用来深入研究。而Spark Streaming和其他的框架不同,它更像是Spark Core的一个应用程序。如果我们能深入的了解Spark Streaming,那我们就可以写出非常复杂的应用程序。

Spark Streaming的优势是可以结合SparkSQL、图计算、机器学习,功能更加强大。这个时代,单纯的流计算已经无法满足客户的需求啦。在Spark中Spark Streaming也是最容易出现问题的,因为数据在不断的流动,程序在不断的运行,内部比较复杂。

本次实验基于如下博客中的程序代码

IMF课程的第94课:SparkStreaming 实现广告计费系统中在线黑名单过滤实战

参考博客地址:http://lqding.blog.51cto.com/9123978/1769290

为了更好的查看Job的运行情况,我们启动history-server

[email protected]:/usr/local/spark/spark-1.6.0-bin-hadoop2.6/sbin# ./start-history-server.sh

启动前先配置下history log日志目录

[email protected]:/tmp# hdfs dfs -mkdir /historyServerForSpark/

配置spark-env.sh,添加一个环境变量,让history server的logDirectory指向上面建立的目录

export SPARK_HISTORY_OPTS="-Dspark.history.fs.logDirectory=hdfs://spark-master:8020/historyServerForSpark"

配置spark-defaults.conf,添加如下配置项:

#是否记录作业产生的事件或者运行状态(job,stage等使用内存等信息)  

spark.eventLog.enabled           true

#如果记录作业产生的事件或者运行状态,则将事件写入什么位置  

spark.eventLog.dir             hdfs://spark-master:8020/historyServerForSpark

#http history的监听端口号,通过http://hadoop.master:18080访问  

spark.history.ui.port            18080

#Spark history日志位置

park.history.fs.logDirectory=hdfs://spark-master:8020/historyServerForSpark


启动History Server,Web UI界面如下

为了可以更清晰的看清楚Streaming运行的各个环节,我们可以通过将batchInterval的值设置的更大。例如5分钟。

将程序上传至spark集群

运行Spark程序

[email protected]:~# /usr/local/spark/spark-1.6.0-bin-hadoop2.6/bin/spark-submit --class com.dt.spark.streaming.OnlineBlackListFilter --master spark://spark-master:7077 ./spark.jar

通过nc -lk 9999命令发送一些数据,内容如下:

[email protected]:~# nc -lk 9999

134343 Hadoop

343434 spark

3432777 Java

0983743 Hbase

893434 Mathou

程序输入结果为:

16/05/01 14:00:01 INFO scheduler.DAGScheduler: Job 3 finished: print at OnlineBlackListFilter.scala:63, took 0.098316 s

-------------------------------------------

Time: 1462082400000 ms

-------------------------------------------

3432777 Java

343434 spark

0983743 Hbase

接下来,我们查看web ui中的内容,来解析SparkStreaming的运行过程。

红色部分为我们刚刚运行的程序的日志(第一次运行时,在completed application这个地方看不到日志,在Show incomplete applications 这个地方显示了日志,可是此时程序已经退出了。)

我们点击进去,查看详细信息:

我们可以看到,这个程序在运行期间,启动了4个Job。

先看看job id 为0 的详细信息

这个job,很明显是我们定义的blackListRDD数据的生成。对应的代码为

val blackList = Array(("Hadoop"true), ("Mathou"true))

//把Array变成RDD

val blackListRDD = ssc.sparkContext.parallelize(blackList)

并且它做了reduceBykey的操作(代码中并没有此步操作,SparkStreaming框架自行生成的)。

这里有两个Stage,Stage 0和Stage 1

接下来我们看看Job 1的详细信息

此处也是一个makeRDD,这个RDD是receiver不断的接收数据流中的数据,在时间间隔达到Batch Interval后,将所有数据变成一个RDD。并且它的耗时也是最长的,59s

特别说明:此处可以看出,receiver也是一个独立的job。由此我们可以得出一个结论:我们在应用程序中,可以启动多个job,并且不用的job之间可以相互配合,这就为我们编写复杂的应用程序打下了基础。

我们点击上面的start at OnlineBlackListFilter.scala:64查看详细信息

根据上图信息,只有一个Executor在接收数据,最最重要的是红色框中的数据本地性为PROCESS_LOCAL,由此可以知道receiver接收到数据后会保存到内存中,只要内存充足是不会写到内存中的。

即便在创建receiver时,指定的存储默认策略为MEMORY_AND_DISK_SER_2

def socketTextStream(

    hostname: String,

    port: Int,

    storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2

  ): ReceiverInputDStream[String] = withNamedScope("socket text stream") {

  socketStream[String](hostname, port, SocketReceiver.bytesToLines, storageLevel)

}

我们再看看job 2的详细信息:

Job 2 将前两个job生成的RDD进行leftOuterJoin操作。

从Stage Id的编号就可以看出,它是依赖于上两个Job的。

Receiver接收数据时是在spark-master节点上,但是Job 2在处理数据时,数据已经到了spark-worker1上了(因为我的环境只有两个worker,数据并没有分散到所有worker节点,worker节点如果多一点,情况可能不一样,每个节点都会处理数据)

点击上面的Stage Id 3查看详细信息:

在一个Executor上运行,并且有5个Task 。

我们看看Job 3的详细信息:

此处的DAG图和Job2的相同,但是Stage 6和7被跳过了。详细的原因,我们后面的课程会一一讲解。

总结:我们可以看出,一个Batch Interval并不是单单触发一个Job。

根据上面的描述,我们更细致的了解了DStream和RDD的关系了。DStream就是由一个个BatchInterval时间内的RDD组成的。只不过DStream带上了时间维度,是一个无边界的集合。

对DStream的操作会构建成DStream Graph

在每到Batch Interval时间间隔后,Job被触发,DStream Graph将会被转换成RDD Graph

备注:

资料来源于:DT_大数据梦工厂

更多私密内容,请关注微信公众号:DT_Spark

如果您对大数据Spark感兴趣,可以免费听由王家林老师每天晚上20:00开设的Spark永久免费公开课,地址YY房间号:68917580

Life is short,you need to Spark!

时间: 2024-10-10 23:38:51

(版本定制)第1课:Spark Streaming另类在线实验及Spark Streaming本质理解的相关文章

(版本定制)第11课:Spark Streaming源码解读之Driver中的ReceiverTracker彻底研究和思考

本期内容: 1.ReceiverTracker的架构设计 2.消息循环系统 3.ReceiverTracker具体实现 上节课讲到了Receiver是如何不断的接收数据的,并且接收到的数据的元数据会汇报给ReceiverTracker,下面我们看看ReceiverTracker具体的功能及实现. ReceiverTracker主要的功能: 在Executor上启动Receivers. 停止Receivers . 更新Receiver接收数据的速度(也就是限流) 不断的等待Receivers的运行

(版本定制)第14课:Spark Streaming源码解读之State管理之updateStateByKey和mapWithState解密

本期内容: 1.updateStateByKey解密 2.mapWithState解密 背景:整个Spark Streaming是按照Batch Duractions划分Job的.但是很多时候我们需要算过去的一天甚至一周的数据,这个时候不可避免的要进行状态管理,而Spark Streaming每个Batch Duractions都会产生一个Job,Job里面都是RDD, 所以此时面临的问题就是怎么对状态进行维护?这个时候就需要借助updateStateByKey和mapWithState方法完成

(版本定制)第5课:基于案例分析Spark Streaming流计算框架的运行源码

本期内容: 1.在线动态计算分类最热门商品案例回顾与演示 2.基于案例分析Spark Streaming的运行源码 第一部分案例: package com.dt.spark.sparkstreaming import com.robinspark.utils.ConnectionPoolimport org.apache.spark.SparkConfimport org.apache.spark.sql.Rowimport org.apache.spark.sql.hive.HiveConte

(版本定制)第16课:Spark Streaming源码解读之数据清理内幕彻底解密

本期内容: 1.Spark Streaming元数据清理详解 2.Spark Streaming元数据清理源码解析 一.如何研究Spark Streaming元数据清理 操作DStream的时候会产生元数据,所以要解决RDD的数据清理工作就一定要从DStream入手.因为DStream是RDD的模板,DStream之间有依赖关系. DStream的操作产生了RDD,接收数据也靠DStream,数据的输入,数据的计算,输出整个生命周期都是由DStream构建的.由此,DStream负责RDD的整个

(版本定制)第17课:Spark Streaming资源动态申请和动态控制消费速率原理剖析

本期内容: 1.Spark Streaming资源动态分配 2.Spark Streaming动态控制消费速率 为什么需要动态? a)Spark默认情况下粗粒度的,先分配好资源再计算.对于Spark Streaming而言有高峰值和低峰值,但是他们需要的资源是不一样的,如果按照高峰值的角度的话,就会有大量的资源浪费. b) Spark Streaming不断的运行,对资源消耗和管理也是我们要考虑的因素. Spark Streaming资源动态调整的时候会面临挑战: Spark Streaming

(版本定制)第15课:Spark Streaming源码解读之No Receivers彻底思考

hu本期内容: 1.Kafka解密 背景: 目前No Receivers在企业中使用的越来越多,No Receivers具有更强的控制度,语义一致性.No Receivers是我们操作数据来源自然方式,操作数据来源使用一个封装器,且是RDD类型的. 所以Spark Streaming就产生了自定义RDD –> KafkaRDD. 源码分析: 1.KafkaRDD源码 private[kafka]class KafkaRDD[K: ClassTag,V: ClassTag,U <: Decode

(版本定制)第12课:Spark Streaming源码解读之Executor容错安全性

本期内容: 1.Executor的WAL容错机制 2.消息重放 Executor的安全容错主要是数据的安全容错,那为什么不考虑数据计算的安全容错呢? 原因是计算的时候Spark Streaming是借助于Spark Core上RDD的安全容错的,所以天然的安全可靠的. Executor的安全容错主要有: 1.数据副本: 有两种方式:a.借助底层的BlockManager,BlockManager做备份,通过传入的StorageLevel进行备份. b. WAL方式进行容错. 2.接受到数据之后,

(版本定制)第18课:Spark Streaming中空RDD处理及流处理程序优雅的停止

本期内容: 1. Spark Streaming中RDD为空处理 2. Streaming Context程序停止方式 Spark Streaming运用程序是根据我们设定的Batch Duration来产生RDD,产生的RDD存在partitons数据为空的情况,但是还是会执行foreachPartition,会获取计算资源,然后计算一下,这种情况就会浪费 集群计算资源,所以需要在程序运行的时候进行过滤,参考如下代码: package com.dt.spark.sparkstreamingim

(版本定制)第7课:Spark Streaming源码解读之JobScheduler内幕实现和深度思考

本期内容: 1.JobScheduler内幕实现 2.JobScheduler深度思考 JobScheduler是Spark Streaming的调度核心,地位相当于Spark Core上调度中心的DAG Scheduler,非常重要! JobGenerator每隔Batch Duration时间会动态的生成JobSet提交给JobScheduler,JobScheduler接收到JobSet后,如何处理呢? 产生Job /** Generate jobs and perform checkpo