Spark Streaming数据限流简述

??Spark Streaming对实时数据流进行分析处理,源源不断的从数据源接收数据切割成一个个时间间隔进行处理;
??流处理与批处理有明显区别,批处理中的数据有明显的边界、数据规模已知;而流处理数据流并没有边界,也未知数据规模;
??由于流处理的数据流特征,使之数据流具有不可预测性,而且数据处理的速率还与硬件、网络等资源有关,在这种情况下如不对源源不断进来的数据流速率进行限制,那当Spark节点故障、网络故障或数据处理吞吐量下来时还有数据不断流进来,那将有可能将出现OOM进而导致Spark Streaming程序崩溃;
??在Spark Streaming中不同的数据源采用不同的限速策略,但无论是Socket数据源的限流策略还是Kafka数据源的限流策略其速率(rate)的计算都是使用PIDController算法进行计算而得来;
??下面从源码的角度分别介绍Socket数据源Kafka数据源的限流处理。

速率限制的计算与更新

??Spark Streaming的流处理其实是基于微批处理(MicroBatch)的,也就是说将数据流按某比较小的时间间隔将数据切割成为一段段微批数据进行处理;

??StreamingContext调用Start()启动的时候会将速率控制器(rateController)添加到StreamingListener监听器中;
??当每批次处理完成时将触发监听器(RateController),使用该批处理的处理结束时间、处理延迟时间、调度延迟时间、记录行数调用PIDRateEstimator传入PID算法中(PID Controller)计算出该批次的速率(rate)并更新速率限制(rateLimit)与发布该限制速率;

override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) {
val elements = batchCompleted.batchInfo.streamIdToInputInfo

for {
  processingEnd <- batchCompleted.batchInfo.processingEndTime
  workDelay <- batchCompleted.batchInfo.processingDelay
  waitDelay <- batchCompleted.batchInfo.schedulingDelay
  elems <- elements.get(streamUID).map(_.numRecords)
 } computeAndPublish(processingEnd, elems, workDelay, waitDelay)
}

private def computeAndPublish(time: Long, elems: Long, workDelay: Long, waitDelay: Long): Unit =
Future[Unit] {
  val newRate = rateEstimator.compute(time, elems, workDelay, waitDelay)
  newRate.foreach { s =>
    rateLimit.set(s.toLong)
    publish(getLatestRate())
  }
}

Socket数据源限流

??批次的限制速率上面已经算出,这里说的是接收Socket过来的数据时的数据限流;
??SocketInputStream类receive方法接收到数据后将数据存入 BlockGenerator的Buffer中,在写入Buffer前调用限流器 (RateLimiter)对写入数据进行限流;
??RateLimiter限流器使用了Google开源的 Guava中内置的RateLimiter限流器,该类只是对Guava限流器的简单封装;
??在Spark Streaming中可通过使用两个参数配置初始速率与最大速率spark.streaming.receiver.maxRate、spark.streaming.backpressure.initialRate;亦可配置PIDController算法相关的四个参数值;
??RateLimiter限流器是基于令牌桶的算法基本原理比较简单,以一个恒定的速率生成令牌放入令牌桶中,桶满则停止,处理请求时需要从令牌桶中取出令牌,当桶中无令牌可取时阻塞等待,此算法用于确保系统不被洪峰击垮。

  private lazy val rateLimiter = GuavaRateLimiter.create(getInitialRateLimit().toDouble)
/**
  * Push a single data item into the buffer.
 */
 def addData(data: Any): Unit = {
  if (state == Active) {
//调用限流器等待
  waitToPush()
  synchronized {
    if (state == Active) {
      currentBuffer += data
    } else {
      throw new SparkException(
        "Cannot add data as BlockGenerator has not been started or has been stopped")
    }
  }
} else {
  throw new SparkException(
    "Cannot add data as BlockGenerator has not been started or has been stopped")
 }
}

def waitToPush() {
   //限流器申请令牌
   rateLimiter.acquire()
}

Guava库中RateLimiter限流器基本使用:

