Spark Streaming Backpressure分析

---恢复内容开始---

1、为什么引入Backpressure

默认情况下,Spark Streaming通过Receiver以生产者生产数据的速率接收数据,计算过程中会出现batch processing time > batch interval的情况,其中batch processing time 为实际计算一个批次花费时间, batch interval为Streaming应用设置的批处理间隔。这意味着Spark Streaming的数据接收速率高于Spark从队列中移除数据的速率,也就是数据处理能力低,在设置间隔内不能完全处理当前接收速率接收的数据。如果这种情况持续过长的时间,会造成数据在内存中堆积,导致Receiver所在Executor内存溢出等问题(如果设置StorageLevel包含disk, 则内存存放不下的数据会溢写至disk, 加大延迟)。Spark 1.5以前版本,用户如果要限制Receiver的数据接收速率,可以通过设置静态配制参数“spark.streaming.receiver.maxRate”的值来实现,此举虽然可以通过限制接收速率,来适配当前的处理能力,防止内存溢出,但也会引入其它问题。比如:producer数据生产高于maxRate,当前集群处理能力也高于maxRate,这就会造成资源利用率下降等问题。为了更好的协调数据接收速率与资源处理能力,Spark Streaming 从v1.5开始引入反压机制(back-pressure),通过动态控制数据接收速率来适配集群数据处理能力。

2、Backpressure

Spark Streaming Backpressure:  根据JobScheduler反馈作业的执行信息来动态调整Receiver数据接收率。通过属性“spark.streaming.backpressure.enabled”来控制是否启用backpressure机制,默认值false,即不启用。

2.1 Streaming架构如下图所示(详见Streaming数据接收过程文档和Streaming 源码解析)

2.2 BackPressure执行过程如下图所示:

  在原架构的基础上加上一个新的组件RateController,这个组件负责监听“OnBatchCompleted”事件,然后从中抽取processingDelay 及schedulingDelay信息.  Estimator依据这些信息估算出最大处理速度(rate),最后由基于Receiver的Input Stream将rate通过ReceiverTracker与ReceiverSupervisorImpl转发给BlockGenerator(继承自RateLimiter).

3、BackPressure 源码解析

3.1 RateController类体系

RatenController 继承自StreamingListener. 用于处理BatchCompleted事件。核心代码为:

**
 * A StreamingListener that receives batch completion updates, and maintains
 * an estimate of the speed at which this stream should ingest messages,
 * given an estimate computation from a `RateEstimator`
 */
private[streaming] abstract class RateController(val streamUID: Int, rateEstimator: RateEstimator)
extends StreamingListener with Serializable {
……
……  /**
   * Compute the new rate limit and publish it asynchronously.
   */
  private def computeAndPublish(time: Long, elems: Long, workDelay: Long, waitDelay: Long): Unit =
    Future[Unit] {
      val newRate = rateEstimator.compute(time, elems, workDelay, waitDelay)
      newRate.foreach { s =>
        rateLimit.set(s.toLong)
        publish(getLatestRate())
      }
    }
  def getLatestRate(): Long = rateLimit.get()

  override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) {
    val elements = batchCompleted.batchInfo.streamIdToInputInfo
    for {
      processingEnd <- batchCompleted.batchInfo.processingEndTime
      workDelay <- batchCompleted.batchInfo.processingDelay
      waitDelay <- batchCompleted.batchInfo.schedulingDelay
      elems <- elements.get(streamUID).map(_.numRecords)
    } computeAndPublish(processingEnd, elems, workDelay, waitDelay)
  }
}

  

---恢复内容开始---

1、为什么引入Backpressure

默认情况下,Spark Streaming通过Receiver以生产者生产数据的速率接收数据,计算过程中会出现batch processing time > batch interval的情况,其中batch processing time 为实际计算一个批次花费时间, batch interval为Streaming应用设置的批处理间隔。这意味着Spark Streaming的数据接收速率高于Spark从队列中移除数据的速率,也就是数据处理能力低,在设置间隔内不能完全处理当前接收速率接收的数据。如果这种情况持续过长的时间,会造成数据在内存中堆积,导致Receiver所在Executor内存溢出等问题(如果设置StorageLevel包含disk, 则内存存放不下的数据会溢写至disk, 加大延迟)。Spark 1.5以前版本,用户如果要限制Receiver的数据接收速率,可以通过设置静态配制参数“spark.streaming.receiver.maxRate”的值来实现,此举虽然可以通过限制接收速率,来适配当前的处理能力,防止内存溢出,但也会引入其它问题。比如:producer数据生产高于maxRate,当前集群处理能力也高于maxRate,这就会造成资源利用率下降等问题。为了更好的协调数据接收速率与资源处理能力,Spark Streaming 从v1.5开始引入反压机制(back-pressure),通过动态控制数据接收速率来适配集群数据处理能力。

2、Backpressure

Spark Streaming Backpressure:  根据JobScheduler反馈作业的执行信息来动态调整Receiver数据接收率。通过属性“spark.streaming.backpressure.enabled”来控制是否启用backpressure机制,默认值false,即不启用。

2.1 Streaming架构如下图所示(详见Streaming数据接收过程文档和Streaming 源码解析)

2.2 BackPressure执行过程如下图所示:

  在原架构的基础上加上一个新的组件RateController,这个组件负责监听“OnBatchCompleted”事件,然后从中抽取processingDelay 及schedulingDelay信息.  Estimator依据这些信息估算出最大处理速度(rate),最后由基于Receiver的Input Stream将rate通过ReceiverTracker与ReceiverSupervisorImpl转发给BlockGenerator(继承自RateLimiter).

