Akka(15): 持久化模式:AtLeastOnceDelivery-消息保证送达模式

消息保证送达是指消息发送方保证在任何情况下都会至少一次确定的消息送达。AtleastOnceDelivery是一个独立的trait,主要作用是对不确定已送达的消息进行补发,这是一种自动的操作,无需用户干预。既然涉及到消息的补发,就不可避免地影响发送方和接收方之间消息传递的顺序、接收方重复收到相同的消息等问题,这些用户必须加以关注。从另一个方面,AtleastOnceDelivery模式保证了强韧性Actor系统的不丢失消息,这项要求可能是一些系统的核心要求。

AtleastOnceDelivery模式既然需要保证消息必达,就必须保证自身在出现任何异常情况下都能恢复到原来的状态,这些都可通过状态持久化来实现。与PersistentActor不同而且更复杂的是AtleastOnceDelivery-Actor的状态除自定义的结构外还必须包括未确认收到的消息(outstanding messages)。所以AtleastOnceDelivery提供了自身特殊的事件(event)和快照(snapshot)类型,它们都包括消息送达状态。

AtleastOnceDelivery模式的原理是一套收到确认回复机制,是通过deliver,confirmDelivery两个函数实现的。deliver是消息发送函数:

/**
 * Scala API: Mix-in this trait with your `PersistentActor` to send messages with at-least-once
 * delivery semantics to destinations. It takes care of re-sending messages when they
 * have not been confirmed within a configurable timeout. Use the [[AtLeastOnceDeliveryLike#deliver]] method to
 * send a message to a destination. Call the [[AtLeastOnceDeliveryLike#confirmDelivery]] method when the destination
 * has replied with a confirmation message.
 *
 * At-least-once delivery implies that original message send order is not always retained
 * and the destination may receive duplicate messages due to possible resends.
 *
 * The interval between redelivery attempts can be defined by [[AtLeastOnceDeliveryLike#redeliverInterval]].
 * After a number of delivery attempts a [[AtLeastOnceDelivery.UnconfirmedWarning]] message
 * will be sent to `self`. The re-sending will still continue, but you can choose to call
 * [[AtLeastOnceDeliveryLike#confirmDelivery]] to cancel the re-sending.
 *
 * The `AtLeastOnceDelivery` trait has a state consisting of unconfirmed messages and a
 * sequence number. It does not store this state itself. You must persist events corresponding
 * to the `deliver` and `confirmDelivery` invocations from your `PersistentActor` so that the
 * state can be restored by calling the same methods during the recovery phase of the
 * `PersistentActor`. Sometimes these events can be derived from other business level events,
 * and sometimes you must create separate events. During recovery calls to `deliver`
 * will not send out the message, but it will be sent later if no matching `confirmDelivery`
 * was performed.
 *
 * Support for snapshots is provided by [[AtLeastOnceDeliveryLike#getDeliverySnapshot]] and [[AtLeastOnceDeliveryLike#setDeliverySnapshot]].
 * The `AtLeastOnceDeliverySnapshot` contains the full delivery state, including unconfirmed messages.
 * If you need a custom snapshot for other parts of the actor state you must also include the
 * `AtLeastOnceDeliverySnapshot`. It is serialized using protobuf with the ordinary Akka
 * serialization mechanism. It is easiest to include the bytes of the `AtLeastOnceDeliverySnapshot`
 * as a blob in your custom snapshot.
 *
 * @see [[AtLeastOnceDeliveryLike]]
 * @see [[AbstractPersistentActorWithAtLeastOnceDelivery]] for Java API
 */
trait AtLeastOnceDelivery extends PersistentActor with AtLeastOnceDeliveryLike {

  /**
   * Scala API: Send the message created by the `deliveryIdToMessage` function to
   * the `destination` actor. It will retry sending the message until
   * the delivery is confirmed with [[#confirmDelivery]]. Correlation
   * between `deliver` and `confirmDelivery` is performed with the
   * `deliveryId` that is provided as parameter to the `deliveryIdToMessage`
   * function. The `deliveryId` is typically passed in the message to the
   * destination, which replies with a message containing the same `deliveryId`.
   *
   * The `deliveryId` is a strictly monotonically increasing sequence number without
   * gaps. The same sequence is used for all destinations of the actor, i.e. when sending
   * to multiple destinations the destinations will see gaps in the sequence if no
   * translation is performed.
   *
   * During recovery this method will not send out the message, but it will be sent
   * later if no matching `confirmDelivery` was performed.
   *
   * This method will throw [[AtLeastOnceDelivery.MaxUnconfirmedMessagesExceededException]]
   * if [[#numberOfUnconfirmed]] is greater than or equal to [[#maxUnconfirmedMessages]].
   */
  def deliver(destination: ActorPath)(deliveryIdToMessage: Long ⇒ Any): Unit = {
    internalDeliver(destination)(deliveryIdToMessage)
  }

