Apache Spark源码走读之5 -- DStream处理的容错性分析

欢迎转载,转载请注明出处,徽沪一郎,谢谢。

在流数据的处理过程中,为了保证处理结果的可信度(不能多算,也不能漏算),需要做到对所有的输入数据有且仅有一次处理。在Spark
Streaming的处理机制中,不能多算,比较容易理解。那么它又是如何作到即使数据处理结点被重启,在重启之后这些数据也会被再次处理呢?

环境搭建

为了有一个感性的认识,先运行一下简单的Spark
Streaming示例。首先确认已经安装了openbsd-netcat。

运行netcat

nc -lk 9999


运行spark-shell

SPARK_JAVA_OPTS=-Dspark.cleaner.ttl=10000 MASTER=local-cluster[2,2,1024] bin/spark-shell

在spark-shell中输入如下内容

import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
val ssc = new StreamingContext(sc, Seconds(3))
val lines = ssc.socketTextStream("localhost", 9999)
val words = lines.flatMap( _.split(" "))
val pairs = words.map(word => (word,1))
val wordCount = pairs.reduceByKey(_ + _)
wordCount.print()
ssc.start()
ssc.awaitTermination()

当ssc.start()执行之后,在nc一侧输入一些内容并回车,spark-shell上就会显示出统计的结果。

数据接收过程

来看一下代码实现层面,从两个角度来说,一是控制层面(control
panel),另一是数据层面(data panel)。

Spark
Streaming的数据接收过程的控制层面大致如下图所示。

简要讲解一下上图的意思,

  1. 数据真正接收到是发生在SocketReceiver.receive函数中,将接收到的数据放入到BlockGenerator.currentBuffer

  2. 在BlockGenerator中有一个重复定时器,处理函数为updateCurrentBuffer,
    updateCurrentBuffer将当前buffer中的数据封装为一个新的Block,放入到blocksForPush队列中

  3. 同样是在BlockGenerator中有一个BlockPushingThread,其职责就是不停的将blocksForPush队列中的成员通过pushArrayBuffer函数传递给blockmanager,让BlockManager将数据存储到MemoryStore中

  4. pushArrayBuffer还会将已经由BlockManager存储的Block的id号传递给ReceiverTracker,ReceiverTracker会将存储的blockId放到对应StreamId的队列中

socket.receive->receiver.store->pushSingle->blockgenerator.updateCurrentBuffer->blockgenerator.keepPushBlocks->pushArrayBufer

->ReceiverTracker.addBlocks

pushArrayBuffer函数的定义如下

  def pushArrayBuffer(
arrayBuffer: ArrayBuffer[_],
optionalMetadata: Option[Any],
optionalBlockId: Option[StreamBlockId]
) {
val blockId = optionalBlockId.getOrElse(nextBlockId)
val time = System.currentTimeMillis
blockManager.put(blockId, arrayBuffer.asInstanceOf[ArrayBuffer[Any]],
storageLevel, tellMaster = true)
logDebug("Pushed block " + blockId + " in " + (System.currentTimeMillis - time) + " ms")
reportPushedBlock(blockId, arrayBuffer.size, optionalMetadata)
}

数据结构的变化过程

Spark
Streaming数据处理高效的原因之一就是批量的进行数据分析,那么这些批量的数据是如何聚集起来的呢?换种方式来表述这个问题,在某一时刻,接收到的数据是单一的,也就是我们最多只能组成<t,data>这种数据元组,而在runJob的时候是批量的提取和分析数据的,这个批量数据的组成是在什么时候完成的呢?

下图大到勾勒出一条新的message被socketreceiver接收之后,是如何通过一系列的处理而放入到BlockManager中,并同时由ReceiverTracker记录下相应的元数据的。

  1. 首先new message被放入到blockManager.currentBuffer

  2. 定时器超时处理过程,将整个currentBuffer中的数据打包成一条Block,放入到ArrayBlockingQueue,该数据结构支持FIFO

  3. keepPushingBlocks将每一条Block(block中包含时间戳,接收到的原始数据)让BlockManager进行保存,同时通知ReceiverTracker已经将哪些block存储到了blockmanager中

  4. ReceiverTracker将每一个stream接收到但还没有进行处理的block放入到receiverBlockInfo,其为一Hashmap.
    在后面的generateJobs中会从receiverBlockInfo提取数据以生成相应的RDD

数据处理过程

