博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
AKKA文档(java版)—容错
阅读量:5788 次
发布时间:2019-06-18

本文共 7233 字,大约阅读时间需要 24 分钟。

正如角色系统这一章中解释的一样,每一个角色都是它孩子的监管者,并且像这样的角色都会定义错误处理监管策略。这个策略在成为角色系统结构的一个完整部分之后是不能被改变的。

错误处理实践

首先,让我们看一个处理数据存储错误的例子,它是实践应用中一个典型的错误根源。当然它基于真实应用,这个应用的数据存储有可能是无效的,但我们在这个例子中会用一个最有效的重连方法来实现。

读下面的源代码。内嵌的注释解释了错误处理的不同块和为什么要添加它们。强烈的建议去运行这个例子,这样才能更简单的去跟踪这个日志输出,来了解运行的时候发生了什么。

容错例子的图解()

容错例子的全部源代码()

创建一个监管策略

下面的章节解释了错误处理机制和更深入的选择。

根据示范的目的,让我们来考虑如下策略:

01 private static SupervisorStrategy strategy =
02 new OneForOneStrategy(10, Duration.create("1 minute"),
03 new Function<Throwable, Directive>() {
04 @Override
05 public Directive apply(Throwable t) {
06 if (t instanceof ArithmeticException) {
07 return resume();
08 else if (t instanceof NullPointerException) {
09 return restart();
10 else if (t instanceof IllegalArgumentException) {
11 return stop();
12 else {
13 return escalate();
14 }
15 }
16 });
17  
18 @Override
19 public SupervisorStrategy supervisorStrategy() {
20 return strategy;
21 }

我选择了一些大家熟知的异常类型,是为了展示在监管和监视章节中描述的错误处理指令的应用。首先,它是一个一对一的策略,意味着每一个孩子都是分开处理的(多对一的策略工作非常类似,唯一的不同就是任何决策都会应用到监管者的所有孩子,不仅仅是发生错误的那个)。在重启频率上会有一些限制,最大是每分钟重启10次。-1和Duration.Inf()意味着限制没有应用,抛开这个可能性,去指定一个绝对的上限或者去让这个重启的工作没有上限。当超出这个限制,孩子角色就被停止。

注意:如果策略在监管角色(而不是一个单独的类)中描述了,它的决策者可以在线程安全的形势下访问角色的所有内部状态,包括获得当前发生错误的孩子的引用(例如错误消息的getSender)。

默认监管策略

如果定义的策略没有覆盖抛出的异常,Escalate会被使用。当没有为一个角色定义监管策略,如下的异常会按照默认来处理:
1. ActorInitializationException会停止发生错误的子角色
2. ActorKilledException会停止发生错误的子角色
3. Exception会重启发生错误的子角色
4. 别的抛出类型会升级到父角色
如果异常升级到根监管者,会按上述的默认策略处理。

停止监管策略

跟Erlang方式类似的策略是当它们失败的时候只停止子角色,以及当DeathWatch通知丢失的子角色的时候会对监管者采取正确的动作。

记录角色失败的信息

默认SupervisorStrategy会记录失败信息除非它们被向上升级。建议在更高层次的结构中处理上升的错误,并潜在的记录下来。

在初始化的时候你可以通过设置SupervisorStrategy的loggingEnabled为false来去掉默认的日志。可以在Decider里定制日志。注意如getSender一样,当SupervisorStrategy在监管角色中描述,当前失败的子角色引用是有效的。
你可以通重写logFailure方法在你自己的SupervisorStrategy实现中定制化日志。

最高层次角色的监管

最高层次角色意味着它们是通过system.actorOf()创建的,并且它们是User Guardian的孩子。在这种情况下没有特定的规则,守护者仅仅应用配置策略。

测试应用

下面章节展示了实践中不同指令的效果,wherefor测试启动是需要的。首先,我们需要一个匹配的监管者。

01 public class Supervisor extends UntypedActor {
02  
03 private static SupervisorStrategy strategy =
04 new OneForOneStrategy(10, Duration.create("1 minute"),
05 new Function<Throwable, Directive>() {
06 @Override
07 public Directive apply(Throwable t) {
08 if (t instanceof ArithmeticException) {
09 return resume();
10 else if (t instanceof NullPointerException) {
11 return restart();
12 else if (t instanceof IllegalArgumentException) {
13 return stop();
14 else {
15 return escalate();
16 }
17 }
18 });
19  
20 @Override
21 public SupervisorStrategy supervisorStrategy() {
22 return strategy;
23 }
24  
25 public void onReceive(Object o) {
26 if (o instanceof Props) {
27 getSender().tell(getContext().actorOf((Props) o), getSelf());
28 else {
29 unhandled(o);
30 }
31 }
32 }

这个监管者会被用于创建子角色,我们可以实验一下:

01 public class Child extends UntypedActor {
02 int state = 0;
03  
04 public void onReceive(Object o) throws Exception {
05 if (o instanceof Exception) {
06 throw (Exception) o;
07 else if (o instanceof Integer) {
08 state = (Integer) o;
09 else if (o.equals("get")) {
10 getSender().tell(state, getSelf());
11 else {
12 unhandled(o);
13 }
14 }
15 }

测试使用Testing Actor Systems里介绍的实用工具会比较简单,TestProbe提供一个Actor Ref用于接收和检查消息回复。

01 import akka.actor.ActorRef;
02 import akka.actor.ActorSystem;
03 import akka.actor.SupervisorStrategy;
04 import static akka.actor.SupervisorStrategy.resume;
05 import static akka.actor.SupervisorStrategy.restart;
06 import static akka.actor.SupervisorStrategy.stop;
07 import static akka.actor.SupervisorStrategy.escalate;
08 import akka.actor.SupervisorStrategy.Directive;
09 import akka.actor.OneForOneStrategy;
10 import akka.actor.Props;
11 import akka.actor.Terminated;
12 import akka.actor.UntypedActor;
13 import scala.collection.immutable.Seq;
14 import scala.concurrent.Await;
15 import static akka.pattern.Patterns.ask;
16 import scala.concurrent.duration.Duration;
17 import akka.testkit.AkkaSpec;
18 import akka.testkit.TestProbe;
19  
20 public class FaultHandlingTest {
21 static ActorSystem system;
22 Duration timeout = Duration.create(5, SECONDS);
23  
24 @BeforeClass
25 public static void start() {
26 system = ActorSystem.create("test", AkkaSpec.testConf());
27 }
28  
29 @AfterClass
30 public static void cleanup() {
31 JavaTestKit.shutdownActorSystem(system);
32 system = null;
33 }
34  
35 @Test
36 public void mustEmploySupervisorStrategy() throws Exception {
37 // code here
38 }
39  
40 }

让我们创建角色

1 Props superprops = Props.create(Supervisor.class);
2 ActorRef supervisor = system.actorOf(superprops, "supervisor");
3 ActorRef child = (ActorRef) Await.result(ask(supervisor,
4 Props.create(Child.class), 5000), timeout);

第一个测试会演示Resume指令,所以我们尝试通过把角色的状态设置成非初始化状态并让它失败:

1 child.tell(42, ActorRef.noSender());
2 assert Await.result(ask(child, "get"5000), timeout).equals(42);
3 child.tell(new ArithmeticException(), ActorRef.noSender());
4 assert Await.result(ask(child, "get"5000), timeout).equals(42);

你可以看到值42让错误处理指令存活下来。现在,如果我们把错误改成一个更严重的NullPointerException异常,则将不会是这种情况:

1 child.tell(new NullPointerException(), ActorRef.noSender());
2 assert Await.result(ask(child, "get"5000), timeout).equals(0);

发生IllegalArgumentException致命异常的情况下,最终会导致子角色会被监管者中断:

1 final TestProbe probe = new TestProbe(system);
2 probe.watch(child);
3 child.tell(new IllegalArgumentException(), ActorRef.noSender());
4 probe.expectMsgClass(Terminated.class);

到目前为止,监管者还没有完全受到子角色失败的影响,因为指令集会处理它。万一抛出一个异常,监管者会向上抛出错误。

1 child = (ActorRef) Await.result(ask(supervisor,
2 Props.create(Child.class), 5000), timeout);
3 probe.watch(child);
4 assert Await.result(ask(child, "get"5000), timeout).equals(0);
5 child.tell(new Exception(), ActorRef.noSender());
6 probe.expectMsgClass(Terminated.class);

监管者自己是由ActorSystem提供的最高等级的角色监管的,在所有的异常(注意这两个异常

ActorInitializationException和ActorKilledException)情况下,默认策略是重新启动。一旦默认的指令重启去杀死所有的孩子,我们期望我们的穷孩子不要幸存这个错误。

这不是所希望的(这依赖于用例),我们需要去用一个不同的监管者重写这个行为。

01 public class Supervisor2 extends UntypedActor {
02  
03 private static SupervisorStrategy strategy = new OneForOneStrategy(10,
04 Duration.create("1 minute"),
05 new Function<Throwable, Directive>() {
06 @Override
07 public Directive apply(Throwable t) {
08 if (t instanceof ArithmeticException) {
09 return resume();
10 else if (t instanceof NullPointerException) {
11 return restart();
12 else if (t instanceof IllegalArgumentException) {
13 return stop();
14 else {
15 return escalate();
16 }
17 }
18 });
19  
20 @Override
21 public SupervisorStrategy supervisorStrategy() {
22 return strategy;
23 }
24  
25 public void onReceive(Object o) {
26 if (o instanceof Props) {
27 getSender().tell(getContext().actorOf((Props) o), getSelf());
28 else {
29 unhandled(o);
30 }
31 }
32  
33 @Override
34 public void preRestart(Throwable cause, Option<Object> msg) {
35 // do not kill all children, which is the default here
36 }
37 }

通过父亲,孩子角色幸存向上升级重启,如最后一段测试代码所演示的:

1 superprops = Props.create(Supervisor2.class);
2 supervisor = system.actorOf(superprops);
3 child = (ActorRef) Await.result(ask(supervisor,
4 Props.create(Child.class), 5000), timeout);
5 child.tell(23, ActorRef.noSender());
6 assert Await.result(ask(child, "get"5000), timeout).equals(23);
7 child.tell(new Exception(), ActorRef.noSender());
8 assert Await.result(ask(child, "get"5000), timeout).equals(0);
  • 转载自 
你可能感兴趣的文章
慎用!BLEU评价NLP文本输出质量存在严重问题
查看>>
JAVA的优势就是劣势啊!
查看>>
ELK实战之logstash部署及基本语法
查看>>
帧中继环境下ospf的使用(点到点模式)
查看>>
BeanShell变量和方法的作用域
查看>>
LINUX下防恶意扫描软件PortSentry
查看>>
由数据库对sql的执行说JDBC的Statement和PreparedStatement
查看>>
springmvc+swagger2
查看>>
软件评测-信息安全-应用安全-资源控制-用户登录限制(上)
查看>>
我的友情链接
查看>>
Java Web Application 自架构 一 注解化配置
查看>>
如何 debug Proxy.pac文件
查看>>
Python 学习笔记 - 面向对象(特殊成员)
查看>>
Kubernetes 1.11 手动安装并启用ipvs
查看>>
Puppet 配置管理工具安装
查看>>
Bug多,也别乱来,别被Bug主导了开发
查看>>
sed 替换基础使用
查看>>
高性能的MySQL(5)创建高性能的索引一B-Tree索引
查看>>
oracle备份与恢复--rman
查看>>
图片变形的抗锯齿处理方法
查看>>