Akka Java 文档 -- 容错

[转自: http://blog.csdn.net/zjw10wei321/article/details/46911825]

容错

正如角色系统所描述的,每个actor都是其子actor的监管者,并且每个actor都定义了故障处理监管策略。这个策略作为角色系统结构的一部分,一经创建后就不能再修改。

实际中的故障处理

首先我们看一个在实际应用中典型故障的案例,演示处理数据存储错误的一种方法。当然这取决于实际的应用中,当存储数据失败时可以做些什么,但在本例中我们使用尽量重新连接的方法。 
阅读下面的源代码。代码内部的注释解释了各个片段的故障处理以及为什么添加它们。强烈推荐运行这个示例,因为很容易就顺着日志的输出,理解在运行时发生了什么。

容错案例图解

 
上图阐述了正常消息流。 
常规流程:

步骤 描述
1 Listener开始干活。
2 Worker定期的向自己发送Do信息来安排工作。
3,4,5 当Worker收到Do信息后,向CounterService发送3条Increment信息去增加计数器。CounterService将Increment信息转发给Counter,由Counter更新计数器变量,并将计数器当前值发送给Storage。
6,7 Worker 请求CounterService获取当前计数器的值,并将结果返回给Listener。

上图阐述了在存储失败时发生了什么 
失败流程:

步骤 描述
1 Storage抛出StorageException。
2 CounterService是Storage的监管者,当StorageException被抛出时并重启Storage。
3,4,5,6 Storage继续失败,继续被重启。
7 Storage在5秒内经历3次失败和重启后,会被其监管者(CounterService)停止。
8 CounterService同样在监视Storage为其终止,在当Storage终止时,接收到Terminated消息。
9,10,11 CounterService这时告诉Counter没有Storage。
12 CounterService计划发送一个Reconnect消息给自己。
13,14 当CounterService接受到Reconnect消息后创建一个新的Storage。
15,16 CounterService告诉Counter用这个新的Storage。

容错案例所有源码

这里就不列出了,详细请看我的github

创建新的监管策略

下面的章节将说明故障处理的原理以及更深层的替代。 
为了达到示范的效果,我们假使有如下的策略:

private static SupervisorStrategy strategy =
  new OneForOneStrategy(10, Duration.create("1 minute"),
    new Function<Throwable, Directive>() {
      @Override
      public Directive apply(Throwable t) {
        if (t instanceof ArithmeticException) {
          return resume();
        } else if (t instanceof NullPointerException) {
          return restart();
        } else if (t instanceof IllegalArgumentException) {
          return stop();
        } else {
          return escalate();
        }
      }
    });

@Override
public SupervisorStrategy supervisorStrategy() {
  return strategy;
}

我选择了几个著名的异常类型演示应用程序中被描述成故障处理指令在监管和监测中的使用。首先,一对一的策略,意味着每一个子actor会被单独的治愈(多对一策略与之相似,唯一不同的是这个策略是针对监管者所有的子actor而不仅仅是出故障的那一个)。这里限制重启的频率,最大程度上每分钟重启10次。-1 和Duration.Inf()的限制并不适用,可以指定一个重启绝对上限或者让重启无限。超过了期限后子actor会被停止。

注意: 
如果这个策略是在监管者角色内部声明的(而不是一个单独的类),决策者可以在线程安全样式下访问角色内部的所有状态,包含获取失败子节点的引用(故障信息中getSender有效)。

默认监管策略

如果定义过的策略没有覆盖到被抛出的异常,Escalate (逐步上升,丢给父监管者)志在必行。 
当actor中没有定义监管策略,如下的异常将会被默认处理掉:

  • ActorInitializationException 会停止掉失败了的子actor。
  • ActorKilledException 会停止掉失败了的子actor。
  • Exception 会重启失败了的子actor。
  • 其他类型的Throwable 将会上升到父actor。

如果异常被上升一路达到根监护者那,在那也会用上述默认策略方式处理掉。

停止监管策略

跟Erlang方式类似的策略是当它们失败的时候只停止子actor,以及当DeathWatch通知丢失的子actor的时候会对监管者采取纠正的动作。

记录actor失败的消息

默认SupervisorStrategy会记录失败信息除非它们会被逐级上升。在高层次机构中处理被逐级上升的错误,并潜在的记录下来。 
在初始化的时候你可以通过设置SupervisorStrategy的loggingEnabled为false用来不激活默认的日志。 在Decider内部可以定制日志。注意获取当前失败的子acotrRef是有效的,当SupervisorStrategy在监管角色中描述,如同getSender一样。 
你可以自定义化日志通过实现SupervisorStrategy重写logFailure方法。

顶层角色的监管

顶层角色意味着是用system.actorOf()创建的,是用户监护人的孩子。 
在这种情况下没有特殊规则应用,监护只适用于配置的策略。

应用测试

下面章节展示了在实际中不同指令的效果,因此咱们需要一个测试环境。首先需要一个合适有效的监管者:

public class Supervisor extends UntypedActor {

  private static SupervisorStrategy strategy =
    new OneForOneStrategy(10, Duration.create("1 minute"),
      new Function<Throwable, Directive>() {
        @Override
        public Directive apply(Throwable t) {
          if (t instanceof ArithmeticException) {
            return resume();
          } else if (t instanceof NullPointerException) {
            return restart();
          } else if (t instanceof IllegalArgumentException) {
            return stop();
          } else {
            return escalate();
          }
        }
      });

  @Override
  public SupervisorStrategy supervisorStrategy() {
    return strategy;
  }

  public void onReceive(Object o) {
    if (o instanceof Props) {
      getSender().tell(getContext().actorOf((Props) o), getSelf());
    } else {
      unhandled(o);
    }
  }
}

此监管者将会创建一个子actor,好让我们可以作如下试验:

public class Child extends UntypedActor {
  int state = 0;

  public void onReceive(Object o) throws Exception {
    if (o instanceof Exception) {
      throw (Exception) o;
    } else if (o instanceof Integer) {
      state = (Integer) o;
    } else if (o.equals("get")) {
      getSender().tell(state, getSelf());
    } else {
      unhandled(o);
    }
  }
}

在测试actor系统中测试使用工具更容易简化,TestProbe提供了有用的角色引用,用来接收和检查消息的回复。

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.SupervisorStrategy;
import static akka.actor.SupervisorStrategy.resume;
import static akka.actor.SupervisorStrategy.restart;
import static akka.actor.SupervisorStrategy.stop;
import static akka.actor.SupervisorStrategy.escalate;
import akka.actor.SupervisorStrategy.Directive;
import akka.actor.OneForOneStrategy;
import akka.actor.Props;
import akka.actor.Terminated;
import akka.actor.UntypedActor;
import scala.collection.immutable.Seq;
import scala.concurrent.Await;
import static akka.pattern.Patterns.ask;
import scala.concurrent.duration.Duration;
import akka.testkit.TestProbe;

public class FaultHandlingTest {
  static ActorSystem system;
  Duration timeout = Duration.create(5, SECONDS);

  @BeforeClass
  public static void start() {
    system = ActorSystem.create("test");
  }

  @AfterClass
  public static void cleanup() {
    JavaTestKit.shutdownActorSystem(system);
    system = null;
  }

  @Test
  public void mustEmploySupervisorStrategy() throws Exception {
    // code here
  }

}

创建角色:

Props superprops = Props.create(Supervisor.class);
ActorRef supervisor = system.actorOf(superprops, "supervisor");
ActorRef child = (ActorRef) Await.result(ask(supervisor,
  Props.create(Child.class), 5000), timeout);

第一个测试将演示resume指令,因此我们尝试在actor中设置一些非初始化的状态,并且让actor出现故障:

child.tell(42, ActorRef.noSender());
assert Await.result(ask(child, "get", 5000), timeout).equals(42);
child.tell(new ArithmeticException(), ActorRef.noSender());
assert Await.result(ask(child, "get", 5000), timeout).equals(42);

你可以看到在错误处理指令完后仍能得到42。现在我们将故障换成更严重的NullPointerException,那将不再是这样的情况:

child.tell(new NullPointerException(), ActorRef.noSender());
assert Await.result(ask(child, "get", 5000), timeout).equals(0);

最后看看最致命的IllegalArgumentException,监管者会终止其子actor:

final TestProbe probe = new TestProbe(system);
probe.watch(child);
child.tell(new IllegalArgumentException(), ActorRef.noSender());
probe.expectMsgClass(Terminated.class);

到目前为止,监管者完全不受子actor故障的影响,因为指令集会处理掉。在Exception情况下,就不会是上述情况了,监管者会将失败情况逐级上升传递:

child = (ActorRef) Await.result(ask(supervisor,
  Props.create(Child.class), 5000), timeout);
probe.watch(child);
assert Await.result(ask(child, "get", 5000), timeout).equals(0);
child.tell(new Exception(), ActorRef.noSender());
probe.expectMsgClass(Terminated.class);

监管者它自己会被ActorSystem提供的顶层actor监管,顶层actor对所有异常(ActorInitializationException 和ActorKilledException异常是例外)使用默认故障策略去重启。因为默认指令的重启是杀死所有子actor,我们预期到这些脆弱的子actor是不会在故障中幸存。

假如这不是所期望的(依赖于实际用例),我们需要使用一个不同的监管者来覆盖它的方法。

public class Supervisor2 extends UntypedActor {

  private static SupervisorStrategy strategy = new OneForOneStrategy(10,
    Duration.create("1 minute"),
      new Function<Throwable, Directive>() {
        @Override
        public Directive apply(Throwable t) {
          if (t instanceof ArithmeticException) {
            return resume();
          } else if (t instanceof NullPointerException) {
            return restart();
          } else if (t instanceof IllegalArgumentException) {
            return stop();
          } else {
            return escalate();
          }
        }
      });

  @Override
  public SupervisorStrategy supervisorStrategy() {
    return strategy;
  }

  public void onReceive(Object o) {
    if (o instanceof Props) {
      getSender().tell(getContext().actorOf((Props) o), getSelf());
    } else {
      unhandled(o);
    }
  }

  @Override
  public void preRestart(Throwable cause, Option<Object> msg) {
    // do not kill all children, which is the default here
  }
}

在这个父actor下,子actor会逐步上升中的重启而幸存,如下的最后测试:

superprops = Props.create(Supervisor2.class);
supervisor = system.actorOf(superprops);
child = (ActorRef) Await.result(ask(supervisor,
  Props.create(Child.class), 5000), timeout);
child.tell(23, ActorRef.noSender());
assert Await.result(ask(child, "get", 5000), timeout).equals(23);
child.tell(new Exception(), ActorRef.noSender());
assert Await.result(ask(child, "get", 5000), timeout).equals(0);
时间: 2024-10-25 20:45:37

Akka Java 文档 -- 容错的相关文章

关于如何生成Java文档

一.生成Java文档,要明白Java注释内容,如果不知道,可以去看上一篇文章. 二.Java系统提供的javadoc工具可以根据程序结构自动生成注释文档.当程序修改时可方便及时更新生成的注释文档.javadoc.exe工具存在于JDK的bin目录下,使用javadoc将读取.java源文件中的文档注释,并按照一定的规则与Java源程序一起进行编译,生成文档. 三.javadoc常用标记: (上面截图来自javadoc的百度百科,读者也可以自己去百度百科看),除了这些,还有一些不常用的标记,这里就

java文档注释 编写格式

java 文档注释 在sun主页上有java文档注释的编写格式 How to Write Doc Comments for the Javadoc Tool http://java.sun.com/j2se/javadoc/writingdoccomments/ 不过是英文的 @author dfeixtay @version 0.10 2010-10-04 1 注释文档的格式注释文档将用来生成HTML格式的代码报告,所以注释文档必须书写在类.域.构造函数.方法.定义之前.注释文档由两部分组成—

JAVA 文档注释,类的说明,HTML说明文档的生成

有的时候,我们会写一些类,编译成.class文件,给别人使用,那么,别人不知道这个类有哪些方法,如何调用. 所以我们需要做一个类的说明文档. 可以采用在.java类里面进行注释,通过注释来生成类的说明文档的方法. 一..java中注释的写法: Test1.java /* 文档注释 */ /** 此类是对数组进行取最值,排序等操作的 @author 张三 @version 1.0 */ public class Test1{ /** 取Int数组里面的最大值 @param arr 传入一个int数

如何为我们的程序编写开发文档——Java文档注释

Java文档注释是用于生成Java API文档的注释,通过在程序中的类.属性.方法部分加上注释,就可以用javadoc命令生成漂亮的API文档,是程序员进阶的必备技能. 注意,文档注释只说明紧跟其后的类.属性或者方法. Javadoc文档生成命令为: -author和-version可以省略. 根据在文档中显示的效果,文档注释分为三部分.举例如下: 第一部分是简述.如下图中被红框框选的部分: 简述部分写在一段文档注释的最前面,第一个点号 (.) 之前 (包括点号). 第二部分是详细说明部分.该部

Java文档查看

对于Java学习者来说,阅读Java文档是必不可少的步骤,比如我现在想知道List接口的retianAll()方法,该怎么办呢? 当然是百度了!!! 皮一下,当然是查找Java文档了,以JDK1.7版本为例,首先找到Java文档的网址:https://docs.oracle.com/javase/7/docs/index.html,接着找到Java SE API 其他版本都一样的,例如JDK1.8: 或者是最新版的JDK11: 点击进入后找到java.util,在Interface Summar

Java - 34 Java 文档注释

Java 文档注释 Java只是三种注释方式.前两种分别是// 和/* */,第三种被称作说明注释,它以/** 开始,以 */结束. 说明注释允许你在程序中嵌入关于程序的信息.你可以使用javadoc工具软件来生成信息,并输出到HTML文件中. 说明注释,使你更加方便的记录你的程序的信息. javadoc 标签 javadoc工具软件识别以下标签: 标签 描述 示例 @author 标识一个类的作者 @author description @deprecated 指名一个过期的类或成员 @dep

Java文档注释

文档注释是用于生成API文档,API主要用于说明类.方法.成员变量 javadoc工具 处理文档源文件在类.接口.方法.成员变量.构造器和内部类之前的注释,忽略其他地方的文档注释.而且javadoc工具默认只处理以public或protected修饰的类.接口.方法.成员变量.构造器和内部类之前的文档注释. 如果开发者希望javadoc工具可以提取private修饰的内容,则可以使用javadoc工具是增加-private选项 javadoc命令的基本用法如下: javadoc 选项 Java源

jar -- java文档归档工具

参考文档 http://docs.oracle.com/javase/7/docs/technotes/tools/solaris/jar.html http://blog.chinaunix.net/uid-692788-id-2681132.html 功能说明:Java归档工具 语法:jar [ 命令选项 ] [manifest] destination input-file [input-files] 补充说明: jar工具是个java应用程序,可将多个文件合并为单个JAR归档文件.jar

Java 文档注释

Java只是三种注释方式.前两种分别是// 和/* */,第三种被称作说明注释,它以/** 开始,以 */结束. 说明注释允许你在程序中嵌入关于程序的信息.你可以使用javadoc工具软件来生成信息,并输出到HTML文件中. 说明注释,是你更加方面的记录你的程序的信息. javadoc 标签 javadoc工具软件识别以下标签: 标签 描述 示例 @author 标识一个类的作者 @author description @deprecated 指名一个过期的类或成员 @deprecated de