数据处理中最重要的函数就是generateJobs,
generateJobs会引发下述的函数调用过程,具体的代码就不一一罗列了。

  1. jobgenerator.generateJobs->dstreamgraph.generateJobs->dstream.generateJob->getOrCompute->compute
    生成RDD

  2. job调用job.func

JobGenerator.generateJobs函数定义如下

  private def generateJobs(time: Time) {
SparkEnv.set(ssc.env)
Try(graph.generateJobs(time)) match {
case Success(jobs) =>
val receivedBlockInfo = graph.getReceiverInputStreams.map { stream =>
val streamId = stream.id
val receivedBlockInfo = stream.getReceivedBlockInfo(time)
(streamId, receivedBlockInfo)
}.toMap
jobScheduler.submitJobSet(JobSet(time, jobs, receivedBlockInfo))
case Failure(e) =>
jobScheduler.reportError("Error generating jobs for time " + time, e)
}
eventActor ! DoCheckpoint(time)
}

我们先来谈一谈数据处理阶段是如何与上述的接收阶段中存储下来的数据挂上钩的。

假设上一次进行RDD处理发生在时间点t1,现在是时间点t2,那么在<t2,t1>之间有哪些blocks没有被处理呢?

想必你已经知道答案了,没有被处理的blocks全部保存在ReceiverTracker的receiverBlockInfo之中

在generateJob时,每一个DStream都会调用getReceivedBlockInfo,你说没有跟ReceiverTracker中的receivedBlockInfo连起来啊,别急!且看数据输入的源头ReceiverInputDStream中的getReceivedBlockInfo是如何定义的。代码列举如下。

  private[streaming] def getReceivedBlockInfo(time: Time) = {
receivedBlockInfo(time)
}

那么此处的receivedBlockInfo(time)是从何而来的呢,这个要看ReceivedInputDStream中的compute函数实现

override def compute(validTime: Time): Option[RDD[T]] = {
// If this is called for any time before the start time of the context,
// then this returns an empty RDD. This may happen when recovering from a
// master failure
if (validTime >= graph.startTime) {
val blockInfo = ssc.scheduler.receiverTracker.getReceivedBlockInfo(id)
receivedBlockInfo(validTime) = blockInfo
val blockIds = blockInfo.map(_.blockId.asInstanceOf[BlockId])
Some(new BlockRDD[T](ssc.sc, blockIds))
} else {
Some(new BlockRDD[T](ssc.sc, Array[BlockId]()))
}
}

至此终于看到了receiverTracker中的getReceivedBlockInfo被调用,也就是说将接收阶段的数据和目前处理阶段的输入通道打通了

函数调用路径,从generateJobs到sparkcontext.submitJobs.
这个时候要注意注册为DStreamGraph中的OutputStream上的操作会引发SparkContext.runJobs被调用。我们以print函数为例看一下调用过程。

  def print() {
def foreachFunc = (rdd: RDD[T], time: Time) => {
val first11 = rdd.take(11)
println ("-------------------------------------------")
println ("Time: " + time)
println ("-------------------------------------------")
first11.take(10).foreach(println)
if (first11.size > 10) println("...")
println()
}
new ForEachDStream(this, context.sparkContext.clean(foreachFunc)).register()
}

注意rdd.take,这个会引发runJob调用,不信的话,我们可以看一看其定义中调用runJob的片段。

      val left = num - buf.size
val p = partsScanned until math.min(partsScanned + numPartsToTry, totalParts)
val res = sc.runJob(this, (it: Iterator[T]) => it.take(left).toArray, p, allowLocal = true)

res.foreach(buf ++= _.take(num - buf.size))
partsScanned += numPartsToTry
}


小结一下数据处理过程


  • 用time为关键字去取出在此时间之前加入的所有blockIds

  • 真正提交运行的时候,rdd中的blockfetcher以blockId为关键字去blockmanagermaster获取真正的数据,即从socket上接收到的原始数据

容错处理


JobGenerator.generateJobs函数的最后会发出DoCheckpoint通知,该通知会让相应的actor将DStreamCheckpointData写入到hdfs文件中,我们来看一看为什么需要写入checkpointdata以及哪些东西是包含在checkpointdata之中。

在数据处理一节,我们已经分析到在generateJobs的时候会生成多个jobs,它们会通过sparkcontext.runJob接口而发送到cluster中被真正执行。

假设在t2,worker挂掉了,挂掉的worker直到t3才完全恢复。由于挂掉的原因,上一次generateJobs生成的job不一定被完全处理了(也许有些已经处理了,有些还没有处理),所以需要重新再提交一次。这里有一个问题,那就是可能导致针对同一批数据有重复处理的情况发生,从而无法达到exactly-once的语义效果。