  /**
   * Scala API: Send the message created by the `deliveryIdToMessage` function to
   * the `destination` actor. It will retry sending the message until
   * the delivery is confirmed with [[#confirmDelivery]]. Correlation
   * between `deliver` and `confirmDelivery` is performed with the
   * `deliveryId` that is provided as parameter to the `deliveryIdToMessage`
   * function. The `deliveryId` is typically passed in the message to the
   * destination, which replies with a message containing the same `deliveryId`.
   *
   * The `deliveryId` is a strictly monotonically increasing sequence number without
   * gaps. The same sequence is used for all destinations of the actor, i.e. when sending
   * to multiple destinations the destinations will see gaps in the sequence if no
   * translation is performed.
   *
   * During recovery this method will not send out the message, but it will be sent
   * later if no matching `confirmDelivery` was performed.
   *
   * This method will throw [[AtLeastOnceDelivery.MaxUnconfirmedMessagesExceededException]]
   * if [[#numberOfUnconfirmed]] is greater than or equal to [[#maxUnconfirmedMessages]].
   */
  def deliver(destination: ActorSelection)(deliveryIdToMessage: Long ⇒ Any): Unit = {
    internalDeliver(destination)(deliveryIdToMessage)
  }

}

deliver自动产生一个deliveryId,这个deliveryId是发送方与接收方沟通的标志。confirmDelivery(deliveryId)用这个id来确认某条消息已经送达:

  /**
   * Call this method when a message has been confirmed by the destination,
   * or to abort re-sending.
   * @see [[#deliver]]
   * @return `true` the first time the `deliveryId` is confirmed, i.e. `false` for duplicate confirm
   */
  def confirmDelivery(deliveryId: Long): Boolean = {
    if (unconfirmed.contains(deliveryId)) {
      unconfirmed -= deliveryId
      true
    } else false
  }

confirmDelivery同时还被用来取消未确认送达消息。系统对超过配置文件中重发次数设置的消息通过自发送一条UnconformedWarning信息,这个信息包嵌了当前未确认送达消息清单:

 /**
   * @see [[AtLeastOnceDeliveryLike#warnAfterNumberOfUnconfirmedAttempts]]
   */
  @SerialVersionUID(1L)
  case class UnconfirmedWarning(unconfirmedDeliveries: immutable.Seq[UnconfirmedDelivery]) {
    /**
     * Java API
     */
    def getUnconfirmedDeliveries: java.util.List[UnconfirmedDelivery] = {
      import scala.collection.JavaConverters._
      unconfirmedDeliveries.asJava
    }
  }

unconfirmedDeliveries是在下面这个函数里形成的:

private def redeliverOverdue(): Unit = {
    val now = System.nanoTime()
    val deadline = now - redeliverInterval.toNanos
    var warnings = Vector.empty[UnconfirmedDelivery]

    unconfirmed
      .iterator
      .filter { case (_, delivery) ⇒ delivery.timestamp <= deadline }
      .take(redeliveryBurstLimit)
      .foreach {
        case (deliveryId, delivery) ⇒
          send(deliveryId, delivery, now)

          if (delivery.attempt == warnAfterNumberOfUnconfirmedAttempts)
            warnings :+= UnconfirmedDelivery(deliveryId, delivery.destination, delivery.message)
      }

    if (warnings.nonEmpty)
      self ! UnconfirmedWarning(warnings)
  }

在状态恢复和消息处理是都会调用这个redeliverOverdue函数:

 override private[akka] def onReplaySuccess(): Unit = {
    redeliverOverdue()
    startRedeliverTask()
    super.onReplaySuccess()
  }

  /**
   * INTERNAL API
   */
  override protected[akka] def aroundReceive(receive: Receive, message: Any): Unit =
    message match {
      case RedeliveryTick ⇒
        redeliverOverdue()

      case x ⇒
        super.aroundReceive(receive, message)
    }

AtLeastOnceDelivery模式的快照snapshot类型定义如下:

object AtLeastOnceDelivery {

