Akka源码分析-CircuitBreaker(熔断器)

  熔断器,在很多技术栈中都会出现的一种技术。它是在分布式系统中提供一个稳定的阻止嵌套失败的机制。

  该怎么理解呢?简单来说,在分布式环境中,如果某个计算节点出现问题,很容易出现失败的逆向传到或整个系统的雪崩。什么意思呢?比如某个服务按照顺序依次调用了其他的三个服务,分别为A/B/C。如果B服务由于某种原因,响应变慢了,本来100毫秒就完成了,现在是1秒。此时A就会等待B服务的时间也就变成了1秒,那么就意味着会有很多的A服务调用在等待,如果并发量非常大,很容易就会造成A服务所在的节点出现问题,也就是说,B的问题传递给了A。熔断器就是用来解决这个问题的。

  A服务和B服务之间有个熔断器,A通过熔断器调用B服务,熔断器会根据某种算法判断B服务是否正常,如果B不正常,则A调用的时候会立即失败,而不用再等待1秒的时间,同时也不会去调用B服务。这样节约了A服务判断失败的时间,也减少了B服务的压力。等B服务正常的时候,A就可以正常调用了。

  关于熔断器的原理和使用,这里不再啰嗦,读者可参考引用的第一遍博客。我这里只分析在akka中如何实现熔断器。

  按照惯例,我们还是从akka的官方demo入手分析。

class DangerousActor extends Actor with ActorLogging {
  import context.dispatcher

  val breaker =
    new CircuitBreaker(
      context.system.scheduler,
      maxFailures = 5,
      callTimeout = 10.seconds,
      resetTimeout = 1.minute).onOpen(notifyMeOnOpen())

  def notifyMeOnOpen(): Unit =
    log.warning("My CircuitBreaker is now open, and will not close for one minute")
  //#circuit-breaker-initialization

  //#circuit-breaker-usage
  def dangerousCall: String = "This really isn‘t that dangerous of a call after all"

  def receive = {
    case "is my middle name" ?
      breaker.withCircuitBreaker(Future(dangerousCall)) pipeTo sender()
    case "block for me" ?
      sender() ! breaker.withSyncCircuitBreaker(dangerousCall)
  }
  //#circuit-breaker-usage

  首先来看CircuitBreaker的创建过程,它有四个参数,我们着重分析后面三个:最大失败次数、调用超时时间、重置的超时时间。

/**
 * Provides circuit breaker functionality to provide stability when working with "dangerous" operations, e.g. calls to
 * remote systems
 *
 * Transitions through three states:
 * - In *Closed* state, calls pass through until the `maxFailures` count is reached.  This causes the circuit breaker
 * to open.  Both exceptions and calls exceeding `callTimeout` are considered failures.
 * - In *Open* state, calls fail-fast with an exception.  After `resetTimeout`, circuit breaker transitions to
 * half-open state.
 * - In *Half-Open* state, the first call will be allowed through, if it succeeds the circuit breaker will reset to
 * closed state.  If it fails, the circuit breaker will re-open to open state.  All calls beyond the first that
 * execute while the first is running will fail-fast with an exception.
 *
 * @param scheduler Reference to Akka scheduler
 * @param maxFailures Maximum number of failures before opening the circuit
 * @param callTimeout [[scala.concurrent.duration.FiniteDuration]] of time after which to consider a call a failure
 * @param resetTimeout [[scala.concurrent.duration.FiniteDuration]] of time after which to attempt to close the circuit
 * @param executor [[scala.concurrent.ExecutionContext]] used for execution of state transition listeners
 */
class CircuitBreaker(
  scheduler:                Scheduler,
  maxFailures:              Int,
  callTimeout:              FiniteDuration,
  val resetTimeout:         FiniteDuration,
  maxResetTimeout:          FiniteDuration,
  exponentialBackoffFactor: Double)(implicit executor: ExecutionContext) extends AbstractCircuitBreaker

  使用就比较简单了,就是把要执行的代码传给withCircuitBreaker或withSyncCircuitBreaker。