问题2:
在<t2,t3>这一段挂掉的时间之内,没有新的数据被接收所以Spark
Streaming的SocketReceiver适合用来充当client侧而不是server侧。SocketReceiver读取到的数据应该存在一个具有冗余备份机制的内存数据库或缓存队列里,如kafaka.
对问题2, Spark Streaming本身是解决不了的。当然这里再往下细究的话,会牵出负载均衡的问题。

checkpointData

checkpoint的成员变量有哪些呢,我们看一看其结构定义就清楚了。

  val master = ssc.sc.master
val framework = ssc.sc.appName
val sparkHome = ssc.sc.getSparkHome.getOrElse(null)
val jars = ssc.sc.jars
val graph = ssc.graph
val checkpointDir = ssc.checkpointDir
val checkpointDuration = ssc.checkpointDuration
val pendingTimes = ssc.scheduler.getPendingTimes().toArray
val delaySeconds = MetadataCleaner.getDelaySeconds(ssc.conf)
val sparkConfPairs = ssc.conf.getAll

generatedRDDs是被包含在graph里面。所以不要突然之间惊惶失措,发觉没有将generatedRDDs保存起来。

checkpoint的数据是通过CheckpointwriteHandler真正的写入到hdfs,通过CheckPiontReader而读入。CheckpointReade在重启的时候会被使用到,判断是第一次干净的启动还是因错误而重启,判断的依据全部在cp这个变量。

为了达到重启之后而自动的检查并载入相应的checkpoint数据,那么在创建StreamingContext的时候就不能简单的通过调用new
StreamingContext来完成,而是利用getOrCreate函数,代码示例如下。

// Function to create and setup a new StreamingContext
def functionToCreateContext(): StreamingContext = {
val ssc = new StreamingContext(...) // new context
val lines = ssc.socketTextStream(...) // create DStreams
...
ssc.checkpoint(checkpointDirectory) // set checkpoint directory
ssc
}

// Get StreaminContext from checkpoint data or create a new one
val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)

// Do additional setup on context that needs to be done,
// irrespective of whether it is being started or restarted
context. ...

// Start the context
context.start()
context.awaitTermination()


小结


本文中讲述数据接收过程中所使用的两幅图使用tikz完成,里面包含的信息很丰富,有志于了解清楚Spark
Streaming内部处理机制的同仁,不妨以此为参考进行详细的代码走读。

如果有任何不对或错误之处,欢迎批评指正。

参考资料


  1. Spark Streaming源码分析 checkpoint http://www.cnblogs.com/fxjwind/p/3596451.html

  2. Spark Streaming Introduction http://jerryshao.me/architecture/2013/04/02/spark-streaming-introduction/

  3. deep dive with Spark Streaming http://www.meetup.com/spark-users/events/122694912/

Apache Spark源码走读之5 -- DStream处理的容错性分析,布布扣,bubuko.com

时间: 2024-10-14 08:33:12

Apache Spark源码走读之5 -- DStream处理的容错性分析的相关文章

Apache Spark源码走读之4 -- DStream实时流数据处理

欢迎转载,转载请注明出处,徽沪一郎. Spark Streaming能够对流数据进行近乎实时的速度进行数据处理.采用了不同于一般的流式数据处理模型,该模型使得Spark Streaming有非常高的处理速度,与storm相比拥有更高的吞能力. 本篇简要分析Spark Streaming的处理模型,Spark Streaming系统的初始化过程,以及当接收到外部数据时后续的处理步骤. 系统概述 流数据的特点 与一般的文件(即内容已经固定)型数据源相比,所谓的流数据拥有如下的特点 数据一直处在变化中

Apache Spark源码走读之12 -- Hive on Spark运行环境搭建

欢迎转载,转载请注明出处,徽沪一郎. 楔子 Hive是基于Hadoop的开源数据仓库工具,提供了类似于SQL的HiveQL语言,使得上层的数据分析人员不用知道太多MapReduce的知识就能对存储于Hdfs中的海量数据进行分析.由于这一特性而收到广泛的欢迎. Hive的整体框架中有一个重要的模块是执行模块,这一部分是用Hadoop中MapReduce计算框架来实现,因而在处理速度上不是非常令人满意.由于Spark出色的处理速度,有人已经成功将HiveQL的执行利用Spark来运行,这就是已经非常

