Actor 模型中的通信模式

在 Actor 模型中所有的 Actor 之间有且只有一种通信模式,那就是 tell 的方式,也就是 fire and forget 的方式。但是在实际的开发过程中工程师们逐渐总结出了一些常用的通信模式。本文以 akka-typed(2.6.0-M8) 框架为例,介绍存在于 actor 模型中最基本的一些通信模式,讨论消息是如何在 actor 之间流动的。

Fire and Forget

我们首先看到最基础的模式 fire and forget 即发送后遗忘。这种模式是直观地尝试向一个 actor 发送一条消息后就不再关心这次通信。此处的发送消息仅保证到 fire 的发送,即消息以发出,但是可能由于网络原因被丢失。这种通信模式使用的是所谓的 at most once 送达语义。

在给出一个可运行的代码例子之前,我们简单介绍一下 akka-typed 框架这个 actor 模型的特定实现。在 akka-typed 中,消息的传输是通过 ActorRef<T> 的引用来进行的,ActorRef 是为位置透明的 actor 的引用,T 表示了 actor 能接收的消息类型。这点跟典型的 actor 模型有所不同。这是因为典型的 actor 模型是非确定性的,实现为 akka untyped 那样的框架,即任意 actor 都可以接受任意类型的信息,并且因此能够通过 become/unbecome 来改变自己的行为(behavior)。在 akka-typed 中通过带有类型标签的引用,保证调用 actorRef.tell(message) 的时候不会意外的传递错误的信息,同时类型信息有助于推理代码,而不是在一堆任意引用任意类型的 ActorRef 和 Object 类型的消息中费力的分析。一个对应的缺点就是无法实现任意灵活度的 become/unbecome,如果需要实现类似状态机的功能,需要加入额外的域并且在 Behaviors 中针对状态分别讨论,但所有能接受的消息都必定是 T 的类型。

import akka.actor.typed.ActorRef;
import akka.actor.typed.ActorSystem;
import akka.actor.typed.Behavior;
import akka.actor.typed.javadsl.Behaviors;
import java.util.concurrent.CountDownLatch;

public class FireAndForget {
    public static final CountDownLatch LATCH = new CountDownLatch(2);

    public static class PrintMe {
        public final String message;
        public PrintMe(String message) {
            this.message = message;
        }
    }

    public static final Behavior<PrintMe> printerBehavior =
        Behaviors.receive((ctx, printMe) -> {
           ctx.getLog().info(printMe.message);
           LATCH.countDown();
           return Behaviors.same();
        });

    public static void main(String[] args) throws Exception {
        ActorSystem<Void> system = ActorSystem.create(Behaviors.setup(ctx -> {
            ActorRef<PrintMe> printer = ctx.spawn(printerBehavior, "printer");
            printer.tell(new PrintMe("message 1"));
            printer.tell(new PrintMe("message 2"));
            return Behaviors.ignore();
        }), "fire-and-forget");

        LATCH.await();

        system.terminate();
    }
}

这里需要用 CountDownLatch 来同步保证消息送达后再关闭 ActorSystem,否则由于执行顺序没有保证,很有可能在看到 log 打印之前由于 ActorSystem 的关闭 printer actor 被停止,从而后续被调度的消息传递失败。由此也可以看出 fire and forget 的方式仅保证在 tell 的调用点发出消息,至于消息能不能送到,会被怎么处理,发送方就不管了。

Request-Response

请求-响应模式是另一种非常常见的通信模式,即 actor-1 向 actor-2 发送消息,actor-2 收到消息后向 actor-1 返回相应的消息。注意这里我们只提到了消息的发送,两个方向均是 fire and forget 的,actor-1 收到的 actor-2 的消息跟它收到的任何一条其他信息没有任何区别。除非 actor-2 发回的消息中携带有跟回应相关的信息,否则 actor-1 并不知道这条消息就是回应刚才发送的某一条信息的。此外,特化到 akka-typed 中,甚至由于消息中不再天然包括 sender 信息,actor-1 并不知道这条消息是 actor-2 发过来的,必须在消息中显式的包含所有需要的信息。

由于 request 和 response 是松耦合的,这和我们下面要讲到的使用 ask pattern 的模式不同,不能作为面向对象编程在 actor 模型中的投影。

