Akka 指南 之「Actors」

2019-05-26 14:27:29 浏览数 (1)

文章目录

  • Actors
    • 依赖
    • 简介
    • 创建 Actors
      • 定义 Actor 类
      • Props
      • 危险的变体
      • 推荐实践
      • 使用 Props 创建 Actors
      • 依赖注入
      • 收件箱
    • Actor API
      • Actor 生命周期
      • 生命周期监控,或称为 DeathWatch
      • Start 钩子
      • Restart 钩子
      • Stop 钩子
    • 通过 Actor Selection 识别 Actor
    • 信息和不变性
    • 发送消息
      • Tell: Fire-forget
      • Ask: Send-And-Receive-Future
      • 转发消息
    • 接收消息
    • 回复消息
    • 接收超时
    • 定时器和调度消息
    • 停止 Actor
      • PoisonPill
      • 杀死一个 Actor
      • 优雅的停止
      • 协调关闭
    • Become/Unbecome
      • 升级
      • 对 Scala Actor 嵌套接收进行编码,而不会意外泄漏内存
    • Stash
    • Actor 和异常
      • 消息发生了什么
      • 邮箱发生了什么
      • Actor 发现了什么
    • 初始化模式
      • 通过构造函数初始化
      • 通过 preStart 初始化
      • 通过消息传递初始化

Actors

依赖

为了使用 Actors,你必须在项目中添加如下依赖:

代码语言:javascript复制
<!-- Maven -->
<dependency>
  <groupId>com.typesafe.akka</groupId>
  <artifactId>akka-actor_2.12</artifactId>
  <version>2.5.21</version>
</dependency>

<!-- Gradle -->
dependencies {
  compile group: 'com.typesafe.akka', name: 'akka-actor_2.12', version: '2.5.21'
}

<!-- sbt -->
libraryDependencies  = "com.typesafe.akka" %% "akka-actor" % "2.5.21"

简介

「Actor Model」为编写并发和分布式系统提供了更高级别的抽象。它减少了开发人员必须处理显式锁和线程管理的问题,使编写正确的并发和并行系统变得更容易。1973 年卡尔·休伊特(Carl Hewitt)在论文中定义了 Actors,然后通过 Erlang 语言所普及,并且在爱立信(Ericsson)成功地建立了高度并发和可靠的电信系统。

Akka 的 Actors API 类似于 Scala Actors,它从 Erlang 中借用了一些语法。

创建 Actors

由于 Akka 实施父级(parental)监督,每个 Actor 都受到其父级的监督并且监督其子级,因此建议你熟悉「Actor Systems」和「Supervision」,它还可能有助于阅读「Actor References, Paths and Addresses」。

定义 Actor 类

Actor 类是通过继承AbstractActor类并在createReceive方法中设置“初始行为”来实现的。

createReceive方法没有参数,并返回AbstractActor.Receive。它定义了 Actor 可以处理哪些消息,以及如何处理消息的实现。可以使用名为ReceiveBuilder的生成器来构建此类行为。在AbstractActor中,有一个名为receiveBuilder的方便的工厂方法。

下面是一个例子:

代码语言:javascript复制
import akka.actor.AbstractActor;
import akka.event.Logging;
import akka.event.LoggingAdapter;

public class MyActor extends AbstractActor {
  private final LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this);

  @Override
  public Receive createReceive() {
    return receiveBuilder()
        .match(
            String.class,
            s -> {
              log.info("Received String message: {}", s);
            })
        .matchAny(o -> log.info("received unknown message"))
        .build();
  }
}

请注意,Akka Actor 消息循环是彻底的(exhaustive),与 Erlang 和后 Scala Actors 相比,它是不同的。这意味着你需要为它可以接受的所有消息提供一个模式匹配,如果你希望能够处理未知消息,那么你需要有一个默认情况,如上例所示。否则,akka.actor.UnhandledMessage(message, sender, recipient)将发布到ActorSystemEventStream

请进一步注意,上面定义的行为的返回类型是Unit;如果 Actor 应回复收到的消息,则必须按照下面的说明显式完成。

createReceive方法的结果是AbstractActor.Receive,它是围绕部分 Scala 函数对象的包装。它作为其“初始行为”存储在 Actor 中,有关在 Actor 构造后更改其行为的详细信息,请参见「Become/Unbecome」。

Props

Props是一个配置类,用于指定创建 Actor 的选项,将其视为不可变的,因此可以自由共享用于创建 Actor 的方法,包括关联的部署信息(例如,要使用哪个调度程序,请参阅下面的更多内容)。下面是一些如何创建Props实例的示例。

代码语言:javascript复制
import akka.actor.Props;
Props props1 = Props.create(MyActor.class);
Props props2 =
    Props.create(ActorWithArgs.class, () -> new ActorWithArgs("arg")); // careful, see below
Props props3 = Props.create(ActorWithArgs.class, "arg");

第二个变量演示了如何将构造函数参数传递给正在创建的 Actor,但它只能在 Actor 之外使用,如下所述。

最后一行显示了传递构造函数参数的可能性,而不管它在哪个上下文中使用。在Props对象的构造过程中,会验证是否存在匹配的构造函数,如果未找到匹配的构造函数或找到多个匹配的构造函数,则会导致IllegalArgumentException

危险的变体

代码语言:javascript复制
// NOT RECOMMENDED within another actor:
// encourages to close over enclosing class
Props props7 = Props.create(ActorWithArgs.class, () -> new ActorWithArgs("arg"));

不建议在另一个 Actor 中使用此方法,因为它鼓励关闭封闭范围,从而导致不可序列化的属性和可能的竞态条件(破坏 Actor 封装)。另一方面,在 Actor 的同伴对象(companion object)中的Props工厂中使用这个变体是完全正确的,如下面的“推荐实践”中所述。

这些方法有两个用例:将构造函数参数传递给由新引入的Props.create(clazz, args)方法或下面的推荐实践解决的 Actor,并将 Actor “就地(on the spot)”创建为匿名类。后者应该通过将这些 Actor 命名为类来解决(如果它们没有在顶级object中声明,则需要将封闭实例的this引用作为第一个参数传递)。

  • 警告:在另一个 Actor 中声明一个 Actor 是非常危险的,并且会破坏 Actor 的封装。千万不要把 Actor 的this引用传给Props

推荐实践

为每个 Actor 提供静态工厂方法是一个好主意,这有助于使合适的Props创建尽可能接近 Actor 的定义。这也避免了与使用Props.create(...)方法相关联的陷阱,该方法将参数作为构造函数参数,因为在静态方法中,给定的代码块不会保留对其封闭范围的引用:

代码语言:javascript复制
static class DemoActor extends AbstractActor {
  /**
   * Create Props for an actor of this type.
   *
   * @param magicNumber The magic number to be passed to this actor’s constructor.
   * @return a Props for creating this actor, which can then be further configured (e.g. calling
   *     `.withDispatcher()` on it)
   */
  static Props props(Integer magicNumber) {
    // You need to specify the actual type of the returned actor
    // since Java 8 lambdas have some runtime type information erased
    return Props.create(DemoActor.class, () -> new DemoActor(magicNumber));
  }

  private final Integer magicNumber;

  public DemoActor(Integer magicNumber) {
    this.magicNumber = magicNumber;
  }

  @Override
  public Receive createReceive() {
    return receiveBuilder()
        .match(
            Integer.class,
            i -> {
              getSender().tell(i   magicNumber, getSelf());
            })
        .build();
  }
}