Apache Spark源码走读之13 -- hiveql on spark实现详解

欢迎转载,转载请注明出处,徽沪一郎 概要 在新近发布的spark 1.0中新加了sql的模块,更为引人注意的是对hive中的hiveql也提供了良好的支持,作为一个源码分析控,了解一下spark是如何完成对hql的支持是一件非常有趣的事情. Hive简介 Hive的由来 以下部分摘自Hadoop definite guide中的Hive一章 "Hive由Facebook出品,其设计之初目的是让精通SQL技能的分析师能够对Facebook存放在HDFS上的大规模数据集进行分析和查询. Hive大大

Apache Spark源码走读之9 -- Spark源码编译

欢迎转载,转载请注明出处,徽沪一郎. 概要 本来源码编译没有什么可说的,对于java项目来说,只要会点maven或ant的简单命令,依葫芦画瓢,一下子就ok了.但到了Spark上面,事情似乎不这么简单,按照spark officical document上的来做,总会出现这样或那样的编译错误,让人懊恼不已. 今天闲来无事,又重试了一把,居然o了,做个记录,以备后用. 准备 我的编译机器上安装的Linux是archlinux,并安装后如下软件 scala 2.11 maven git 下载源码 第

Apache Spark源码走读之21 -- 浅谈mllib中线性回归的算法实现

欢迎转载,转载请注明出处,徽沪一郎. 概要 本文简要描述线性回归算法在Spark MLLib中的具体实现,涉及线性回归算法本身及线性回归并行处理的理论基础,然后对代码实现部分进行走读. 线性回归模型 机器学习算法是的主要目的是找到最能够对数据做出合理解释的模型,这个模型是假设函数,一步步的推导基本遵循这样的思路 假设函数 为了找到最好的假设函数,需要找到合理的评估标准,一般来说使用损失函数来做为评估标准 根据损失函数推出目标函数 现在问题转换成为如何找到目标函数的最优解,也就是目标函数的最优化

Apache Spark源码走读之14 -- Graphx实现剖析

欢迎转载,转载请注明出处,徽沪一郎. 概要 图的并行化处理一直是一个非常热门的话题,这里头的重点有两个,一是如何将图的算法并行化,二是找到一个合适的并行化处理框架.Spark作为一个非常优秀的并行处理框架,将一些并行化的算法移到其上面就成了一个很自然的事情. Graphx是一些图的常用算法在Spark上的并行化实现,同时提供了丰富的API接口.本文就Graphx的代码架构及pagerank在graphx中的具体实现做一个初步的学习. Google为什么赢得了搜索引擎大战 当Google还在起步的

Apache Spark源码走读之15 -- Standalone部署模式下的容错性分析

欢迎转载,转载请注明出处,徽沪一郎. 概要 本文就standalone部署方式下的容错性问题做比较细致的分析,主要回答standalone部署方式下的包含哪些主要节点,当某一类节点出现问题时,系统是如何处理的. Standalone部署的节点组成 介绍Spark的资料中对于RDD这个概念涉及的比较多,但对于RDD如何运行起来,如何对应到进程和线程的,着墨的不是很多. 在实际的生产环境中,Spark总是会以集群的方式进行运行的,其中standalone的部署方式是所有集群方式中最为精简的一种,另外

Apache Spark源码走读之16 -- spark repl实现详解

欢迎转载,转载请注明出处,徽沪一郎. 概要 之所以对spark shell的内部实现产生兴趣全部缘于好奇代码的编译加载过程,scala是需要编译才能执行的语言,但提供的scala repl可以实现代码的实时交互式执行,这是为什么呢? 既然scala已经提供了repl,为什么spark还要自己单独搞一套spark repl,这其中的缘由到底何在? 显然,这些都是问题,要解开这些谜团,只有再次开启一段源码分析之旅了. 全局视图 上图显示了java源文件从编译到加载执行的全局视图,整个过程中最主要的步

Apache Spark源码走读之7 -- Standalone部署方式分析

欢迎转载,转载请注明出处,徽沪一郎. 楔子 在Spark源码走读系列之2中曾经提到Spark能以Standalone的方式来运行cluster,但没有对Application的提交与具体运行流程做详细的分析,本文就这些问题做一个比较详细的分析,并且对在standalone模式下如何实现HA进行讲解. 没有HA的Standalone运行模式 先从比较简单的说起,所谓的没有ha是指master节点没有ha. 组成cluster的两大元素即Master和Worker.slave worker可以有1到