import akka.actor.typed.ActorRef;
import akka.actor.typed.ActorSystem;
import akka.actor.typed.Behavior;
import akka.actor.typed.Terminated;
import akka.actor.typed.javadsl.Behaviors;
import scala.runtime.Nothing$;

@SuppressWarnings("WeakerAccess")
public class RequestResponseTypedActors {

    public static final class Request {
        public final String message;
        public final ActorRef<Response> replyTo;

        public Request(String message, ActorRef<Response> replyTo) {
            this.message = message;
            this.replyTo = replyTo;
        }
    }

    public static final class Response {
        public final String message;

        public Response(String message) {
            this.message = message;
        }
    }

    private static final Behavior<Request> responder = Behaviors
            .receiveMessage(request -> {
                System.out.println("got request: " + request.message);
                request.replyTo.tell(new Response("got it!"));
                return Behaviors.same();
            });

    private static Behavior<Response> requester(ActorRef<Request> responder) {
        return Behaviors
                .setup(ctx -> {
                    responder.tell(new Request("hello", ctx.getSelf()));
                    return Behaviors.receiveMessage(response -> {
                        System.out.println("got message " + response.message);
                        return Behaviors.stopped();
                    });
                });
    }

    public static void main(String[] args) {
        ActorSystem.create(Behaviors.setup(ctx -> {
            ActorRef<Request> res = ctx.spawn(responder, "responder");
            ActorRef<Response> req = ctx.spawn(requester(res), "requester");
            ctx.watch(req);

            return Behaviors.receiveSignal((context, sig) -> {
                if (sig instanceof Terminated) {
                    return Behaviors.stopped();
                } else {
                    return Behaviors.unhandled();
                }
            });
        }), "ReqResTyped");
    }
}

可以看到,responder 收到 requester 的消息中包含了 requester 的地址,因此可以发送回传信息。requester 之所以知道 responder 的地址是由开发者在创建 requester 的时候直接告诉它的。这样一个显然的问题就是 requester 是怎么天生地知道 responder 的地址的呢?在分布式系统中,这可以通过预配置固定地址或名称服务来达到这个目的。在 akka-typed 中,通过 ActorSelection 使用 actor 地址寻址的方式已经很难做到的(除非先用 untyped 强行获取再转回去,非常复杂而且 hacky),主要是通过 Receptionist 的名称服务来实现的。这个服务在 cluster 模式下要求必须使用 akka-cluster-typed,实在颇有些臃肿。

Query

这里提到的 Query 模式即是所谓的 ask 模式,也即发送一个消息后等待一个绑定到该消息上的回应。由于网络的滞后性和不确定性,阻塞地等待这个回应不仅会造成性能问题,更有可能由于回应消息的丢失或目标机器宕机而使得当前系统不可用。为此,我们需要解耦请求和响应的过程,将本地返回的类型从直接的结果值变为包裹在 Future 中的值。然后,请求方的业务流程提升到 Future 域中,使用变换组合字来组合后续步骤。由于 Future 盒子是立即返回的,当前线程可以继续其他工作,在 Future 值被填充后根据此前组合的步骤执行相应的逻辑。这里顺带提一句,Future 也是函数式编程中著名的概念 Monad 的一个实例,上面这种域的提升和组合的方式是函数式编程中应对状态变化的一种常用手法。

由于返回的是一个 Future,我们需要仔细的讨论这个 Future 完成的条件。首先,抽象地说,这个 Future 会在收到回复的时候完成。但是这个请求回复的双方如果是两个业务 actor 之间完成的话,业务 actor 就需要有一张表来缓存所有的 Future,并且在请求和回复中都带有 Future 的唯一标识符,以在接受到回复时自动的完成 Future。由于 Query 模式不是天生就伴随在 actor 模型里的,而是一个后期总结的设计模式,这么做会导致所有 actor 都承担这一不必要的开销。

因此在 akka 当中采用的方法是,对于一个请求回复周期,使用一个专门的 PromiseActor 来负责和远端的 actor 交互,即请求消息由 PromiseActor 发出,收到回复时也由 PromiseActor 完成其中唯一的专用的 Future。在 akka-typed 中,还额外加入了 createRequestapplyToResponse 参数,前者用于将 PromiseActor 的引用传入发送的消息中,后者用于接收回复后适配为上层 actor 能接受的消息类型。