  /**
   * Wraps invocations of asynchronous calls that need to be protected
   *
   * @param body Call needing protected
   * @return [[scala.concurrent.Future]] containing the call result or a
   *   `scala.concurrent.TimeoutException` if the call timed out
   *
   */
  def withCircuitBreaker[T](body: ? Future[T]): Future[T] = currentState.invoke(body, CircuitBreaker.exceptionAsFailure)

  withSyncCircuitBreaker的参数是一个body,也就是函数的传名调用,你可以把它理解成一个函数指针吧。代码很简单,就是调用了currentState.invoke。那currentState是什么呢?还记得熔断器的三种状态么?开放、关闭、半开。很显然第一次调用的时候,应该是关闭状态,body的代码会被正常执行。

  /**
   * Helper method for accessing underlying state via Unsafe
   *
   * @return Reference to current state
   */
  @inline
  private[this] def currentState: State =
    Unsafe.instance.getObjectVolatile(this, AbstractCircuitBreaker.stateOffset).asInstanceOf[State]

  currentState的类型是State。

  State有三种实现:Closed、HalfOpen、Open。

   /**
     * Implementation of invoke, which simply attempts the call
     *
     * @param body Implementation of the call that needs protected
     * @return Future containing result of protected call
     */
    override def invoke[T](body: ? Future[T], defineFailureFn: Try[T] ? Boolean): Future[T] =
      callThrough(body, defineFailureFn)

 先来看Closed的invoke实现,很简单,调用callThrough,从名字来看应该就是直接调用body这个函数。

/**
     * Shared implementation of call across all states.  Thrown exception or execution of the call beyond the allowed
     * call timeout is counted as a failed call, otherwise a successful call
     *
     * @param body Implementation of the call
     * @param defineFailureFn function that define what should be consider failure and thus increase failure count
     * @return Future containing the result of the call
     */
    def callThrough[T](body: ? Future[T], defineFailureFn: Try[T] ? Boolean): Future[T] = {

      def materialize[U](value: ? Future[U]): Future[U] = try value catch { case NonFatal(t) ? Future.failed(t) }

      if (callTimeout == Duration.Zero) {
        val start = System.nanoTime()
        val f = materialize(body)

        f.onComplete {
          case s: Success[_] ?
            notifyCallSuccessListeners(start)
            callSucceeds()
          case Failure(ex) ?
            notifyCallFailureListeners(start)
            callFails()
        }

        f
      } else {
        val start = System.nanoTime()
        val p = Promise[T]()

        implicit val ec = sameThreadExecutionContext

        p.future.onComplete { fResult ?
          if (defineFailureFn(fResult)) {
            callFails()
          } else {
            notifyCallSuccessListeners(start)
            callSucceeds()
          }
        }

        val timeout = scheduler.scheduleOnce(callTimeout) {
          if (p tryFailure timeoutEx) {
            notifyCallTimeoutListeners(start)
          }
        }

        materialize(body).onComplete {
          case Success(result) ?
            p.trySuccess(result)
            timeout.cancel
          case Failure(ex) ?
            if (p.tryFailure(ex)) {
              notifyCallFailureListeners(start)
            }
            timeout.cancel
        }
        p.future
      }
    }

  callThrough分两种情况,一种是body有超时时间,另一种是没有超时时间(会一直等待body调用成功)。我们只分析设置了超时时间的情况,跟没有超时时间的实现也差不多,无非是多了一个promise。

  首先创建了一个Promise,然后设置了Promise的失败、成功的处理逻辑。失败调用callFails,成功调用callSucceeds。

  materialize这个函数比较简单,就是直接调用body方法,处理了非致命异常。body调用成功,则设置promise状态为成功;否则设置Promise为失败。那如果body一直执行,超时了呢?

  那就是scheduler.scheduleOnce这段代码发挥作用的时候了,它起了一个timer,到达时间后设置Promise为失败,然后调用notifyCallTimeoutListeners。但一定要注意,即使调用超时,body也会一直等待返回结果直至退出的。这跟future的机制有关。

