基于Actor的并发方案

  • 共享可变状态的问题
  • Actor模型
  • Actor系统
  • 定义Actor
  • 消息处理
    • 副作用
    • 非类型化
    • 异步和非阻塞
  • 创建一个Actor
  • 发送消息
  • 消息应答
  • 问询Ask机制
  • 有状态的Actor
  • 小结

译者注: 本文原文标题:《The Neophyte’s Guide to Scala Part 14: The Actor Approach to Concurrency》,作者:Daniel Westheide, 原文链接:http://danielwestheide.com/blog/2013/02/27/the-neophytes-guide-to-scala-part-14-the-actor-approach-to-concurrency.html, 本文是作者撰写的《Scala初学者指南》系列文章里的第14篇,是对Scala的Actor编程模型和Akka的一个入门介绍,在征得作者授权之后,翻译为中文发表在我的技术博客上。本文原文出处: http://blog.csdn.net/bluishglc/article/details/53155454 严禁任何形式的转载,否则将委托CSDN官方维护权益!



在前面几篇关于如何利用Scala类型系统大幅提升灵活性和编译期安全性的文章之后,我们现在重新回到这个系列早期探讨过的一个话题:Scala如何处理并发。

我们前面介绍过组合使用Future进行异步处理的方案,这个方案能很好地应对诸多问题,但是,它不是Scala提供的唯一方案,Scala处理并发的另一块基石是“Actor模型”,它提供了一种基于进程间消息传递的并发方案。

Actor并不是新概念,最知名的实现是在Erlang上,Scala核心库很早就有自己的Actor实现了,但是在Scala2.11版本之后,它面临被废弃的命运,因为它将被Akka提供的Actor实现所取代,Akka作为Scala事实上的Actor标准已经很久了。

本文你将了解到Akka Actor模型的理念并学习如何使用Akka工具箱进行基础编程,我们不打算深入地讨论Akka Actor的所有内容,所以,和本系列前面的多数文章不同,本文是为了让你了解一下Akka的理念,激发你对它的兴趣。

共享可变状态的问题

