我们在前面介绍Actor系统时说过每个Actor都是其子Actor的管理员,并且每个Actor定义了发生错误时的管理策略,策略一旦定义好,之后不能修改,就像是Actor系统不可分割的一部分。
实用错误处理
首先我们来看一个例子来显示一种处理数据存储错误的情况,这是现实中一个应用可能出现的典型错误。当然实际的应用可能针对数据源不存在时有不同的处理,这里我们使用重新连接的处理方法。
下面是例子的源码,比较长,需要仔细阅读,最好是实际运行,参考日志来理解:
2 |
import akka.actor.SupervisorStrategy. _ |
3 |
import scala.concurrent.duration. _ |
4 |
import akka.util.Timeout |
5 |
import akka.event.LoggingReceive |
6 |
import akka.pattern.{ask, pipe} |
7 |
import com.typesafe.config.ConfigFactory |
12 |
object FaultHandlingDocSample extends App { |
16 |
val config = ConfigFactory.parseString( "" " |
17 |
akka.loglevel = " DEBUG " |
24 |
val system = ActorSystem( "FaultToleranceSample" , config) |
25 |
val worker = system.actorOf(Props[Worker], name = "worker" ) |
26 |
val listener = system.actorOf(Props[Listener], name = "listener" ) |
27 |
// start the work and listen on progress |
28 |
// note that the listener is used as sender of the tell, |
29 |
// i.e. it will receive replies from the worker |
30 |
worker.tell(Start, sender = listener) |
34 |
* Listens on progress from the worker and shuts down the system when enough |
37 |
class Listener extends Actor with ActorLogging { |
41 |
// If we don’t get any progress within 15 seconds then the service is unavailable |
42 |
context.setReceiveTimeout( 15 seconds) |
45 |
case Progress(percent) = > |
46 |
log.info( "Current progress: {} %" , percent) |
47 |
if (percent > = 100.0 ) { |
48 |
log.info( "That’s all, shutting down" ) |
49 |
context.system.shutdown() |
51 |
case ReceiveTimeout = > |
52 |
// No progress within 15 seconds, ServiceUnavailable |
53 |
log.error( "Shutting down due to unavailable service" ) |
54 |
context.system.shutdown() |
64 |
final case class Progress(percent : Double) |
69 |
* Worker performs some work when it receives the ‘Start‘ message. |
70 |
* It will continuously notify the sender of the ‘Start‘ message |
71 |
* of current ‘‘Progress‘‘. The ‘Worker‘ supervise the ‘CounterService‘. |
73 |
class Worker extends Actor with ActorLogging { |
76 |
import CounterService. _ |
78 |
implicit val askTimeout = Timeout( 5 seconds) |
79 |
// Stop the CounterService child if it throws ServiceUnavailable |
80 |
override val supervisorStrategy = OneForOneStrategy() { |
81 |
case _: CounterService.ServiceUnavailable = > Stop |
83 |
// The sender of the initial Start message will continuously be notified |
85 |
var progressListener : Option[ActorRef] = None |
86 |
val counterService = context.actorOf(Props[CounterService], name = "counter" ) |
89 |
import context.dispatcher |
91 |
// Use this Actors’ Dispatcher as ExecutionContext |
92 |
def receive = LoggingReceive { |
93 |
case Start if progressListener.isEmpty = > |
94 |
progressListener = Some(sender()) |
95 |
context.system.scheduler.schedule(Duration.Zero, 1 second, self, Do) |
97 |
counterService ! Increment( 1 ) |
98 |
counterService ! Increment( 1 ) |
99 |
counterService ! Increment( 1 ) |
100 |
// Send current progress to the initial sender |
101 |
counterService ? GetCurrentCount map { |
102 |
case CurrentCount( _ , count) = > Progress( 100.0 * count / totalCount) |
103 |
} pipeTo progressListener.get |
107 |
object CounterService { |
109 |
final case class Increment(n : Int) |
111 |
case object GetCurrentCount |
113 |
final case class CurrentCount(key : String, count : Long) |
115 |
class ServiceUnavailable(msg : String) extends RuntimeException(msg) |
117 |
private case object Reconnect |
122 |
* Adds the value received in ‘Increment‘ message to a persistent |
123 |
* counter. Replies with ‘CurrentCount‘ when it is asked for ‘CurrentCount‘. |
124 |
* ‘CounterService‘ supervise ‘Storage‘ and ‘Counter‘. |
126 |
class CounterService extends Actor { |
128 |
import CounterService. _ |
132 |
// Restart the storage child when StorageException is thrown. |
133 |
// After 3 restarts within 5 seconds it will be stopped. |
134 |
override val supervisorStrategy = OneForOneStrategy(maxNrOfRetries = 3 , |
135 |
withinTimeRange = 5 seconds) { |
136 |
case _: Storage.StorageException = > Restart |
138 |
val key = self.path.name |
139 |
var storage : Option[ActorRef] = None |
140 |
var counter : Option[ActorRef] = None |
141 |
var backlog = IndexedSeq.empty[(ActorRef, Any)] |
142 |
val MaxBacklog = 10000 |
144 |
import context.dispatcher |
146 |
// Use this Actors’ Dispatcher as ExecutionContext |
147 |
override def preStart() { |
152 |
* The child storage is restarted in case of failure, but after 3 restarts, |
153 |
* and still failing it will be stopped. Better to back-off than continuously |
154 |
* failing. When it has been stopped we will schedule a Reconnect after a delay. |
155 |
* Watch the child so we receive Terminated message when it has been terminated. |
158 |
storage = Some(context.watch(context.actorOf(Props[Storage], name = "storage" ))) |
159 |
// Tell the counter, if any, to use the new storage |
161 |
_ ! UseStorage(storage) |
163 |
// We need the initial value to be able to operate |
164 |
storage.get ! Get(key) |
167 |
def receive = LoggingReceive { |
168 |
case Entry(k, v) if k == key && counter == None = > |
169 |
// Reply from Storage of the initial value, now we can create the Counter |
170 |
val c = context.actorOf(Props(classOf[Counter], key, v)) |
172 |
// Tell the counter to use current storage |
173 |
c ! UseStorage(storage) |
174 |
// and send the buffered backlog to the counter |
175 |
for ((replyTo, msg) <- backlog) c.tell(msg, sender = replyTo) |
176 |
backlog = IndexedSeq.empty |
177 |
case msg @ Increment(n) = > forwardOrPlaceInBacklog(msg) |
179 |
case msg @ GetCurrentCount = > forwardOrPlaceInBacklog(msg) |
180 |
case Terminated(actorRef) if Some(actorRef) == storage = > |
181 |
// After 3 restarts the storage child is stopped. |
182 |
// We receive Terminated because we watch the child, see initStorage. |
184 |
// Tell the counter that there is no storage for the moment |
188 |
// Try to re-establish storage after while |
189 |
context.system.scheduler.scheduleOnce( 10 seconds, self, Reconnect) |
191 |
// Re-establish storage after the scheduled delay |
195 |
def forwardOrPlaceInBacklog(msg : Any) { |
196 |
// We need the initial value from storage before we can start delegate to |
197 |
// the counter. Before that we place the messages in a backlog, to be sent |
198 |
// to the counter when it is initialized. |
200 |
case Some(c) = > c forward msg |
202 |
if (backlog.size > = MaxBacklog) |
203 |
throw new ServiceUnavailable( |
204 |
"CounterService not available, lack of initial value" ) |
205 |
backlog : + = (sender() -> msg) |
212 |
final case class UseStorage(storage : Option[ActorRef]) |
217 |
* The in memory count variable that will send current |
218 |
* value to the ‘Storage‘, if there is any storage |
219 |
* available at the moment. |
221 |
class Counter(key : String, initialValue : Long) extends Actor { |
224 |
import CounterService. _ |
227 |
var count = initialValue |
228 |
var storage : Option[ActorRef] = None |
230 |
def receive = LoggingReceive { |
231 |
case UseStorage(s) = > |
237 |
case GetCurrentCount = > |
238 |
sender() ! CurrentCount(key, count) |
242 |
// Delegate dangerous work, to protect our valuable state. |
243 |
// We can continue without storage. |
245 |
_ ! Store(Entry(key, count)) |
252 |
import Storage.StorageException |
254 |
private var db = Map[String, Long]() |
256 |
@ throws(classOf[StorageException]) |
257 |
def save(key : String, value : Long) : Unit = synchronized { |
258 |
if ( 11 < = value && value < = 14 ) |
259 |
throw new StorageException( "Simulated store failure " + value) |
263 |
@ throws(classOf[StorageException]) |
264 |
def load(key : String) : Option[Long] = synchronized { |
271 |
final case class Store(entry : Entry) |
273 |
final case class Get(key : String) |
275 |
final case class Entry(key : String, value : Long) |
277 |
class StorageException(msg : String) extends RuntimeException(msg) |
282 |
* Saves key/value pairs to persistent storage when receiving ‘Store‘ message. |
283 |
* Replies with current value when receiving ‘Get‘ message. |
284 |
* Will throw StorageException if the underlying data store is out of order. |
286 |
class Storage extends Actor { |
292 |
def receive = LoggingReceive { |
293 |
case Store(Entry(key, count)) = > db.save(key, count) |
294 |
case Get(key) = > sender() ! Entry(key, db.load(key).getOrElse( 0 L)) |
这个例子定义了五个Actor,分别是Worker, Listener, CounterService ,Counter 和 Storage,下图给出了系统正常运行时的流程(无错误发生的情况):
其中Worker是CounterService的父Actor(管理员),CounterService是Counter和Storage的父Actor(管理员)图中浅红色,白色代表引用,其中Worker引用了Listener,Listener也引用了Worker,它们之间不存在父子关系,同样Counter也引用了Storage,但Counter不是Storage的管理员。
正常流程如下:
步骤 |
描述 |
1 |
progress Listener 通知Worker开始工作. |
2 |
Worker通过定时发送Do消息给自己来完成工作 |
3,4,5 |
Worker接受到Do消息时,通知其子Actor CounterService 三次递增计数器,
CounterService 将Increment消息转发给Counter,它将递增计数器变量然后把当前值发送给Storeage保存
|
6,7 |
Workier询问CounterService 当前计数器的值,然后通过管道把结果传给Listener |
下图给出系统出错的情况,例子中Worker和CounterService作为管理员分别定义了两个管理策略,Worker在收到CounterService 的ServiceUnaviable上终止CounterService的运行,而CounterService在收到StorageException时重启Storage。
出错时的流程
步骤 |
描述 |
1 |
Storage抛出StorageException异常 |
2 |
Storage的管理员CounterService根据策略在接受到StorageException异常后重启Storage |
3,4,5,6 |
Storage继续出错并重启 |
7 |
如果在5秒钟之内Storage出错三次并重启,其管理员(CounterService)就终止Storage运行 |
8 |
CounterService 同时监听Storage的Terminated消息,它在Storeage终止后接受到Terminated消息 |
9,10,11 |
并且通知Counter 暂时没有Storage |
12 |
CounterService 延时一段时间给自己发生Reconnect消息 |
13,14 |
当它收到Reconnect消息时,重新创建一个Storage |
15,16 |
然后通知Counter使用新的Storage |
这里给出运行的一个日志供参考。
时间: 2024-10-09 01:15:30