  简单起见,我们首先来看执行成功且没有超时的情况。根据源码逻辑应该执行notifyCallSuccessListeners(start)、 callSucceeds()。notifyCallTimeoutListeners不再分析,这是用来回调开发者定义的函数的。

 /**
     * On successful call, the failure count is reset to 0
     *
     * @return
     */
    override def callSucceeds(): Unit = set(0)

  Closed的callSucceeds实现的是不是有点简单了,就是调用了set。set是干啥的呢?来看看Closed定义。

private object Closed extends AtomicInteger with State

  Closed不仅是一个State,还是一个AtomicInteger。简单来说它还是一个线程安全的计数器。当调用成功时,就会把当前的值设置成0,那如果失败呢,应该是++吧。

 /**
     * On failed call, the failure count is incremented.  The count is checked against the configured maxFailures, and
     * the breaker is tripped if we have reached maxFailures.
     *
     * @return
     */
    override def callFails(): Unit = if (incrementAndGet() == maxFailures) tripBreaker(Closed)

  来看看callFails,它就是把当前技术加1,然后判断是否到达最大失败次数,如果到达,则调用tripBreaker。

/**
   * Trips breaker to an open state.  This is valid from Closed or Half-Open states.
   *
   * @param fromState State we‘re coming from (Closed or Half-Open)
   */
  private def tripBreaker(fromState: State): Unit = transition(fromState, Open)
/**
   * Implements consistent transition between states. Throws IllegalStateException if an invalid transition is attempted.
   *
   * @param fromState State being transitioning from
   * @param toState State being transitioning from
   */
  private def transition(fromState: State, toState: State): Unit = {
    if (swapState(fromState, toState))
      toState.enter()
    // else some other thread already swapped state
  }

  其实就是到达最大失败次数后,转换当前状态,到达Open,然后调用enter方法。

 /**
     * On entering this state, schedule an attempted reset via [[akka.actor.Scheduler]] and store the entry time to
     * calculate remaining time before attempted reset.
     *
     * @return
     */
    override def _enter(): Unit = {
      set(System.nanoTime())
      scheduler.scheduleOnce(currentResetTimeout) {
        attemptReset()
      }
      val nextResetTimeout = currentResetTimeout * exponentialBackoffFactor match {
        case f: FiniteDuration ? f
        case _                 ? currentResetTimeout
      }

      if (nextResetTimeout < maxResetTimeout)
        swapResetTimeout(currentResetTimeout, nextResetTimeout)
    }

  上面是Open的enter实现,上来就调用set。set是啥?

 private object Open extends AtomicLong with State

  从定义来看,set是AtomicLong里面的方法。其实就是设置进入Open状态的时间。然后启动一个timer,设置重置的时间。那如果达到指定时间,attemptReset做了什么呢?

/**
   * Attempts to reset breaker by transitioning to a half-open state.  This is valid from an Open state only.
   *
   */
  private def attemptReset(): Unit = transition(Open, HalfOpen)

  额,其实就是到达了半开状态。我们来看看如果在开放状态,失败次数超过了指定次数,到达Open时,invoke的逻辑是怎样的。

/**
     * Fail-fast on any invocation
     *
     * @param body Implementation of the call that needs protected
     * @return Future containing result of protected call
     */
    override def invoke[T](body: ? Future[T], defineFailureFn: Try[T] ? Boolean): Future[T] = {
      notifyCallBreakerOpenListeners()
      Future.failed(new CircuitBreakerOpenException(remainingDuration()))
    }

  invoke逻辑有点简单啊,就是通知了listener,然后直接设置返回值为失败了。根本没有调用body的代码。也就是说在Open状态,所有的调用都会直接失败,一直到达重置时间转入HalfOpen状态未止。

/**
     * On entry, guard should be reset for that first call to get in
     *
     * @return
     */
    override def _enter(): Unit = set(true)

