Akka(6): become/unbecome:运算行为切换

通过一段时间的学习了解,加深了一些对Akka的认识,特别是对于Akka在实际编程中的用途方面。我的想法,或者我希望利用Akka来达到的目的是这样的:作为传统方式编程的老兵,我们已经习惯了直线流程方式一口气实现完整的功能。如果使用Akka,我们可以把这个完整的功能分切成多个能产生中间临时结果的小功能然后把这些功能放到不同的Actor上分别独立运算,再通过消息来连接这些功能集合成最终结果。如此我们就轻易得到了一个多线程并发程序。由于Akka是软件工具(Tool),没有软件架构(Framework)对编程方式的特别要求,Actor的构建和使用非常方便,我们甚至不需要多少修改就可以直接把原来的一段代码移到Actor上。如果遇到一些重复的运算,我们还可以用Routing来实现并行运算。当然,把Actor当作简单的行令运算器可能还不够,如果能实现一些具体运算之上的高层次程序逻辑和流程就更加完善。我们可以用这样的高层次Actor去解析程序逻辑、执行流程、把具体的运算分配给其它各种运算Actor或者一组Routees并行运算从而取得整体程序的高效率运行。具备了这些功能后,也许我们就可以完全用Actor模式来替代传统单线程行令编程了。Akka可以通过Actor的动态行为转换来实现同一Actor在不同情况下提供不同的功能支持。我们前面提到Actor的功能是在receive函数内实现的。那么转换功能是否就是切换不同的receive函数呢?答案是确定的,Akka是通过Actor的context.become(rcvFunc)来实现receive函数切换的,我们看看下面这个示范:

import akka.actor._

object FillSeasons {
  case object HowYouFeel
  def props = Props(new FillSeasons)
}

class FillSeasons extends Actor with ActorLogging {
  import FillSeasons._

  override def receive: Receive = spring
  def Winter: Receive = {
    case HowYouFeel =>
      log.info("It‘s freezing cold!")
  }
  def summer: Receive = {
    case HowYouFeel =>
      log.info("It‘s hot hot hot!")
  }
  def spring: Receive = {
    case HowYouFeel =>
      log.info("It feels so goooood!")
  }
}

object Becoming extends App {
  val demoSystem = ActorSystem("demoSystem")

  val feelingsActor = demoSystem.actorOf(FillSeasons.props,"feelings")

  feelingsActor ! FillSeasons.HowYouFeel

}

在FeelingsActor里我们定义了三个receive函数,对共同的HowYouFeel消息采取了不同的反应。默认行为是spring。那么应该如何在三种行为中切换呢?用context.become(???),如下:

import akka.actor._

object FillSeasons {
  case object HowYouFeel
  case object ToSummer
  case object ToSpring
  case object ToWinter
  def props = Props(new FillSeasons)
}

class FillSeasons extends Actor with ActorLogging {
  import FillSeasons._

  override def receive: Receive = spring
  def winter: Receive = {
    case HowYouFeel =>
      log.info("It‘s freezing cold!")
    case ToSummer => context.become(summer)
    case ToSpring => context.become(spring)

  }
  def summer: Receive = {
    case HowYouFeel =>
      log.info("It‘s hot hot hot!")
    case ToSpring => context.become(spring)
    case ToWinter => context.become(winter)
  }
  def spring: Receive = {
    case HowYouFeel =>
      log.info("It feels so goooood!")
    case ToSummer => context.become(summer)
    case ToWinter => context.become(winter)
  }
}

object Becoming extends App {
  val demoSystem = ActorSystem("demoSystem")

  val feelingsActor = demoSystem.actorOf(FillSeasons.props,"feelings")

  feelingsActor ! FillSeasons.HowYouFeel
  feelingsActor ! FillSeasons.ToSummer
  feelingsActor ! FillSeasons.HowYouFeel
  feelingsActor ! FillSeasons.ToWinter
  feelingsActor ! FillSeasons.HowYouFeel
  feelingsActor ! FillSeasons.ToSpring
  feelingsActor ! FillSeasons.HowYouFeel

  scala.io.StdIn.readLine()
  demoSystem.terminate()
}

我们增加了三个消息来切换receive。运算结果如下:

