第4课:Spark Streaming的Exactly-One的事务处理

Spark Streaming的事务处理和关系型数据库的事务的概念有所不同,关系型数据库事务关注的是语句级别的一致性,例如银行转账。而Spark Streaming的事务关注的是某次job执行的一致性。也就是如何保证Job在处理数据的过程中做到如下两点:

  • 不丢失数据
  • 不重复处理数据

SparkStreaming程序执行架构大致如下:

一、我们先来说说丢失数据的情况:

  1. Receiver接收到数据后,首先会在Executor级别上保存数据(根据StorageLevel的设置),例如socketTextStream的Receiver。在内存和磁盘上保留2份副本数据
def socketTextStream(
    hostname: String,
    port: Int,
    storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
  ): ReceiverInputDStream[String] = withNamedScope("socket text stream") {
  socketStream[String](hostname, port, SocketReceiver.bytesToLines, storageLevel)
}

如果StorageLevel设置的是只进行内存级别的存储,那么当程序崩溃后,即便对Driver进行了Checkpoint,然后重新启动程序。该部分数据也会丢失。因为Driver的Checkpoint并不对计算数据进行保存。

我们假设StorageLevel设置了磁盘级别的存储,也不能完全保证数据不被丢失,因为Receiver并不是接收一条数据写一次磁盘,而是按照数据块为单位写数据。然后将数据块的元数据信息发送给Driver,Driver的Checkpoint记录的数Block的元数据信息。当数据块写到一半的时候,或者是元数据还没有发送给Driver的时候,Executor崩溃了,数据也就丢失啦。

解决方案:为了减少这种情况的发送,可以在Receiver端引入WAL写机制,因为WAL写的频率要比数据块的频率高的多。这样,当Executor恢复的时候,可以读取WAL日志恢复数据块。

但是通过WAL方式会极大的损伤Spark Streaming中Receivers接受数据的性能;

WAL也不能完全的解决数据丢失的问题,就像Oracle一样,日志文件的写,也是先写到内存中,然后根据一定的触发条件再将数据写到磁盘。如果还没有来的及写WAL日志,此时数据也会有不一致的情况(数据已经接收,但是还没有写到WAL的这部分数据是恢复不出来的。)。

Spark Streaming 1.3的时候为了避免WAL的性能损失和实现Exactly Once而提供了Kafka Direct API,把Kafka作为文件存储系统!!!此时兼具有流的优势和文件系统的优势,至此,Spark Streaming+Kafka就构建了完美的流处理世界!!!所有的Executors通过Kafka API直接消费数据,直接管理Offset,所以也不会重复消费数据;事务实现啦!!!

2. Driver崩溃,此时Job正在处理的数据,包括Receiver已经接收到还未被处理的数据将全部丢失。

解决方案:对Driver进行Checkpoint,此处的Checkpoint和RDD的Checkpoint并不一样。

我们看看Checkpoint都包含哪些属性:

private[streaming]
class Checkpoint(ssc: StreamingContext, val checkpointTime: Time)
  extends Logging with Serializable {
  val master = ssc.sc.master
  val framework = ssc.sc.appName
  val jars = ssc.sc.jars
  val graph = ssc.graph
  val checkpointDir = ssc.checkpointDir
  val checkpointDuration = ssc.checkpointDuration
  val pendingTimes = ssc.scheduler.getPendingTimes().toArray
  val delaySeconds = MetadataCleaner.getDelaySeconds(ssc.conf)
  val sparkConfPairs = ssc.conf.getAll

其中graph是DStreamGraph的实例化,它里面包含了InputDStream

private val inputStreams = new ArrayBuffer[InputDStream[_]]()

我们以DirectKafkaInputDStream为例,其中包含了checkpointData

protected[streaming] override val checkpointData =
  new DirectKafkaInputDStreamCheckpointData

其中只是包含:

class DirectKafkaInputDStreamCheckpointData extends DStreamCheckpointData(this) {
  def batchForTime: mutable.HashMap[Time, Array[(String, Int, Long, Long)]] = {
    data.asInstanceOf[mutable.HashMap[Time, Array[OffsetRange.OffsetRangeTuple]]]
  }

就是每个batch 的唯一标识 time 对象,以及每个KafkaRDD对应的的Kafka偏移信息。

所以:

checkpoint 是非常高效的。没有涉及到实际数据的存储。一般大小只有几十K,因为只存了Kafka的偏移量等信息。

checkpoint 采用的是序列化机制,尤其是DStreamGraph的引入,里面包含了可能如ForeachRDD等,而ForeachRDD里面的函数应该也会被序列化。如果采用了CheckPoint机制,而你的程序包做了做了变更,恢复后可能会有一定的问题。

二、关于数据重复处理涉及两个方面:

  1. 数据被重复读取:在使用Kafka的情况下,Receiver收到数据且保存到了HDFS等持久化引擎但是没有来得及进行updateOffsets,此时Receiver崩溃后重新启动就会通过管理Kafka的ZooKeeper中元数据再次重复读取数据,但是此时SparkStreaming认为是成功的,但是Kafka认为是失败的(因为没有更新offset到ZooKeeper中),此时就会导致数据重新消费的情况。
  2. 数据输出多次重写

    为什么会有这个问题,因为Spark Streaming在计算的时候基于Spark Core,Spark Core天生会做以下事情导致Spark Streaming的部分结果重复输出(例如数据输出后,该Task的后续程序发生错误,而任务发生错误,Spark Core会进入如下程序):

    Task重试;慢任务推测(两个相同任务可能会同时执行),Stage重复;Job重试;

具体解决方案:

设置spark.task.maxFailures次数为1;

设置spark.speculation为关闭状态(因为慢任务推测其实非常消耗性能,所以关闭后可以显著提高Spark Streaming处理性能)

Spark Streaming on Kafka的话,Job失败后可以设置auto.offset.reset为“largest”的方式;

Exactly Once的事务处理必须满足:

  1. Receiver数据零丢失:必须有可靠的数据来源和可靠的Receiver,且通过WAL来保证数据安全。
  2. 整个应用程序的metadata必须进行checkpoint;

最后再次强调可以通过transform和foreachRDD基于业务逻辑代码进行逻辑控制来实现数据不重复消费和输出不重复!这两个方式类似于Spark Streaming的后门,可以做任意想象的控制操作!

备注:

1、DT大数据梦工厂微信公众号DT_Spark 
2、IMF晚8点大数据实战YY直播频道号:68917580
3、新浪微博: http://www.weibo.com/ilovepains

时间: 2024-12-19 14:54:25

第4课:Spark Streaming的Exactly-One的事务处理的相关文章

第82课 Spark Streaming第一课 案例动手实战并在电光石火间理解其工作原理

本课内容提要: (1)什么是流处理以及Spark Streaming主要介绍 (2)Spark Streaming初体验 一.什么是流处理以及Spark Streaming主要介绍 流(Streaming),在大数据时代为数据流处理,就像水流一样,是数据流:既然是数据流处理,就会想到数据的流入.数据的加工.数据的流出. 日常工作.生活中数据来源很多不同的地方.例如:工业时代的汽车制造.监控设备.工业设备会产生很多源数据:信息时代的电商网站.日志服务器.社交网络.金融交易系统.黑客攻击.垃圾邮件.

第4课:Spark Streaming的Exactly Once的事务处理

本期内容: Exactly once 输出不重复 Exactly once 1,事务一定会被处理,且只被处理一次: 2,输出能够输出且只会被输出. Receiver:数据通过BlockManager写入内存+磁盘或者通过WAL来保证数据的安全性. WAL机制:写数据时先通过WAL写入文件系统然后存储的Executor(存储在内存和磁盘中,由StorageLevel设定),假设前面没有写成功后面一定不会存储在Executor,如不存在Executor中的话,汇报Driver数据一定不被处理.WAL

(版本定制)第6课:Spark Streaming源码解读之Job动态生成和深度思考

本期内容: 1.Spark Streaming Job生成深度思考 2.Spark Streaming Job生成源码解析 本节课主要是针对Job如何产生进行阐述 在Spark Streaming里,总体负责动态作业调度的具体类是JobScheduler: /** * This class schedules jobs to be run on Spark. It uses the JobGenerator to generate * the jobs and runs them using a

第10课:Spark Streaming源码解读之流数据不断接收全生命周期彻底研究和思考

上一课我们讲解了Receiver启动的流程.Receiver是通过ReceiverSupervisor的start方法启动的: /** Start the supervisor */ def start() {   onStart()   startReceiver() } 首先会调用ReceiverSupervisor的onStart()方法, override protected def onStart() {   registeredBlockGenerators.foreach { _.

第89课:Spark Streaming on Kafka解析和安装实战

本课分2部分讲解: 第一部分,讲解Kafka的概念.架构和用例场景: 第二部分,讲解Kafka的安装和实战. 由于时间关系,今天的课程只讲到如何用官网的例子验证Kafka的安装是否成功.后续课程会接着讲解如何集成Spark Streaming和Kafka. 一.Kafka的概念.架构和用例场景 http://kafka.apache.org/documentation.html#introdution 1.Kafka的概念 Apache Kafka是分布式发布-订阅消息系统.它最初由Linked

第88课:Spark Streaming从Flume Pull数据案例实战及内幕源码解密

本节课分成二部分讲解: 一.Spark Streaming on Pulling from Flume实战 二.Spark Streaming on Pulling from Flume源码解析 先简单介绍下Flume的两种模式:推模式(Flume push to Spark Streaming)和 拉模式(Spark Streaming pull from Flume ) 采用推模式:推模式的理解就是Flume作为缓存,存有数据.监听对应端口,如果服务可以连接,就将数据push过去.(简单,耦

第99课:使用Spark Streaming+Kafka实战对论坛网站动态行为的多维度分析及java.lang.NoClassDefFoundError问题解决完整内幕版本解密

第99课:使用Spark Streaming 实战对论坛网站动态行为的多维度分析 /* 王家林老师授课http://weibo.com/ilovepains  每天晚上20:00YY频道现场授课频道68917580*/ /** * *第99课:使用Spark Streaming 实战对论坛网站动态行为的多维度分析 * 论坛数据自动生成代码,该生成的数据会作为Producer的方式发送给Kafka,然后SparkStreaming程序会从 * Kafka中在线Pull到论坛或者网站的用户在线行为信

第16课:Spark Streaming源码解读之数据清理内幕彻底解密

本期内容: Spark Streaming数据清理原因和现象 Spark Streaming数据清理代码解析 对Spark Streaming解析了这么多课之后,我们越来越能感知,Spark Streaming只是基于Spark Core的一个应用程序,因此掌握Spark Streaming对于我们怎么编写Spark应用是绝对有好处的. Spark Streaming 不像Spark Core的应用程序,Spark Core的应用的数据是存储在底层文件系统,如HDFS等别的存储系统中,而Spar

(版本定制)第1课:Spark Streaming另类在线实验及Spark Streaming本质理解

本节课内容: 1.Spark Streaming另类在线实验解析 2.Spark Streaming本质理解 Spark Streaming是Spark Core上的一个子框架,如果我们能够完全精通这个子框架,我们就能够更好的驾驭Spark.Spark Streaming和Spark SQL是目前最流行的框架,从研究角度而言,Spark SQL有太多涉及到SQL优化的问题,不太适合用来深入研究.而Spark Streaming和其他的框架不同,它更像是Spark Core的一个应用程序.如果我们

第88课:Spark Streaming从Flume Poll数据案例实战和内幕源码解密

本节课分成二部分讲解: 一.Spark Streaming on Polling from Flume实战 二.Spark Streaming on Polling from Flume源码 第一部分: 推模式(Flume push SparkStreaming) VS 拉模式(SparkStreaming poll Flume) 采用推模式:推模式的理解就是Flume作为缓存,存有数据.监听对应端口,如果服务可以链接,就将数据push过去.(简单,耦合要低),缺点是SparkStreaming