Akka 编程(20):容错处理(一)

我们在前面介绍Actor系统时说过每个Actor都是其子Actor的管理员,并且每个Actor定义了发生错误时的管理策略,策略一旦定义好,之后不能修改,就像是Actor系统不可分割的一部分。
实用错误处理
首先我们来看一个例子来显示一种处理数据存储错误的情况,这是现实中一个应用可能出现的典型错误。当然实际的应用可能针对数据源不存在时有不同的处理,这里我们使用重新连接的处理方法。
下面是例子的源码,比较长,需要仔细阅读,最好是实际运行,参考日志来理解:

1 import akka.actor._
2 import akka.actor.SupervisorStrategy._
3 import scala.concurrent.duration._
4 import akka.util.Timeout
5 import akka.event.LoggingReceive
6 import akka.pattern.{ask, pipe}
7 import com.typesafe.config.ConfigFactory
8  
9 /**
10  * Runs the sample
11  */
12 object FaultHandlingDocSample extends App {
13  
14   import Worker._
15  
16   val config = ConfigFactory.parseString( """
17       akka.loglevel = "DEBUG"
18       akka.actor.debug {
19       receive = on
20       lifecycle = on
21       }
22       """)
23  
24   val system = ActorSystem("FaultToleranceSample", config)
25   val worker = system.actorOf(Props[Worker], name = "worker")
26   val listener = system.actorOf(Props[Listener], name = "listener")
27   // start the work and listen on progress
28   // note that the listener is used as sender of the tell,
29   // i.e. it will receive replies from the worker
30   worker.tell(Start, sender = listener)
31 }
32  
33 /**
34  * Listens on progress from the worker and shuts down the system when enough
35  * work has been done.
36  */
37 class Listener extends Actor with ActorLogging {
38  
39   import Worker._
40  
41   // If we don’t get any progress within 15 seconds then the service is unavailable
42   context.setReceiveTimeout(15 seconds)
43  
44   def receive = {
45     case Progress(percent) =>
46       log.info("Current progress: {} %", percent)
47       if (percent >= 100.0) {
48         log.info("That’s all, shutting down")
49         context.system.shutdown()
50       }
51     case ReceiveTimeout =>
52       // No progress within 15 seconds, ServiceUnavailable
53       log.error("Shutting down due to unavailable service")
54       context.system.shutdown()
55   }
56 }
57  
58 object Worker {
59  
60   case object Start
61  
62   case object Do
63  
64   final case class Progress(percent: Double)
65  
66 }
67  
68 /**
69  * Worker performs some work when it receives the ‘Start‘ message.
70  * It will continuously notify the sender of the ‘Start‘ message
71  * of current ‘‘Progress‘‘. The ‘Worker‘ supervise the ‘CounterService‘.
72  */
73 class Worker extends Actor with ActorLogging {
74  
75   import Worker._
76   import CounterService._
77  
78   implicit val askTimeout = Timeout(5 seconds)
79   // Stop the CounterService child if it throws ServiceUnavailable
80   override val supervisorStrategy = OneForOneStrategy() {
81     case _: CounterService.ServiceUnavailable => Stop
82   }
83   // The sender of the initial Start message will continuously be notified
84   // about progress
85   var progressListener: Option[ActorRef] = None
86   val counterService = context.actorOf(Props[CounterService], name ="counter")
87   val totalCount = 51
88  
89   import context.dispatcher
90  
91   // Use this Actors’ Dispatcher as ExecutionContext
92   def receive = LoggingReceive {
93     case Start if progressListener.isEmpty =>
94       progressListener = Some(sender())
95       context.system.scheduler.schedule(Duration.Zero, 1 second, self, Do)
96     case Do =>
97       counterService ! Increment(1)
98       counterService ! Increment(1)
99       counterService ! Increment(1)
100       // Send current progress to the initial sender
101       counterService ? GetCurrentCount map {
102         case CurrentCount(_, count) => Progress(100.0 * count / totalCount)
103       } pipeTo progressListener.get
104   }
105 }
106  
107 object CounterService {
108  
109   final case class Increment(n: Int)
110  
111   case object GetCurrentCount
112  
113   final case class CurrentCount(key: String, count: Long)
114  
115   class ServiceUnavailable(msg: String) extends RuntimeException(msg)
116  
117   private case object Reconnect
118  
119 }
120  
121 /**
122  * Adds the value received in ‘Increment‘ message to a persistent
123  * counter. Replies with ‘CurrentCount‘ when it is asked for ‘CurrentCount‘.
124  * ‘CounterService‘ supervise ‘Storage‘ and ‘Counter‘.
125  */
126 class CounterService extends Actor {
127  
128   import CounterService._
129   import Counter._
130   import Storage._
131  
132   // Restart the storage child when StorageException is thrown.
133   // After 3 restarts within 5 seconds it will be stopped.
134   override val supervisorStrategy = OneForOneStrategy(maxNrOfRetries = 3,
135     withinTimeRange = 5 seconds) {
136     case _: Storage.StorageException => Restart
137   }
138   val key = self.path.name
139   var storage: Option[ActorRef] = None
140   var counter: Option[ActorRef] = None
141   var backlog = IndexedSeq.empty[(ActorRef, Any)]
142   val MaxBacklog = 10000
143  
144   import context.dispatcher
145  
146   // Use this Actors’ Dispatcher as ExecutionContext
147   override def preStart() {
148     initStorage()
149   }
150  
151   /**
152    * The child storage is restarted in case of failure, but after 3 restarts,
153    * and still failing it will be stopped. Better to back-off than continuously
154    * failing. When it has been stopped we will schedule a Reconnect after a delay.
155    * Watch the child so we receive Terminated message when it has been terminated.
156    */
157   def initStorage() {
158     storage = Some(context.watch(context.actorOf(Props[Storage], name ="storage")))
159     // Tell the counter, if any, to use the new storage
160     counter foreach {
161       _ ! UseStorage(storage)
162     }
163     // We need the initial value to be able to operate
164     storage.get ! Get(key)
165   }
166  
167   def receive = LoggingReceive {
168     case Entry(k, v) if == key && counter == None =>
169       // Reply from Storage of the initial value, now we can create the Counter
170       val = context.actorOf(Props(classOf[Counter], key, v))
171       counter = Some(c)
172       // Tell the counter to use current storage
173       c ! UseStorage(storage)
174       // and send the buffered backlog to the counter
175       for ((replyTo, msg) <- backlog) c.tell(msg, sender = replyTo)
176       backlog = IndexedSeq.empty
177     case msg@Increment(n) => forwardOrPlaceInBacklog(msg)
178  
179     case msg@GetCurrentCount => forwardOrPlaceInBacklog(msg)
180     case Terminated(actorRef) if Some(actorRef) == storage =>
181       // After 3 restarts the storage child is stopped.
182       // We receive Terminated because we watch the child, see initStorage.
183       storage = None
184       // Tell the counter that there is no storage for the moment
185       counter foreach {
186         _ ! UseStorage(None)
187       }
188       // Try to re-establish storage after while
189       context.system.scheduler.scheduleOnce(10 seconds, self, Reconnect)
190     case Reconnect =>
191       // Re-establish storage after the scheduled delay
192       initStorage()
193   }
194  
195   def forwardOrPlaceInBacklog(msg: Any) {
196     // We need the initial value from storage before we can start delegate to
197     // the counter. Before that we place the messages in a backlog, to be sent
198     // to the counter when it is initialized.
199     counter match {
200       case Some(c) => c forward msg
201       case None =>
202         if (backlog.size >= MaxBacklog)
203           throw new ServiceUnavailable(
204             "CounterService not available, lack of initial value")
205         backlog :+= (sender() -> msg)
206     }
207   }
208 }
209  
210 object Counter {
211  
212   final case class UseStorage(storage: Option[ActorRef])
213  
214 }
215  
216 /**
217  * The in memory count variable that will send current
218  * value to the ‘Storage‘, if there is any storage
219  * available at the moment.
220  */
221 class Counter(key: String, initialValue: Long) extends Actor {
222  
223   import Counter._
224   import CounterService._
225   import Storage._
226  
227   var count = initialValue
228   var storage: Option[ActorRef] = None
229  
230   def receive = LoggingReceive {
231     case UseStorage(s) =>
232       storage = s
233       storeCount()
234     case Increment(n) =>
235       count += n
236       storeCount()
237     case GetCurrentCount =>
238       sender() ! CurrentCount(key, count)
239   }
240  
241   def storeCount() {
242     // Delegate dangerous work, to protect our valuable state.
243     // We can continue without storage.
244     storage foreach {
245       _ ! Store(Entry(key, count))
246     }
247   }
248 }
249  
250 object DummyDB {
251  
252   import Storage.StorageException
253  
254   private var db = Map[String, Long]()
255  
256   @throws(classOf[StorageException])
257   def save(key: String, value: Long): Unit = synchronized {
258     if (11 <= value && value <= 14)
259       throw new StorageException("Simulated store failure " + value)
260     db += (key -> value)
261   }
262  
263   @throws(classOf[StorageException])
264   def load(key: String): Option[Long] = synchronized {
265     db.get(key)
266   }
267 }
268  
269 object Storage {
270  
271   final case class Store(entry: Entry)
272  
273   final case class Get(key: String)
274  
275   final case class Entry(key: String, value: Long)
276  
277   class StorageException(msg: String) extends RuntimeException(msg)
278  
279 }
280  
281 /**
282  * Saves key/value pairs to persistent storage when receiving ‘Store‘ message.
283  * Replies with current value when receiving ‘Get‘ message.
284  * Will throw StorageException if the underlying data store is out of order.
285  */
286 class Storage extends Actor {
287  
288   import Storage._
289  
290   val db = DummyDB
291  
292   def receive = LoggingReceive {
293     case Store(Entry(key, count)) => db.save(key, count)
294     case Get(key) => sender() ! Entry(key, db.load(key).getOrElse(0L))
295   }
296 }