static class SomeOtherActor extends AbstractActor {
  // Props(new DemoActor(42)) would not be safe
  ActorRef demoActor = getContext().actorOf(DemoActor.props(42), "demo");
  // ...
}

另一个好的做法是声明 Actor 可以接收的消息尽可能接近 Actor 的定义(例如,作为 Actor 内部的静态类或使用其他合适的类),这使得更容易知道它可以接收到什么:

代码语言:javascript复制
static class DemoMessagesActor extends AbstractLoggingActor {

  public static class Greeting {
    private final String from;

    public Greeting(String from) {
      this.from = from;
    }

    public String getGreeter() {
      return from;
    }
  }

  @Override
  public Receive createReceive() {
    return receiveBuilder()
        .match(
            Greeting.class,
            g -> {
              log().info("I was greeted by {}", g.getGreeter());
            })
        .build();
  }
}

使用 Props 创建 Actors

Actors 是通过将Props实例传递到actorOf工厂方法来创建的,该方法在ActorSystemActorContext上可用。

代码语言:javascript复制
import akka.actor.ActorRef;
import akka.actor.ActorSystem;

使用ActorSystem将创建顶级 Actor,由ActorSystem提供的守护者 Actor 进行监督,而使用ActorContext将创建子 Actor。

代码语言:javascript复制
static class FirstActor extends AbstractActor {
  final ActorRef child = getContext().actorOf(Props.create(MyActor.class), "myChild");

  @Override
  public Receive createReceive() {
    return receiveBuilder().matchAny(x -> getSender().tell(x, getSelf())).build();
  }
}

建议创建一个子级、孙子(grand-children)级这样的层次结构,以便它适合应用程序的逻辑故障处理结构,详见「Actor Systems」。

actorOf的调用返回ActorRef的实例。这是 Actor 实例的句柄,也是与之交互的唯一方法。ActorRef是不可变的,并且与它所表示的 Actor 有一对一的关系。ActorRef也是可序列化的,并且具有网络意识(network-aware)。这意味着你可以序列化它,通过网络发送它,并在远程主机上使用它,并且它仍然在网络上表示原始节点上的同一个 Actor。

name参数是可选的,但你最好为 Actor 命名,因为它用于日志消息和标识 Actor。名称不能为空或以$开头,但可以包含 URL 编码字符(例如,空格为 )。如果给定的名称已被同一父级的另一个子级使用,则会引发InvalidActorNameException

Actor 在创建时自动异步启动。

依赖注入

如果你的 Actor 有一个接受参数的构造函数,那么这些参数也需要成为Props的一部分,如上所述。但在某些情况下,必须使用工厂方法,例如,当依赖项注入(dependency injection)框架确定实际的构造函数参数时。

代码语言:javascript复制
import akka.actor.Actor;
import akka.actor.IndirectActorProducer;
class DependencyInjector implements IndirectActorProducer {
  final Object applicationContext;
  final String beanName;

  public DependencyInjector(Object applicationContext, String beanName) {
    this.applicationContext = applicationContext;
    this.beanName = beanName;
  }

  @Override
  public Class<? extends Actor> actorClass() {
    return TheActor.class;
  }

  @Override
  public TheActor produce() {
    TheActor result;
    result = new TheActor((String) applicationContext);
    return result;
  }
}

  final ActorRef myActor =
      getContext()
          .actorOf(
              Props.create(DependencyInjector.class, applicationContext, "TheActor"), "TheActor");
  • 警告:有时,你可能会试图提供始终返回同一实例的IndirectActorProducer,例如使用静态字段。这是不支持的,因为它违背了 Actor 重新启动的含义,这里描述了:「重新启动意味着什么」。

当使用依赖注入框架时,Actor bean不能有singleton作用域。

在「Using Akka with Dependency Injection」指南和「Akka Java Spring」教程中,有关于依赖注入的更深层次的描述。

收件箱

当在与 Actor 通信的 Actor 外部编写代码时,ask模式可以是一个解决方案(见下文),但它不能做两件事:接收多个回复(例如,通过向通知服务订阅ActorRef)和观察其他 Actor 的生命周期。出于这些目的,有一个Inbox类:

代码语言:javascript复制
final Inbox inbox = Inbox.create(system);
inbox.send(target, "hello");
try {
  assert inbox.receive(Duration.ofSeconds(1)).equals("world");
} catch (java.util.concurrent.TimeoutException e) {
  // timeout
}

send方法包装了一个普通的tell,并将内部 Actor 的引用作为发送者提供。这允许在最后一行接收回复。监视(Watching)Actor 也很简单:

代码语言:javascript复制
final Inbox inbox = Inbox.create(system);
inbox.watch(target);
target.tell(PoisonPill.getInstance(), ActorRef.noSender());
try {
  assert inbox.receive(Duration.ofSeconds(1)) instanceof Terminated;
} catch (java.util.concurrent.TimeoutException e) {
  // timeout
}

Actor API

AbstractActor类定义了一个名为createReceive的方法,该方法用于设置 Actor 的“初始行为”。

如果当前的 Actor 行为与接收到的消息不匹配,则调用unhandled,默认情况下,它在Actor 系统的事件流上发布akka.actor.UnhandledMessage(message, sender, recipient)(将配置项akka.actor.debug.unhandled设置为on,以便将其转换为实际的Debug消息)。

此外,它还提供:

  • getSelf(),对 Actor 的ActorRef的引用
  • getSender(),前一次接收到的消息的发送方 Actor 的引用
  • supervisorStrategy(),用户可重写定义用于监视子 Actor 的策略