  /**
   * Snapshot of current `AtLeastOnceDelivery` state. Can be retrieved with
   * [[AtLeastOnceDeliveryLike#getDeliverySnapshot]] and saved with [[PersistentActor#saveSnapshot]].
   * During recovery the snapshot received in [[SnapshotOffer]] should be set
   * with [[AtLeastOnceDeliveryLike#setDeliverySnapshot]].
   */
  @SerialVersionUID(1L)
  case class AtLeastOnceDeliverySnapshot(currentDeliveryId: Long, unconfirmedDeliveries: immutable.Seq[UnconfirmedDelivery])
    extends Message {

    /**
     * Java API
     */
    def getUnconfirmedDeliveries: java.util.List[UnconfirmedDelivery] = {
      import scala.collection.JavaConverters._
      unconfirmedDeliveries.asJava
    }

  }

可以看到这个类型包括了未确认送达清单UnconfirmedDelivery。快照存写和恢复用下面这两个函数:

/**
   * Full state of the `AtLeastOnceDelivery`. It can be saved with [[PersistentActor#saveSnapshot]].
   * During recovery the snapshot received in [[SnapshotOffer]] should be set
   * with [[#setDeliverySnapshot]].
   *
   * The `AtLeastOnceDeliverySnapshot` contains the full delivery state, including unconfirmed messages.
   * If you need a custom snapshot for other parts of the actor state you must also include the
   * `AtLeastOnceDeliverySnapshot`. It is serialized using protobuf with the ordinary Akka
   * serialization mechanism. It is easiest to include the bytes of the `AtLeastOnceDeliverySnapshot`
   * as a blob in your custom snapshot.
   */
  def getDeliverySnapshot: AtLeastOnceDeliverySnapshot =
    AtLeastOnceDeliverySnapshot(
      deliverySequenceNr,
      unconfirmed.map { case (deliveryId, d) ⇒ UnconfirmedDelivery(deliveryId, d.destination, d.message) }(breakOut))

  /**
   * If snapshot from [[#getDeliverySnapshot]] was saved it will be received during recovery
   * in a [[SnapshotOffer]] message and should be set with this method.
   */
  def setDeliverySnapshot(snapshot: AtLeastOnceDeliverySnapshot): Unit = {
    deliverySequenceNr = snapshot.currentDeliveryId
    val now = System.nanoTime()
    unconfirmed = snapshot.unconfirmedDeliveries.map(d ⇒
      d.deliveryId → Delivery(d.destination, d.message, now, 0))(breakOut)
  }

我们还是设计个例子来示范保证送达模式:

package atleastonce.calculation
import akka.actor._
import akka.persistence._
import akka.persistence.AtLeastOnceDelivery._
import atleastonce.calculator.Calculator
object CalcAggregator {
  sealed trait Command
  case class Add(x: Int, y: Int) extends Command
  case class Sub(x: Int, y: Int) extends Command
  case class Mul(x: Int, y: Int) extends Command
  case class Div(x: Int, y: Int) extends Command
  case class Result(id: Long, res: Int) extends Command
/*
  sealed trait Event
  case class Added(x: Int, y: Int) extends Event
  case class Substracted(x: Int, y: Int) extends Event
  case class Multiplied(x: Int, y: Int) extends Event
  case class Divided(x: Int, y: Int) extends Event
  case class GotResult(id: Long, res: Int) extends Event

*/
  case class Snap(results: Set[Int], deliverySnapshot: AtLeastOnceDeliverySnapshot)
}

class CalcAggregator(calculators: Map[String,ActorPath]) extends PersistentActor
  with AtLeastOnceDelivery with ActorLogging {
 import CalcAggregator._
  var results: Set[Int] = Set()

  override def persistenceId = "calculation-actor"

  //sending commands and update state only with delivery ack
  def updateState(cmd: Command) = cmd match {
    case Add(x,y) => deliver(calculators("ADD")){id => Calculator.Add(id,x,y)}
    case Sub(x,y) => deliver(calculators("SUB")){id => Calculator.Sub(id,x,y)}
    case Mul(x,y) => deliver(calculators("MUL")){id => Calculator.Mul(id,x,y)}
    case Div(x,y) => deliver(calculators("DIV")){id => Calculator.Div(id,x,y)}
    case Result(id,res) =>
      results += res
      confirmDelivery(id)
  }

  override def receiveCommand: Receive = {
    case cmd: Command => persistAsync(cmd){updateState}

    case UnconfirmedWarning(unconfirmedDeliveries) =>  //cancel outstanding message
      unconfirmedDeliveries.foreach{u => confirmDelivery(u.deliveryId)}
  }

  override def receiveRecover: Receive = {
    case cmd: Command => updateState(cmd)
    case SnapshotOffer(_,snap: Snap) =>
      results = snap.results
      setDeliverySnapshot(snap.deliverySnapshot)
  }

}

以上是一个典型的任务分发器,主要功能是通过发送指令消息驱动其它Calculator Actor进行运算,我们希望保证发出的指令都能送达。首先,指令和事件是一一对应的,无需进行指令事件转换,可以统一直接采用指令。再就是存写指令时无需验证,因为状态results更新是在收到指令接收方回复Results(id,res)之后进行的。从这个例子比较简单的功能操作中我们可明显感觉到写入日志的流量:CalcAggregator好像就是在不断的把经历的指令写入日志然后等待回复,回复时间就是Calculator运算时间。试想下次启动系统进行日志重演时会怎样:启动时间长度等于系统累积运算的时间长度。这很可怕,花几个小时来恢复状态可能是常态。所以必须充分利用快照,采用一套有效的日志、快照维护方式来提高状态恢复效率。下面是加入了日志快照维护功能的新代码:

package atleastonce.calculation
import akka.actor._
import akka.persistence._
import akka.persistence.AtLeastOnceDelivery._
import atleastonce.calculator.Calculator
import scala.util.control.NoStackTrace

object CalcAggregator {
  sealed trait Command
  case class Add(x: Int, y: Int) extends Command
  case class Sub(x: Int, y: Int) extends Command
  case class Mul(x: Int, y: Int) extends Command
  case class Div(x: Int, y: Int) extends Command
  case class Result(id: Long, res: Int) extends Command
/*
  sealed trait Event
  case class Added(x: Int, y: Int) extends Event
  case class Substracted(x: Int, y: Int) extends Event
  case class Multiplied(x: Int, y: Int) extends Event
  case class Divided(x: Int, y: Int) extends Event
  case class GotResult(id: Long, res: Int) extends Event
*/
  case class Snap(results: List[Int],
                  deliverySnapshot: AtLeastOnceDeliverySnapshot)

