Spark Streaming性能优化系列-如何获得和持续使用足够的集群计算资源?

一:数据峰值的巨大影响

1. 数据确实不稳定,例如晚上的时候访问流量特别大

2. 在处理的时候例如GC的时候耽误时间会产生delay延迟

二:Backpressure:数据的反压机制

基本思想:根据上一次计算的Job的一些信息评估来决定下一个Job数据接收的速度。

如何限制Spark接收数据的速度?

Spark Streaming在接收数据的时候必须把当前的数据接收完毕才能接收下一条数据。

源码解析

RateController:

1. RateController是监听器,继承自StreamingListener.

/**
 * A StreamingListener that receives batch completion updates, and maintains
 * an estimate of the speed at which this stream should ingest messages,
 * given an estimate computation from a `RateEstimator`
 */
private[streaming] abstract class RateController(val streamUID: Int, rateEstimator: RateEstimator)
    extends StreamingListener with Serializable {

问题来了,RateContoller什么时候被调用的呢?

BackPressure是根据上一次计算的Job信息来评估下一个Job数据接收的速度。因此肯定是在JobScheduler中被调用的。

1. 在JobScheduler的start方法中rateController方法是从inputStream中获取的。

// attach rate controllers of input streams to receive batch completion updates
for {
  inputDStream <- ssc.graph.getInputStreams
  rateController <- inputDStream.rateController
} ssc.addStreamingListener(rateController)
2.  然后将此消息加入到listenerBus中。
/** Add a [[org.apache.spark.streaming.scheduler.StreamingListener]] object for
  * receiving system events related to streaming.
  */
def addStreamingListener(streamingListener: StreamingListener) {
  scheduler.listenerBus.addListener(streamingListener)
}

}

3. 在StreamingListenerBus源码如下:

/** Asynchronously passes StreamingListenerEvents to registered StreamingListeners. */
private[spark] class StreamingListenerBus
  extends AsynchronousListenerBus[StreamingListener, StreamingListenerEvent]("StreamingListenerBus")
  with Logging {

  private val logDroppedEvent = new AtomicBoolean(false)

  override def onPostEvent(listener: StreamingListener, event: StreamingListenerEvent): Unit = {
    event match {
      case receiverStarted: StreamingListenerReceiverStarted =>
        listener.onReceiverStarted(receiverStarted)
      case receiverError: StreamingListenerReceiverError =>
        listener.onReceiverError(receiverError)
      case receiverStopped: StreamingListenerReceiverStopped =>
        listener.onReceiverStopped(receiverStopped)
      case batchSubmitted: StreamingListenerBatchSubmitted =>
        listener.onBatchSubmitted(batchSubmitted)
      case batchStarted: StreamingListenerBatchStarted =>
        listener.onBatchStarted(batchStarted)
      case batchCompleted: StreamingListenerBatchCompleted =>
        listener.onBatchCompleted(batchCompleted)
4.  在RateController就实现了onBatchCompleted

5. RateController中onBatchCompleted具体实现如下:

override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) {
  val elements = batchCompleted.batchInfo.streamIdToInputInfo

  for {
    processingEnd <- batchCompleted.batchInfo.processingEndTime
    workDelay <- batchCompleted.batchInfo.processingDelay
    waitDelay <- batchCompleted.batchInfo.schedulingDelay
    elems <- elements.get(streamUID).map(_.numRecords)
  } computeAndPublish(processingEnd, elems, workDelay, waitDelay)
}
6.  RateController中computeAndPulish源码如下:
/**
 * Compute the new rate limit and publish it asynchronously.
 */
private def computeAndPublish(time: Long, elems: Long, workDelay: Long, waitDelay: Long): Unit =
  Future[Unit] {
//评估新的更加合适Rate速度。
    val newRate = rateEstimator.compute(time, elems, workDelay, waitDelay)
    newRate.foreach { s =>
      rateLimit.set(s.toLong)
      publish(getLatestRate())
    }
  }
7.  其中publish实现是在ReceiverRateController中。

8. 将pulish消息给ReceiverTracker.

/**
 * A RateController that sends the new rate to receivers, via the receiver tracker.
 */
private[streaming] class ReceiverRateController(id: Int, estimator: RateEstimator)
    extends RateController(id, estimator) {
  override def publish(rate: Long): Unit =
//因为会有很多RateController所以会有具体Id
    ssc.scheduler.receiverTracker.sendRateUpdate(id, rate)
}
9.  在ReceiverTracker中sendRateUpdate源码如下:
此时的endpoint是ReceiverTrackerEndpoint.
/** Update a receiver‘s maximum ingestion rate */
def sendRateUpdate(streamUID: Int, newRate: Long): Unit = synchronized {
  if (isTrackerStarted) {
    endpoint.send(UpdateReceiverRateLimit(streamUID, newRate))
  }
}
10. 在ReceiverTrackerEndpoint的receive方法中就接收到了发来的消息。
case UpdateReceiverRateLimit(streamUID, newRate) =>
//根据receiverTrackingInfos获取info信息,然后根据endpoint获取通信句柄。
//此时endpoint是ReceiverSupervisor的endpoint通信实体。
  for (info <- receiverTrackingInfos.get(streamUID); eP <- info.endpoint) {
    eP.send(UpdateRateLimit(newRate))
  }
11. 因此在ReceiverSupervisorImpl中接收到ReceiverTracker发来的消息。
/** RpcEndpointRef for receiving messages from the ReceiverTracker in the driver */
private val endpoint = env.rpcEnv.setupEndpoint(
  "Receiver-" + streamId + "-" + System.currentTimeMillis(), new ThreadSafeRpcEndpoint {
    override val rpcEnv: RpcEnv = env.rpcEnv

    override def receive: PartialFunction[Any, Unit] = {
      case StopReceiver =>
        logInfo("Received stop signal")
        ReceiverSupervisorImpl.this.stop("Stopped by driver", None)
      case CleanupOldBlocks(threshTime) =>
        logDebug("Received delete old batch signal")
        cleanupOldBlocks(threshTime)
      case UpdateRateLimit(eps) =>
        logInfo(s"Received a new rate limit: $eps.")
        registeredBlockGenerators.foreach { bg =>
          bg.updateRate(eps)
        }
    }
  })
12. RateLimiter中updateRate源码如下:
/**
 * Set the rate limit to `newRate`. The new rate will not exceed the maximum rate configured by
//这里有最大限制,因为你的集群处理规模是有限的。
//Spark Streaming可能运行在YARN之上,因为多个计算框架都在运行的话,资源就//更有限了。
 * {{{spark.streaming.receiver.maxRate}}}, even if `newRate` is higher than that.
 *
 * @param newRate A new rate in events per second. It has no effect if it‘s 0 or negative.
 */
private[receiver] def updateRate(newRate: Long): Unit =
  if (newRate > 0) {
    if (maxRateLimit > 0) {
      rateLimiter.setRate(newRate.min(maxRateLimit))
    } else {
      rateLimiter.setRate(newRate)
    }
  }

总体流程图如下:

总结:

每次上一个Batch Duration的Job执行完成之后,都会返回JobCompleted等信息,基于这些信息产生一个新的Rate,然后将新的Rate通过远程通信交给了Executor中,而Executor也会根据Rate重新设置Rate大小。

时间: 2024-10-27 08:13:25

Spark Streaming性能优化系列-如何获得和持续使用足够的集群计算资源?的相关文章

Spark Streaming性能优化: 如何在生产环境下应对流数据峰值巨变

1.为什么引入Backpressure 默认情况下,Spark Streaming通过Receiver以生产者生产数据的速率接收数据,计算过程中会出现batch processing time > batch interval的情况,其中batch processing time 为实际计算一个批次花费时间, batch interval为Streaming应用设置的批处理间隔.这意味着Spark Streaming的数据接收速率高于Spark从队列中移除数据的速率,也就是数据处理能力低,在设置

Android应用性能优化系列视图篇——隐藏在资源图片中的内存杀手

图片加载性能优化永远是Android领域中一个无法绕过的话题,经过数年的发展,涌现了很多成熟的图片加载开源库,比如Fresco.Picasso.UIL等等,使得图片加载不再是一个头疼的问题,并且大幅降低了OOM发生的概率.然而,在图片加载方面我们是否可以就此放松警惕了呢? 开源图片加载库能为我们解决绝大部分有关图片的问题,然而并不是所有! 首先,图片从来源上可以分成三大类:网络图片.手机图片.APK资源图片.网络图片和手机图片都在图片加载库功能的覆盖范围内,基本上不用开发者太操心,但是APK资源

Android性能优化系列之apk瘦身

Android性能优化系列之布局优化 Android性能优化系列之内存优化 为什么APK要瘦身.APK越大,在下载安装过程中,他们耗费的流量会越多,安装等待时间也会越长:对于产品本身,意味着下载转化率会越低(因为竞品中,用户有更多机会选择那个体验最好,功能最多,性能最好,包最小的),所以apk的瘦身优化也很重要,本篇博客将讲述apk瘦身的相关内容. 包体分析 在Android Studio工具栏里,打开build–>Analyze APK, 选择要分析的APK包 可以看到占用空间的主要是代码.图

PLSQL_性能优化系列16_Oracle DataScan数据扫描

对数据的读取操作是非常消耗资源的,如何减少对数据的扫描,是提升sql效率的一个重要方面,例如物化视图技术.本篇介绍几种sql写法,分别是CASE expression/DML with returning clause /multitable insert.[@[email protected]] 一. 用CASE EXPRESSION将多句查询组合在一起SELECT COUNT (*)FROM employeesWHERE salary < 2000;SELECT COUNT (*)FROM

Spark Streaming性能调优详解(转)

原文链接:Spark Streaming性能调优详解 Spark Streaming提供了高效便捷的流式处理模式,但是在有些场景下,使用默认的配置达不到最优,甚至无法实时处理来自外部的数据,这时候我们就需要对默认的配置进行相关的修改.由于现实中场景和数据量不一样,所以我们无法设置一些通用的配置(要不然Spark Streaming开发者就不会弄那么多参数,直接写死不得了),我们需要根据数据量,场景的不同设置不一样的配置,这里只是给出建议,这些调优不一定试用于你的程序,一个好的配置是需要慢慢地尝试

[Android 性能优化系列]内存之终极篇--降低你的内存消耗

大家如果喜欢我的博客,请关注一下我的微博,请点击这里(http://weibo.com/kifile),谢谢 转载请标明出处(http://blog.csdn.net/kifile),再次感谢 原文地址:http://developer.android.com/training/articles/memory.html 在接下来的一段时间里,我会每天翻译一部分关于性能提升的Android官方文档给大家 建议大家在看本文之前先去我的博客看看 [Android 性能优化系列]内存之基础篇--Andr

[Android 性能优化系列]布局篇之减少你的界面层级

大家如果喜欢我的博客,请关注一下我的微博,请点击这里(http://weibo.com/kifile),谢谢 转载请标明出处(http://blog.csdn.net/kifile),再次感谢 原文地址:http://developer.android.com/training/improving-layouts/optimizing-layout.html 在接下来的一段时间里,我会每天翻译一部分关于性能提升的Android官方文档给大家 性能优化之布局篇: [Android 性能优化系列]布

[Android 性能优化系列]布局篇之通过&lt;include&gt;复用布局

大家如果喜欢我的博客,请关注一下我的微博,请点击这里(http://weibo.com/kifile),谢谢 转载请标明出处(http://blog.csdn.net/kifile),再次感谢 原文地址:http://developer.android.com/training/improving-layouts/reusing-layouts.html 在接下来的一段时间里,我会每天翻译一部分关于性能提升的Android官方文档给大家 性能优化布局篇: [Android 性能优化系列]布局篇之

[Android 性能优化系列]内存之基础篇--Android怎样管理内存

大家假设喜欢我的博客,请关注一下我的微博,请点击这里(http://weibo.com/kifile),谢谢 转载请标明出处(http://blog.csdn.net/kifile),再次感谢 原文地址:http://developer.android.com/training/articles/memory.html 在接下来的一段时间里,我会每天翻译一部分关于性能提升的Android官方文档给大家 以下是本次的正文: ################ 随机訪问存储器(Ram) 无论在哪种软件