该策略通常在 Actor 内部声明,以便访问决策函数中 Actor 的内部状态:由于故障作为消息发送给其监督者并像其他消息一样进行处理(尽管不属于正常行为),因此 Actor 内的所有值和变量都可用,就像sender引用一样(报告失败的是直接子级;如果原始失败发生在一个遥远的后代中,则每次仍向上一级报告)。

  • getContext()公开 Actor 和当前消息的上下文信息,例如:
    • 创建子 Actor 的工厂方法(actorOf
    • Actor 所属的系统
    • 父级监督者
    • 受监督的子级
    • 生命周期监控
    • Become/Unbecome中所述的热交换行为栈(hotswap behavior stack

其余可见的方法是用户可重写的生命周期钩子方法,如下所述:

代码语言:javascript复制
public void preStart() {}

public void preRestart(Throwable reason, Optional<Object> message) {
  for (ActorRef each : getContext().getChildren()) {
    getContext().unwatch(each);
    getContext().stop(each);
  }
  postStop();
}

public void postRestart(Throwable reason) {
  preStart();
}

public void postStop() {}

上面显示的实现是AbstractActor类提供的默认值。

Actor 生命周期

Actor 系统中的一条路径表示一个“地方”,可能被一个活着的 Actor 占据。最初(除了系统初始化的 Actor 之外)路径是空的,当调用actorOf()时,它将通过传递的Props描述的 Actor 的化身(incarnation)分配给给定的路径,Actor 的化身由路径(path)和UID标识。

值得注意的是:

  • restart
  • stop,然后重新创建 Actor

如下所述。

重新启动(restart)只交换由Props定义的Actor实例,因此UID保持不变。只要化身是相同的,你可以继续使用相同的ActorRef。重启是通过 Actor 的父 Actor 的「Supervision Strategy」来处理的,关于重启的含义还有「更多的讨论」。

当 Actor 停止时,化身的生命周期就结束了。此时将调用适当的生命周期事件,并将终止通知观察 Actor。当化身停止后,可以通过使用actorOf()创建 Actor 来再次使用路径。在这种情况下,新化身的名称将与前一个相同,但UID将不同。Actor 可以由 Actor 本身、另一个 Actor 或 Actor 系统停止,详见「停止 Actor」。

  • 注释:重要的是要注意,Actor 不再被引用时不会自动停止,创建的每个 Actor 也必须显式销毁。唯一的简化是,停止父 Actor 也将递归地停止此父 Actor 创建的所有子 Actor。

ActorRef总是代表一个化身(路径和UID),而不仅仅是一个给定的路径。因此,如果一个 Actor 被停止,一个同名的新 Actor 被创造出来,旧化身的 Actor 引用就不会指向新的 Actor。

另一方面,ActorSelection指向路径(或者如果使用通配符,则指向多个路径),并且完全忽略了具体化当前正在占用的路径。由于这个原因,ActorSelection不能被观看。可以通过向ActorSelection发送Identify消息来获取(resolve)当前化身的ActorRef,该消息将以包含正确引用的ActorIdentity回复,详见ActorSelection」。这也可以通过ActorSelectionresolveOne方法来实现,该方法返回匹配ActorRefFuture

生命周期监控,或称为 DeathWatch

为了在另一个 Actor 终止时得到通知(即永久停止,而不是临时失败和重新启动),Actor 可以注册(register)自己,以便在终止时接收另一个 Actor 发送的Terminated消息,详见「停止 Actor」。此服务由 Actor 系统的DeathWatch组件提供。

注册监视器(monitor)很容易:

代码语言:javascript复制
import akka.actor.Terminated;
static class WatchActor extends AbstractActor {
  private final ActorRef child = getContext().actorOf(Props.empty(), "target");
  private ActorRef lastSender = system.deadLetters();

  public WatchActor() {
    getContext().watch(child); // <-- this is the only call needed for registration
  }

  @Override
  public Receive createReceive() {
    return receiveBuilder()
        .matchEquals(
            "kill",
            s -> {
              getContext().stop(child);
              lastSender = getSender();
            })
        .match(
            Terminated.class,
            t -> t.actor().equals(child),
            t -> {
              lastSender.tell("finished", getSelf());
            })
        .build();
  }
}

在这里,有一点需要我们注意:Terminated消息的生成与注册和终止发生的顺序无关。特别是,即使被监视的 Actor 在注册时已经被终止,监视的 Actor 也将收到一条Terminated消息。

多次注册并不一定会导致生成多条消息,但不能保证只接收到一条这样的消息:如果被监视的 Actor 的终止消息已经生成并将消息排队,并且在处理此消息之前完成了另一个注册,则第二条消息也将进入消息队列。因为注册监视已经终止的 Actor 会导致立即生成Terminated消息。

也可以通过context.unwatch(target)取消监视另一个 Actor 的存活情况。即使Terminated消息已经在邮箱中排队,也可以这样做;在调用unwatch之后,将不再处理该 Actor 的Terminated消息。

Start 钩子

启动 Actor 之后,立即调用其preStart方法。

代码语言:javascript复制
@Override
public void preStart() {
  target = getContext().actorOf(Props.create(MyActor.class, "target"));
}

首次创建 Actor 时调用此方法。在重新启动期间,它由postRestart的默认实现调用,这意味着通过重写该方法,你可以选择是否只为此 Actor 或每次重新启动时调用一次此方法中的初始化代码。当创建 Actor 类的实例时,总是会调用作为 Actor 构造函数一部分的初始化代码,该实例在每次重新启动时都会发生。

Restart 钩子

所有 Actor 都受到监督,即通过故障处理策略链接到另一个 Actor。如果在处理消息时引发异常,则可以重新启动 Actor(详见「supervision」)。重新启动涉及上述挂钩:

  1. 通过调用导致preRestart的异常和触发该异常的消息来通知旧 Actor ;如果重新启动不是由处理消息引起的,则后者可能为None,例如,当监督者不捕获异常并由其监督者依次重新启动时,或者某个 Actor 由于其同级 Actor 的失败而导致重新启动时。如果消息可用,那么该消息的发送者也可以通过常规方式访问,即调用sender。此方法是清理、准备移交给新的 Actor 实例等的最佳位置。默认情况下,它会停止所有子级并调用postStop
  2. 来自actorOf调用的初始工厂用于生成新实例。
  3. 调用新 Actor 的postRestart方法时出现导致重新启动的异常。默认情况下,会调用preStart,就像在正常启动情况下一样。

Actor 重新启动仅替换实际的 Actor 对象;邮箱的内容不受重新启动的影响,因此在postRestart钩子返回后,将继续处理消息,而且将不再接收触发异常的消息。重新启动时发送给 Actor 的任何消息都将像往常一样排队进入其邮箱。

  • 警告:请注意,与用户消息相关的失败通知的顺序是不确定的。特别是,父级可以在处理子级在失败之前发送的最后一条消息之前重新启动其子级。有关详细信息,请参阅「讨论:消息排序」。

Stop 钩子

停止某个 Actor 后,将调用其postStop钩子,该钩子可用于将该 Actor 从其他服务中注销。此钩子保证在禁用此 Actor 的消息队列后运行,即发送到已停止 Actor 的消息将被重定向到ActorSystemdeadLetters

通过 Actor Selection 识别 Actor

如「Actor References, Paths and Addresses」中所述,每个 Actor 都有唯一的逻辑路径,该路径通过从子级到父级的 Actor 链获得,直到到达 Actor 系统的根,并且它有一个物理路径,如果监督链(supervision chain)包含任何远程监管者,则该路径可能有所不同。系统使用这些路径来查找 Actor,例如,当接收到远程消息并搜索收件人时,它们很有用:Actor 可以通过指定逻辑或物理的绝对或相对路径来查找其他 Actor,并接收带有结果的ActorSelection

代码语言:javascript复制
// will look up this absolute path
getContext().actorSelection("/user/serviceA/actor");
// will look up sibling beneath same supervisor
getContext().actorSelection("../joe");
  • 注释:与其他 Actor 交流时,最好使用他们的ActorRef,而不是依靠ActorSelection。但也有例外,如
    • 使用「至少一次传递」能力发送消息
    • 启动与远程系统的第一次连接

在所有其他情况下,可以在 Actor 创建或初始化期间提供ActorRef,将其从父级传递到子级,或者通过将其ActorRef发送到其他 Actor 来引出 Actor。

提供的路径被解析为java.net.URI,这意味着它在路径元素上被/拆分。如果路径以/开头,则为绝对路径,查找从根守护者(它是/user的父级)开始;否则,查找从当前 Actor 开始。如果路径元素等于..,则查找将向当前遍历的 Actor 的监督者“向上”一步,否则将向命名的子级“向下”一步。应该注意的是..在 Actor 路径中,总是指逻辑结构,即监督者。

Actor 选择(selection)的路径元素可以包含允许向该部分广播(broadcasting)消息的通配符模式:

代码语言:javascript复制
// will look all children to serviceB with names starting with worker
getContext().actorSelection("/user/serviceB/worker*");
// will look up all siblings beneath same supervisor
getContext().actorSelection("../*");

消息可以通过ActorSelection发送,并且在传递每个消息时查找ActorSelection的路径。如果选择(selection)与任何 Actor 都不匹配,则消息将被删除。

要获取ActorRef以进行ActorSelection,你需要向选择发送消息,并使用来自 Actor 的答复的getSender()引用。有一个内置的Identify消息,所有 Actor 都将理解该消息,并使用包含ActorRefActorIdentity消息自动回复。此消息由 Actor 专门处理,如果具体的名称查找失败(即非通配符路径元素与存活的 Actor 不对应),则会生成负(negative)结果。请注意,这并不意味着保证回复的传递(delivery of that reply is guaranteed),它仍然是正常的消息。

代码语言:javascript复制
import akka.actor.ActorIdentity;
import akka.actor.ActorSelection;
import akka.actor.Identify;
static class Follower extends AbstractActor {
  final Integer identifyId = 1;

  public Follower() {
    ActorSelection selection = getContext().actorSelection("/user/another");
    selection.tell(new Identify(identifyId), getSelf());
  }

  @Override
  public Receive createReceive() {
    return receiveBuilder()
        .match(
            ActorIdentity.class,
            id -> id.getActorRef().isPresent(),
            id -> {
              ActorRef ref = id.getActorRef().get();
              getContext().watch(ref);
              getContext().become(active(ref));
            })
        .match(
            ActorIdentity.class,
            id -> !id.getActorRef().isPresent(),
            id -> {
              getContext().stop(getSelf());
            })
        .build();
  }

  final AbstractActor.Receive active(final ActorRef another) {
    return receiveBuilder()
        .match(
            Terminated.class, t -> t.actor().equals(another), t -> getContext().stop(getSelf()))
        .build();
  }
}

你还可以使用ActorSelectionresolveOne方法获取ActorRef以进行actorselection。如果存在这样的 Actor,它将返回匹配的ActorRefFuture,可参阅「 Java 8 Compatibility for Java compatibility」。如果不存在这样的 Actor 或标识在提供的timeout内未完成,则完成此操作并抛出akka.actor.ActorNotFound异常。

如果启用「remoting」,也可以查找远程 Actor 的地址:

代码语言:javascript复制
getContext().actorSelection("akka.tcp://app@otherhost:1234/user/serviceB");

「Remoting Sample」中给出了一个在启用远程处理(remoting)的情况下演示 Actor 查找的例子。

信息和不变性

  • 重要的:消息可以是任何类型的对象,但必须是不可变的。Akka 还不能强制执行不可变性,所以必须按惯例执行。

以下是不可变消息的示例:

代码语言:javascript复制
public class ImmutableMessage {
  private final int sequenceNumber;
  private final List<String> values;

  public ImmutableMessage(int sequenceNumber, List<String> values) {
    this.sequenceNumber = sequenceNumber;
    this.values = Collections.unmodifiableList(new ArrayList<String>(values));
  }

  public int getSequenceNumber() {
    return sequenceNumber;
  }

  public List<String> getValues() {
    return values;
  }
}

发送消息

消息通过以下方法之一发送给 Actor。

  • tell的意思是“发送并忘记(fire-and-forget)”,例如异步发送消息并立即返回。
  • ask异步发送消息,并返回一个表示可能的答复。

每一个发送者都有消息顺序的保证。

  • 注释:使用ask会带来性能方面的影响,因为有些东西需要跟踪它何时超时,需要有一些东西将一个Promise连接到ActorRef中,并且还需要通过远程处理实现它。所以,我们更倾向于使用tell,只有当你有足够的理由时才应该使用ask

在所有这些方法中,你可以选择传递自己的ActorRef。将其作为一种实践,因为这样做将允许接收者 Actor 能够响应你的消息,因为发送者引用与消息一起发送。

Tell: Fire-forget

这是发送消息的首选方式,它不用等待消息返回,因此不是阻塞的。这提供了最佳的并发性和可伸缩性的特性。

代码语言:javascript复制
// don’t forget to think about who is the sender (2nd argument)
target.tell(message, getSelf());

发送方引用与消息一起传递,并在处理此消息时通过getSender()方法在接收 Actor 中使用。在一个 Actor 内部,通常是getSelf()作为发送者,但在某些情况下,回复(replies)应该路由到另一个 Actor,例如,父对象,其中tell的第二个参数将是另一个 Actor。在 Actor 外部,如果不需要回复,则第二个参数可以为null;如果在 Actor 外部需要回复,则可以使用下面描述的ask模式。

Ask: Send-And-Receive-Future

ask模式涉及 Actor 和Future,因此它是作为一种使用模式而不是ActorRef上的一种方法提供的:

代码语言:javascript复制
import static akka.pattern.Patterns.ask;
import static akka.pattern.Patterns.pipe;

import java.util.concurrent.CompletableFuture;
final Duration t = Duration.ofSeconds(5);

// using 1000ms timeout
CompletableFuture<Object> future1 =
    ask(actorA, "request", Duration.ofMillis(1000)).toCompletableFuture();

// using timeout from above
CompletableFuture<Object> future2 = ask(actorB, "another request", t).toCompletableFuture();

CompletableFuture<Result> transformed =
    CompletableFuture.allOf(future1, future2)
        .thenApply(
            v -> {
              String x = (String) future1.join();
              String s = (String) future2.join();
              return new Result(x, s);
            });

pipe(transformed, system.dispatcher()).to(actorC);

这个例子演示了askpipeTo模式在Future上的结合,因为这可能是一个常见的组合。请注意,以上所有内容都是完全非阻塞和异步的:ask生成一个,其中两个使用CompletableFuture.allOfthenApply方法组合成新的Future,然后pipeCompletionStage上安装一个处理程序,以将聚合Result提交给另一个 Actor。

使用ask会像使用tell一样向接收 Actor 发送消息,并且接收 Actor 必须使用getSender().tell(reply, getSelf())才能完成返回的值。ask操作涉及创建一个用于处理此回复的内部 Actor,该 Actor 需要有一个超时,在该超时之后才能将其销毁,以便不泄漏资源;具体请参阅下面更多的内容。

  • 警告:要完成带异常的,你需要向发件人发送akka.actor.Status.Failure消息。当 Actor 在处理消息时抛出异常,不会自动执行此操作。
代码语言:javascript复制
try {
  String result = operation();
  getSender().tell(result, getSelf());
} catch (Exception e) {
  getSender().tell(new akka.actor.Status.Failure(e), getSelf());
  throw e;
}

如果 Actor 未完成,则它将在超时期限(指定为ask方法的参数)之后过期;这将使用AskTimeoutException完成CompletionStage

这可以用于注册回调以便在完成时获取通知,从而提供避免阻塞的方法。

  • 警告:当使用Future的回调时,内部 Actor 需要小心避免关闭包含 Actor 的引用,即不要从回调中调用方法或访问封闭 Actor 的可变状态。这将破坏 Actor 的封装,并可能引入同步错误和竞态条件,因为回调将被同时调度到封闭 Actor。不幸的是,目前还没有一种方法可以在编译时检测到这些非法访问。另见「Actors 和共享可变状态」。

转发消息

你可以将消息从一个 Actor 转发到另一个 Actor。这意味着即使消息通过“中介器(mediator)”,原始发送方地址/引用也会得到维护。这在编写充当路由器(routers)、负载平衡器(load-balancers)、复制器(replicators)等的 Actor 时很有用。

代码语言:javascript复制
target.forward(result, getContext());

接收消息

Actor 必须通过在AbstractActor中实现createReceive方法来定义其初始接收行为:

代码语言:javascript复制
@Override
public Receive createReceive() {
  return receiveBuilder().match(String.class, s -> System.out.println(s.toLowerCase())).build();
}

返回类型是AbstractActor.Receive,它定义了 Actor 可以处理哪些消息,以及如何处理这些消息的实现。可以使用名为ReceiveBuilder的生成器来构建此类行为。下面是一个例子:

代码语言:javascript复制
import akka.actor.AbstractActor;
import akka.event.Logging;
import akka.event.LoggingAdapter;

public class MyActor extends AbstractActor {
  private final LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this);

  @Override
  public Receive createReceive() {
    return receiveBuilder()
        .match(
            String.class,
            s -> {
              log.info("Received String message: {}", s);
            })
        .matchAny(o -> log.info("received unknown message"))
        .build();
  }
}

如果你希望提供许多match案例,但希望避免创建长调用跟踪(a long call trail),可以将生成器的创建拆分为多个语句,如示例中所示:

代码语言:javascript复制
import akka.actor.AbstractActor;
import akka.event.Logging;
import akka.event.LoggingAdapter;
import akka.japi.pf.ReceiveBuilder;

public class GraduallyBuiltActor extends AbstractActor {
  private final LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this);

  @Override
  public Receive createReceive() {
    ReceiveBuilder builder = ReceiveBuilder.create();

    builder.match(
        String.class,
        s -> {
          log.info("Received String message: {}", s);
        });

    // do some other stuff in between

    builder.matchAny(o -> log.info("received unknown message"));

    return builder.build();
  }
}