  case object ShowResults
  case object Boom
  case object ClearJournal

  def props(calculators: Map[String,ActorRef],keepJournalNr: Int) =
    Props(new CalcAggregator(calculators,keepJournalNr))
}

class CalcAggregator(calculators: Map[String,ActorRef],keepJournalNr: Int)
  extends PersistentActor with AtLeastOnceDelivery with ActorLogging {
  import CalcAggregator._
  var results: List[Int] = List()
  var resultsId: Int = 0
  override def persistenceId = "calculation-actor023"

  //sending commands and update state only with delivery ack
  def updateState(cmd: Command) = {
    if (!recoveryRunning && !cmd.isInstanceOf[Result])
        log.info(s"Sending command message: $cmd at: $lastSequenceNr")

    cmd match {
      case Add(x,y) => deliver(calculators("ADD").path){id => Calculator.Add(id,x,y)}
      case Sub(x,y) => deliver(calculators("SUB").path){id => Calculator.Sub(id,x,y)}
      case Mul(x,y) => deliver(calculators("MUL").path){id => Calculator.Mul(id,x,y)}
      case Div(x,y) => deliver(calculators("DIV").path){id => Calculator.Div(id,x,y)}
      case Result(id,res) =>
        log.info(s"Receive calculation result $res with ack id: $id")
        if ( res != 0) {
          results = res :: results
          confirmDelivery(id)
          log.info(s"Current state updated to: $results at $lastSequenceNr")
          resultsId += 1
          if (resultsId % keepJournalNr == 0) {
            resultsId = 0
            saveSnapshot(Snap(results, getDeliverySnapshot))
            log.info(s"Saving snapshot with state $results, snapshot: $getDeliverySnapshot")
          }
        }
    }
  }

  override def receiveCommand: Receive = {
    case cmd: Command => persist(cmd){updateState}
    case ack: Calculator.Ack =>
      updateState(Result(ack.id,ack.x))

    case UnconfirmedWarning(unconfirmedDeliveries) =>  //cancel outstanding message
      log.info(s"UnconfirmedWarning: $unconfirmedDeliveries ...")
      unconfirmedDeliveries.foreach{u =>
        log.info(s"Cancelling unconfirmedDeliveris $u")
        confirmDelivery(u.deliveryId)}

    case SaveSnapshotSuccess(m) =>
      log.info(s"Sucessfull saving snapshot: ${m} at: $lastSequenceNr")
      //clear journal and snapshot
      deleteSnapshots(SnapshotSelectionCriteria(maxSequenceNr = m.sequenceNr - 1))
      deleteMessages(m.sequenceNr)
    case SaveSnapshotFailure(m,cause) =>
      log.info(s"Saving snapshot failed because: ${cause}")
    case DeleteMessagesSuccess(toSeq) =>
      log.info(s"Succefull deleting journal upto: $toSeq")
    case DeleteMessagesFailure(cause,toSeq) =>
      log.info(s"Failed to delete journal upto: $toSeq because: $cause")
    case DeleteSnapshotsSuccess(crit) =>
      log.info(s"Successful delete snapshots for $crit")
    case DeleteSnapshotSuccess(m) =>
      log.info(s"Successful delete snapshot upto: ${m.sequenceNr}")
    case DeleteSnapshotsFailure(crit,cause) =>
      log.info(s"Failed to delete snapshots $crit because: $cause")
    case DeleteSnapshotFailure(m,cause) =>
      log.info(s"Failed to delete snapshot upto: ${m.sequenceNr} because: $cause")

    case ShowResults =>
      log.info(s"Show Current State: $results and lastSequenceNr : $lastSequenceNr")

    case "TakeSnapshot" =>
      log.info(s"Saving snapshot with state: $results ...")
      saveSnapshot(Snap(results, getDeliverySnapshot))

    case Boom =>
      log.info("Boom!")
      throw new RuntimeException("boom") with NoStackTrace
    case ClearJournal =>
      deleteMessages(lastSequenceNr)
      deleteSnapshots(SnapshotSelectionCriteria(maxSequenceNr = lastSequenceNr))

  }

  override def receiveRecover: Receive = {
    case cmd: Command => updateState(cmd)
      log.info(s"Replaying command: $cmd")
    case SnapshotOffer(md,snap: Snap) =>
      log.info(s"Loading snapshot at: ${md.sequenceNr} with state: ${snap.results}")
      results = snap.results
      setDeliverySnapshot(snap.deliverySnapshot)
      log.info(s"Updated state to $results with snapshot")
    case RecoveryCompleted =>
      log.info(s"Recovery compeleted with State: $results and lastSequenceNr=$lastSequenceNr")
  }

  override def preRestart(reason: Throwable, message: Option[Any]): Unit = {
    log.info(s"Aggregator restarting with reason: ${reason.getMessage}")
    super.preRestart(reason, message)
  }

  override def warnAfterNumberOfUnconfirmedAttempts = 1
}

当收到Calculator运算结果次数大于等于保留日志长度keepJournal则把快照写入一次:

    case Result(id,res) =>
        log.info(s"Receive calculation result $res with ack id: $id")
        if ( res != 0) {
          results = res :: results
          confirmDelivery(id)
          log.info(s"Current state updated to: $results at $lastSequenceNr")
          resultsId += 1
          if (resultsId % keepJournalNr == 0) {
            resultsId = 0
            saveSnapshot(Snap(results, getDeliverySnapshot))
            log.info(s"Saving snapshot with state $results, snapshot: $getDeliverySnapshot")
          }
        }

每次快照写入成功则把之前的Journal和Snapshot都删除:

  case SaveSnapshotSuccess(m) =>
      log.info(s"Sucessfull saving snapshot: ${m} at: $lastSequenceNr")
      //clear journal and snapshot
      deleteSnapshots(SnapshotSelectionCriteria(maxSequenceNr = m.sequenceNr - 1))
      deleteMessages(m.sequenceNr)

使用了下面这段代码来测试:

package atleastonce.demo
import atleastonce.calculation.CalcAggregator
import atleastonce.calculation.CalcAggregator._
import atleastonce.calculator.Calculator
import akka.actor._

object AtLeastOnceDemo extends App {
  val atLeastOnceSystem = ActorSystem("atleastonceSystem")

  val addActor = atLeastOnceSystem.actorOf(Calculator.props,"addActor")
  val subActor = atLeastOnceSystem.actorOf(Calculator.props,"subActor")
  val mulActor = atLeastOnceSystem.actorOf(Calculator.props,"mulActor")
  val divActor = atLeastOnceSystem.actorOf(Calculator.props,"divActor")
  var actors = Map[String,ActorRef]()
  actors += ("ADD" -> addActor)
  actors += ("SUB" -> subActor)
  actors += ("MUL" -> mulActor)
  actors += ("DIV" -> divActor)

  val aggregator = atLeastOnceSystem.actorOf(CalcAggregator.props(actors,5), "aggregator")

  aggregator ! Sub(0,0)
  aggregator ! Add(6,3)
  aggregator ! Sub(8,0)
  aggregator ! Mul(3,2)
  aggregator ! Boom
  aggregator ! Div(12,3)
  Thread.sleep(10000)
  aggregator ! ShowResults

 // aggregator ! ClearJournal

  scala.io.StdIn.readLine()
  atLeastOnceSystem.terminate()

}

连续运算几次,状态都恢复正确。

下面是本次示范的源代码:

build.sbt

name := "atleastonce-delivery"

version := "1.0"

scalaVersion := "2.11.9"

sbtVersion := "0.13.5"

libraryDependencies ++= Seq(
  "com.typesafe.akka"           %% "akka-actor"       % "2.5.3",
  "com.typesafe.akka"           %% "akka-persistence" % "2.5.3",
  "ch.qos.logback" % "logback-classic" % "1.1.7",
  "com.typesafe.akka" %% "akka-persistence-cassandra" % "0.54",
  "com.typesafe.akka" %% "akka-persistence-cassandra-launcher" % "0.54" % Test
)

application.conf

akka {
  persistence {
    journal.plugin = "cassandra-journal"
    snapshot-store.plugin = "cassandra-snapshot-store"
  }
}
akka.actor.warn-about-java-serializer-usage = off
akka.persistence.at-least-once-delivery.warn-after-number-of-unconfirmed-attempts = 1

Calculator.scala

package atleastonce.calculator
import akka.actor._
import scala.util.Random
import scala.concurrent.duration._
object Calculator {
  sealed trait Math
  case class Add(id: Long, x: Int, y: Int) extends Math
  case class Sub(id: Long, x: Int, y: Int) extends Math
  case class Mul(id: Long, x: Int, y: Int) extends Math
  case class Div(id: Long, x: Int, y: Int) extends Math