import akka.actor.typed.ActorRef;
import akka.actor.typed.ActorSystem;
import akka.actor.typed.Behavior;
import akka.actor.typed.javadsl.AskPattern;
import akka.actor.typed.javadsl.Behaviors;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;

public class Ask {
    public static class Pong { }
    public static class Ping {
        public final ActorRef<Pong> replyTo;
        public Ping(ActorRef<Pong> replyTo) {
            this.replyTo = replyTo;
        }
    }

    public static final Behavior<Ping> pongBehavior =
        Behaviors.receiveMessage(ping -> {
           ping.replyTo.tell(new Pong());
           return Behaviors.stopped();
        });

    public static void main(String[] args) {
        ActorSystem system = ActorSystem.create(Behaviors.setup(ctx -> {
            ActorRef<Ping> ref = ctx.spawn(pongBehavior, "pong");
            CompletableFuture<Pong> future = AskPattern.ask(
                ref,
                Ping::new,
                Duration.ofMillis(2000L),
                ctx.getSystem().scheduler()).toCompletableFuture();
            future.whenComplete((pong, throwable) -> {
                ctx.getLog().info("pong={}, throwable={}", pong, throwable);
                ctx.getSystem().terminate();
            });
            return Behaviors.ignore();
        }), "ask");
    }
}

关于 ask 模式还有一个特别需要注意的点,即如果在 actor 中使用 AskSupport 的 ask,实际上这个 ask 中会有一个 spawn 出来的 PromiseActor。这一点在上面已经提过了,但是绝对值得再强调一遍。因为当你在 actor-1 中使用 AskSupport 应用 ask 模式和 actor-2 通信时,实际通信的是 PromiseActor。因此,akka 提供的关于两个 actor 之间消息的有序性对于这第三个 actor PromiseActor 是不成立的。这有可能导致非常微妙的并发竞争。

Pipe 和 Aggregate

最后简单提及两种消息流模式 Pipe 和 Aggregate,这两种模式经常配合 ask 模式使用。

Pipe 模式将一个 Future 在完成时发送到另一个 actor 上,其基本形式为 pipe(future, replyTo),经常用于 actor-1 向 actor-2 请求,actor-2 进而向 actor-3 请求,随后在 actor-2 中将向 actor-3 请求的 future pipe 到 actor-1 的回复中。举个例子,actor-1 是 client,actor-2 是数据库前端,actor-3 是数据库后端。

Aggregate 模式可以视为 Ask 模式的一个自然扩展。Ask 模式一次只能处理一个请求及其回应,而 Aggregate 模式简单的扩展到一个显式创建的子 actor 来处理多个请求响应,非常直观。

原文地址:https://www.cnblogs.com/tisonkun/p/11622304.html

时间: 2024-11-08 17:34:01

Actor 模型中的通信模式的相关文章

ActorLite: 一个轻量级Actor模型实现

Actor模型 Actor模型为并行而生,具Wikipedia中的描述,它原本是为大量独立的微型处理器所构建的高性能网络而设计的模型.而目前,单台机器也有了多个独立的计算单元,这就是为什么在并行程序愈演愈烈的今天,Actor模型又重新回到了人们的视线之中了.Actor模型的理念非常简单:天下万物皆为Actor,Actor之间通过发送消息进行通信.Actor模型的执行方式有两个特点: 每个Actor,单线程地依次执行发送给它的消息. 不同的Actor可以同时执行它们的消息. 对于第1点至今还有一些

Actor模型-Akka

英文原文链接,译文链接,原文作者:Arun Manivannan ,译者:有孚 写过多线程的人都不会否认,多线程应用的维护是件多么困难和痛苦的事.我说的是维护,这是因为开始的时候还很简单,一旦你看到性能得到提升就会欢呼雀跃.然而,当你发现很难从子任务的错误中恢复或者有些僵尸BUG很难复现再或者你的分析器显示你的线程在写入一个共享状态前大部分时间都浪费在阻塞上面的时候,痛苦降临了. 我刻意没提Java的并发API,以及它里面的集合类使得多线程编程变得多么轻松简单,因为我相信既然你们点进了这篇文章,