当前主流的并发解决方案是“共享可变状态”( shared mutable ,即:大量有状态的对象,它们的状态可以被应用程序的很多地方改变,每处修改都是在自己的线程上进行的。这类方案下代码通常会遍布读/写锁,防止多个线程同时修改,以保证对象状态的改变是可控的。同时,我们还要尽量避免锁住太大的代码块,因为这会大幅度的削弱程序的性能。

Actor模型

广泛使用的“共享可变状态”方案需要你时刻谨记你的代码要在并发场景下运行,你必须从头到尾用并发的方式去设计和编写你的应用,以后你将很难再为其添加支持。Actor编程模型致力于避免上述的所有问题,它允许你编写易读的高性能并发代码。

Actor模型的思想是:把你的应用程序看作是由许多轻量的被称之为“Actor”的实体组成的,每个Actor只负责一个很小的任务,职责单一且清晰,复杂的业务逻辑会通过多个Actor之间相互协作来完成,比如委派任务给其他的Actor或者传递消息给协作者。

Actor系统

Actor是种可怜的“生物”,它们不能独自存活。Akka中的每一个Actor都是由一个Actor系统(Actor System)来创建和维护的。一个Actor系统会提供一整套辅助功能,但是现在我们先不用关心这些。

让我们从示例代码开始,要运行它们需要在基于Scala 2.10的SBT项目里添加如下的resolver和dependency:

resolvers += "Typesafe Releases" at "http://repo.typesafe.com/typesafe/releases"
libraryDependencies += "com.typesafe.akka" %% "akka-actor" % "2.2.3"

现在,让我们来创建一个ActorSystem,它将是所有用户定义的Actor的宿主环境。

import akka.actor.ActorSystem
object Barista extends App {
  val system = ActorSystem("Barista")
  system.shutdown()
}

我们创建了一个ActorSystem实例,把它命名为“Barista”(咖啡师),我们将以”购买和制作咖啡”这个场景来讲解,这是我们在本系列较早一篇文章 composable futures中提到过的例子。

程序的最后,作为一个好市民,当我们不再使用这个ActorSystem时,我们要记得关闭它。

定义Actor

你的应用程序是由几十个还是几百万个Actor来组成取决于你的用例,但是Akka完全有能力处理百万级别数量的Actor,你可能觉得创建这么多Actor一定是疯了,但实际上,Actor和线程之间并不是一一对应的关系,这一点非常重要,否则系统的内存很快就会被耗光的。由于Actor的非阻塞特性,一个线程可以为不同的Actor服务,Akka(译者注:具体地说是Dispatcher)会让线程在不同的Actor之间切换,依据是当前谁有消息需要被处理就分配线程给它。

为了了解实际发生了什么,让我们首先创建一个简单的Actor:Barista,它负责接收咖啡订单制作咖啡,我们只是简单地让它打印一条消息来表示它完成了咖啡订单的处理:

sealed trait CoffeeRequest
case object CappuccinoRequest extends CoffeeRequest
case object EspressoRequest extends CoffeeRequest

import akka.actor.Actor
class Barista extends Actor {
  def receive = {
    case CappuccinoRequest => println("I have to prepare a cappuccino!")
    case EspressoRequest => println("Let‘s prepare an espresso.")
  }
}

首先,我们要定义Actor能理解的消息类型。如果消息有各种参数,通常我们会使用case class来封装消息并在Actor之间传递它们。如果消息没有参数,就简单地使用case object就可以了,就像我们这里写的一样。

任何情况下,要确保你的消息是不可变的,否则出现糟糕的后果。

接下来,我们看一下Barista类,它是一个具体类,继承自Actor特质,这个特质定义了一个方法receive,它返回一个Receive类型的值。Receive是类型PartialFunction[Any, Unit]的一个别名。

消息处理

receive方法的含义是什么呢?它的返回类型PartialFunction[Any, Unit]可能会让你觉得古怪。

简单地说,receive方法返回的这个偏函数代表着这个Actor对传递给它的所有消息的处理逻辑。无论什么时候,当你系统的其他部分(另外一个Actor或者其他什么东西)给这个Actor发送消息时,Akka最终都会通过调用这个Actor的receive方法所返回的这个偏函数来处理消息,在调用时会把这条消息作为这个偏函数的参数传递它。

副作用

在处理消息的时候,你可以让Actor做任何你想做的事情,但是,它就是不能有返回值

为什么?

因为偏函数限定了返回类型只能是Unit,我们曾经强调在函数式编程里应该总是尽量地使用纯函数,所以这里可能会让你感到有些意外。对于一个并发编程模型来说,这其实是合理的,Actor接收的每一条消息都是隔离处理的,是一条一条进行的,并不需要同步或锁机制。那些可能出现副作用的地方都会以一种可控的方式去处理。

非类型化

偏函数的另一个副作用是它所期望的参数类型Any,也就是消息是非类型化的,这在拥有如此强大类型系统的Scala里看上去让人困惑。

非类型化配合一些重要的设计决策可以让我们做很多事情,比如转发消息给其他的Actor,负载均衡或者代理Actor从而避免发送者了解过多的细节。

从实践来看,偏函数里代表消息的参数没有类型化通常不是问题,如果你使用的是强类型化的消息,你就使用模式匹配处理对应类型的消息就可以了,就像我们前面代码里做的那样。

但是有些时候,弱类型的Actor确实会导致糟糕的Bug。如果你习惯并高度依赖强类型系统,你可以看一下Akka还处于实验阶段的特性:Typed Channels。

异步和非阻塞

我前面提到Akka“最终”会让你的Actor去处理发送给它的消息。这句话的意思是:消息的发送和处理完成是一个异步的非阻塞的过程。发送方不会一直被阻塞到接收方处理完消息,它会直接继续它自己的工作,发送方可能会期望从处理方那里得到一个反馈消息,但也许它根本不关心消息的处理结果。

当某些组件发送一条消息给一个Actor时,真正发生的事情是:这个消息被投递给了这个Actor的Mailbox, 我们基本上可以把Mailbox当成一个对列。把一条消息放到一个Actor的Mailbox的过程也是非阻塞的,比如:发送方不会一直等待消息进入接收方的Mailbox对列。

Dispatcher会通知Actor它的Mailbox有一条新消息,如果这个Actor正在处理着手头上的消息,Dispatcher会从当前的执行上下文里选一个可用的线程,一旦Actor处理完前面的消息,它会让Actor在这个准备好的线程上从Mailbox里取出这条消息去处理。

Actor会阻塞分配给它的线程直到它开始处理一条消息,但这并不会阻塞消息的发送方,这意味着一个耗时较长的操作会影响整体的性能,因为所有其他的Actor不得不被安排从剩余线程中选取一个去处理消息(译者注:增加了线程调度的开销)。

因此,你的Receive偏函数要遵从一个核心的准则:尽可能地缩短它的执行时间(译者注:可以把任务分解为更小的单元委派更粒度更细的Actor去执行)。更重要的是,在你的消息处理代码里尽量地避免调用会造成阻塞的代码。

当然,有些事情是你无法完全避免的,比如,当今主流的数据库驱动基本都是阻塞的,如果你想让你的程序基于Actor去持久化或查询数据时,你就会面临这样的问题。现在已经有一些应对这类问题的方案了,但是作为一偏介绍性的文章,本文先不涉及。

创建一个Actor

定义好Actor之后,我们如何实际地使用我们的Barista呢?为此,我需要为创建Barista的一个实例。你可能会用下面这种常规的做法,即通过调用构造函数来实例化一个Actor:

val barista = new Barista // will throw exception

这样不行!Akka会用一个ActorInitializationException异常回敬你。事情是这样的:为了能让Actor良好地工作,你的Actor必须交给ActorSystem和它的组件来托管。因此,你需要通过ActorSystem来帮你创建这个实例:

import akka.actor.{ActorRef, Props}
val barista: ActorRef = system.actorOf(Props[Barista], "Barista")

定义在AcotorSysytem上actorOf方法期望一个Props实例,针对新建的Actor的配置信息都会封装到这个Props实例里,另外,方法也给这个Actor起了一个名称。我们使用的是创建Props实例的最简单的方式,也就是通过调用Props的伴生对象的apply方法来实现的,同时指定它的类型参数。Akka随后将根据给定的类型调用它的构造函数来创建一个新的Actor实例。

注意:actorOf返回的对象类型不是Barista而是ActorRef(译者注:Akka对Actor的托管体现在很多地方,前面提到的你不能直接创建一个Actor实例是一方面,此处,创建之后你得到的也不是Actor实例本身而是一个ActorRef)。Actor从不会与其他Actor直接通信,因此没有必要获取一个Actor实例的直接引用,而是让Actor或其他组件获取那些需要发送消息的Actor的ActorRef

因此,ActorRef扮演了Actor实例代理的角色,这样做会带来很多好处,因为一个ActorRef可以被序列化,我们可以让它代理一个远程机器上的Actor,至于ActorRef后面的这个Actor是本地JVM里的还是一个远程机器上的,这对使用者来说都是透明的,我们把这种特性称作“位置透明”。

请记住,ActorRef不是类型参数化的,任何ActorRef都可以互相交换,以便可以让你发送任意的消息给任意的ActorRef。这种设计我们前面也提到了,它让你可以简单地修改你的Actor系统的网络拓扑而不需要对发送方做任何修改。

发送消息

现在我们已经创建了一个Barista的actor实例并获得了指向它的引用ActorRef。我们现在可以给它发送消息了。这是通过调用ActorRef!方法实现的:

barista ! CappuccinoRequest
barista ! EspressoRequest
println("I ordered a cappuccino and an espresso")

调用!方法是一个“调用之后不管”的操作:你告诉Barista你想要一杯卡布奇诺,但是你不会一直等着它响应。这是Akka里面Actor最常见的交互方式。通过调用!,你告诉Akka把你的消息加入到收信人的邮箱队列里,如前面所述,这是非阻塞的,作为接收方的Actor最终一定会处理你的消息。

由于异步的特性,上述代码的执行循序是不确定的,它可能会是这样一种结果:

I have to prepare a cappuccino!
I ordered a cappuccino and an espresso
Let‘s prepare an espresso.

即使我们是先给Barista的Mailbox连续发送了两条消息,但是描述顾客点完咖啡信息去先于espresso的制作完成前打印到了控制台。

消息应答

有时候,你可能想通过给消息发送者发送一个应答消息。为了让你能那样做,Actor有一个叫sender的方法,它返回最后一条消息的发送方的ActorRef

但是它是怎么知道发送者是谁的呢?答案可以从!方法的函数签名看出来,它的第二个参数是一个隐式参数:

def !(message: Any)(implicit sender: ActorRef = Actor.noSender): Unit

在调用某个Actor的!方法时,这个Actor的ActorRef会作为隐士参数sender传递过去。

让我们改变一下Barista以便于它在工作前先发送一条账单消息BillCoffeeRequest的发送者:

case class Bill(cents: Int)
case object ClosingTime
class Barista extends Actor {
  def receive = {
    case CappuccinoRequest =>
      sender ! Bill(250)
      println("I have to prepare a cappuccino!")
    case EspressoRequest =>
      sender ! Bill(200)
      println("Let‘s prepare an espresso.")
    case ClosingTime => context.system.shutdown()
  }
}

上述代码里我们引入了一个新的消息ClosingTime, Barista应对这条消息的做法就是关闭ActorSystem,所有的Actor都可以从ActorContext获取ActorSystem。

现在,让我们引入第二个Actor,它代表“顾客”。

case object CaffeineWithdrawalWarning
class Customer(caffeineSource: ActorRef) extends Actor {
  def receive = {
    case CaffeineWithdrawalWarning => caffeineSource ! EspressoRequest
    case Bill(cents) => println(s"I have to pay $cents cents, or else!")
  }
}

这是一个咖啡成瘾的顾客,一旦咖啡因摄入量减少,他就要买咖啡喝。我们给Customer的构造函数传递一个ActorRef,即caffeineSourceCustomer并不知道这个ActorRef指向一个Barista,但它知道它可以发送一个CoffeeRequest消息给它,仅此而已。

最后,让我们实例化这两个Actor,并给Customer发送一个CaffeineWithdrawalWarning消息(译者注:这条消息的意思是:咖啡因摄入量减少,发出警告,言下之意就是顾客的身体告诉顾客:“你该喝咖啡了!”),让程序跑起来:

val barista = system.actorOf(Props[Barista], "Barista")
val customer = system.actorOf(Props(classOf[Customer], barista), "Customer")
customer ! CaffeineWithdrawalWarning
barista ! ClosingTime

在这里,对于这个Customer,我们使用了一个不同的工厂方法来创建一个Props实例:我们传入Actor的类型以及它的构造函数所需要的参数。我们这样做是因为我们想把BaristaActorRef传递给Customer 构造函数。

发送CaffeineWithdrawalWarning消息给咖啡因成瘾的顾客,顾客的反应是立即买一杯浓咖啡,即发送一条EspressoRequest消息给咖啡师,后者则会发送一个账单消息给顾客。相应的输出可能是这样的:

Let‘s prepare an espresso.
I have to pay 200 cents, or else!

首先,在处理EspressoRequest消息的时候,Barista会发一个消息给sender,也就是Customer,但是,这个操作并不会阻塞它后面的操作。Barista可以继续处理EspressoRequest,就如像控制台打印的那样。很快, Customer会处理Bill消息,然后打印到控制台。

问询(Ask)机制

有时候,给一个Actor发送消息然后期待返回一个响应消息这种模式并不适用于某些场景,最常见的例子是当某些组件并不是Actor但又需要和Actor交互时,它们就无法接收来自Actor的消息。

对于这种情况,Akka有一种Ask机制,它在基于Actor的并发和基于Future的并发之间提供一座桥梁,从客户端的角度看,它是这样工作的:

import akka.pattern.ask
import akka.util.Timeout
import scala.concurrent.duration._
implicit val timeout = Timeout(2.second)
implicit val ec = system.dispatcher
val f: Future[Any] = barista2 ? CappuccinoRequest
f.onSuccess {
  case Bill(cents) => println(s"Will pay $cents cents for a cappuccino")
}

首先,你要引入对ask语法的支持(即import akka.pattern.ask),同时针对?方法返回的Future创建一个隐式变量timeout,Future还需要一个ExecutionContext。这里我们只简单地使用ActorSystem默认的dispatcher,它同时也是一个ExecutionContext

如你所见,返回的Future是非类型化的,它是一个Future[Any]。这也不奇怪,既然它接收的是来自某个Actor的消息,这些Actor(ActorRef)尚未类型化,返回的Future又怎么能类型化呢?

对于被问询的Actor来说,这和发送消息给被处理的消息的发送方并没有什么不同,所以我们在使用Ask机制从Barista处获取答复时,Barista本身不需要做任何改动。

一旦被询问的Actor发送了反馈消息给消息的发送方,返回的FuturePromise就完成了。

总的来说,主动告知要比问询好,因为它耗费的资源更少,Akka不是给“礼貌”的人准备的。但是,确实有些场景你只能使用问询的方式,但那也没有什么,Akka同样可以工作地很好。

有状态的Actor

每个Actor都可以维护一个内部的状态,但不是必须的。有时候,系统的整体状态很大一部分是由那些在Actor之间传递的不可变的消息所携带的信息组成的。

一个Actor一次只处理一条消息,这个过程中它可能会修改它的内部状态,这意味着Actor的内部状态是可变的,但是既然每一条消息是互相隔离处理的,那么Actor的内部状态就不会因为并发而被搞乱。

为了说明这一点,让我们把无状态的Barista改造成有状态的,我们给它添加一个订单计数器:

class Barista extends Actor {
  var cappuccinoCount = 0
  var espressoCount = 0
  def receive = {
    case CappuccinoRequest =>
      sender ! Bill(250)
      cappuccinoCount += 1
      println(s"I have to prepare cappuccino #$cappuccinoCount")
    case EspressoRequest =>
      sender ! Bill(200)
      espressoCount += 1
      println(s"Let‘s prepare espresso #$espressoCount.")
    case ClosingTime => context.system.shutdown()
  }
}

我们引入了两个变量,cappuccinoCountexpressoCount分别代表两类咖啡的订单数量。这是我们在本系列文章里第一次使用var,虽然在函数式编程里我们应该尽量地避免使用var,但这是唯一让你的Actor携带”状态”的做法。既然每条消息都是隔离处理的,上述的代码和在非Actor环境里使用AtomicInteger效果上是一样的。

小结

到这里,我们关于使用Actor模型进行并发编程以及如何在Akka中使用这种编程范式的介绍就要结束了。我们只是泛泛地介绍了一下,忽略了Akka里一些重要的概念,我希望这能让你对这种并发编程方法先有一个初步的了解,并激发出你的学习兴趣。

在下一篇文章里,我会增强我们的例子,给它添加一些有意义的行为以便介绍更多关于Akka Actor的东西,比如在一个Actor系统里是如何进行错误处理的。

时间: 2024-08-11 07:48:51

基于Actor的并发方案的相关文章

初解,Scala语言中基于Actor的并发编程的机制,并展示了在Spark中基于Scala语言的Actor而产生的消息驱动框架Akka的使用,

Scala深入浅出实战中级--进阶经典(第66讲:Scala并发编程实战初体验及其在Spark源码中应用解析)内容介绍和视频链接 2015-07-24 DT大数据梦工厂 从明天起,做一个勤奋的人 看视频.下视频,分享视频 DT大数据梦工厂-Scala深入浅出实战中级--进阶经典:第66讲:Scala并发编程实战初体验及其在Spark源码中的应用解析 本期视频通过代码实战详解了Java语言基于加锁的并发编程模型的弊端以及Scala语言中基于Actor的并发编程的机制,并展示了在Spark中基于Sc

再谈AbstractQueuedSynchronizer:基于AbstractQueuedSynchronizer的并发类实现

公平模式ReentrantLock实现原理 前面的文章研究了AbstractQueuedSynchronizer的独占锁和共享锁,有了前两篇文章的基础,就可以乘胜追击,看一下基于AbstractQueuedSynchronizer的并发类是如何实现的. ReentrantLock显然是一种独占锁,首先是公平模式的ReentrantLock,Sync是ReentractLock中的基础类,继承自AbstractQueuedSynchronizer,看一下代码实现: 1 abstract stati

利用基于控制器的加密方案进行数据保护(三)

利用基于控制器的加密方案进行数据保护(三) 提交加密模块进行FIPS验证须知 本系列文章中第一篇简要介绍了FIPS验证流程:第二篇则探究了FIPS 140-2的验证级别及其要求. 本篇是此系列博文的最后一篇,讲解提交加密模块进行FIPS验证时的其他注意事项. 客户须知 开发加密模块进行FIPS验证时需要完成若干步骤,高级别的认证包括如下步骤: 1.    与一家FIPS140加密及安全测试(CST)实验室签订合同. 2.   定义加密边界,并确保其满足FIPS 140-2要求. 3.   采取措

利用基于控制器的加密方案进行数据保护(二)

利用基于控制器的加密方案进行数据保护(二) FIPS 140-2验证级别及需求 本加密系列的第一篇博文解释了基于控制器的加密(CBE),并概要介绍了FIPS验证流程.现在来探讨一下Federal Information Processing Standards 140 (FIPS 140-2,联邦信息处理标准)的验证级别及其需求. FIPS 140-2验证级别 与加密模块的设计与实现相关的领域共有十一个,其中每个领域的安全级别可以划分为1(最低)到4(最高)不等. 加密模块还有一个总安全级别的评

HttpClient基于HTTP协议认证方案

简介 HttpClient支持三种不同形式的HTTP协议验证发难:Basic,Digest和NTLM.这些都可以解决同http服务器或是代理服务器之间的认证. 认证服务器 HttpClient处理服务器认证几乎是透明的,发开者只需要做一件事情,提供一个合法login证书.此证书被保存到HttpState实例中并且可调用方法setCredentials(AuthScopeauthscope, Credentials cred)和getCredentials(AuthScope authscope)

27 Apr 18 GIL 多进程多线程使用场景 线程互斥锁与GIL对比 基于多线程实现并发的套接字通信 进程池与线程池 同步、异步、阻塞、非阻塞

27 Apr 18 一.全局解释器锁 (GIL) 运行test.py的流程: a.将python解释器的代码从硬盘读入内存 b.将test.py的代码从硬盘读入内存  (一个进程内装有两份代码) c.将test.py中的代码像字符串一样读入python解释器中解析执行 1 .GIL:全局解释器锁 (CPython解释器的特性) In CPython, the global interpreter lock, or GIL, is a mutex that prevents multiple na

并发编程(6)基于锁的并发数据结构设计

主要内容: 并发数据结构设计的意义 指导如何设计 实现为并发设计的数据结构 如果一种数据结构可以被多个线程所访问,其要不就是绝对不变的(其值不会发生变化,并且不需同步),要不程序就要对数据结构进行正确的设计,以确保其能在多线程环境下能够(正确的)同步.一种选择是使用独立的互斥量,其可以锁住需要保护的数据,另一种选择是设计一种能够并发访问的数据结构.第一种使用互斥量,在同一时间只有一个线程可以访问数据,实际是一种串行的序列化访问.显示的组织了多线程对数据结构的并发访问. 所以,缩小保护区域,减少序

基于UDP协议的socket套接字编程 基于socketserver实现并发的socket编程

基于UDP协议 的socket套接字编程 1.UDP套接字简单示例 1.1服务端 import socket server = socket.socket(socket.AF_INET,socket.SOCK_DGRAM) # 建立一个UDP协议的服务器 server.bind(("127.0.0.1",8080)) while True: data,addr = server.recvfrom(1024) server.sendto(data.upper(),addr) server

第五十个知识点:什么是BLS基于对的签名方案?

第五十个知识点:什么是BLS基于对的签名方案? BLS签名方案使用了椭圆曲线上了Weil对,本质上是一个在曲线上除n划分的双线性形式,使用 \(n^{th}\) 个单位根. 假设我们有一个椭圆曲线\(E/F_{3^l}\),根据原始论文中的记号,方案如下描述: 密钥生成:让\(E/F_{3^l}\)是一个椭圆曲线,\(q\)是这个曲线阶数的最大因数.让\(P\)是其中的一个阶数是\(q\)的点,然后随机的选择\(x \in Z_q^*\).最后让\(R = x \cdot P\).那么输出\((