正如角色系统这一章中解释的一样,每一个角色都是它孩子的监管者,并且像这样的角色都会定义错误处理监管策略。这个策略在成为角色系统结构的一个完整部分之后是不能被改变的。
错误处理实践
首先,让我们看一个处理数据存储错误的例子,它是实践应用中一个典型的错误根源。当然它基于真实应用,这个应用的数据存储有可能是无效的,但我们在这个例子中会用一个最有效的重连方法来实现。
读下面的源代码。内嵌的注释解释了错误处理的不同块和为什么要添加它们。强烈的建议去运行这个例子,这样才能更简单的去跟踪这个日志输出,来了解运行的时候发生了什么。
容错例子的图解()
容错例子的全部源代码()
创建一个监管策略
下面的章节解释了错误处理机制和更深入的选择。
根据示范的目的,让我们来考虑如下策略:
01 | private static SupervisorStrategy strategy = |
02 | new OneForOneStrategy( 10 , Duration.create( "1 minute" ), |
03 | new Function<Throwable, Directive>() { |
05 | public Directive apply(Throwable t) { |
06 | if (t instanceof ArithmeticException) { |
08 | } else if (t instanceof NullPointerException) { |
10 | } else if (t instanceof IllegalArgumentException) { |
19 | public SupervisorStrategy supervisorStrategy() { |
我选择了一些大家熟知的异常类型,是为了展示在监管和监视章节中描述的错误处理指令的应用。首先,它是一个一对一的策略,意味着每一个孩子都是分开处理的(多对一的策略工作非常类似,唯一的不同就是任何决策都会应用到监管者的所有孩子,不仅仅是发生错误的那个)。在重启频率上会有一些限制,最大是每分钟重启10次。-1和Duration.Inf()意味着限制没有应用,抛开这个可能性,去指定一个绝对的上限或者去让这个重启的工作没有上限。当超出这个限制,孩子角色就被停止。
注意:如果策略在监管角色(而不是一个单独的类)中描述了,它的决策者可以在线程安全的形势下访问角色的所有内部状态,包括获得当前发生错误的孩子的引用(例如错误消息的getSender)。
默认监管策略
如果定义的策略没有覆盖抛出的异常,Escalate会被使用。当没有为一个角色定义监管策略,如下的异常会按照默认来处理:
1. ActorInitializationException会停止发生错误的子角色
2. ActorKilledException会停止发生错误的子角色
3. Exception会重启发生错误的子角色
4. 别的抛出类型会升级到父角色
如果异常升级到根监管者,会按上述的默认策略处理。
停止监管策略
跟Erlang方式类似的策略是当它们失败的时候只停止子角色,以及当DeathWatch通知丢失的子角色的时候会对监管者采取正确的动作。
记录角色失败的信息
默认SupervisorStrategy会记录失败信息除非它们被向上升级。建议在更高层次的结构中处理上升的错误,并潜在的记录下来。
在初始化的时候你可以通过设置SupervisorStrategy的loggingEnabled为false来去掉默认的日志。可以在Decider里定制日志。注意如getSender一样,当SupervisorStrategy在监管角色中描述,当前失败的子角色引用是有效的。
你可以通重写logFailure方法在你自己的SupervisorStrategy实现中定制化日志。
最高层次角色的监管
最高层次角色意味着它们是通过system.actorOf()创建的,并且它们是User Guardian的孩子。在这种情况下没有特定的规则,守护者仅仅应用配置策略。
测试应用
下面章节展示了实践中不同指令的效果,wherefor测试启动是需要的。首先,我们需要一个匹配的监管者。
01 | public class Supervisor extends UntypedActor { |
03 | private static SupervisorStrategy strategy = |
04 | new OneForOneStrategy( 10 , Duration.create( "1 minute" ), |
05 | new Function<Throwable, Directive>() { |
07 | public Directive apply(Throwable t) { |
08 | if (t instanceof ArithmeticException) { |
10 | } else if (t instanceof NullPointerException) { |
12 | } else if (t instanceof IllegalArgumentException) { |
21 | public SupervisorStrategy supervisorStrategy() { |
25 | public void onReceive(Object o) { |
26 | if (o instanceof Props) { |
27 | getSender().tell(getContext().actorOf((Props) o), getSelf()); |
这个监管者会被用于创建子角色,我们可以实验一下:
01 | public class Child extends UntypedActor { |
04 | public void onReceive(Object o) throws Exception { |
05 | if (o instanceof Exception) { |
07 | } else if (o instanceof Integer) { |
09 | } else if (o.equals( "get" )) { |
10 | getSender().tell(state, getSelf()); |
测试使用Testing Actor Systems里介绍的实用工具会比较简单,TestProbe提供一个Actor Ref用于接收和检查消息回复。
01 | import akka.actor.ActorRef; |
02 | import akka.actor.ActorSystem; |
03 | import akka.actor.SupervisorStrategy; |
04 | import static akka.actor.SupervisorStrategy.resume; |
05 | import static akka.actor.SupervisorStrategy.restart; |
06 | import static akka.actor.SupervisorStrategy.stop; |
07 | import static akka.actor.SupervisorStrategy.escalate; |
08 | import akka.actor.SupervisorStrategy.Directive; |
09 | import akka.actor.OneForOneStrategy; |
10 | import akka.actor.Props; |
11 | import akka.actor.Terminated; |
12 | import akka.actor.UntypedActor; |
13 | import scala.collection.immutable.Seq; |
14 | import scala.concurrent.Await; |
15 | import static akka.pattern.Patterns.ask; |
16 | import scala.concurrent.duration.Duration; |
17 | import akka.testkit.AkkaSpec; |
18 | import akka.testkit.TestProbe; |
20 | public class FaultHandlingTest { |
21 | static ActorSystem system; |
22 | Duration timeout = Duration.create( 5 , SECONDS); |
25 | public static void start() { |
26 | system = ActorSystem.create( "test" , AkkaSpec.testConf()); |
30 | public static void cleanup() { |
31 | JavaTestKit.shutdownActorSystem(system); |
36 | public void mustEmploySupervisorStrategy() throws Exception { |
让我们创建角色
1 | Props superprops = Props.create(Supervisor. class ); |
2 | ActorRef supervisor = system.actorOf(superprops, "supervisor" ); |
3 | ActorRef child = (ActorRef) Await.result(ask(supervisor, |
4 | Props.create(Child. class ), 5000 ), timeout); |
第一个测试会演示Resume指令,所以我们尝试通过把角色的状态设置成非初始化状态并让它失败:
1 | child.tell( 42 , ActorRef.noSender()); |
2 | assert Await.result(ask(child, "get" , 5000 ), timeout).equals( 42 ); |
3 | child.tell( new ArithmeticException(), ActorRef.noSender()); |
4 | assert Await.result(ask(child, "get" , 5000 ), timeout).equals( 42 ); |
你可以看到值42让错误处理指令存活下来。现在,如果我们把错误改成一个更严重的NullPointerException异常,则将不会是这种情况:
1 | child.tell( new NullPointerException(), ActorRef.noSender()); |
2 | assert Await.result(ask(child, "get" , 5000 ), timeout).equals( 0 ); |
发生IllegalArgumentException致命异常的情况下,最终会导致子角色会被监管者中断:
1 | final TestProbe probe = new TestProbe(system); |
3 | child.tell( new IllegalArgumentException(), ActorRef.noSender()); |
4 | probe.expectMsgClass(Terminated. class ); |
到目前为止,监管者还没有完全受到子角色失败的影响,因为指令集会处理它。万一抛出一个异常,监管者会向上抛出错误。
1 | child = (ActorRef) Await.result(ask(supervisor, |
2 | Props.create(Child. class ), 5000 ), timeout); |
4 | assert Await.result(ask(child, "get" , 5000 ), timeout).equals( 0 ); |
5 | child.tell( new Exception(), ActorRef.noSender()); |
6 | probe.expectMsgClass(Terminated. class ); |
监管者自己是由ActorSystem提供的最高等级的角色监管的,在所有的异常(注意这两个异常
ActorInitializationException和ActorKilledException)情况下,默认策略是重新启动。一旦默认的指令重启去杀死所有的孩子,我们期望我们的穷孩子不要幸存这个错误。
这不是所希望的(这依赖于用例),我们需要去用一个不同的监管者重写这个行为。
01 | public class Supervisor2 extends UntypedActor { |
03 | private static SupervisorStrategy strategy = new OneForOneStrategy( 10 , |
04 | Duration.create( "1 minute" ), |
05 | new Function<Throwable, Directive>() { |
07 | public Directive apply(Throwable t) { |
08 | if (t instanceof ArithmeticException) { |
10 | } else if (t instanceof NullPointerException) { |
12 | } else if (t instanceof IllegalArgumentException) { |
21 | public SupervisorStrategy supervisorStrategy() { |
25 | public void onReceive(Object o) { |
26 | if (o instanceof Props) { |
27 | getSender().tell(getContext().actorOf((Props) o), getSelf()); |
34 | public void preRestart(Throwable cause, Option<Object> msg) { |
通过父亲,孩子角色幸存向上升级重启,如最后一段测试代码所演示的:
1 | superprops = Props.create(Supervisor2. class ); |
2 | supervisor = system.actorOf(superprops); |
3 | child = (ActorRef) Await.result(ask(supervisor, |
4 | Props.create(Child. class ), 5000 ), timeout); |
5 | child.tell( 23 , ActorRef.noSender()); |
6 | assert Await.result(ask(child, "get" , 5000 ), timeout).equals( 23 ); |
7 | child.tell( new Exception(), ActorRef.noSender()); |
8 | assert Await.result(ask(child, "get" , 5000 ), timeout).equals( 0 ); |