这个例子定义了五个Actor,分别是Worker, Listener, CounterService ,Counter 和 Storage,下图给出了系统正常运行时的流程(无错误发生的情况):

其中Worker是CounterService的父Actor(管理员),CounterService是Counter和Storage的父Actor(管理员)图中浅红色,白色代表引用,其中Worker引用了Listener,Listener也引用了Worker,它们之间不存在父子关系,同样Counter也引用了Storage,但Counter不是Storage的管理员。

正常流程如下:

步骤 描述
1 progress Listener 通知Worker开始工作.
2 Worker通过定时发送Do消息给自己来完成工作
3,4,5 Worker接受到Do消息时,通知其子Actor CounterService 三次递增计数器,

CounterService 将Increment消息转发给Counter,它将递增计数器变量然后把当前值发送给Storeage保存

6,7  Workier询问CounterService 当前计数器的值,然后通过管道把结果传给Listener

下图给出系统出错的情况,例子中Worker和CounterService作为管理员分别定义了两个管理策略,Worker在收到CounterService 的ServiceUnaviable上终止CounterService的运行,而CounterService在收到StorageException时重启Storage。

出错时的流程

步骤 描述
1  Storage抛出StorageException异常
2  Storage的管理员CounterService根据策略在接受到StorageException异常后重启Storage
3,4,5,6  Storage继续出错并重启
7  如果在5秒钟之内Storage出错三次并重启,其管理员(CounterService)就终止Storage运行
8  CounterService 同时监听Storage的Terminated消息,它在Storeage终止后接受到Terminated消息
9,10,11  并且通知Counter 暂时没有Storage
12  CounterService 延时一段时间给自己发生Reconnect消息
13,14  当它收到Reconnect消息时,重新创建一个Storage
15,16  然后通知Counter使用新的Storage