3、BackPressure 源码解析

3.1 RateController类体系

RatenController 继承自StreamingListener. 用于处理BatchCompleted事件。核心代码为:

**
 * A StreamingListener that receives batch completion updates, and maintains
 * an estimate of the speed at which this stream should ingest messages,
 * given an estimate computation from a `RateEstimator`
 */
private[streaming] abstract class RateController(val streamUID: Int, rateEstimator: RateEstimator)
extends StreamingListener with Serializable {
……
……  /**
   * Compute the new rate limit and publish it asynchronously.
   */
  private def computeAndPublish(time: Long, elems: Long, workDelay: Long, waitDelay: Long): Unit =
    Future[Unit] {
      val newRate = rateEstimator.compute(time, elems, workDelay, waitDelay)
      newRate.foreach { s =>
        rateLimit.set(s.toLong)
        publish(getLatestRate())
      }
    }
  def getLatestRate(): Long = rateLimit.get()

  override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) {
    val elements = batchCompleted.batchInfo.streamIdToInputInfo
    for {
      processingEnd <- batchCompleted.batchInfo.processingEndTime
      workDelay <- batchCompleted.batchInfo.processingDelay
      waitDelay <- batchCompleted.batchInfo.schedulingDelay
      elems <- elements.get(streamUID).map(_.numRecords)
    } computeAndPublish(processingEnd, elems, workDelay, waitDelay)
  }
}

3.2 RateController的注册

JobScheduler启动时会抽取在DStreamGraph中注册的所有InputDstream中的rateController,并向ListenerBus注册监听. 此部分代码如下:

时间: 2024-07-29 06:54:12

Spark Streaming Backpressure分析的相关文章

Spark源码系列(八)Spark Streaming实例分析

这一章要讲Spark Streaming,讲之前首先回顾下它的用法,具体用法请参照<Spark Streaming编程指南>. Example代码分析 val ssc = new StreamingContext(sparkConf, Seconds(1)); // 获得一个DStream负责连接 监听端口:地址 val lines = ssc.socketTextStream(serverIP, serverPort); // 对每一行数据执行Split操作 val words = line

&lt;Spark&gt;&lt;Spark Streaming&gt;&lt;作业分析&gt;

Intro 这篇是对一个Spark (Streaming)作业的log进行分析.用来加深对Spark application运行过程,优化空间的各种理解. Here to Start 从我这个初学者写得一个Spark Streaming程序开始... package com.wttttt.spark /** * Created with IntelliJ IDEA. * Description: * Author: wttttt * Github: https://github.com/wttt

Spark Streaming实践和优化

发表于:<程序员>杂志2016年2月刊.链接:http://geek.csdn.net/news/detail/54500 作者:徐鑫,董西成 在流式计算领域,Spark Streaming和Storm时下应用最广泛的两个计算引擎.其中,Spark Streaming是Spark生态系统中的重要组成部分,在实现上复用Spark计算引擎.如图1所示,Spark Streaming支持的数据源有很多,如Kafka.Flume.TCP等.Spark Streaming的内部数据表示形式为DStrea

Spark Streaming性能优化: 如何在生产环境下应对流数据峰值巨变

1.为什么引入Backpressure 默认情况下,Spark Streaming通过Receiver以生产者生产数据的速率接收数据,计算过程中会出现batch processing time > batch interval的情况,其中batch processing time 为实际计算一个批次花费时间, batch interval为Streaming应用设置的批处理间隔.这意味着Spark Streaming的数据接收速率高于Spark从队列中移除数据的速率,也就是数据处理能力低,在设置

Spark Streaming编程指南

本文基于Spark Streaming Programming Guide原文翻译, 加上一些自己的理解和小实验的结果. 一.概述 Spark Streaming是基于Core Spark API的可扩展,高吞吐量,并具有容错能力的用于处理实时数据流的一个组件.Spark Streaming可以接收各种数据源传递来的数据,比如Kafka, Flume, Kinesis或者TCP等,对接收到的数据还可以使用一些用高阶函数(比如map, reduce, join及window)进行封装的复杂算法做进

4. Spark Streaming解析

4.1 初始化StreamingContext import org.apache.spark._ import org.apache.spark.streaming._ val conf = new SparkConf().setAppName(appName).setMaster(master) val ssc = new StreamingContext(conf, Seconds(1)) // 可以通过 ssc.sparkContext 来访问 SparkContext // 或者通过已

Spark Streaming数据限流简述

??Spark Streaming对实时数据流进行分析处理,源源不断的从数据源接收数据切割成一个个时间间隔进行处理: ??流处理与批处理有明显区别,批处理中的数据有明显的边界.数据规模已知:而流处理数据流并没有边界,也未知数据规模: ??由于流处理的数据流特征,使之数据流具有不可预测性,而且数据处理的速率还与硬件.网络等资源有关,在这种情况下如不对源源不断进来的数据流速率进行限制,那当Spark节点故障.网络故障或数据处理吞吐量下来时还有数据不断流进来,那将有可能将出现OOM进而导致Spark

Spark Streaming源代码学习总结(一)

1.Spark Streaming 代码分析: 1.1 演示样例代码DEMO: 实时计算的WorldCount: import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.storage.StorageLevel object NetworkWordCount { def mai

Spark Streaming源码学习总结(一)

1.Spark Streaming 代码分析: 1.1 示例代码DEMO: 实时计算的WorldCount: import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.storage.StorageLevel object NetworkWordCount { def main(