//创建限流器,每秒产生令牌数1
    RateLimiter rateLimiter=RateLimiter.create(1);
    for (int i = 0; i < 10; i++) {
        //获得一个令牌,未申请到令牌则阻塞等待
        double waitTime = rateLimiter.acquire();
        System.out.println(String.format("id:%d time:%d waitTime:%f",i,System.currentTimeMillis(),waitTime));
    }

Kafka数据源限流的实现

??在Spark Streaming Kafka包拉取Kafka数据会进行如下动作:
??1、取Kafka中最新偏移量、分区
??2、通过rateController限制每个分区可拉取的最大消息数
??3、在DirectKafkaInputDStream中创建KafkaRDD,在其中调用相关对象拉取数据

??通过如上步骤也可用看出,只要限制了Kafka某个分区的偏移量(offset)范围也就可限制从Kafka拉取的消息数量,从而达到限流的目的,Spark streaming kafka也是通过此实现的;

计算每个分区速率限制,有如下步骤:
??1、通过seekToEnd获取最新可用偏移量与当前偏移量对比获得当前所有分区延迟偏移量
??单个分区偏移量延迟=最新偏移量记录-当前偏移量记录
??2、获取配置项中每个分区最大速率
(spark.streaming.kafka.maxRatePerPartition),背压率计算,计算每个分区背压率计算公式为:
??单个分区背压率=单个分区偏移量延迟/所有分区总延迟*速率限制
??速率限制(rateLimit):为通过PIDController动态计算得来

??如有配置每个分区最大速率则取配置项最大速率与背压率两者中的最小值未配置则取背压率作为每个分区速率限制;

??3、将批次间隔(batchDuration)*每个分区速率限制=每个分区最大消息数
??4、取当前分区偏移量+分区最大消息数 与 最新偏移量两者当中最小的,由此来控制拉取消息速率;

??如当前偏移量+分区最大消息数 大于 最新偏移量则取 最新偏移量否则取 当前偏移量+分区最大消息数作为拉取Kafka数据的Offset范围

// 限制每个分区最大消息数
protected def clamp(
offsets: Map[TopicPartition, Long]): Map[TopicPartition, Long] = {

maxMessagesPerPartition(offsets).map { mmp =>
  mmp.map { case (tp, messages) =>
      val uo = offsets(tp)
      tp -> Math.min(currentOffsets(tp) + messages, uo)
  }
}.getOrElse(offsets)
}

??不管是Kafka数据源还是Socket数据源Spark Streaming中都使用了PIDController算法用于计算其速率限制值,两者的差别也只是因为两种数据源的获取方式数据特征而决定的。Socket数据源使用了Guava RateLimiter、而Kafka数据源自己实现了基于Offsets的限流
??以上说介绍的框架版本为:Spark Streaming 版本为2.3.2与spark-streaming-kafka-0-10_2.11;

参考资料:
http://kafka.apache.org
http://spark.apache.org

文章首发地址:Solinx
https://mp.weixin.qq.com/s/yHStZgTAGBPoOMpj4e27Jg

原文地址:https://www.cnblogs.com/softlin/p/12215446.html

时间: 2024-08-29 15:34:25

Spark Streaming数据限流简述的相关文章

Spark Streaming:大规模流式数据处理的新贵(转)

原文链接:Spark Streaming:大规模流式数据处理的新贵 摘要:Spark Streaming是大规模流式数据处理的新贵,将流式计算分解成一系列短小的批处理作业.本文阐释了Spark Streaming的架构及编程模型,并结合实践对其核心技术进行了深入的剖析,给出了具体的应用场景及优化方案. 提到Spark Streaming,我们不得不说一下BDAS(Berkeley Data Analytics Stack),这个伯克利大学提出的关于数据分析的软件栈.从它的视角来看,目前的大数据处

Spark Streaming 数据接收过程

SparkStreaming 源码分析 一节中从源码角度,描述了Streaming执行时代码的调用过程.下边就接收转化阶段过程再简单分析一下,为分析backpressure作准备. SparkStreaming的全过程分为两个阶段:数据接收转化阶段和Job产生与执行阶段.两个阶段通过数据接收转化阶段产生的Block联系在一起.下图是依据对基于Recevier的数据接收源转化部分源码分析所做. 数据接收转化过程可以分为如下几个关键步骤: Receiver接收外部数据流,其将接收的数据流交由Bloc