在 Actor 中,使用小方法也是一种很好的做法。建议将消息处理的实际工作委托给方法,而不是在每个lambda中定义具有大量代码的大型ReceiveBuilder。一个结构良好的 Actor 可以是这样的:

代码语言:javascript复制
static class WellStructuredActor extends AbstractActor {

  public static class Msg1 {}

  public static class Msg2 {}

  public static class Msg3 {}

  @Override
  public Receive createReceive() {
    return receiveBuilder()
        .match(Msg1.class, this::receiveMsg1)
        .match(Msg2.class, this::receiveMsg2)
        .match(Msg3.class, this::receiveMsg3)
        .build();
  }

  private void receiveMsg1(Msg1 msg) {
    // actual work
  }

  private void receiveMsg2(Msg2 msg) {
    // actual work
  }

  private void receiveMsg3(Msg3 msg) {
    // actual work
  }
}

这样做有以下好处:

  • 更容易看到 Actor 能处理什么样的信息
  • 异常情况下的可读堆栈跟踪
  • 更好地使用性能分析工具
  • Java HotSpot 有更好的机会进行优化

Receive可以通过其他方式实现,而不是使用ReceiveBuilder,因为它最终只是一个 Scala PartialFunction的包装器。在 Java 中,可以通过扩展AbstractPartialFunction实现PartialFunction。例如,可以实现「与 DSL 匹配的 Vavr 模式适配器」,有关更多详细信息,请参阅「Akka Vavr 示例项目」。

