1.mailbox
Akka的每个actor默认有一个mailbox,按照FIFO顺序单线程处理。在抛出异常导致父actor根据设置的监管策略执行重启或恢复操作时,会从触发异常的消息的后续消息开始处理,邮箱并不会被清空。如果你想重新处理那个触发异常的消息,可以通过重写preRestart方法来访问该消息,java 中的preRestart参数为(Throwable reason, Option<Object> message),message.get()可以获得该消息(因为是从Option对象中get,所以可能为空),可以将该消息再次发给自己或做其它处理。
默认邮箱的大小没有限制,也就是内存的上限。可以设置bounded邮箱来限定大小,还可以设置邮箱以文件形式持久存储。
2.监管策略设置
1)在actor类中重写supervisorStrategy()
2)创建父actor时在Props参数中使用FromConfig.getInstance().withSupervisorStrategy(strategy).props(XXX)
可以使用下面的类来方便设置:
import akka.actor.AllForOneStrategy; import akka.actor.OneForOneStrategy; import akka.actor.SupervisorStrategy; import akka.japi.Function; import scala.concurrent.duration.Duration; import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeUnit; import static akka.actor.SupervisorStrategy.escalate; /** * Created by fyk on 16-4-2. */ public class StrategySetter { private Map<Class<? extends Throwable>, SupervisorStrategy.Directive> map; private boolean oneForOne; private int maxNrOfRetries=5; private Duration withinTimeRange=Duration.create(1, TimeUnit.MINUTES);//Duration.create("1 minute") public StrategySetter(boolean oneForOne) { this.oneForOne=oneForOne; map=new HashMap<Class<? extends Throwable>, SupervisorStrategy.Directive>(); } public void setOptParam(int maxNrOfRetries,Duration withinTimeRange){ this.maxNrOfRetries=maxNrOfRetries; this.withinTimeRange=withinTimeRange; } public void put(Class<? extends Throwable> t, SupervisorStrategy.Directive action){ map.put(t,action); } /** * 设定监管策略并返回 * cls.isInstance(yourObject) * instead of using the instanceof operator, which can only be used if you know the class at compile time. */ public SupervisorStrategy getSupervisorStrategy(){ SupervisorStrategy strategy=null; if(oneForOne){ strategy=new OneForOneStrategy(maxNrOfRetries, withinTimeRange, new Function<Throwable, SupervisorStrategy.Directive>() { @Override public SupervisorStrategy.Directive apply(Throwable t) { for(Class c:map.keySet()){ if(c.isInstance(t)) return map.get(c); } return escalate();//提交给上一级监管 } }); }else{ strategy=new AllForOneStrategy(maxNrOfRetries, withinTimeRange, new Function<Throwable, SupervisorStrategy.Directive>() { @Override public SupervisorStrategy.Directive apply(Throwable t) { for(Class c:map.keySet()){ if(c.isInstance(t)) return map.get(c); } return escalate();//提交给上一级监管 } }); } return strategy; } }
3.continue...
时间: 2024-12-25 18:10:00