[INFO] [06/08/2017 17:51:46.013] [demoSystem-akka.actor.default-dispatcher-3] [akka://demoSystem/user/feelings] It feels so goooood!
[INFO] [06/08/2017 17:51:46.019] [demoSystem-akka.actor.default-dispatcher-4] [akka://demoSystem/user/feelings] It‘s hot hot hot!
[INFO] [06/08/2017 17:51:46.028] [demoSystem-akka.actor.default-dispatcher-4] [akka://demoSystem/user/feelings] It‘s freezing cold!
[INFO] [06/08/2017 17:51:46.028] [demoSystem-akka.actor.default-dispatcher-4] [akka://demoSystem/user/feelings] It feels so goooood!

Process finished with exit code 0

就这样在几个receive里窜来窜去的好像已经能达到我们设想的目的了。看看Akka源代码中become和unbecome发现这样的做法是不正确的:

  def become(behavior: Actor.Receive, discardOld: Boolean = true): Unit =
    behaviorStack = behavior :: (if (discardOld && behaviorStack.nonEmpty) behaviorStack.tail else behaviorStack)

  def become(behavior: Procedure[Any]): Unit = become(behavior, discardOld = true)

  def become(behavior: Procedure[Any], discardOld: Boolean): Unit =
    become({ case msg ? behavior.apply(msg) }: Actor.Receive, discardOld)

  def unbecome(): Unit = {
    val original = behaviorStack
    behaviorStack =
      if (original.isEmpty || original.tail.isEmpty) actor.receive :: emptyBehaviorStack
      else original.tail
  }

从上面的代码可以发现:调用become(x)实际上是把x压进了一个堆栈里。如果像我们这样不断调用become转来转去的,在堆栈上留下旧的行为函数实例最终会造成StackOverFlowError。所以Akka提供了unbecome,这是个堆栈弹出函数,把上一个become压进的行为函数再弹出来,释放一个堆栈空间。所以我们应该用unbecome来解决堆栈溢出问题。但是,如果在多个receive函数之间转换来实现行为变化的话,就难以正确掌握堆栈的压进,弹出冲抵配对,并且无法避免所谓的意大利面代码造成的混乱逻辑。所以,become/unbecome最好使用在两个功能之间的转换。我们再设计一个例子来示范:

sealed trait DBOperations
case class DBWrite(sql: String) extends DBOperations
case class DBRead(sql: String) extends DBOperations

sealed trait DBStates
case object Connected extends DBStates
case object Disconnected extends DBStates

DBoperations代表数据库读写操作。DBState代表数据库当前状态:连线Connected或断线Disconnected。只有数据库在Connected状态下才能进行数据库操作。顺理成章,我们需要两个receive函数:

import akka.actor._
sealed trait DBOperations
case class DBWrite(sql: String) extends DBOperations
case class DBRead(sql: String) extends DBOperations

sealed trait DBStates
case object Connected extends DBStates
case object Disconnected extends DBStates

object DBOActor {
  def props = Props(new DBOActor)
}

class DBOActor extends Actor with ActorLogging {

  override def receive: Receive = disconnected

  def disconnected: Receive = {
    case Connected =>
      log.info("Logon to DB.")
      context.become(connected)
  }
  def connected: Receive = {
    case Disconnected =>
      log.info("Logoff from DB.")
      context.unbecome()
    case DBWrite(sql) =>
      log.info(s"Writing to DB: $sql")
    case DBRead(sql) =>
      log.info(s"Reading from DB: $sql")
  }
}

object BecomeDB extends App {
  val dbSystem = ActorSystem("dbSystem")
  val dbActor = dbSystem.actorOf(DBOActor.props,"dbActor")

  dbActor ! Connected
  dbActor ! DBWrite("Update table x")
  dbActor ! DBRead("Select from table x")
  dbActor ! Disconnected

  scala.io.StdIn.readLine()
  dbSystem.terminate()

}

运算结果显示如下:

[INFO] [06/09/2017 11:44:40.093] [dbSystem-akka.actor.default-dispatcher-3] [akka://dbSystem/user/dbActor] Logon to DB.
[INFO] [06/09/2017 11:44:40.106] [dbSystem-akka.actor.default-dispatcher-3] [akka://dbSystem/user/dbActor] Writing to DB: Update table x
[INFO] [06/09/2017 11:44:40.107] [dbSystem-akka.actor.default-dispatcher-3] [akka://dbSystem/user/dbActor] Reading from DB: Select from table x
[INFO] [06/09/2017 11:44:40.107] [dbSystem-akka.actor.default-dispatcher-3] [akka://dbSystem/user/dbActor] Logoff from DB.

以上是按正确顺序向dbActor发出数据库操作指令后产生的结果。但是,我们是在一个多线程消息驱动的环境里。发送给dbActor的消息收到时间无法预料。我们试着调换一下指令到达顺序:

  dbActor ! DBWrite("Update table x")
  dbActor ! Connected
  dbActor ! DBRead("Select from table x")
  dbActor ! Disconnected

运算结果:

[INFO] [06/09/2017 11:54:57.264] [dbSystem-akka.actor.default-dispatcher-4] [akka://dbSystem/user/dbActor] Logon to DB.
[INFO] [06/09/2017 11:54:57.273] [dbSystem-akka.actor.default-dispatcher-4] [akka://dbSystem/user/dbActor] Reading from DB: Select from table x
[INFO] [06/09/2017 11:54:57.273] [dbSystem-akka.actor.default-dispatcher-4] [akka://dbSystem/user/dbActor] Logoff from DB.

漏掉了DBWrite操作。可以理解,所有connected状态之前的任何操作都不会真正生效。Akka提供了个Stash trait能把一个receive函数未处理的消息都存起来。然后用unstash()可以把存储的消息都转移到本Actor的邮箱里。我们可以用Stash来解决这个消息遗失问题:

  def disconnected: Receive = {
    case Connected =>
      log.info("Logon to DB.")
      context.become(connected)
      unstashAll()
    case _ => stash()
  }

所有消息遗失都是在Disconnected状态内发生的。在disconnected里我们用stash把所有非Connected消息存起来,然后在转换成Connected状态时把这些消息转到信箱。再看看运算结果:

object BecomeDB extends App {
  val dbSystem = ActorSystem("dbSystem")
  val dbActor = dbSystem.actorOf(DBOActor.props,"dbActor")

  dbActor ! DBWrite("Update table x")
  dbActor ! Connected
  dbActor ! DBRead("Select from table x")
  dbActor ! Disconnected

  scala.io.StdIn.readLine()
  dbSystem.terminate()

}

[INFO] [06/09/2017 12:01:54.518] [dbSystem-akka.actor.default-dispatcher-4] [akka://dbSystem/user/dbActor] Logon to DB.
[INFO] [06/09/2017 12:01:54.528] [dbSystem-akka.actor.default-dispatcher-4] [akka://dbSystem/user/dbActor] Writing to DB: Update table x
[INFO] [06/09/2017 12:01:54.528] [dbSystem-akka.actor.default-dispatcher-4] [akka://dbSystem/user/dbActor] Reading from DB: Select from table x
[INFO] [06/09/2017 12:01:54.528] [dbSystem-akka.actor.default-dispatcher-4] [akka://dbSystem/user/dbActor] Logoff from DB.

显示结果正确。下面就是整个示范的源代码:

import akka.actor._
sealed trait DBOperations
case class DBWrite(sql: String) extends DBOperations
case class DBRead(sql: String) extends DBOperations

sealed trait DBStates
case object Connected extends DBStates
case object Disconnected extends DBStates

object DBOActor {
  def props = Props(new DBOActor)
}

class DBOActor extends Actor with ActorLogging with Stash {

  override def receive: Receive = disconnected

  def disconnected: Receive = {
    case Connected =>
      log.info("Logon to DB.")
      context.become(connected)
      unstashAll()
    case _ => stash()
  }
  def connected: Receive = {
    case Disconnected =>
      log.info("Logoff from DB.")
      context.unbecome()
    case DBWrite(sql) =>
      log.info(s"Writing to DB: $sql")
    case DBRead(sql) =>
      log.info(s"Reading from DB: $sql")
  }
}

object BecomeDB extends App {
  val dbSystem = ActorSystem("dbSystem")
  val dbActor = dbSystem.actorOf(DBOActor.props,"dbActor")

  dbActor ! DBWrite("Update table x")
  dbActor ! Connected
  dbActor ! DBRead("Select from table x")
  dbActor ! Disconnected

  scala.io.StdIn.readLine()
  dbSystem.terminate()

}
时间: 2024-10-06 10:18:41

Akka(6): become/unbecome:运算行为切换的相关文章

javascript运算符——位运算符

× 目录 [1]二进制 [2]非 [3]与[4]或[5]异或[6]左移[7]右移[8]>>>[9]应用 前面的话 位运算符是非常底层的运算,由于其很不直观,所以并不常用.但是,其速度极快,且合理使用能达到很好的效果.本文将介绍javascript中常常被忽视的运算符——位运算符 二进制表示 ECMAScript中的所有数值都以IEEE-754 64位格式存储,但位操作符并不直接操作64位的值,而是以32位带符号的整数进行运算的,并且返回值也是一个32位带符号的整数 这种位数转换使得在对特

每天高效短睡4小时与饥饿的力量

为什么要短睡 ? 通常人们所认识到,并接受的睡眠时间是每天7-8个小时.也就是说,一天24个小时,有接近1/3的时间是在睡眠中度过的.那么,扩展到整个人生,竟有就有1/3是在床上度过的.人生苦短,既然死亡是不可避免的终点.那么减少睡眠时间,是否可以看成增加了生命的长度了呢 ?虽然睡觉也可以睡得很幸福,做梦也可以有奇妙的体验,但毕竟清醒的时刻才是我们可以控制的人生,梦醒时分还是要面对现实的. 重要的是,有这样一群人每天在不停地疯狂运转,然后另一群人一觉醒来,就发现世界和昨天不一样了.疯狂运转的人群

Akka 编程(14): Become/Unbecome

Akka支持Actor消息循环处理部分的热切换,调用context.become方法可以使用新的消息循环处理替换当前的消息处理器,被替换的消息处理器被压到一个栈结构,支持消息处理器的出栈和入栈.注:但Actor重启时,它的消息循环处理恢复到初始的行为.become方法的参数类型为部分函数PartialFunction[Any, Unit],例如: 1 import akka.actor.Actor 2 import akka.actor.ActorSystem 3 import akka.act

akka入门-热插拔(become和unbecome)

Akka支持在运行时对角色消息循环 (例如它的的实现)进行实时替换: 在角色中调用getContext.become 方法. 热替换的代码被存在一个栈中,可以被pushed(replacing 或 adding 在顶部)和popped. become一个特别好的例子是用它来实现一个有限状态机. 使用Become/Unbecome特性还可以很方便的实现状态转换机. 1.动态替换方法 import akka.actor.ActorRef; import akka.actor.ActorSystem;

Akka(10): 分布式运算:集群-Cluster

Akka-Cluster可以在一部物理机或一组网络连接的服务器上搭建部署.用Akka开发同一版本的分布式程序可以在任何硬件环境中运行,这样我们就可以确定以Akka分布式程序作为标准的编程方式了. 在上面两篇讨论里我们介绍了Akka-Remoting.Akka-Remoting其实是一种ActorSystem之间Actor对Actor点对点的沟通协议.通过Akka-Remoting来实现一个ActorSystem中的一个Actor与另一个Actorsystem中的另一个Actor之间的沟通.在Re

Akka(12): 分布式运算:Cluster-Singleton-让运算在集群节点中自动转移

在很多应用场景中都会出现在系统中需要某类Actor的唯一实例(only instance).这个实例在集群环境中可能在任何一个节点上,但保证它是唯一的.Akka的Cluster-Singleton提供对这种Singleton Actor模式的支持,能做到当这个实例所在节点出现问题需要脱离集群时自动在另一个节点上构建一个同样的Actor,并重新转交控制.当然,由于涉及了一个新构建的Actor,内部状态会在这个过程中丢失.Single-Actor的主要应用包括某种对外部只能支持一个接入的程序接口,或

AKKA文档(java版)

目前我正在翻译AKKA官网文档.翻译:吴京润 译者注:本人正在翻译AKKA官网文档,本篇是文档第一章,欢迎有兴趣的同学加入一起翻译.更多内容请读这里:https://tower.im/projects/ac49db18a6a24ae4b340a5fa22d930dc/lists/ded96c34f7ce4a6bb8b5473f596e1008/show/https://tower.im/projects/ac49db18a6a24ae4b340a5fa22d930dc/todos/640e53d

Akka(1):Actor - 靠消息驱动的运算器

Akka是由各种角色和功能的Actor组成的,工作的主要原理是把一项大的计算任务分割成小环节,再按各环节的要求构建相应功能的Actor,然后把各环节的运算托付给相应的Actor去独立完成.Akka是个工具库(Tools-Library),不是一个软件架构(Software-Framework),我们不需要按照Akka的框架格式去编写程序,而是直接按需要构建Actor去异步运算一项完整的功能,这样让用户在不知不觉中自然的实现了多线程并发软件编程(concurrent programming).按这

Akka(15): 持久化模式:AtLeastOnceDelivery-消息保证送达模式

消息保证送达是指消息发送方保证在任何情况下都会至少一次确定的消息送达.AtleastOnceDelivery是一个独立的trait,主要作用是对不确定已送达的消息进行补发,这是一种自动的操作,无需用户干预.既然涉及到消息的补发,就不可避免地影响发送方和接收方之间消息传递的顺序.接收方重复收到相同的消息等问题,这些用户必须加以关注.从另一个方面,AtleastOnceDelivery模式保证了强韧性Actor系统的不丢失消息,这项要求可能是一些系统的核心要求. AtleastOnceDeliver