  进入半开状态逻辑还是调用set,只不过这里的值是true,那想必HalfOpen还是一个AtomicBoolean了。

/**
     * Allows a single call through, during which all other callers fail-fast.  If the call fails, the breaker reopens.
     * If the call succeeds the breaker closes.
     *
     * @param body Implementation of the call that needs protected
     * @return Future containing result of protected call
     */
    override def invoke[T](body: ? Future[T], defineFailureFn: Try[T] ? Boolean): Future[T] =
      if (compareAndSet(true, false))
        callThrough(body, defineFailureFn)
      else {
        notifyCallBreakerOpenListeners()
        Future.failed[T](new CircuitBreakerOpenException(0.seconds))
      }

  那我们来看看invoke。它首先判断当前值是不是true,如果是true则赋值为false,很显然只有在第一次调用的时候改值才是true。也就是说从Open状态到HalfOpen状态时,只有第一次invoke会执行callThrough,也就是会执行body,如果是第二次则会直接失败。

  我们知道callThrough在body执行成功时调用callSucceeds,失败时调用callFails。我们来看看HalfOpen这两个函数的实现。

    /**
     * Reset breaker on successful call.
     *
     * @return
     */
    override def callSucceeds(): Unit = resetBreaker()

    /**
     * Reopen breaker on failed call.
     *
     * @return
     */
    override def callFails(): Unit = tripBreaker(HalfOpen)
  /**
   * Resets breaker to a closed state.  This is valid from an Half-Open state only.
   *
   */
  private def resetBreaker(): Unit = transition(HalfOpen, Closed)

  首先来看成功的情况,根据源码调用逻辑,它就是把状态从HalfOpen转换到了Closed状态。Closed状态我们已经分析过了。那失败如何处理呢?

  /**
   * Trips breaker to an open state.  This is valid from Closed or Half-Open states.
   *
   * @param fromState State we‘re coming from (Closed or Half-Open)
   */
  private def tripBreaker(fromState: State): Unit = transition(fromState, Open)

  失败时又一次转到了Open状态,这个状态也分析过了。

  至此akka的熔断器逻辑分析完毕。有没有非常简单?简单来说就是,刚开始熔断器处于Closed状态,用户的逻辑会正常执行,如果失败次数和超时次数超过指定次数,就会进入Open状态;Open状态不会执行用户逻辑,会直接失败,等到指定时间后,进入HalfOpen状态;HalfOpen状态第一次调用成功,就进入Closed状态,如果调用失败则重新进入Open状态,再次等待指定时间。

  其实官网的一个图就可以解释清楚了。有人问withSyncCircuitBreaker跟withCircuitBreaker有啥区别,其实吧,区别不大,一个是同步的,一个是异步的。异步是通过Await.result来转化成同步的。

/**
   * Wraps invocations of synchronous calls that need to be protected
   *
   * Calls are run in caller‘s thread. Because of the synchronous nature of
   * this call the  `scala.concurrent.TimeoutException` will only be thrown
   * after the body has completed.
   *
   * Throws java.util.concurrent.TimeoutException if the call timed out.
   *
   * @param body Call needing protected
   * @param defineFailureFn function that define what should be consider failure and thus increase failure count
   * @return The result of the call
   */
  def withSyncCircuitBreaker[T](body: ? T, defineFailureFn: Try[T] ? Boolean): T =
    Await.result(
      withCircuitBreaker(
        try Future.successful(body) catch { case NonFatal(t) ? Future.failed(t) },
        defineFailureFn),
      callTimeout)