如果对某些 Actor 来说,验证ReceiveBuilder匹配逻辑是一个瓶颈,那么你可以考虑通过扩展UntypedAbstractActor而不是AbstractActor来在较低的级别实现它。ReceiveBuilder创建的分部函数由每个match语句的多个lambda表达式组成,其中每个lambda都引用要运行的代码。这是 JVM 在优化时可能会遇到的问题,并且产生的代码的性能可能不如非类型化版本。当扩展UntypedAbstractActor时,每个消息都作为非类型化Object接收,你必须以其他方式检查并转换为实际的消息类型,如下所示:

代码语言:javascript复制
static class OptimizedActor extends UntypedAbstractActor {

  public static class Msg1 {}

  public static class Msg2 {}

  public static class Msg3 {}

  @Override
  public void onReceive(Object msg) throws Exception {
    if (msg instanceof Msg1) receiveMsg1((Msg1) msg);
    else if (msg instanceof Msg2) receiveMsg2((Msg2) msg);
    else if (msg instanceof Msg3) receiveMsg3((Msg3) msg);
    else unhandled(msg);
  }

  private void receiveMsg1(Msg1 msg) {
    // actual work
  }

  private void receiveMsg2(Msg2 msg) {
    // actual work
  }

  private void receiveMsg3(Msg3 msg) {
    // actual work
  }
}

回复消息

如果你想有一个回复消息的句柄,可以使用getSender(),它会给你一个ActorRef。你可以通过使用getSender().tell(replyMsg, getSelf())发送ActorRef来进行回复。你还可以存储ActorRef以供稍后回复或传递给其他 Actor。如果没有发送者(发送的消息没有 Actor 或Future上下文),那么发送者默认为死信(dead-letter)Actor 引用。

代码语言:javascript复制
getSender().tell(s, getSelf());

接收超时

ActorContext setReceiveTimeout定义了非活动超时,在该超时之后,将触发发送ReceiveTimeout消息。指定超时时间后,接收函数应该能够处理akka.actor.ReceiveTimeout消息。1毫秒是支持的最小超时时间。

请注意,接收超时(receive timeout)可能会在另一条消息排队后立即触发并排队ReceiveTimeout消息;因此,不保证在接收超时,如通过此方法配置的那样,事先必须有空闲时间。

设置后,接收超时将保持有效(即在非活动期后继续重复触发),可以通过传入Duration.Undefined消息来关闭此功能。

代码语言:javascript复制
static class ReceiveTimeoutActor extends AbstractActor {
  public ReceiveTimeoutActor() {
    // To set an initial delay
    getContext().setReceiveTimeout(Duration.ofSeconds(10));
  }

  @Override
  public Receive createReceive() {
    return receiveBuilder()
        .matchEquals(
            "Hello",
            s -> {
              // To set in a response to a message
              getContext().setReceiveTimeout(Duration.ofSeconds(1));
            })
        .match(
            ReceiveTimeout.class,
            r -> {
              // To turn it off
              getContext().cancelReceiveTimeout();
            })
        .build();
  }
}

标记为NotInfluenceReceiveTimeout的消息将不会重置计时器。当ReceiveTimeout受外部不活动而不受内部活动(如定时勾选消息)影响时,这可能会很有用(This can be useful when ReceiveTimeout should be fired by external inactivity but not influenced by internal activity, e.g. scheduled tick messages.)。

定时器和调度消息

通过直接使用「Scheduler」,可以将消息安排在以后的时间点发送,但是在将 Actor 中的定期或单个消息安排到自身时,使用对命名定时器(named timers)的支持更为方便和安全。当 Actor 重新启动并由定时器处理时,调度(scheduled)消息的生命周期可能难以管理。

代码语言:javascript复制
import java.time.Duration;
import akka.actor.AbstractActorWithTimers;

static class MyActor extends AbstractActorWithTimers {

  private static Object TICK_KEY = "TickKey";

  private static final class FirstTick {}

  private static final class Tick {}

  public MyActor() {
    getTimers().startSingleTimer(TICK_KEY, new FirstTick(), Duration.ofMillis(500));
  }

  @Override
  public Receive createReceive() {
    return receiveBuilder()
        .match(
            FirstTick.class,
            message -> {
              // do something useful here
              getTimers().startPeriodicTimer(TICK_KEY, new Tick(), Duration.ofSeconds(1));
            })
        .match(
            Tick.class,
            message -> {
              // do something useful here
            })
        .build();
  }
}

每个定时器都有一个键,可以更换或取消。它保证不会收到来自具有相同密钥的定时器的前一个实例的消息,即使当它被取消或新定时器启动时,它可能已经在邮箱中排队。

定时器绑定到拥有它的 Actor 的生命周期,因此当它重新启动或停止时自动取消。请注意,TimerScheduler不是线程安全的,即它只能在拥有它的 Actor 中使用。

停止 Actor