  case class Ack(id: Long, x: Int)
  case class CalcMath(expr: Math)

  def props = Props(new Calculator)

}

class Calculator extends Actor with ActorLogging {
  import Calculator._
  import context.dispatcher
  override def receive: Receive = {
    case CalcMath(expr) => {
      val delay: FiniteDuration = (100 millis) * Random.nextInt(10)
      context.system.scheduler.scheduleOnce(delay,self,expr)
    }
    case add: Add => sender() ! Ack(add.id,add.x + add.y)
    case sub: Sub => sender() ! Ack(sub.id,sub.x - sub.y)
    case mul: Mul => sender() ! Ack(mul.id,mul.x * mul.y)
    case div: Div => sender() ! Ack(div.id,div.x / div.y)
  }
}

CalcAggregator

package atleastonce.calculation
import akka.actor._
import akka.persistence._
import akka.persistence.AtLeastOnceDelivery._
import atleastonce.calculator.Calculator
import scala.util.control.NoStackTrace

object CalcAggregator {
  sealed trait Command
  case class Add(x: Int, y: Int) extends Command
  case class Sub(x: Int, y: Int) extends Command
  case class Mul(x: Int, y: Int) extends Command
  case class Div(x: Int, y: Int) extends Command
  case class Result(id: Long, res: Int) extends Command
/*
  sealed trait Event
  case class Added(x: Int, y: Int) extends Event
  case class Substracted(x: Int, y: Int) extends Event
  case class Multiplied(x: Int, y: Int) extends Event
  case class Divided(x: Int, y: Int) extends Event
  case class GotResult(id: Long, res: Int) extends Event
*/
  case class Snap(results: List[Int],
                  deliverySnapshot: AtLeastOnceDeliverySnapshot)

  case object ShowResults
  case object Boom
  case object ClearJournal