Spark Streaming源码解读之流数据不断接收全生命周期彻底研究和思考

本期内容 : 数据接收架构设计模式 数据接收源码彻底研究 一.Spark Streaming数据接收设计模式   Spark Streaming接收数据也相似MVC架构: 1. Mode相当于Receiver存储数据,C级别的,Receiver是个抽象因为他有好多的Receiver 2. ReceiverSupervisor 是控制器,因为Receiver启动是靠ReceiverSuperior启动的,及接收到的数据交给ReceiverSuperior存储数据的 3. Driver会获得源数据,

Kafka:ZK+Kafka+Spark Streaming集群环境搭建(二十二)Spark Streaming接收流数据及使用窗口函数

官网文档:<http://spark.apache.org/docs/latest/streaming-programming-guide.html#a-quick-example> Spark Streaming官网的例子reduceByKeyAndWindow 简单的介绍了spark streaming接收socket流的数据,并把接收到的数据进行windows窗口函数对数据进行批量处理. import java.util.Arrays; import org.apache.spark.S

Spark发行版笔记10:Spark Streaming源码解读之流数据不断接收和全生命周期彻底研究和思考

本节的主要内容: 一.数据接受架构和设计模式 二.接受数据的源码解读 Spark Streaming不断持续的接收数据,具有Receiver的Spark 应用程序的考虑. Receiver和Driver在不同进程,Receiver接收数据后要不断给Deriver汇报. 因为Driver负责调度,Receiver接收的数据如果不汇报给Deriver,Deriver调度时不会把接收的数据计算入调度系统中(如:数据ID,Block分片). 思考Spark Streaming接收数据: 不断有循环器接收

从Storm和Spark Streaming学习流式实时分布式计算系统的设计要点

0. 背景 最近我在做流式实时分布式计算系统的架构设计,而正好又要参见CSDN博文大赛的决赛.本来想就写Spark源码分析的文章吧.但是又想毕竟是决赛,要拿出一些自己的干货出来,仅仅是源码分析貌似分量不够.因此,我将最近一直在做的系统架构的思路整理出来,形成此文.为什么要参考Storm和Spark,因为没有参照效果可能不会太好,尤其是对于Storm和Spark由了解的同学来说,可能通过对比,更能体会到每个具体实现背后的意义. 本文对流式系统出现的背景,特点,数据HA,服务HA,节点间和计算逻辑间

.Spark Streaming(上)--实时流计算Spark Streaming原理介

Spark入门实战系列--7.Spark Streaming(上)--实时流计算Spark Streaming原理介绍 http://www.cnblogs.com/shishanyuan/p/4747735.html 1.Spark Streaming简介 1.1 概述 Spark Streaming 是Spark核心API的一个扩展,可以实现高吞吐量的.具备容错机制的实时流数据的处理.支持从多种数据源获取数据,包括Kafk.Flume.Twitter.ZeroMQ.Kinesis 以及TCP

Spark入门实战系列--7.Spark Streaming(上)--实时流计算Spark Streaming介绍

[注]该系列文章以及使用到安装包/测试数据 可以在<倾情大奉送–Spark入门实战系列>获取 1 Spark Streaming简介 1.1 概述 Spark Streaming 是Spark核心API的一个扩展,可以实现高吞吐量的.具备容错机制的实时流数据的处理.支持从多种数据源获取数据,包括Kafk.Flume.Twitter.ZeroMQ.Kinesis 以及TCP sockets,从数据源获取数据之后,可以使用诸如map.reduce.join和window等高级函数进行复杂算法的处理

Spark Streaming源码解读之数据清理内幕彻底解密

本期内容 : Spark Streaming数据清理原理和现象 Spark Streaming数据清理代码解析 Spark Streaming一直在运行的,在计算的过程中会不断的产生RDD ,如每秒钟产生一个BachDuration同时也会产生RDD, 在这个过程中除了基本的RDD外还有累加器.广播变量等,对应Spark Streaming也有自己的对象.源数据及数据清理机制, 在运行中每个BachDuration会触发了Job ,由于会自动产生对象.数据及源数据等运行完成后肯定要自动进行回收