通过调用ActorRefFactorystop方法(即ActorContextActorSystem)来停止 Actor。通常,上下文用于停止 Actor 本身或子 Actor,以及停止顶级 Actor 的系统。Actor 的实际终止是异步执行的,也就是说,stop可能会在 Actor 停止之前返回。

代码语言:javascript复制
import akka.actor.ActorRef;
import akka.actor.AbstractActor;

public class MyStoppingActor extends AbstractActor {

  ActorRef child = null;

  // ... creation of child ...

  @Override
  public Receive createReceive() {
    return receiveBuilder()
        .matchEquals("interrupt-child", m -> getContext().stop(child))
        .matchEquals("done", m -> getContext().stop(getSelf()))
        .build();
  }
}

当前邮件(如果有)的处理将在 Actor 停止之前继续,但不会处理邮箱中的其他邮件。默认情况下,这些消息将发送到ActorSystemdeadLetters,但这取决于邮箱的实现。

一个 Actor 的终止分两步进行:首先,Actor 暂停其邮箱处理并向其所有子级发送停止命令,然后继续处理其子级的内部终止通知,直到最后一个终止,最后终止其自身(调用postStop、转储邮箱、在DeathWatch上发布Terminated、通知其监督者)。此过程确保 Actor 系统子树以有序的方式终止,将stop命令传播到叶,并将其确认信息收集回已停止的监督者。如果其中一个 Actor 没有响应(即长时间处理消息,因此不接收stop命令),那么整个过程将被阻塞。

ActorSystem.terminate()之后,系统守护者 Actor 将被停止,上述过程将确保整个系统的正确终止。

postStop()钩子在 Actor 完全停止后调用。这样可以清理资源:

代码语言:javascript复制
@Override
public void postStop() {
  final String message = "stopped";
  // don’t forget to think about who is the sender (2nd argument)
  target.tell(message, getSelf());
  final Object result = "";
  target.forward(result, getContext());
  target = null;
}
  • 注释:由于停止 Actor 是异步的,因此不能立即重用刚刚停止的子级的名称;这将导致InvalidActorNameException。相反,watch()终止的 Actor,并创建其替换(replacement),以响应最终到达的Terminated消息。

PoisonPill

你还可以向 Actor 发送akka.actor.PoisonPill消息,该消息将在处理消息时停止 Actor。PoisonPill作为普通消息排队,并将在邮箱中已排队的消息之后处理。

代码语言:javascript复制
victim.tell(akka.actor.PoisonPill.getInstance(), ActorRef.noSender());

杀死一个 Actor

你也可以通过发送一条Kill消息来“杀死”一个 Actor。与PoisonPill不同的是,这可能会使 Actor 抛出ActorKilledException,并触发失败。Actor 将暂停操作,并询问其监督者如何处理故障,这可能意味着恢复 Actor、重新启动或完全终止 Actor。更多信息,请参阅「What Supervision Means」。

像这样使用Kill

代码语言:javascript复制
victim.tell(akka.actor.Kill.getInstance(), ActorRef.noSender());

// expecting the actor to indeed terminate:
expectTerminated(Duration.ofSeconds(3), victim);

一般来说,虽然在设计 Actor 交互时不建议过分依赖于PoisonPillKill,但通常情况下,鼓励使用诸如PleaseCleanupAndStop之类的协议级(protocol-level)消息,因为 Actor 知道如何处理这些消息。像PoisonPillKill这样的信息是为了能够停止那些你无法控制的 Actor 的。

优雅的停止

如果你需要等待终止或组合多个 Actor 的有序终止,则gracefulStop非常有用:

代码语言:javascript复制
import static akka.pattern.Patterns.gracefulStop;
import akka.pattern.AskTimeoutException;
import java.util.concurrent.CompletionStage;

try {
  CompletionStage<Boolean> stopped =
      gracefulStop(actorRef, Duration.ofSeconds(5), Manager.SHUTDOWN);
  stopped.toCompletableFuture().get(6, TimeUnit.SECONDS);
  // the actor has been stopped
} catch (AskTimeoutException e) {
  // the actor wasn't stopped within 5 seconds
}

gracefulStop()成功返回时,Actor 的postStop()钩子将被执行:在postStop()结尾和gracefulStop()返回之间存在一个“发生在边缘之前(happens-before edge)”的关系。

在上面的例子中,一个定制的Manager.Shutdown消息被发送到目标 Actor,以启动停止 Actor 的过程。你可以为此使用PoisonPill,但在停止目标 Actor 之前,你与其他 Actor 进行交互的可能性很有限。可以在postStop中处理简单的清理任务。

  • 警告:请记住,停止的 Actor 和取消注册的 Actor 是彼此异步发生的独立事件。因此,在gracefulStop()返回后,你可能会发现该名称仍在使用中。为了保证正确的注销,只能重用你控制的监督者中的名称,并且只响应Terminated消息,即不用于顶级 Actor。

协调关闭

有一个名为CoordinatedShutdown的扩展,它将按特定顺序停止某些 Actor 和服务,并在关闭过程中执行注册的任务。

停机阶段(shutdown phases)的顺序在配置akka.coordinated-shutdown.phases中定义。默认阶段定义为:

代码语言:javascript复制
# CoordinatedShutdown is enabled by default and will run the tasks that
# are added to these phases by individual Akka modules and user logic.
#
# The phases are ordered as a DAG by defining the dependencies between the phases
# to make sure shutdown tasks are run in the right order.
#
# In general user tasks belong in the first few phases, but there may be use
# cases where you would want to hook in new phases or register tasks later in
# the DAG.
#
# Each phase is defined as a named config section with the
# following optional properties:
# - timeout=15s: Override the default-phase-timeout for this phase.
# - recover=off: If the phase fails the shutdown is aborted
#                and depending phases will not be executed.
# - enabled=off: Skip all tasks registered in this phase. DO NOT use
#                this to disable phases unless you are absolutely sure what the
#                consequences are. Many of the built in tasks depend on other tasks
#                having been executed in earlier phases and may break if those are disabled.
# depends-on=[]: Run the phase after the given phases
phases {

  # The first pre-defined phase that applications can add tasks to.
  # Note that more phases can be added in the application's
  # configuration by overriding this phase with an additional
  # depends-on.
  before-service-unbind {
  }

  # Stop accepting new incoming connections.
  # This is where you can register tasks that makes a server stop accepting new connections. Already
  # established connections should be allowed to continue and complete if possible.
  service-unbind {
    depends-on = [before-service-unbind]
  }

  # Wait for requests that are in progress to be completed.
  # This is where you register tasks that will wait for already established connections to complete, potentially
  # also first telling them that it is time to close down.
  service-requests-done {
    depends-on = [service-unbind]
  }

  # Final shutdown of service endpoints.
  # This is where you would add tasks that forcefully kill connections that are still around.
  service-stop {
    depends-on = [service-requests-done]
  }

  # Phase for custom application tasks that are to be run
  # after service shutdown and before cluster shutdown.
  before-cluster-shutdown {
    depends-on = [service-stop]
  }

  # Graceful shutdown of the Cluster Sharding regions.
  # This phase is not meant for users to add tasks to.
  cluster-sharding-shutdown-region {
    timeout = 10 s
    depends-on = [before-cluster-shutdown]
  }

  # Emit the leave command for the node that is shutting down.
  # This phase is not meant for users to add tasks to.
  cluster-leave {
    depends-on = [cluster-sharding-shutdown-region]
  }

  # Shutdown cluster singletons
  # This is done as late as possible to allow the shard region shutdown triggered in
  # the "cluster-sharding-shutdown-region" phase to complete before the shard coordinator is shut down.
  # This phase is not meant for users to add tasks to.
  cluster-exiting {
    timeout = 10 s
    depends-on = [cluster-leave]
  }

  # Wait until exiting has been completed
  # This phase is not meant for users to add tasks to.
  cluster-exiting-done {
    depends-on = [cluster-exiting]
  }

  # Shutdown the cluster extension
  # This phase is not meant for users to add tasks to.
  cluster-shutdown {
    depends-on = [cluster-exiting-done]
  }

  # Phase for custom application tasks that are to be run
  # after cluster shutdown and before ActorSystem termination.
  before-actor-system-terminate {
    depends-on = [cluster-shutdown]
  }

  # Last phase. See terminate-actor-system and exit-jvm above.
  # Don't add phases that depends on this phase because the
  # dispatcher and scheduler of the ActorSystem have been shutdown.
  # This phase is not meant for users to add tasks to.
  actor-system-terminate {
    timeout = 10 s
    depends-on = [before-actor-system-terminate]
  }
}

