版本定制第5课:基于案例一节课贯通Spark Streaming流计算框架的运行源码

本期内容:

1、在线动态计算分类最热门商品案例回顾与演示

2、基于案例贯通Spark Streaming的运行源码

第一部分案例:

package com.dt.spark.sparkstreaming

import com.robinspark.utils.ConnectionPool

import org.apache.spark.SparkConf

import org.apache.spark.sql.Row

import org.apache.spark.sql.hive.HiveContext

import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}

import org.apache.spark.streaming.{Seconds, StreamingContext}

/**

  * 使用Spark Streaming+Spark SQL来在线动态计算电商中不同类别中最热门的商品排名,例如手机这个类别下面最热门的三种手机、电视这个类别

  下最热门的三种电视,该实例在实际生产环境下具有非常重大的意义;

  *

  * @author DT大数据梦工厂

  新浪微博:http://weibo.com/ilovepains/

  *

  *

  *   实现技术:Spark Streaming+Spark SQL,之所以Spark Streaming能够使用MLsqlgraphx等功能是因为有foreachRDDTransform

  * 等接口,这些接口中其实是基于RDD进行操作,所以以RDD为基石,就可以直接使用Spark其它所有的功能,就像直接调用API一样简单。

  *  假设说这里的数据的格式:user item category,例如Rocky Samsung Android

  */

