Akka(3): Actor监管 - 细述BackoffSupervisor

在上一篇讨论中我们谈到了监管:在Akka中就是一种直属父子监管树结构,父级Actor负责处理直属子级Actor产生的异常。当时我们把BackoffSupervisor作为父子监管方式的其中一种。实际上BackoffSupervisor与定义了supervisorStrategy的Actor有所不同。我们应该把BackoffSupervisor看作是一个一体化的Actor。当然,它的实现方式还是由一对父子Actor组成。监管策略(SupervisorStrategy)是在BackoffSupervisor的内部实现的。从外表上BackoffSupervisor就像是一个Actor,运算逻辑是在子级Actor中定义的,所谓的父级Actor除监管之外没有任何其它功能,我们甚至没有地方定义父级Actor的功能,它的唯一功能是转发收到的信息给子级,是嵌入BackoffSupervisor里的。所以我们虽然发送消息给BackoffSupervisor,但实际上是在与它的子级交流。我们看看下面这个例子:

package backoffSupervisorDemo
import akka.actor._
import akka.pattern._
import backoffSupervisorDemo.InnerChild.TestMessage

import scala.concurrent.duration._

object InnerChild {
  case class TestMessage(msg: String)
  class ChildException extends Exception

  def props = Props[InnerChild]
}
class InnerChild extends Actor with ActorLogging {
  import InnerChild._
  override def receive: Receive = {
    case TestMessage(msg) => //模拟子级功能
      log.info(s"Child received message: ${msg}")
  }
}
object Supervisor {
  def props: Props = { //在这里定义了监管策略和child Actor构建
    def decider: PartialFunction[Throwable, SupervisorStrategy.Directive] = {
      case _: InnerChild.ChildException => SupervisorStrategy.Restart
    }

    val options = Backoff.onFailure(InnerChild.props, "innerChild", 1 second, 5 seconds, 0.0)
      .withManualReset
      .withSupervisorStrategy(
        OneForOneStrategy(maxNrOfRetries = 5, withinTimeRange = 5 seconds)(
          decider.orElse(SupervisorStrategy.defaultDecider)
        )
      )
    BackoffSupervisor.props(options)
  }
}
//注意:下面是Supervisor的父级,不是InnerChild的父级
object ParentalActor {
  case class SendToSupervisor(msg: InnerChild.TestMessage)
  case class SendToInnerChild(msg: InnerChild.TestMessage)
  case class SendToChildSelection(msg: InnerChild.TestMessage)
  def props = Props[ParentalActor]
}
class ParentalActor extends Actor with ActorLogging {
  import ParentalActor._
  //在这里构建子级Actor supervisor
  val supervisor = context.actorOf(Supervisor.props,"supervisor")
  supervisor ! BackoffSupervisor.getCurrentChild //要求supervisor返回当前子级Actor
  var innerChild: Option[ActorRef] = None   //返回的当前子级ActorRef
  val selectedChild = context.actorSelection("/user/parent/supervisor/innerChild")
  override def receive: Receive = {
    case BackoffSupervisor.CurrentChild(ref) =>   //收到子级Actor信息
      innerChild = ref
    case SendToSupervisor(msg) => supervisor ! msg
    case SendToChildSelection(msg) => selectedChild ! msg
    case SendToInnerChild(msg) => innerChild foreach(child => child ! msg)
  }

}
object BackoffSupervisorDemo extends App {
  import ParentalActor._
  val testSystem = ActorSystem("testSystem")
  val parent = testSystem.actorOf(ParentalActor.props,"parent")

  Thread.sleep(1000)   //wait for BackoffSupervisor.CurrentChild(ref) received

  parent ! SendToSupervisor(TestMessage("Hello message 1 to supervisor"))
  parent ! SendToInnerChild(TestMessage("Hello message 2 to innerChild"))
  parent ! SendToChildSelection(TestMessage("Hello message 3 to selectedChild"))

  scala.io.StdIn.readLine()

  testSystem.terminate()

}

在上面的例子里我们分别向supervisor,innerChild,selectedChild发送消息。但所有消息都是由InnerChild响应的,如下:

[INFO] [05/29/2017 16:11:48.167] [testSystem-akka.actor.default-dispatcher-2] [akka://testSystem/user/parent/supervisor/innerChild] Child received message: Hello message 1 to supervisor
[INFO] [05/29/2017 16:11:48.177] [testSystem-akka.actor.default-dispatcher-2] [akka://testSystem/user/parent/supervisor/innerChild] Child received message: Hello message 2 to innerChild
[INFO] [05/29/2017 16:11:48.179] [testSystem-akka.actor.default-dispatcher-3] [akka://testSystem/user/parent/supervisor/innerChild] Child received message: Hello message 3 to selectedChild

上面我们向supervisor发送了一个BackoffSupervisor.GetCurrentChild消息用来获取子级Actor。BackoffSupervisor是这样处理下面几个特殊消息的:

private[akka] trait HandleBackoff { this: Actor ?
  def childProps: Props
  def childName: String
  def reset: BackoffReset

  var child: Option[ActorRef] = None
  var restartCount = 0

  import BackoffSupervisor._
  import context.dispatcher

  override def preStart(): Unit = startChild()

  def startChild(): Unit = {
    if (child.isEmpty) {
      child = Some(context.watch(context.actorOf(childProps, childName)))
    }
  }

  def handleBackoff: Receive = {
    case StartChild ?
      startChild()
      reset match {
        case AutoReset(resetBackoff) ?
          val _ = context.system.scheduler.scheduleOnce(resetBackoff, self, ResetRestartCount(restartCount))
        case _ ? // ignore
      }

    case Reset ?
      reset match {
        case ManualReset ? restartCount = 0
        case msg         ? unhandled(msg)
      }

    case ResetRestartCount(current) ?
      if (current == restartCount) {
        restartCount = 0
      }

    case GetRestartCount ?
      sender() ! RestartCount(restartCount)

    case GetCurrentChild ?
      sender() ! CurrentChild(child)

    case msg if child.contains(sender()) ?
      // use the BackoffSupervisor as sender
      context.parent ! msg

    case msg ? child match {
      case Some(c) ? c.forward(msg)
      case None    ? context.system.deadLetters.forward(msg)
    }
  }
}

在handleBackoff函数里可以找到这些消息的处理方式。

在构建上面例子里的Supervisor的Props时定义了监管策略(SupervisorStrategy)对InnerChild产生的异常ChildException进行Restart处理。我们调整一下InnerChild代码来随机产生一些异常:

object InnerChild {
  case class TestMessage(msg: String)
  class ChildException(val errmsg: TestMessage) extends Exception
  object CException {  //for pattern match of class with parameter
    def apply(msg: TestMessage) = new ChildException(msg)
    def unapply(cex: ChildException) = Some(cex.errmsg)
  }
  def props = Props[InnerChild]
}
class InnerChild extends Actor with ActorLogging {
  import InnerChild._
  context.parent ! BackoffSupervisor.Reset  //reset backoff counts
  override def receive: Receive = {
    case TestMessage(msg) => //模拟子级功能
      if (Random.nextBoolean())   //任意产生异常
        throw new ChildException(TestMessage(msg))
      else
        log.info(s"Child received message: ${msg}")
  }
}

我们用Random.nextBoolean来任意产生一些异常。注意:我们同时把ChildException改成了一个带参数的class,因为我们可能需要在重启之前获取造成异常的消息,如下:

    def decider: PartialFunction[Throwable, SupervisorStrategy.Directive] = {
      case InnerChild.CException(tmsg) =>
        println(s"Message causing exception: ${tmsg.msg}") //we can extract message here
        SupervisorStrategy.Restart
    }

所有信息发给supervisor就行了:

class ParentalActor extends Actor with ActorLogging {
  import ParentalActor._
  //在这里构建子级Actor supervisor
  val supervisor = context.actorOf(Supervisor.props,"supervisor")
  override def receive: Receive = {
    case [email protected] _ => supervisor ! msg
  }

}
object BackoffSupervisorDemo extends App {
  import ParentalActor._
  import InnerChild._
  val testSystem = ActorSystem("testSystem")
  val parent = testSystem.actorOf(ParentalActor.props,"parent")

  parent ! TestMessage("Hello message 1 to supervisor")
  parent ! TestMessage("Hello message 2 to supervisor")
  parent ! TestMessage("Hello message 3 to supervisor")
  parent ! TestMessage("Hello message 4 to supervisor")
  parent ! TestMessage("Hello message 5 to supervisor")
  parent ! TestMessage("Hello message 6 to supervisor")

  scala.io.StdIn.readLine()

  testSystem.terminate()

}

运行后发现在出现异常后所有消息都变成了DeadLetter:

[INFO] [05/29/2017 18:22:11.689] [testSystem-akka.actor.default-dispatcher-5] [akka://testSystem/user/parent/supervisor/innerChild] Message [backoffSupervisorDemo.InnerChild$TestMessage] from Actor[akka://testSystem/user/parent#2140150413] to Actor[akka://testSystem/user/parent/supervisor/innerChild#-1047097634] was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings ‘akka.log-dead-letters‘ and ‘akka.log-dead-letters-during-shutdown‘.
....

这也证明了BackoffSupervisor具有不同的Restart处理方式,好像是直接终止InnerChild而非正常的挂起,销毁了ActorRef和邮箱,所以在完成启动之前发给InnerChild的消息都被导入DeadLetter队列了。也就是说不但错过造成异常的消息,而是跳过了下面启动时间段内所有的消息。

下面我们来解决失踪消息的问题:首先是如何重新发送造成异常的消息,我们可以在监管策略中重启前发送:

    def decider: PartialFunction[Throwable, SupervisorStrategy.Directive] = {
      case InnerChild.CException(tmsg) =>
        println(s"Message causing exception: ${tmsg.msg}") //we can extract message here
        BackoffSupervisorDemo.sendToParent(tmsg)  //resend message
        SupervisorStrategy.Restart
    }

在BackoffSupervisorDemo里先声明sendToParent函数:

  def sendToParent(msg: TestMessage) = parent ! msg

然后再想办法把DeadLetter捞出来。我们可以用Akka的eventStream来订阅DeadLetter类型消息:

object DeadLetterMonitor {
  def props(parentRef: ActorRef) = Props(new DeadLetterMonitor(parentRef))
}
class DeadLetterMonitor(receiver: ActorRef) extends Actor with ActorLogging {
  import InnerChild._
  import context.dispatcher
  override def receive: Receive = {
    case DeadLetter(msg,sender,_) =>
      //wait till InnerChild finishes restart then resend
      context.system.scheduler.scheduleOnce(1 second,receiver,msg.asInstanceOf[TestMessage])
  }
}
object BackoffSupervisorDemo extends App {
  import ParentalActor._
  import InnerChild._

  def sendToParent(msg: TestMessage) = parent ! msg

  val testSystem = ActorSystem("testSystem")
  val parent = testSystem.actorOf(ParentalActor.props,"parent")

  val deadLetterMonitor = testSystem.actorOf(DeadLetterMonitor.props(parent),"dlmonitor")
  testSystem.eventStream.subscribe(deadLetterMonitor,classOf[DeadLetter]) //listen to DeadLetter

  parent ! TestMessage("Hello message 1 to supervisor")
  parent ! TestMessage("Hello message 2 to supervisor")
  parent ! TestMessage("Hello message 3 to supervisor")
  parent ! TestMessage("Hello message 4 to supervisor")
  parent ! TestMessage("Hello message 5 to supervisor")
  parent ! TestMessage("Hello message 6 to supervisor")

  scala.io.StdIn.readLine()

  testSystem.terminate()

}

试运算后显示InnerChild成功处理了所有6条消息。

下面是本次讨论的完整示范代码:

package backoffSupervisorDemo
import akka.actor._
import akka.pattern._
import scala.util.Random

import scala.concurrent.duration._

object InnerChild {
  case class TestMessage(msg: String)
  class ChildException(val errmsg: TestMessage) extends Exception
  object CException {  //for pattern match of class with parameter
    def apply(msg: TestMessage) = new ChildException(msg)
    def unapply(cex: ChildException) = Some(cex.errmsg)
  }
  def props = Props[InnerChild]
}
class InnerChild extends Actor with ActorLogging {
  import InnerChild._
  context.parent ! BackoffSupervisor.Reset  //reset backoff counts
  override def receive: Receive = {
    case TestMessage(msg) => //模拟子级功能
      if (Random.nextBoolean())   //任意产生异常
        throw new ChildException(TestMessage(msg))
      else
        log.info(s"Child received message: ${msg}")
  }
}
object Supervisor {
  def props: Props = { //在这里定义了监管策略和child Actor构建
    def decider: PartialFunction[Throwable, SupervisorStrategy.Directive] = {
      case InnerChild.CException(tmsg) =>
        println(s"Message causing exception: ${tmsg.msg}") //we can extract message here
        BackoffSupervisorDemo.sendToParent(tmsg)  //resend message
        SupervisorStrategy.Restart
    }

    val options = Backoff.onFailure(InnerChild.props, "innerChild", 1 second, 5 seconds, 0.0)
      .withManualReset
      .withSupervisorStrategy(
        OneForOneStrategy(maxNrOfRetries = 5, withinTimeRange = 5 seconds)(
          decider.orElse(SupervisorStrategy.defaultDecider)
        )
      )
    BackoffSupervisor.props(options)
  }
}
//注意:下面是Supervisor的父级,不是InnerChild的父级
object ParentalActor {
  case class SendToSupervisor(msg: InnerChild.TestMessage)
  case class SendToInnerChild(msg: InnerChild.TestMessage)
  case class SendToChildSelection(msg: InnerChild.TestMessage)
  def props = Props[ParentalActor]
}
class ParentalActor extends Actor with ActorLogging {
  import ParentalActor._
  //在这里构建子级Actor supervisor
  val supervisor = context.actorOf(Supervisor.props,"supervisor")
  override def receive: Receive = {
    case [email protected] _ => supervisor ! msg
  }

}
object DeadLetterMonitor {
  def props(parentRef: ActorRef) = Props(new DeadLetterMonitor(parentRef))
}
class DeadLetterMonitor(receiver: ActorRef) extends Actor with ActorLogging {
  import InnerChild._
  import context.dispatcher
  override def receive: Receive = {
    case DeadLetter(msg,sender,_) =>
      //wait till InnerChild finishes restart then resend
      context.system.scheduler.scheduleOnce(1 second,receiver,msg.asInstanceOf[TestMessage])
  }
}
object BackoffSupervisorDemo extends App {
  import ParentalActor._
  import InnerChild._

  def sendToParent(msg: TestMessage) = parent ! msg

  val testSystem = ActorSystem("testSystem")
  val parent = testSystem.actorOf(ParentalActor.props,"parent")

  val deadLetterMonitor = testSystem.actorOf(DeadLetterMonitor.props(parent),"dlmonitor")
  testSystem.eventStream.subscribe(deadLetterMonitor,classOf[DeadLetter]) //listen to DeadLetter

  parent ! TestMessage("Hello message 1 to supervisor")
  parent ! TestMessage("Hello message 2 to supervisor")
  parent ! TestMessage("Hello message 3 to supervisor")
  parent ! TestMessage("Hello message 4 to supervisor")
  parent ! TestMessage("Hello message 5 to supervisor")
  parent ! TestMessage("Hello message 6 to supervisor")

  scala.io.StdIn.readLine()

  testSystem.terminate()

}
时间: 2024-11-13 04:38:11

Akka(3): Actor监管 - 细述BackoffSupervisor的相关文章

二 Akka学习 - actor介绍

一个actorSystem 是一个重量级的结构.它会分配N个线程.所以对于每一个应用来说只用创建一个ActorSystem. Actor是种可怜的"生物",它们不能独自存活.Akka中的每一个Actor都是由一个Actor系统(Actor System)来创建和维护的.一个Actor系统会提供一整套辅助功能, 树形结构 actors以树形结构组织起来,类似一个生态系统.例如,一个actor可能会把自己的任务划分成更多更小的.利于管理的子任务.为了这个目的,它会开启自己的子actor,并

2014.8.12-AKKA和Actor model 分布式开发环境学习小结

学习使用AKKA 断断续续有一年了.目前还是习惯用java来写akka下面的程序.对于原生的scala还是没有时间和兴趣去学习它. 毕竟学习一门语言需要兴趣和时间的. AKKA学习资源还是不算丰富. 看过最多的就是官方的编程手册,还有就是AKKA Essentials 这两本.  自己动手写的程序还不算多,也放在github上面. 另外,在akka编译配置.升级版本上,以及部署多台服务器组建akka cluster 方面花费了不少时间.因为项目需要,上周重新在办公室用两台mac台式机和一台thi

细述 Java垃圾回收机制→Types of Java Garbage Collectors

细述 Java垃圾回收机制→Types of Java Garbage Collectors 转自:https://segmentfault.com/a/1190000006214497 本文非原创,翻译自Types of Java Garbage Collectors在Java中为对象分配和释放内存空间都是由垃圾回收线程自动执行完成的.和C语言不一样的是Java程序员不需要手动写垃圾回收相关的代码.这是使得Java如此流行,同时也是Java能帮助程序员写出更好的Java应用的优点之一. 本文将

Akka 2 Actor 源码

Actor源码研究,先附上源码 // ...... object Actor {   /**    * Type alias representing a Receive-expression for Akka Actors.    */   //#receive   type Receive = PartialFunction[Any, Unit]   //#receive   /**    * emptyBehavior is a Receive-expression that matche

Akka 4 akka的Actor模型

1. What is an Actor? The Actor Model represents objects and their interactions, resembling human organizations and built upon the laws of physics. is an object with identity has a behavior only interacts using asynchronous message passing 2. The Acto

一 Akka学习 - actor

(引用 http://shiyanjun.cn/archives/1168.html) 一: 什么是Akka? Akka是JAVA虚拟机JVM平台上构建高并发.分布式和容错应用的工具包和运行时,是一个框架.Akka用Scala语言写成,同时提供了Scala和JAVA的开发接口. Akka处理并发的方法基于Actor模型.在Akka里,Actor之间通信的唯一机制就是消息传递. 二: 什么是Actor? 维基百科这样定义Actor模型: 在计算科学领域,Actor模型是一个并行计算(Concurr

IO细述

Java IO1:IO和File IO 大多数的应用程序都要与外部设备进行数据交换,最常见的外部设备包含磁盘和网络.IO就是指应用程序对这些设备的数据输入与输出,Java语言定义了许多类专门负责各种方式的输入.输出,这些类都被放在java.io包中. File类 File类是IO包中唯一代表磁盘文件本身的对象,File类定义了一些与平台无关的方法来操作文件.通过调用File类提供的各种方法,能够完成创建.删除文件.重命名文件.判断文件的读写权限权限是否存在.设置和查询文件的最近修改时间等操作.

细述synflood

大家都知道,TCP与UDP不同,它是基于连接的,也就是说:为了在服务端和客户端之间传送TCP数据,必须先建立一个虚拟电路,也就是TCP连接,建立TCP连接的标准过程是这样的: 首先,请求端(客户端)发送一个包含SYN标志的TCP报文,SYN即同步(Synchronize),同步报文会指明客户端使用的端口以及TCP连接的初始序号: 第二步,服务器在收到客户端的SYN报文后,将返回一个SYN+ACK的报文,表示客户端的请求被接受,同时TCP序号被加一,ACK即确认(Acknowledgment).

细述:Linux 监控系统 Linux-dash

Linux-dash 是一个低开销 Linux 服务器监控系统,基于 Web 的监控界面.Linux-dash 的界面提供了一个详细的概述您的服务器的所有重要方面,包括内存和磁盘使用情况,网络,安装软件,用户,运行的流程.所有信息被组织成widget 部件,你可以跳转到一个特定的部分使用的主要工具栏中的按钮.Linux Dash不是最先进的监控工具,但它可能是一个适合用户寻找一个光滑,轻便,易于部署应用程序. 特性 漂亮的操作面板 在线,按需监控 RAM, Load, Uptime, Disk