如果需要,可以在应用程序的配置中添加更多的阶段(phases),方法是使用附加的depends-on覆盖阶段。尤其是在before-service-unbindbefore-cluster-shutdownbefore-actor-system-terminate的阶段,是针对特定于应用程序的阶段或任务的。

默认阶段是以单个线性顺序定义的,但是通过定义阶段之间的依赖关系,可以将阶段排序为有向非循环图(DAG)。阶段是按 DAG 的拓扑排序的。

可以将任务添加到具有以下内容的阶段:

代码语言:javascript复制
CoordinatedShutdown.get(system)
    .addTask(
        CoordinatedShutdown.PhaseBeforeServiceUnbind(),
        "someTaskName",
        () -> {
          return akka.pattern.Patterns.ask(someActor, "stop", Duration.ofSeconds(5))
              .thenApply(reply -> Done.getInstance());
        });

在任务完成后,应返回CompletionStage<Done>。任务名称参数仅用于调试或日志记录。

添加到同一阶段的任务是并行执行的,没有任何排序假设。在完成前一阶段的所有任务之前,下一阶段将不会启动。

如果任务没有在配置的超时内完成(请参见「reference.conf」),则下一个阶段无论如何都会启动。如果任务失败或未在超时内完成,则可以为一个阶段配置recover=off以中止关闭过程的其余部分。

任务通常应在系统启动后尽早注册。运行时,将执行已注册的协调关闭任务,但不会运行添加得太晚的任务。

要启动协调关闭进程,可以对CoordinatedShutdown扩展调用runAll

代码语言:javascript复制
CompletionStage<Done> done =
    CoordinatedShutdown.get(system).runAll(CoordinatedShutdown.unknownReason());

多次调用runAll方法是安全的,它只能运行一次。

这也意味着ActorSystem将在最后一个阶段终止。默认情况下,不会强制停止 JVM(如果终止了所有非守护进程线程,则会停止 JVM)。要启用硬System.exit作为最终操作,可以配置:

代码语言:javascript复制
akka.coordinated-shutdown.exit-jvm = on

当使用「Akka 集群」时,当集群节点将自己视为Exiting时,CoordinatedShutdown将自动运行,即从另一个节点离开将触发离开节点上的关闭过程。当使用 Akka 集群时,会自动添加集群的优雅离开任务,包括集群单例的优雅关闭和集群分片,即运行关闭过程也会触发尚未进行的优雅离开。

默认情况下,当 JVM 进程退出时,例如通过kill SIGTERM信号(SIGINTCtrl-C不起作用),将运行CoordinatedShutdown。此行为可以通过以下方式禁用:

代码语言:javascript复制
akka.coordinated-shutdown.run-by-jvm-shutdown-hook=off

如果你有特定于应用程序的 JVM 关闭钩子,建议你通过CoordinatedShutdown对它们进行注册,以便它们在 Akka 内部关闭钩子之前运行,例如关闭 Akka 远程处理。

代码语言:javascript复制
CoordinatedShutdown.get(system)
    .addJvmShutdownHook(() -> System.out.println("custom JVM shutdown hook..."));

对于某些测试,可能不希望通过CoordinatedShutdown来终止ActorSystem。你可以通过将以下内容添加到测试时使用的ActorSystem的配置中来禁用此功能:

代码语言:javascript复制
# Don't terminate ActorSystem via CoordinatedShutdown in tests
akka.coordinated-shutdown.terminate-actor-system = off
akka.coordinated-shutdown.run-by-jvm-shutdown-hook = off
akka.cluster.run-coordinated-shutdown-when-down = off

Become/Unbecome

升级

Akka 支持在运行时对 Actor 的消息循环(例如其实现)进行热交换:从 Actor 内部调用context.become方法。become采用实现新消息处理程序的PartialFunction<Object, BoxedUnit>。热交换代码(hotswapped code)保存在一个Stack中,可以推送和弹出。

  • 警告:请注意,Actor 在被其监督者重新启动时将恢复其原始行为。

要使用become热交换 Actor 的行为,可以参考以下操作:

代码语言:javascript复制
static class HotSwapActor extends AbstractActor {
  private AbstractActor.Receive angry;
  private AbstractActor.Receive happy;

  public HotSwapActor() {
    angry =
        receiveBuilder()
            .matchEquals(
                "foo",
                s -> {
                  getSender().tell("I am already angry?", getSelf());
                })
            .matchEquals(
                "bar",
                s -> {
                  getContext().become(happy);
                })
            .build();

    happy =
        receiveBuilder()
            .matchEquals(
                "bar",
                s -> {
                  getSender().tell("I am already happy :-)", getSelf());
                })
            .matchEquals(
                "foo",
                s -> {
                  getContext().become(angry);
                })
            .build();
  }

  @Override
  public Receive createReceive() {
    return receiveBuilder()
        .matchEquals("foo", s -> getContext().become(angry))
        .matchEquals("bar", s -> getContext().become(happy))
        .build();
  }
}

become方法的这种变体对于许多不同的事情都很有用,例如实现有限状态机(FSM,例如「Dining Hakkers」)。它将替换当前行为(即行为堆栈的顶部),这意味着你不使用unbecome,而是始终显式安装(installed)下一个行为。

另一种使用become的方法不是替换而是添加到行为堆栈的顶部。在这种情况下,必须注意确保pop操作的数量(即unbecome)与push操作的数量在长期内匹配,否则这将导致内存泄漏,这就是为什么此行为不是默认行为。

代码语言:javascript复制
static class Swapper extends AbstractLoggingActor {
  @Override
  public Receive createReceive() {
    return receiveBuilder()
        .matchEquals(
            Swap,
            s -> {
              log().info("Hi");
              getContext()
                  .become(
                      receiveBuilder()
                          .matchEquals(
                              Swap,
                              x -> {
                                log().info("Ho");
                                getContext()
                                    .unbecome(); // resets the latest 'become' (just for fun)
                              })
                          .build(),
                      false); // push on top instead of replace
            })
        .build();
  }
}

static class SwapperApp {
  public static void main(String[] args) {
    ActorSystem system = ActorSystem.create("SwapperSystem");
    ActorRef swapper = system.actorOf(Props.create(Swapper.class), "swapper");
    swapper.tell(Swap, ActorRef.noSender()); // logs Hi
    swapper.tell(Swap, ActorRef.noSender()); // logs Ho
    swapper.tell(Swap, ActorRef.noSender()); // logs Hi
    swapper.tell(Swap, ActorRef.noSender()); // logs Ho
    swapper.tell(Swap, ActorRef.noSender()); // logs Hi
    swapper.tell(Swap, ActorRef.noSender()); // logs Ho
    system.terminate();
  }
}