  上面代码可以从根本上解释两者的区别,我就不再过多分析了。

防雪崩利器:熔断器 Hystrix 的原理与使用

原文地址:https://www.cnblogs.com/gabry/p/9645367.html

时间: 2024-11-07 02:36:32

Akka源码分析-CircuitBreaker(熔断器)的相关文章

Akka源码分析-Cluster-ActorSystem

前面几篇博客,我们依次介绍了local和remote的一些内容,其实再分析cluster就会简单很多,后面关于cluster的源码分析,能够省略的地方,就不再贴源码而是一句话带过了,如果有不理解的地方,希望多翻翻之前的博客. 在使用cluster时,配置文件中的akka.actor.provider值是cluster,所以ActorSystem对应的provider就是akka.cluster.ClusterActorRefProvider. /** * INTERNAL API * * The

Akka源码分析-Cluster-Distributed Publish Subscribe in Cluster

在ClusterClient源码分析中,我们知道,他是依托于"Distributed Publish Subscribe in Cluster"来实现消息的转发的,那本文就来分析一下Pub/Sub是如何实现的. 还记得之前分析Cluster源码的文章吗?其实Cluster只是把集群内各个节点的信息通过gossip协议公布出来,并把节点的信息分发出来.但各个actor的地址还是需要开发者自行获取或设计的,比如我要跟worker通信,那就需要知道这个actor在哪个节点,通过actorPa

akka源码分析

看akka源码的一些体会,没有列出源码来.akka代码主要包括两块:底层分发(akka.dispatch包)和上层模型(akka.actor包),从底层线程调度(dispatch)往上看起 函数式语言主要处理表达式求值,面向对象语言主要处理对象间消息发送消息. 1. 底层线程调度 Doug Lea: ForkJoinTask ForkJoinTask是用少数线程执行海量独立任务的极好架构,这里的独立任务指的是任务和任务之间不要有共享数据,否则会有并发访问的问题. ForkJoinTask的实现包

Akka源码分析-Actor创建

上一篇博客我们介绍了ActorSystem的创建过程,下面我们就研究一下actor的创建过程. val system = ActorSystem("firstActorSystem",ConfigFactory.load()) val helloActor= system.actorOf(Props(new HelloActor),"HelloActor") helloActor ! "Hello" 普通情况下,我们一般使用ActorSystem

Akka源码分析-Actor&amp;ActorContext&amp;ActorRef&amp;ActorCell

分析源码的过程中我们发现,Akka出现了Actor.ActorRef.ActorCell.ActorContext等几个相似的概念,它们之间究竟有什么区别和联系呢? /** * Actor base trait that should be extended by or mixed to create an Actor with the semantics of the 'Actor Model': * <a href="http://en.wikipedia.org/wiki/Actor

Akka源码分析-Remote-发消息

上一篇博客我们介绍了remote模式下Actor的创建,其实与local的创建并没有太大区别,一般情况下还是使用LocalActorRef创建了Actor.那么发消息是否意味着也是相同的呢? 既然actorOf还是委托给了LocalActorRef,那么在本地创建的Actor发消息还是跟以前一样的,那么如果如何给远程的Actor发消息呢?我们一般是通过actorSelection或者给远程Actor发送一个Identify消息,来接收对应的ActorRef,然后再发消息.我们来分析一下这两者的区

Akka源码分析-Remote-收消息

上一遍博客中,我们分析了网络链接建立的过程,一旦建立就可以正常的收发消息了.发送消息的细节不再分析,因为对于本地的actor来说这个过程相对简单,它只是创立链接然后给指定的netty网路服务发送消息就好了.接收消息就比较麻烦了,因为这对于actor来说是透明的,netty收到消息后如何把消息分发给指定的actor呢?这个分发的过程值得研究研究. 之前分析过,在监听创立的过程中,有一个对象非常关键:TcpServerHandler.它负责链接建立.消息收发等功能.TcpServerHandler继

spark core源码分析6 Spark job的提交

本节主要讲解SparkContext的逻辑 首先看一个spark自带的最简单的例子: object SparkPi { def main(args: Array[String]) { val conf = new SparkConf().setAppName("Spark Pi") val spark = new SparkContext(conf) val slices = if (args.length > 0) args(0).toInt else 2 val n = ma

Spark源码分析之八:Task运行(二)

在<Spark源码分析之七:Task运行(一)>一文中,我们详细叙述了Task运行的整体流程,最终Task被传输到Executor上,启动一个对应的TaskRunner线程,并且在线程池中被调度执行.继而,我们对TaskRunner的run()方法进行了详细的分析,总结出了其内Task执行的三个主要步骤: Step1:Task及其运行时需要的辅助对象构造,主要包括: 1.当前线程设置上下文类加载器: 2.获取序列化器ser: 3.更新任务状态TaskState: 4.计算垃圾回收时间: 5.反