- 解密Spark Streaming Job架构和运行机制
- 解密Spark Streaming容错架构和运行机制
- 作业的生成肯定是一个动态的生成
private[streaming]valgraph:
DStreamGraph = {
if(isCheckpointPresent)
{
cp_.graph.setContext(this)
cp_.graph.restoreCheckpointData()
cp_.graph
}else{
require(batchDur_
!= null,"Batch
duration for StreamingContext cannot be null")
valnewGraph
=newDStreamGraph()
newGraph.setBatchDuration(batchDur_)
newGraph
}
}
2.
private[streaming]valcheckpointDuration:
Duration = {
if(isCheckpointPresent)
cp_.checkpointDurationelsegraph.batchDuration
}
private[streaming]valscheduler=newJobScheduler(this)
private[streaming]valwaiter=newContextWaiter
private[streaming]valprogressListener=newStreamingJobProgressListener(this)
private[streaming]valuiTab:
Option[StreamingTab] =
if(conf.getBoolean("spark.ui.enabled",true))
{
Some(newStreamingTab(this))
}else{
None
}
3.
defstart():Unit=
synchronized {
if(eventLoop!=null)return//
scheduler has already been started
logDebug("Starting
JobScheduler")
eventLoop=newEventLoop[JobSchedulerEvent]("JobScheduler")
{
override protected defonReceive(event:
JobSchedulerEvent):Unit= processEvent(event)
override protected defonError(e:Throwable):Unit=
reportError("Error in job scheduler",e)
}
eventLoop.start()
// attach rate controllers of input streams to receive batch completion
updates
for{
inputDStream <- ssc.graph.getInputStreams
rateController <- inputDStream.rateController
} ssc.addStreamingListener(rateController)
listenerBus.start(ssc.sparkContext)
receiverTracker=newReceiverTracker(ssc)
inputInfoTracker=newInputInfoTracker(ssc)
receiverTracker.start()
jobGenerator.start()
logInfo("Started JobScheduler")
}
1.在StreamingContext调用start方法的内部其实是会启动JobScheduler的Start方法,进行消息循环,在JobScheduler 的start内部会构造JobGenerator和ReceiverTacker,并且调用JobGenerator和ReceiverTacker的start方法:
(1). JobGenerator启动后会不断的根据batchDuration生成一个个的Job
(2). ReceiverTracker启动后首先在Spark Cluster中启动Receiver(其实是在Executor中先启动ReceiverSupervisor),在Receiver收到
数据后会通过ReceiverSupervisor存储到Executor并且把数据的Metadata信息发送给Driver中的ReceiverTracker,在ReceiverTracker内部会通过ReceivedBlockTracker来管理接受到的元数据信息
2.
每个BatchInterval会产生一个具体的Job,其实这里的Job不是Spark Core中所指的Job,它只是基于DStreamGraph而生成的RDD的DAG而已,从Java角度讲,相当于Runnable接口实例,此时要想运行Job需要提交给JobScheduler,在JobScheduler中通过线程池的方式找到一个
元数据:分布式的集群,为了高效的处理数据,一方面是数据本身,另一方面是数据存储在哪里,相当于数据的本身,高效的数据处理方法,是将数据的索引和数据进行区分,元数据的管理是通过BlockTracker来管理的,存数据通过BlockManager来存储管理
- Spark Streaming容错机制:
从第二课中我们发现,Dstream相对于RDD是逻辑层面的,通过Dstream
Graph随着时间的流失不断的产生RDD,所以对Dstream的操作就是在固定的时间上操作RDD。所以基于RDD的容错机制同样也实用与Spark
Streamign的容错机制
Spark Streaming的容错要考虑两个方面:
- 接收数据的安全性,1.默认情况下都会在两台机器上2.WAL机制,先把数据通过WAL做一个日志记录,kafka支持数据的回放
- 执行数据的安全性,Job执行完全靠RDD的引导,Excutor的安全性
- Driver级别的安全性,DAG生成的模板,Dstream Graph,Reciever
Tracker, JobGenerator,只需要做一个checkpoint就行
感谢DT大数据梦工厂支持提供以下内容,DT大数据梦工厂专注于Spark发行版定制。详细信息请查看
联系邮箱[email protected]
电话:18610086859
QQ:1740415547
微信号:18610086859