对 Scala Actor 嵌套接收进行编码,而不会意外泄漏内存

请参阅「Unnested receive example」。

Stash

AbstractActorWithStash类使 Actor 能够临时存储"不能"或"不应该"使用 Actor 当前行为处理的消息。更改 Actor 的消息处理程序后,即在调用getContext().become()getContext().unbecome()之前,所有隐藏的消息都可以“unstashed”,从而将它们预存到 Actor 的邮箱中。这样,可以按照与最初接收到的消息相同的顺序处理隐藏的消息。扩展AbstractActorWithStash的 Actor 将自动获得基于deque的邮箱。

  • 注释:抽象类AbstractActorWithStash实现了标记接口RequiresMessageQueue<DequeBasedMessageQueueSemantics>,如果希望对邮箱进行更多控制,请参阅有关邮箱的文档:「Mailboxes」。

下面是AbstractActorWithStash类的一个示例:

代码语言:javascript复制
static class ActorWithProtocol extends AbstractActorWithStash {
  @Override
  public Receive createReceive() {
    return receiveBuilder()
        .matchEquals(
            "open",
            s -> {
              getContext()
                  .become(
                      receiveBuilder()
                          .matchEquals(
                              "write",
                              ws -> {
                                /* do writing */
                              })
                          .matchEquals(
                              "close",
                              cs -> {
                                unstashAll();
                                getContext().unbecome();
                              })
                          .matchAny(msg -> stash())
                          .build(),
                      false);
            })
        .matchAny(msg -> stash())
        .build();
  }
}

调用stash()会将当前消息(Actor 最后收到的消息)添加到 Actor 的stash中。它通常在处理 Actor 消息处理程序中的默认情况时调用,以存储其他情况未处理的消息。将同一条消息存储两次是非法的;这样做会导致IllegalStateExceptionstash也可以是有界的,在这种情况下,调用stash()可能导致容量冲突(capacity violation),从而导致StashOverflowException。可以使用邮箱配置的stash-capacity设置(一个int值)存储容量。

调用unstashAll()将消息从stash排队到 Actor 的邮箱,直到达到邮箱的容量(如果有),请注意,stash中的消息是预先发送到邮箱的。如果有界邮箱溢出,将引发MessageQueueAppendFailedException。调用unstashAll()后,stash保证为空。

stashscala.collection.immutable.Vector支持。因此,即使是非常大量的消息也可能被存储起来,而不会对性能产生重大影响。

请注意,与邮箱不同,stash是短暂的 Actor 状态的一部分。因此,它应该像 Actor 状态中具有相同属性的其他部分一样进行管理。preRestartAbstractActorWithStash实现将调用unstashAll(),这通常是需要的行为。

  • 注释:如果你想强制你的 Actor 只能使用无界的stash,那么你应该使用AbstractActorWithUnboundedStash类。

Actor 和异常

当 Actor 处理消息时,可能会引发某种异常,例如数据库异常。

消息发生了什么

如果在处理邮件时引发异常(即从邮箱中取出并移交给当前行为),则此邮件将丢失。重要的是要知道它不会放回邮箱。因此,如果你想重试处理消息,你需要自己处理它,捕获异常并重试处理流程(retry your flow)。确保对重试次数进行了限制,因为你不希望系统进行livelock,否则的话,这会在程序没有进展的情况下消耗大量 CPU 周期。

邮箱发生了什么

如果在处理邮件时引发异常,则邮箱不会发生任何异常。如果 Actor 重新启动,则会出现相同的邮箱。因此,该邮箱上的所有邮件也将在那里。

Actor 发现了什么

如果 Actor 内的代码抛出异常,则该 Actor 将被挂起,并且监控过程将启动。根据监督者的决定,Actor 被恢复(好像什么都没有发生)、重新启动(清除其内部状态并从头开始)或终止。

初始化模式

Actor 的丰富生命周期钩子提供了一个有用的工具箱来实现各种初始化模式(initialization patterns)。在ActorRef的生命周期中,Actor 可能会经历多次重新启动,旧实例被新实例替换,外部观察者看不见内部的变化,外部观察者只看到ActorRef引用。

每次实例化一个 Actor 时,可能都需要初始化,但有时只需要在创建ActorRef时对第一个实例进行初始化。以下部分提供了不同初始化需求的模式。

通过构造函数初始化

使用构造函数进行初始化有很多好处。首先,它让使用val字段存储在 Actor 实例的生命周期中不发生更改的任何状态成为可能,从而使 Actor 的实现更加健壮。当创建一个调用actorOf的 Actor 实例时,也会在重新启动时调用构造函数,因此 Actor 的内部始终可以假定发生了正确的初始化。这也是这种方法的缺点,因为在某些情况下,人们希望避免在重新启动时重新初始化内部信息。例如,在重新启动时保护子 Actor 通常很有用。下面的部分提供了这个案例的模式。

通过 preStart 初始化

在第一个实例的初始化过程中,即在创建ActorRef时,只直接调用一次 Actor 的preStart()方法。在重新启动的情况下,postRestart()调用preStart(),因此如果不重写,则在每次重新启动时都会调用preStart()。但是,通过重写postRestart(),可以禁用此行为,并确保只有一个对preStart()的调用。

此模式的一个有用用法是在重新启动期间禁用为子级创建新的ActorRef。这可以通过重写preRestart()来实现。以下是这些生命周期挂钩的默认实现:

代码语言:javascript复制
@Override
public void preStart() {
  // Initialize children here
}

// Overriding postRestart to disable the call to preStart()
// after restarts
@Override
public void postRestart(Throwable reason) {}

// The default implementation of preRestart() stops all the children
// of the actor. To opt-out from stopping the children, we
// have to override preRestart()
@Override
public void preRestart(Throwable reason, Optional<Object> message) throws Exception {
  // Keep the call to postStop(), but no stopping of children
  postStop();
}

请注意,子 Actor 仍然重新启动,但没有创建新的ActorRef。可以递归地为子级应用相同的原则,确保只在创建引用时调用它们的preStart()方法。

有关更多信息,请参阅「What Restarting Means」。

通过消息传递初始化

有些情况下,在构造函数中无法传递 Actor 初始化所需的所有信息,例如在存在循环依赖项的情况下。在这种情况下,Actor 应该监听初始化消息,并使用become()或有限状态机(finite state-machine)状态转换来编码 Actor 的初始化和未初始化状态。

代码语言:javascript复制
@Override
public Receive createReceive() {
  return receiveBuilder()
      .matchEquals(
          "init",
          m1 -> {
            initializeMe = "Up and running";
            getContext()
                .become(
                    receiveBuilder()
                        .matchEquals(
                            "U OK?",
                            m2 -> {
                              getSender().tell(initializeMe, getSelf());
                            })
                        .build());
          })
      .build();
}

如果 Actor 可能在初始化消息之前收到消息,那么一个有用的工具可以是Stash存储消息,直到初始化完成,然后在 Actor 初始化之后重放消息。

  • 警告:此模式应小心使用,并且仅当上述模式均不适用时才应用。其中一个潜在的问题是,消息在发送到远程 Actor 时可能会丢失。此外,在未初始化状态下发布ActorRef可能会导致在初始化完成之前接收到用户消息的情况。

0 人点赞