  def props(calculators: Map[String,ActorRef],keepJournalNr: Int) =
    Props(new CalcAggregator(calculators,keepJournalNr))
}

class CalcAggregator(calculators: Map[String,ActorRef],keepJournalNr: Int)
  extends PersistentActor with AtLeastOnceDelivery with ActorLogging {
  import CalcAggregator._
  var results: List[Int] = List()
  var resultsId: Int = 0
  override def persistenceId = "calculation-actor023"

  //sending commands and update state only with delivery ack
  def updateState(cmd: Command) = {
    if (!recoveryRunning && !cmd.isInstanceOf[Result])
        log.info(s"Sending command message: $cmd at: $lastSequenceNr")

    cmd match {
      case Add(x,y) => deliver(calculators("ADD").path){id => Calculator.Add(id,x,y)}
      case Sub(x,y) => deliver(calculators("SUB").path){id => Calculator.Sub(id,x,y)}
      case Mul(x,y) => deliver(calculators("MUL").path){id => Calculator.Mul(id,x,y)}
      case Div(x,y) => deliver(calculators("DIV").path){id => Calculator.Div(id,x,y)}
      case Result(id,res) =>
        log.info(s"Receive calculation result $res with ack id: $id")
        if ( res != 0) {
          results = res :: results
          confirmDelivery(id)
          log.info(s"Current state updated to: $results at $lastSequenceNr")
          resultsId += 1
          if (resultsId % keepJournalNr == 0) {
            resultsId = 0
            saveSnapshot(Snap(results, getDeliverySnapshot))
            log.info(s"Saving snapshot with state $results, snapshot: $getDeliverySnapshot")
          }
        }
    }
  }

  override def receiveCommand: Receive = {
    case cmd: Command => persist(cmd){updateState}
    case ack: Calculator.Ack =>
      updateState(Result(ack.id,ack.x))

    case UnconfirmedWarning(unconfirmedDeliveries) =>  //cancel outstanding message
      log.info(s"UnconfirmedWarning: $unconfirmedDeliveries ...")
      unconfirmedDeliveries.foreach{u =>
        log.info(s"Cancelling unconfirmedDeliveris $u")
        confirmDelivery(u.deliveryId)}

    case SaveSnapshotSuccess(m) =>
      log.info(s"Sucessfull saving snapshot: ${m} at: $lastSequenceNr")
      //clear journal and snapshot
      deleteSnapshots(SnapshotSelectionCriteria(maxSequenceNr = m.sequenceNr - 1))
      deleteMessages(m.sequenceNr)
    case SaveSnapshotFailure(m,cause) =>
      log.info(s"Saving snapshot failed because: ${cause}")
    case DeleteMessagesSuccess(toSeq) =>
      log.info(s"Succefull deleting journal upto: $toSeq")
    case DeleteMessagesFailure(cause,toSeq) =>
      log.info(s"Failed to delete journal upto: $toSeq because: $cause")
    case DeleteSnapshotsSuccess(crit) =>
      log.info(s"Successful delete snapshots for $crit")
    case DeleteSnapshotSuccess(m) =>
      log.info(s"Successful delete snapshot upto: ${m.sequenceNr}")
    case DeleteSnapshotsFailure(crit,cause) =>
      log.info(s"Failed to delete snapshots $crit because: $cause")
    case DeleteSnapshotFailure(m,cause) =>
      log.info(s"Failed to delete snapshot upto: ${m.sequenceNr} because: $cause")

    case ShowResults =>
      log.info(s"Show Current State: $results and lastSequenceNr : $lastSequenceNr")

    case "TakeSnapshot" =>
      log.info(s"Saving snapshot with state: $results ...")
      saveSnapshot(Snap(results, getDeliverySnapshot))

    case Boom =>
      log.info("Boom!")
      throw new RuntimeException("boom") with NoStackTrace
    case ClearJournal =>
      deleteMessages(lastSequenceNr)
      deleteSnapshots(SnapshotSelectionCriteria(maxSequenceNr = lastSequenceNr))

  }

  override def receiveRecover: Receive = {
    case cmd: Command => updateState(cmd)
      log.info(s"Replaying command: $cmd")
    case SnapshotOffer(md,snap: Snap) =>
      log.info(s"Loading snapshot at: ${md.sequenceNr} with state: ${snap.results}")
      results = snap.results
      setDeliverySnapshot(snap.deliverySnapshot)
      log.info(s"Updated state to $results with snapshot")
    case RecoveryCompleted =>
      log.info(s"Recovery compeleted with State: $results and lastSequenceNr=$lastSequenceNr")
  }

  override def preRestart(reason: Throwable, message: Option[Any]): Unit = {
    log.info(s"Aggregator restarting with reason: ${reason.getMessage}")
    super.preRestart(reason, message)
  }

  override def warnAfterNumberOfUnconfirmedAttempts = 1
}

AtLeastOnceDemo.scala

package atleastonce.demo
import atleastonce.calculation.CalcAggregator
import atleastonce.calculation.CalcAggregator._
import atleastonce.calculator.Calculator
import akka.actor._

object AtLeastOnceDemo extends App {
  val atLeastOnceSystem = ActorSystem("atleastonceSystem")

  val addActor = atLeastOnceSystem.actorOf(Calculator.props,"addActor")
  val subActor = atLeastOnceSystem.actorOf(Calculator.props,"subActor")
  val mulActor = atLeastOnceSystem.actorOf(Calculator.props,"mulActor")
  val divActor = atLeastOnceSystem.actorOf(Calculator.props,"divActor")
  var actors = Map[String,ActorRef]()
  actors += ("ADD" -> addActor)
  actors += ("SUB" -> subActor)
  actors += ("MUL" -> mulActor)
  actors += ("DIV" -> divActor)

  val aggregator = atLeastOnceSystem.actorOf(CalcAggregator.props(actors,5), "aggregator")

  aggregator ! Sub(0,0)
  aggregator ! Add(6,3)
  aggregator ! Sub(8,0)
  aggregator ! Mul(3,2)
  aggregator ! Boom
  aggregator ! Div(12,3)
  Thread.sleep(10000)
  aggregator ! ShowResults

 // aggregator ! ClearJournal