这里给出运行的一个日志供参考。

时间: 2024-08-03 18:53:43

Akka 编程(20):容错处理(一)的相关文章

Akka 编程(14): Become/Unbecome

Akka支持Actor消息循环处理部分的热切换,调用context.become方法可以使用新的消息循环处理替换当前的消息处理器,被替换的消息处理器被压到一个栈结构,支持消息处理器的出栈和入栈.注:但Actor重启时,它的消息循环处理恢复到初始的行为.become方法的参数类型为部分函数PartialFunction[Any, Unit],例如: 1 import akka.actor.Actor 2 import akka.actor.ActorSystem 3 import akka.act

Java Swing界面编程(20)---多行文本输入组件:JTextArea

如果要输入多行文本,则可以使用JTextArea实现多行文本的输入. package com.beyole.util; import javax.swing.JFrame; import javax.swing.JLabel; import javax.swing.JTextArea; public class test19 { public static void main(String[] args) { JFrame frame = new JFrame("Crystal");/

Akka 编程: 什么是Actor

上一篇我们简单介绍了Actor系统,说明了Actor之间存在着层次关系,它也是构成Actor应用的最基本的单位.本篇介绍Actor本身的一些基本概念.一个Actor包含了State(状态),Behavior(行为),一个Mailbox(邮箱)和Supervisor Strategy (管理员策略),所有这些都封装在一个Actor引用之中(Actor Reference).Actor 引用一个Actor对象需要和外界隔离开来才能构成一个Actor模型,因此从外部看,一个Actor对象由一个Acto

并发编程 20—— 原子变量和非阻塞同步机制

并发编程 01—— ConcurrentHashMap 并发编程 02—— 阻塞队列和生产者-消费者模式 并发编程 03—— 闭锁CountDownLatch 与 栅栏CyclicBarrier 并发编程 04—— Callable和Future 并发编程 05—— CompletionService : Executor 和 BlockingQueue 并发编程 06—— 任务取消 并发编程 07—— 任务取消 之 中断 并发编程 08—— 任务取消 之 停止基于线程的服务 并发编程 09——

2018 校招在线编程 20题-01

1. 最大乘积(拼多多) 输入 3 4 1 2 输出 24 解题思路: 定义五个数,一个最大,一个次大,一个第三大,一个最小,一个次小.只要找到这五个数,问题就解决了.因为最大乘积只可能是最大*(次大*第三大) 或者是 最大*(最小*次小).时间复杂度O(n),空间复杂度O(1).PS:这道题输入有问题,题目给的样例是直接给了一组数,而此时用例先给了一个数的个数n,然后再给了一组数. 1 #include<bits/stdc++.h> 2 using namespace std; 3 4 in

Java高效编程(20) - 注解

built-in annotations, defined in java.lang:@Override@Deprecated@SuppressWarnings The meta-annotations are for annotating annotations: @Target(ElementType.METHOD) @Retention(RetentionPolicy.RUNTIME) public @interface Test {} ///:~

Linux编程 20 shell编程(shell脚本创建,echo显示信息)

一概述 前面19章里已经掌握了linux系统和命令行的基础知识,从本章开始继续学习shell脚本的基础知识.在大量编辑shell脚本前,先来学习下一些基本概念. 1.1    使用多个命令 Shell可以让多个命令串起来,一次执行完成,可以把它们放在同一行中,用分号隔开,如下所示: 上面就是一个简单的脚本了,用到了两个bash shell命令,但每次运行之前,都必须在命令提示符下输入整个命令.如果将命令组合成一个简单的文本文件,需要时运行这个文本文件就行了. 1.2    创建shell 脚本文

Scala-Unit7-Scala并发编程模型AKKA

一.Akka简介 Akka时spark的底层通信框架,Hadoop的底层通信框架时rpc. 并发的程序编写很难,但是Akka解决了spark的这个问题. Akka构建在JVM平台上,是一种高并发.分布式.并且容错的应用工具包: Akka使用Scala语言编写,同时它提供了Scala和Java的开发接口,Akka可以开发一些高并发的程序. 二.Akka的Acor模型 A卡卡处理并发的方法基于actor模型,在基于actor的系统中,所有事物都是actor(类似于Java的万物皆对象): actor

《Python核心编程》 第3版 中文版pdf

下载地址:网盘下载 内容简介 编辑 Python是一种功能十分强大的面向对象编程语言,可以用于编写独立程序.快速脚本和复杂应用的原型.作为一种开源软件,Python可以自由获取,而且非常易学易用.本书是Python语言的经典入门读本,由两名顶尖的Python技术专家兼培训专家联手撰写,涵盖了该语言的所有核心内容.所有练习的解答都可在书后找到. 本书描述了Python程序的基本构件:类型.操作符.语句.函数.模块.类以及异常和介绍了更多高级主题,包括复杂的实例.无论是用于编写简单的脚本,还是复杂的