object OnlineTheTop3ItemForEachCategory2DB {

def main(args: Array[String]){

/**

      * 1步:创建Spark的配置对象SparkConf,设置Spark程序的运行时的配置信息,

      例如说通过setMaster来设置程序要链接的Spark集群的MasterURL,如果设置

      local,则代表Spark程序在本地运行,特别适合于机器配置条件非常差(例如

      只有1G的内存)的初学者 
     
*

      */

    val conf = new SparkConf() //创建SparkConf对象

conf.setAppName("OnlineTheTop3ItemForEachCategory2DB") //设置应用程序的名称,在程序运行的监控界面可以看到名称

conf.setMaster("spark://Master:7077") //此时,程序在Spark集群

//conf.setMaster("local[2]")

//设置batchDuration时间间隔来控制Job生成的频率并且创建Spark Streaming执行的入口

val ssc = new StreamingContext(conf, Seconds(5))

ssc.checkpoint("/root/Documents/SparkApps/checkpoint")

val userClickLogsDStream = ssc.socketTextStream("Master", 9999)

val formattedUserClickLogsDStream = userClickLogsDStream.map(clickLog =>

(clickLog.split(" ")(2) + "_" + clickLog.split(" ")(1), 1))

//    val categoryUserClickLogsDStream = formattedUserClickLogsDStream.reduceByKeyAndWindow((v1:Int, v2: Int) => v1 + v2,

//      (v1:Int, v2: Int) => v1 - v2, Seconds(60), Seconds(20))

val categoryUserClickLogsDStream = formattedUserClickLogsDStream.reduceByKeyAndWindow(_+_,

_-_, Seconds(60), Seconds(20))

categoryUserClickLogsDStream.foreachRDD { rdd => {

if (rdd.isEmpty()) {

println("No data inputted!!!")

else {

val categoryItemRow = rdd.map(reducedItem => {

val category = reducedItem._1.split("_")(0)

val item = reducedItem._1.split("_")(1)

val click_count = reducedItem._2

Row(category, item, click_count)

})

val structType = StructType(Array(

StructField("category", StringType, true),

StructField("item", StringType, true),

StructField("click_count", IntegerType, true)

))

val hiveContext = new HiveContext(rdd.context)

val categoryItemDF = hiveContext.createDataFrame(categoryItemRow, structType)

categoryItemDF.registerTempTable("categoryItemTable")

val reseltDataFram = hiveContext.sql("SELECT category,item,click_count FROM (SELECT category,item,click_count,row_number()" +

" OVER (PARTITION BY category ORDER BY click_count DESC) rank FROM categoryItemTable) subquery " +

" WHERE rank <= 3")

reseltDataFram.show()

val resultRowRDD = reseltDataFram.rdd

        resultRowRDD.foreachPartition { partitionOfRecords => {

if (partitionOfRecords.isEmpty){

println("This RDD is not null but partition is null")

else {

// ConnectionPool is a static, lazily initialized pool of connections

val connection = ConnectionPool.getConnection()

partitionOfRecords.foreach(record => {

val sql = "insert into categorytop3(category,item,client_count) values(‘" + record.getAs("category") + "‘,‘" +

record.getAs("item") + "‘," + record.getAs("click_count") + ")"

val stmt = connection.createStatement();

stmt.executeUpdate(sql);

})

ConnectionPool.returnConnection(connection) // return to the pool for future reuse

}

}

}

}

}

}

/**

      * StreamingContext调用start方法的内部其实是会启动JobSchedulerStart方法,进行消息循环,在JobScheduler

      * start内部会构造JobGeneratorReceiverTacker,并且调用JobGeneratorReceiverTackerstart方法:

      *   1JobGenerator启动后会不断的根据batchDuration生成一个个的Job

      *   2ReceiverTracker启动后首先在Spark Cluster中启动Receiver(其实是在Executor中先启动ReceiverSupervisor),在Receiver收到

      *   数据后会通过ReceiverSupervisor存储到Executor并且把数据的Metadata信息发送给Driver中的ReceiverTracker,在ReceiverTracker

      *   内部会通过ReceivedBlockTracker来管理接受到的元数据信息

      每个BatchInterval会产生一个具体的Job,其实这里的Job不是Spark Core中所指的Job,它只是基于DStreamGraph而生成的RDD

      * DAG而已,从Java角度讲,相当于Runnable接口实例,此时要想运行Job需要提交给JobScheduler,在JobScheduler中通过线程池的方式找到一个

      单独的线程来提交Job到集群运行(其实是在线程中基于RDDAction触发真正的作业的运行),为什么使用线程池呢?

      *   1,作业不断生成,所以为了提升效率,我们需要线程池;这和在Executor中通过线程池执行Task有异曲同工之妙;

      *   2,有可能设置了JobFAIR公平调度的方式,这个时候也需要多线程的支持;

      *

      */

    ssc.start()

ssc.awaitTermination()

}

}

第二部分源码解析:

1、构建StreamingContext时传递SparkConf参数(或者自己Configuration)在内部创建SparkContext

def this(conf: SparkConf, batchDuration: Duration) = {

this(StreamingContext.createNewSparkContext(conf), null, batchDuration)

}

2、事实说明SparkStreaming就是SparkCore上的一个应用程序

private[streaming] def createNewSparkContext(conf: SparkConf): SparkContext = {

new SparkContext(conf)

}

3、创建Socket输入流

def socketTextStream(

hostname: String,

port: Int,

storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2

  ): ReceiverInputDStream[String] = withNamedScope("socket text stream") {

socketStream[String](hostname, port, SocketReceiver.bytesToLines, storageLevel)

}

4、创建SocketInputDStream

def socketStream[T: ClassTag](

hostname: String,

port: Int,

converter: (InputStream) => Iterator[T],

storageLevel: StorageLevel

): ReceiverInputDStream[T] = {

new SocketInputDStream[T](this, hostname, port, converter, storageLevel)

}

5、SocketInputDstream继承ReceiverInputDStream,通过构建Receiver来接收数据

private[streaming]

class SocketInputDStream[T: ClassTag](

ssc_ : StreamingContext,

host: String,

port: Int,

bytesToObjects: InputStream => Iterator[T],

storageLevel: StorageLevel

extends ReceiverInputDStream[T](ssc_) {

def getReceiver(): Receiver[T] = {

new SocketReceiver(host, port, bytesToObjects, storageLevel)

}

}

5.1、ReceiverInputDStream

abstract class ReceiverInputDStream[T: ClassTag](ssc_ : StreamingContext)

extends InputDStream[T](ssc_) {

abstract class InputDStream[T: ClassTag] (ssc_ : StreamingContext)

extends DStream[T](ssc_) {

5.2、DStream

/*

* DStreams internally is characterized by a few basic properties:

*  - A list of other DStreams that the DStream depends on

*  - A time interval at which the DStream generates an RDD

*  - A function that is used to generate an RDD after each time interval

*/

1)、依赖于其他DStream

2)、什么时候依据DStream,依赖关系的模板,构成RDD之间的依赖

3)、基于DStream它有一个Function,Function 基于Batch Interval(time Interval)生成RDD,这个和定时器有关系

abstract class DStream[T: ClassTag] (

@transient private[streaming] var ssc: StreamingContext

extends Serializable with Logging {

6、SocketReceiver对象在onStart中创建Thread启动run方法调用执行receive接收数据。

def onStart() {

// Start the thread that receives data over a connection

new Thread("Socket Receiver") {

setDaemon(true)

override def run() { receive() }

}.start()

}

7、创建一个Socket connection连接接收数据

/** Create a socket connection and receive data until receiver is stopped */

  def receive() {

var socket: Socket = null

    try {

logInfo("Connecting to " + host + ":" + port)

socket = new Socket(host, port)

logInfo("Connected to " + host + ":" + port)

val iterator = bytesToObjects(socket.getInputStream())

while(!isStopped && iterator.hasNext) {

store(iterator.next)

}

if (!isStopped()) {

restart("Socket data stream had no more data")

else {

logInfo("Stopped receiving")

}

8、总体流程:在StreamingContext调用start方法的内部其实是会启动JobScheduler的Start方法,进行消息循环,在JobScheduler的start内部会构造JobGenerator和ReceiverTacker,并且调用JobGenerator和ReceiverTacker的start方法:

1)、JobGenerator启动后会不断的根据batchDuration生成一个个的Job

2)、ReceiverTracker启动后首先在Spark Cluster中启动Receiver(其实是在Executor中先启动ReceiverSupervisor),

在Receiver收到 数据后会通过ReceiverSupervisor存储到Executor并且把数据的Metadata信息发送给Driver中的ReceiverTracker,

在ReceiverTracker 内部会通过ReceivedBlockTracker来管理接受到的元数据信息 每个BatchInterval会产生一个具体的Job(这里的Job主要是封装了业务逻辑例如上面实例中的代码),其实这里的Job不是Spark Core中所指的Job,它只是基于DStreamGraph而生成的RDD 的DAG而已,

从Java角度讲,相当于Runnable接口实例,此时要想运行Job需要提交给JobScheduler,在JobScheduler中通过线程池的方式找到一个 单独的线程来提交Job到集群运行(其实是在线程中基于RDD的Action触发真正的作业的运行),

为什么使用线程池呢?

a)、作业不断生成,所以为了提升效率,我们需要线程池;这和在Executor中通过线程池执行Task有异曲同工之妙;

b)、有可能设置了Job的FAIR公平调度的方式,这个时候也需要多线程的支持;

8.1、StreamingContext.start

// Start the streaming scheduler in a new thread, so that thread local properties

// like call sites and job groups can be reset without affecting those of the

// current thread.

ThreadUtils.runInNewThread("streaming-start") {

sparkContext.setCallSite(startSite.get)

sparkContext.clearJobGroup()

sparkContext.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, "false")

scheduler.start()

}

补充:线程本地存储,线程ThreadLocal每个线程有自己的私有属性,设置线程的私有属性不会影响当前线程或其他线程

9、JobScheduler.start 创建EventLoop消息线程并启动

def start(): Unit = synchronized {

if (eventLoop != nullreturn // scheduler has already been started

logDebug("Starting JobScheduler")

eventLoop new EventLoop[JobSchedulerEvent]("JobScheduler") {

override protected def onReceive(event: JobSchedulerEvent): Unit = processEvent(event)

override protected def onError(e: Throwable): Unit = reportError("Error in job scheduler", e)

}

eventLoop.start()

9.1、EventLoop中创建Thread线程接收和发送消息,调用JobScheduler中的processEvent方法

private[spark] abstract class EventLoop[E](name: String) extends Logging {

private val eventQueue: BlockingQueue[E] = new LinkedBlockingDeque[E]()

private val stopped new AtomicBoolean(false)

private val eventThread new Thread(name) {

setDaemon(true)

override def run(): Unit = {

try {

while (!stopped.get) {

val event = eventQueue.take()

try {

onReceive(event)

catch {

9.2、会接受不同的任务,JobScheduler是整个Job的调度器,它本身用了一个线程循环,去监听不同的Job启动、Job完成、Job失败等任务(消息驱动系统)

private def processEvent(event: JobSchedulerEvent) {

try {

event match {

case JobStarted(job, startTime) => handleJobStart(job, startTime)

case JobCompleted(job, completedTime) => handleJobCompletion(job, completedTime)

case ErrorReported(m, e) => handleError(m, e)

}

catch {

10、JobScheduler.start

// attach rate controllers of input streams to receive batch completion updates

for {

inputDStream <- ssc.graph.getInputStreams

rateController <- inputDStream.rateController

} ssc.addStreamingListener(rateController)

10.1、多个InputStream

inputDStream <- ssc.graph.getInputStreams

10.2、RateController控制输入的速度

// Keep track of the freshest rate for this stream using the rateEstimator

protected[streaming] val rateController: Option[RateController] = None

11、JobScheduler.start

listenerBus.start(ssc.sparkContext)

receiverTracker new ReceiverTracker(ssc)

inputInfoTracker new InputInfoTracker(ssc)

receiverTracker.start()

jobGenerator.start()

11.1、StreamingListenerBus

override def onPostEvent(listener: StreamingListener, event: StreamingListenerEvent): Unit = {

event match {

case receiverStarted: StreamingListenerReceiverStarted =>

listener.onReceiverStarted(receiverStarted)

case receiverError: StreamingListenerReceiverError =>

listener.onReceiverError(receiverError)

case receiverStopped: StreamingListenerReceiverStopped =>

listener.onReceiverStopped(receiverStopped)

case batchSubmitted: StreamingListenerBatchSubmitted =>

listener.onBatchSubmitted(batchSubmitted)

case batchStarted: StreamingListenerBatchStarted =>

listener.onBatchStarted(batchStarted)

case batchCompleted: StreamingListenerBatchCompleted =>

listener.onBatchCompleted(batchCompleted)

case outputOperationStarted: StreamingListenerOutputOperationStarted =>

listener.onOutputOperationStarted(outputOperationStarted)

case outputOperationCompleted: StreamingListenerOutputOperationCompleted =>

listener.onOutputOperationCompleted(outputOperationCompleted)

case _ =>

}

}

11.2、receiverTracker.start(),ReceiveTracker是通过发Job的方式到集群的Executor上启动Receiver

/** Start the endpoint and receiver execution thread. */

def start(): Unit = synchronized {

if (isTrackerStarted) {

throw new SparkException("ReceiverTracker already started")

}

if (!receiverInputStreams.isEmpty) {

endpoint = ssc.env.rpcEnv.setupEndpoint(

"ReceiverTracker", new ReceiverTrackerEndpoint(ssc.env.rpcEnv))

if (!skipReceiverLaunch) launchReceivers()

logInfo("ReceiverTracker started")

trackerState Started

  }

}

11.2.1、创建一个ReceiverTrackerEndpoint消息通信体

override def receive: PartialFunction[Any, Unit] = {

// Local messages

case StartAllReceivers(receivers) =>

val scheduledLocations = schedulingPolicy.scheduleReceivers(receivers, getExecutors)

for (receiver <- receivers) {

val executors = scheduledLocations(receiver.streamId)

updateReceiverScheduledExecutors(receiver.streamId, executors)

receiverPreferredLocations(receiver.streamId) = receiver.preferredLocation

startReceiver(receiver, executors)

}

case RestartReceiver(receiver) =>

// Old scheduled executors minus the ones that are not active any more

val oldScheduledExecutors = getStoredScheduledExecutors(receiver.streamId)

val scheduledLocations = if (oldScheduledExecutors.nonEmpty) {

// Try global scheduling again

oldScheduledExecutors

else {

val oldReceiverInfo = receiverTrackingInfos(receiver.streamId)

// Clear "scheduledLocations" to indicate we are going to do local scheduling

val newReceiverInfo = oldReceiverInfo.copy(

state = ReceiverState.INACTIVE, scheduledLocations = None)

receiverTrackingInfos(receiver.streamId) = newReceiverInfo

schedulingPolicy.rescheduleReceiver(

receiver.streamId,

receiver.preferredLocation,

receiverTrackingInfos,

getExecutors)

}

// Assume there is one receiver restarting at one time, so we don‘t need to update

// receiverTrackingInfos

startReceiver(receiver, scheduledLocations)

11.2.1.1、ReceiverSchedulingPolicy.scheduleReceivers,从下面的代码中可以看出来在那些Executor上启动Receiver,以及怎么具体在Executor上启动Receiver

// Firstly, we need to respect "preferredLocation". So if a receiver has "preferredLocation",

// we need to make sure the "preferredLocation" is in the candidate scheduled executor list.

for (i <- 0 until receivers.length) {

// Note: preferredLocation is host but executors are host_executorId

receivers(i).preferredLocation.foreach { host =>

hostToExecutors.get(host) match {

case Some(executorsOnHost) =>

// preferredLocation is a known host. Select an executor that has the least receivers in

// this host

val leastScheduledExecutor =

executorsOnHost.minBy(executor => numReceiversOnExecutor(executor))

scheduledLocations(i) += leastScheduledExecutor

numReceiversOnExecutor(leastScheduledExecutor) =

numReceiversOnExecutor(leastScheduledExecutor) + 1

case None =>

// preferredLocation is an unknown host.

// Note: There are two cases:

// 1. This executor is not up. But it may be up later.

// 2. This executor is dead, or it‘s not a host in the cluster.

// Currently, simply add host to the scheduled executors.

// Note: host could be `HDFSCacheTaskLocation`, so use `TaskLocation.apply` to handle

// this case

scheduledLocations(i) += TaskLocation(host)

}

}

}

补充:ReceiverTracker本身不直接监管Receiver,它是Driver级别的可间接地,用ReceiverSupervisor监控那台机器上Executor中的Receiver。

11.2.2、if (!skipReceiverLaunch) launchReceivers()

/**

 * Get the receivers from the ReceiverInputDStreams, distributes them to the

 * worker nodes as a parallel collection, and runs them.

 */

private def launchReceivers(): Unit = {

val receivers = receiverInputStreams.map(nis => {

val rcvr = nis.getReceiver()

rcvr.setReceiverId(nis.id)

rcvr

})

runDummySparkJob()

logInfo("Starting " + receivers.length + " receivers")

endpoint.send(StartAllReceivers(receivers))

}

11.2.2.1运行了一个Dummy的作业,确保所有的Slaves正常工作,保证所有的Receiver都在一台机器上

/**

 * Run the dummy Spark job to ensure that all slaves have registered. This avoids all the

 * receivers to be scheduled on the same node.

 *

 * TODO Should poll the executor number and wait for executors according to

 * "spark.scheduler.minRegisteredResourcesRatio" and

 * "spark.scheduler.maxRegisteredResourcesWaitingTime" rather than running a dummy job.

 */

private def runDummySparkJob(): Unit = {

if (!ssc.sparkContext.isLocal) {

ssc.sparkContext.makeRDD(1 to 50, 50).map(x => (x, 1)).reduceByKey(_ + _, 20).collect()

}

assert(getExecutors.nonEmpty)

}

11.2.2.2、endpoint.send(StartAllReceivers(receivers)

// endpoint is created when generator starts.

// This not being null means the tracker has been started and not stopped

private var endpoint: RpcEndpointRef = null

endpoint = ssc.env.rpcEnv.setupEndpoint(

"ReceiverTracker", new ReceiverTrackerEndpoint(ssc.env.rpcEnv))

ReceiverTrackerEndpoint

override def receive: PartialFunction[Any, Unit] = {

// Local messages

case StartAllReceivers(receivers) =>

val scheduledLocations = schedulingPolicy.scheduleReceivers(receivers, getExecutors)

for (receiver <- receivers) {

val executors = scheduledLocations(receiver.streamId)

updateReceiverScheduledExecutors(receiver.streamId, executors)

receiverPreferredLocations(receiver.streamId) = receiver.preferredLocation

startReceiver(receiver, executors)

}

startReceiver

// Function to start the receiver on the worker node

val startReceiverFunc: Iterator[Receiver[_]] => Unit =

(iterator: Iterator[Receiver[_]]) => {

if (!iterator.hasNext) {

throw new SparkException(

"Could not start receiver as object not found.")

}

if (TaskContext.get().attemptNumber() == 0) {

val receiver = iterator.next()

assert(iterator.hasNext == false)

val supervisor = new ReceiverSupervisorImpl(

receiver, SparkEnv.get, serializableHadoopConf.value, checkpointDirOption)

supervisor.start()

supervisor.awaitTermination()

else {

// It‘s restarted by TaskScheduler, but we want to reschedule it again. So exit it.

}

}

逆天的设计啊

// Create the RDD using the scheduledLocations to run the receiver in a Spark job

val receiverRDD: RDD[Receiver[_]] =

if (scheduledLocations.isEmpty) {

ssc.sc.makeRDD(Seq(receiver), 1)

else {

val preferredLocations = scheduledLocations.map(_.toString).distinct

ssc.sc.makeRDD(Seq(receiver -> preferredLocations))

}

receiverRDD.setName(s"Receiver $receiverId")

ssc.sparkContext.setJobDescription(s"Streaming job running receiver $receiverId")

ssc.sparkContext.setCallSite(Option(ssc.getStartSite()).getOrElse(Utils.getCallSite()))

val future = ssc.sparkContext.submitJob[Receiver[_], Unit, Unit](

receiverRDD, startReceiverFunc, Seq(0), (_, _) => Unit, ())

// We will keep restarting the receiver job until ReceiverTracker is stopped

future.onComplete {

case Success(_) =>

if (!shouldStartReceiver) {

onReceiverJobFinish(receiverId)

else {

logInfo(s"Restarting Receiver $receiverId")

self.send(RestartReceiver(receiver))

}

case Failure(e) =>

if (!shouldStartReceiver) {

onReceiverJobFinish(receiverId)

else {

logError("Receiver has been stopped. Try to restart it.", e)

logInfo(s"Restarting Receiver $receiverId")

self.send(RestartReceiver(receiver))

}

}(submitJobThreadPool)

logInfo(s"Receiver ${receiver.streamId} started")

}

ReceiverSupervisorImpl.startReceiver

/** Start receiver */

def startReceiver(): Unit = synchronized {

try {

if (onReceiverStart()) {

logInfo("Starting receiver")

receiverState Started

      receiver.onStart()

logInfo("Called receiver onStart")

else {

// The driver refused us

stop("Registered unsuccessfully because Driver refused to start receiver " + streamId, None)

}

override protected def onReceiverStart(): Boolean = {

val msg = RegisterReceiver(

streamId, receiver.getClass.getSimpleName, hostexecutorIdendpoint)

trackerEndpoint.askWithRetry[Boolean](msg)

}

11.3、JobScheduler.start  jobGenerator.start()

/** Start generation of jobs */

def start(): Unit = synchronized {

if (eventLoop != nullreturn // generator has already been started

// Call checkpointWriter here to initialize it before eventLoop uses it to avoid a deadlock.

// See SPARK-10125

checkpointWriter

  eventLoop new EventLoop[JobGeneratorEvent]("JobGenerator") {

override protected def onReceive(event: JobGeneratorEvent): Unit = processEvent(event)

override protected def onError(e: Throwable): Unit = {

jobScheduler.reportError("Error in job generator", e)

}

}

eventLoop.start()

if (ssc.isCheckpointPresent) {

restart()

else {

startFirstTime()

}

}

根据时间间隔不断发送消息

/** Processes all events */

private def processEvent(event: JobGeneratorEvent) {

logDebug("Got event " + event)

event match {

case GenerateJobs(time) => generateJobs(time)

case ClearMetadata(time) => clearMetadata(time)

case DoCheckpoint(time, clearCheckpointDataLater) =>

doCheckpoint(time, clearCheckpointDataLater)

case ClearCheckpointData(time) => clearCheckpointData(time)

}

}

/** Generate jobs and perform checkpoint for the given `time`
*/

private def generateJobs(time: Time) {

// Set the SparkEnv in this thread, so that job generation code can access the environment

// Example: BlockRDDs are created in this thread, and it needs to access BlockManager

// Update: This is probably redundant after threadlocal stuff in SparkEnv has been removed.

SparkEnv.set(ssc.env)

Try {

jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch

graph.generateJobs(time) // generate jobs using allocated block

match {

case Success(jobs) =>

val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time)

jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))

case Failure(e) =>

jobScheduler.reportError("Error generating jobs for time " + time, e)

}

eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false))

}

def submitJobSet(jobSet: JobSet) {

if (jobSet.jobs.isEmpty) {

logInfo("No jobs added for time " + jobSet.time)

else {

listenerBus.post(StreamingListenerBatchSubmitted(jobSet.toBatchInfo))

jobSets.put(jobSet.time, jobSet)

jobSet.jobs.foreach(job => jobExecutor.execute(new JobHandler(job)))

logInfo("Added jobs for time " + jobSet.time)

}

}

/**

 * Executes the given task sometime in the future.  The task

 * may execute in a new thread or in an existing pooled thread.

 *

 * If the task cannot be submitted for execution, either because this

 * executor has been shutdown or because its capacity has been reached,

 * the task is handled by the current {@code RejectedExecutionHandler}.

 *

 * @param command the task to execute

 * @throws RejectedExecutionException at discretion of

 *         {@code RejectedExecutionHandler}, if the task

 *         cannot be accepted for execution

 * @throws NullPointerException if {@code command} is null

 */

public void execute(Runnable command) {

if (command == null)

throw new NullPointerException();

/*

* Proceed in 3 steps:

*

* 1. If fewer than corePoolSize threads are running, try to

* start a new thread with the given command as its first

* task.  The call to addWorker atomically checks runState and

* workerCount, and so prevents false alarms that would add

* threads when it shouldn‘t, by returning false.

*

* 2. If a task can be successfully queued, then we still need

* to double-check whether we should have added a thread

* (because existing ones died since last checking) or that

* the pool shut down since entry into this method. So we

* recheck state and if necessary roll back the enqueuing if

* stopped, or start a new thread if there are none.

*

* 3. If we cannot queue task, then we try to add a new

* thread.  If it fails, we know we are shut down or saturated

* and so reject the task.

*/

int c = ctl.get();

if (workerCountOf(c) < corePoolSize) {

if (addWorker(command, true))

return;

c = ctl.get();

}

if (isRunning(c) && workQueue.offer(command)) {

int recheck = ctl.get();

if (! isRunning(recheck) && remove(command))

reject(command);

else if (workerCountOf(recheck) == 0)

addWorker(null, false);

}

else if (!addWorker(command, false))

reject(command);

}

private class JobHandler(job: Job) extends Runnable with Logging {

import JobScheduler._

def run() {

try {

val formattedTime = UIUtils.formatBatchTime(

job.time.milliseconds, ssc.graph.batchDuration.milliseconds, showYYYYMMSS = false)

val batchUrl = s"/streaming/batch/?id=${job.time.milliseconds}"

val batchLinkText = s"[output operation ${job.outputOpId}, batch time ${formattedTime}]"

ssc.sc.setJobDescription(

s"""Streaming job from <a href="$batchUrl">$batchLinkText</a>""")

ssc.sc.setLocalProperty(BATCH_TIME_PROPERTY_KEY, job.time.milliseconds.toString)

ssc.sc.setLocalProperty(OUTPUT_OP_ID_PROPERTY_KEY, job.outputOpId.toString)

// We need to assign `eventLoop` to a temp variable. Otherwise, because

// `JobScheduler.stop(false)` may set `eventLoop` to null when this method is running, then

// it‘s possible that when `post` is called, `eventLoop` happens to null.

var _eventLoop = eventLoop

        if (_eventLoop != null) {

_eventLoop.post(JobStarted(job, clock.getTimeMillis()))

// Disable checks for existing output directories in jobs launched by the streaming

// scheduler, since we may need to write output to an existing directory during checkpoint

// recovery; see SPARK-4835 for more details.

PairRDDFunctions.disableOutputSpecValidation.withValue(true) {

job.run()

}

_eventLoop = eventLoop

          if (_eventLoop != null) {

_eventLoop.post(JobCompleted(job, clock.getTimeMillis()))

}

else {

// JobScheduler has been stopped.

}

finally {

ssc.sc.setLocalProperty(JobScheduler.BATCH_TIME_PROPERTY_KEYnull)

ssc.sc.setLocalProperty(JobScheduler.OUTPUT_OP_ID_PROPERTY_KEYnull)

}

}

}

}

private def processEvent(event: JobSchedulerEvent) {

try {

event match {

case JobStarted(job, startTime) => handleJobStart(job, startTime)

case JobCompleted(job, completedTime) => handleJobCompletion(job, completedTime)

case ErrorReported(m, e) => handleError(m, e)

}

catch {

case e: Throwable =>

reportError("Error in job scheduler", e)

}

}

private def handleJobStart(job: Job, startTime: Long) {

val jobSet = jobSets.get(job.time)

val isFirstJobOfJobSet = !jobSet.hasStarted

jobSet.handleJobStart(job)

if (isFirstJobOfJobSet) {

// "StreamingListenerBatchStarted" should be posted after calling "handleJobStart" to get the

// correct "jobSet.processingStartTime".

listenerBus.post(StreamingListenerBatchStarted(jobSet.toBatchInfo))

}

job.setStartTime(startTime)

listenerBus.post(StreamingListenerOutputOperationStarted(job.toOutputOperationInfo))

logInfo("Starting job " + job.id + " from job set of time " + jobSet.time)

}

private class JobHandler(job: Job) extends Runnable with Logging {

import JobScheduler._

def run() {

try {

val formattedTime = UIUtils.formatBatchTime(

job.time.milliseconds, ssc.graph.batchDuration.milliseconds, showYYYYMMSS = false)

val batchUrl = s"/streaming/batch/?id=${job.time.milliseconds}"

val batchLinkText = s"[output operation ${job.outputOpId}, batch time ${formattedTime}]"

ssc.sc.setJobDescription(

s"""Streaming job from <a href="$batchUrl">$batchLinkText</a>""")

ssc.sc.setLocalProperty(BATCH_TIME_PROPERTY_KEY, job.time.milliseconds.toString)

ssc.sc.setLocalProperty(OUTPUT_OP_ID_PROPERTY_KEY, job.outputOpId.toString)

// We need to assign `eventLoop` to a temp variable. Otherwise, because

// `JobScheduler.stop(false)` may set `eventLoop` to null when this method is running, then

// it‘s possible that when `post` is called, `eventLoop` happens to null.

var _eventLoop = eventLoop

        if (_eventLoop != null) {

_eventLoop.post(JobStarted(job, clock.getTimeMillis()))

// Disable checks for existing output directories in jobs launched by the streaming

// scheduler, since we may need to write output to an existing directory during checkpoint

// recovery; see SPARK-4835 for more details.

PairRDDFunctions.disableOutputSpecValidation.withValue(true) {

job.run()

}

_eventLoop = eventLoop

          if (_eventLoop != null) {

_eventLoop.post(JobCompleted(job, clock.getTimeMillis()))

}

else {

// JobScheduler has been stopped.

}

finally {

ssc.sc.setLocalProperty(JobScheduler.BATCH_TIME_PROPERTY_KEYnull)

ssc.sc.setLocalProperty(JobScheduler.OUTPUT_OP_ID_PROPERTY_KEYnull)

}

}

}

}

资料来源于:王家林(Spark版本定制班课程)

新浪微博:http://www.weibo.com/ilovepains

时间: 2024-10-12 13:33:42

版本定制第5课:基于案例一节课贯通Spark Streaming流计算框架的运行源码的相关文章

spark版本定制五:基于案例一节课贯通Spark Streaming流计算框架的运行源码

本期内容: 1.在线动态计算分类最热门商品案例回顾与演示 2.基于案例贯通Spark Streaming的运行源码 一.在线动态计算分类最热门商品案例回顾与演示 案例回顾: package com.dt.spark.sparkstreaming import com.robinspark.utils.ConnectionPool import org.apache.spark.SparkConf import org.apache.spark.sql.Row import org.apache.

Spark定制班第5课:基于案例一节课贯通Spark Streaming流计算框架的运行源码

本期内容: 1 在线动态计算分类最热门商品案例回顾与演示 2 基于案例贯通Spark Streaming的运行源码 1 在线动态计算分类最热门商品案例回顾与演示 我们用Spark Streaming+Spark SQL来实现分类最热门商品的在线动态计算.代码如下: package com.dt.spark.streaming import org.apache.spark.SparkConf import org.apache.spark.sql.Row import org.apache.sp

第5课:基于案例一节课贯通Spark Streaming流计算框架的运行源码

本期内容: 1 在线动态计算分类最热门商品案例回顾与演示 2 基于案例贯通Spark Streaming的运行源码 一.案例代码 在线动态计算电商中不同类别中最热门的商品排名,例如:手机类别中最热门的三种手机.电视类别中最热门的三种电视等 package com.dt.spark.sparkstreaming import org.apache.spark.SparkConf import org.apache.spark.sql.Row import org.apache.spark.sql.

基于案例一节课贯通Spark Streaming流计算框架的运行源码

 在线动态计算分类最热门商品案例回顾与演示 基于案例贯通Spark Streaming的运行源码 使用Spark Streaming + Spark SQL来在线动态计算电商中不同类别中最热门的商品排名,例如手机这个类别下面最热门的三款手机. 是用mysql数据库作为元数据库,使用Hive作为存储引擎,使用Spark SQL作为查询引擎. 其中链接数据库代码如下: package com.dt.spark.com.dt.spark.streaming; import java.sql.Con

基于案例贯通 Spark Streaming 流计算框架的运行源码

本期内容 : Spark Streaming+Spark SQL案例展示 基于案例贯穿Spark Streaming的运行源码 一. 案例代码阐述 : 在线动态计算电商中不同类别中最热门的商品排名,例如:手机类别中最热门的三种手机.电视类别中最热门的三种电视等. 1.案例运行代码 : import org.apache.spark.SparkConf import org.apache.spark.sql.Row import org.apache.spark.sql.hive.HiveCont

(版本定制)第5课:基于案例分析Spark Streaming流计算框架的运行源码

本期内容: 1.在线动态计算分类最热门商品案例回顾与演示 2.基于案例分析Spark Streaming的运行源码 第一部分案例: package com.dt.spark.sparkstreaming import com.robinspark.utils.ConnectionPoolimport org.apache.spark.SparkConfimport org.apache.spark.sql.Rowimport org.apache.spark.sql.hive.HiveConte

Spark版本定制第5天:案列解析Spark Streaming运行源码

本期内容: 1 在线动态计算分类最热门商品案例回顾与演示 2 基于案例贯通Spark Streaming的运行源码 一切不能进行实时流处理的数据都是无效的数据.在流处理时代,SparkStreaming有着强大吸引力,而且发展前景广阔,加之Spark的生态系统,Streaming可以方便调用其他的诸如SQL,MLlib等强大框架,它必将一统天下. Spark Streaming运行时与其说是Spark Core上的一个流式处理框架,不如说是Spark Core上的一个最复杂的应用程序.如果可以掌

自定义基于xmemcached协议消息队列的Spark Streaming 接收器

虽然spark streaming定义了常用的Receiver,但有时候还是需要自定义自己的Receiver的.对于自定义的Receiver,只需要实现spark streaming的Receiver抽象类即可.而Receiver的实现只需要简单地实现两个方法: 1.onStart():接收数据. 2.onStop():停止接收数据. 一般onStart()不应该阻塞,应该启动一个新的线程复杂数据接收.而onStop()方法负责确保这些接收数据的线程是停止的,在 Receiver 被关闭时调用了

基于Zlib算法的流压缩、字符串压缩源码

原文:基于Zlib算法的流压缩.字符串压缩源码 Zlib.net官方源码demo中提供了压缩文件的源码算法.处于项目研发的需要,我需要对内存流进行压缩,由于zlib.net并无相关文字帮助只能自己看源码解决.通过对SharpZipLib的demo研究,写出了Zlib.net的流压缩算法. 中间花费了不少的时间,应为通过Stream压缩出来的数据全是空的,呵呵,主要原因就是忽略了ZOutputStream.flush()和ZOutPutStream.close()方法.大家自己看吧.关于字符串压缩