  scala.io.StdIn.readLine()
  atLeastOnceSystem.terminate()

}
时间: 2024-10-08 09:48:09

Akka(15): 持久化模式:AtLeastOnceDelivery-消息保证送达模式的相关文章

15. 星际争霸之php设计模式--策略模式

题记==============================================================================本php设计模式专辑来源于博客(jymoz.com),现在已经访问不了了,这一系列文章是我找了很久才找到完整的,感谢作者jymoz的辛苦付出哦! 本文地址:http://www.cnblogs.com/davidhhuan/p/4248199.html============================================

ODAC(V9.5.15) 学习笔记(十七)主从模式

主从模式(Master/Detail mode)是指建立主表和从表关系的多个数据集集合模式. 1. 关系设置 要设置主从模式,必须有一个主表数据集(TDataSet)和一个从表数据集(TDataSet),且主表数据集关联一个数据源组件(TDataSource),并将从表数据集的MasterSource指向该数据源组件,最后设置关联字段.设置主从表之间关联字段有2种形式: 1.在从表数据集的SQL中设置外键,通过参数指向主表的字段,如: //主表数据集 Master := TOraQuery.Cr

设计模式(二)工厂模式:2-工厂方法模式

模拟场景: 继续沿用在简单工厂模式中讨论的,运算器相关的场景. 思想: 考虑之前最初的设计,简单工厂模式中,最大的问题在于,面对新增的需要在工厂中创建的对象,对其的修改会违反开闭原则. 工厂方法模式(Factory Method)对于这种问题的解决方案是:将生产运算器的工厂抽象出来(AbsOperationFactory),然后为原来每一个需要创建的对象(继承AbsOperation),都建立一个专门的工厂.这样一来,可以巧妙地利用多态的性质,完成代码的解耦. 由此可见,工厂方法模式,是模板方法

《JAVA与模式》之中介者模式(转载)

原文出处:http://blog.csdn.net/zhengzhb/article/details/7430098   定义:用一个中介者对象封装一系列的对象交互,中介者使各对象不需要显示地相互作用,从而使耦合松散,而且可以独立地改变它们之间的交互. 类型:行为类模式 类图: 中介者模式的结构 中介者模式又称为调停者模式,从类图中看,共分为3部分: 抽象中介者:定义好同事类对象到中介者对象的接口,用于各个同事类之间的通信.一般包括一个或几个抽象的事件方法,并由子类去实现. 中介者实现类:从抽象

5.5 进入编辑模式 5.6 vim命令模式 5.7 vim实践

5.5 进入编辑模式 5.6 vim命令模式 5.7 vim实践 扩展 vim的特殊用法 http://www.apelearn.com/bbs/thread-9334-1-1.html vim常用快捷键总结 http://www.apelearn.com/bbs/thread-407-1-1.html vim快速删除一段字符 http://www.apelearn.com/bbs/thread-842-1-1.html vim乱码 http://www.apelearn.com/bbs/thr

创建型模式1.2简单工厂模式

一.概念 简单工厂模式是由一个工厂对象决定创建出哪一种产品类的实例.从设计模式的类型上来说,简单工厂模式是属于创建型模式,又叫做静态工厂方法(StaticFactory Method)模式,但不属于23种GOF设计模式之一. 二.类图 三.具体介绍 简单工厂模式的实质是由一个工厂类根据传入的参数,动态决定应该创建哪一个产品类(这些产品类继承自一个父类或接口)的实例. 该模式中包含的角色及其职责 工厂(OpertionFactory)角色 简单工厂模式的核心,它负责实现创建所有实例的内部逻辑.工厂

《JAVA与模式》之享元模式

Flyweight在拳击比赛中指最轻量级,即"蝇量级"或"雨量级",这里选择使用"享元模式"的意译,是因为这样更能反映模式的用意.享元模式是对象的结构模式.享元模式以共享的方式高效地支持大量的细粒度对象. Java中的String类型 在JAVA语言中,String类型就是使用了享元模式.String对象是final类型,对象一旦创建就不可改变.在JAVA中字符串常量都是存在常量池中的,JAVA会确保一个字符串常量在常量池中只有一个拷贝.Stri

《JAVA与模式》之责任链模式

责任链模式是一种对象的行为模式.在责任链模式里,很多对象由每一个对象对其下家的引用而连接起来形成一条链.请求在这个链上传递,直到链上的某一个对象决定处理此请求.发出这个请求的客户端并不知道链上的哪一个对象最终处理这个请求,这使得系统可以在不影响客户端的情况下动态地重新组织和分配责任. 从击鼓传花谈起 击鼓传花是一种热闹而又紧张的饮酒游戏.在酒宴上宾客依次坐定位置,由一人击鼓,击鼓的地方与传花的地方是分开的,以示公正.开始击鼓时,花束就开始依次传递,鼓声一落,如果花束在某人手中,则该人就得饮酒.

《JAVA与模式》之调停者模式

调停者模式是对象的行为模式.调停者模式包装了一系列对象相互作用的方式,使得这些对象不必相互明显引用.从而使它们可以较松散地耦合.当这些对象中的某些对象之间的相互作用发生改变时,不会立即影响到其他的一些对象之间的相互作用.从而保证这些相互作用可以彼此独立地变化. 为什么需要调停者 如下图所示,这个示意图中有大量的对象,这些对象既会影响别的对象,又会被别的对象所影响,因此常常叫做同事(Colleague)对象.这些同事对象通过彼此的相互作用形成系统的行为.从图中可以看出,几乎每一个对象都需要与其他的