【Scala篇】--Scala中Trait、模式匹配、样例类、Actor模型

一.前述 Scala Trait(特征) 相当于 Java 的接口,实际上它比接口还功能强大. 模式匹配机制相当于java中的switch-case. 使用了case关键字的类定义就是样例类(case classes),样例类是种特殊的类. Actor相当于Java中的多线程. 二.具体阐述 trait特性 1.概念理解 Scala Trait(特征) 相当于 Java 的接口,实际上它比接口还功能强大. 与接口不同的是,它还可以定义属性和方法的实现. 一般情况下Scala的类可以继承多个Tra

在.NET中实现Actor模型的不同方式

上周,<实现领域驱动设计>(Implementing Domain-Driven Design)一书的作者Vaughn Vernon,发布了Dotsero,这是一个使用C#编写的.基于.NET的Actor模型工具包,它的实现参考了Akka API.Akka工具包是对Actor模型的一种实现,目前为止已经有对应Java和Scala版本的API. 今年早些时候,微软Research部门也发布了一个基于Actor模型的框架,Orleans框架的预览版.这个框架采用了云端编程模型,编写这个框架的目的在

java中多线程通信实例:生产者消费者模式

线程间的通信: 其实就是多个线程再操作同一个资源,但是操作的动作不同   当某个线程进入synchronized块后,共享数据的状态不一定满足该线程的需要,需要其他线程改变共享数据的状态后才能运行,而由于当时线程对共享资源时独占的,它必须解除对共享资源的锁定的状态,通知其他线程可以使用该共享资源. Java中的 wait(),notify(),notifyAll()可以实现线程间的通信. 生产者--消费者问题是典型的线程同步和通信问题 /** * 生产者和消费者问题,生产者生成出产品,消费者去购

分布式高并发下Actor模型

写在开始 一般来说有两种策略用来在并发线程中进行通信:共享数据和消息传递.使用共享数据方式的并发编程面临的最大的一个问题就是数据条件竞争.处理各种锁的问题是让人十分头痛的一件事. 传统多数流行的语言并发是基于多线程之间的共享内存,使用同步方法防止写争夺,Actors使用消息模型,每个Actor在同一时间处理最多一个消息,可以发送消息给其他Actor,保证了单独写原则.从而巧妙避免了多线程写争夺.和共享数据方式相比,消息传递机制最大的优点就是不会产生数据竞争状态.实现消息传递有两种常见的类型:基于

Actor模型[转]

原文链接:http://blog.jeoygin.org/archives/477 Actor这个模型由Carl Hewitt在1973年提出,Gul Agha在1986年发表技术报告“Actors: A Model of Concurrent Computation in Distributed Systems”,至今已有不少年头了.在计算机科学中,它是一个并行计算的数学模型,最初为由大量独立的微处理器组成的高并行计算机所开发,Actor模型的理念非常简单:天下万物皆为Actor. Actor

Orleans框架------基于Actor模型生成分布式Id

一.Actor简介 actor模型是一种并行计算的数学模型. 响应于收到的消息,演员可以:做出决定,创建更多Actor,发送更多消息,并确定如何响应接收到的下一条消息. 演员可以修改自己的状态,但只能通过消息相互影响(避免需要任何锁). actor是一个计算实体,当其收到消息时,可以并发执行如下操作: 1. 发送有限数量的消息给其他actor 2. 创建有限数量的新actor 3. 指定收到下一消息时的行为 在Orleans中使用的是虚拟Actor方式,详细:http://dotnet.gith

稍微谈一下 javascript 开发中的 MVC 模式

随着前台开发日益受到重视,客户端代码比重日益增加的今天,如何在javascript开发里应用MVC模式,这个问题似乎会一直被提到,所以偶在这里粗略的谈一下自己的看法吧. MVC模式的基本理念,是通过把一个application封装成model, view和controller三个部分达到降低耦合,简化开发的目的.这么说很空洞,大家可以实际看个例子: 1<select id="selAnimal"> 2    <option value="cat"&