Akka并发编程——第七节:Actor模型(六)

主要内容:

1. Typed Actor定义

2. Typed Actor创建

3. 消息发送

1. Typed Actor定义

Akka中的Typed Actor是Active Objects设计模式的实现,Active Objects模式将方法的执行和方法的调用进行解耦合,从而为程序引入并发性。Typed Actor由公用的接口和对应实现两部分构成,其后面深层次的实现使用的是代理模式,即通过使用JDK中的动态代理来实现,在调用接口的方法时自动分发到实现接口的对象上。Typed Actor的定义[ ]如下所示。

trait Squarer {
    //fire-and-forget消息
    def squareDontCare(i: Int): Unit
    //非阻塞send-request-reply消息
    def square(i: Int): Future[Int]
    //阻塞式的send-request-reply消息
    def squareNowPlease(i: Int): Option[Int]
    //阻塞式的send-request-reply消息
    def squareNow(i: Int): Int
  }

  class SquarerImpl(val name: String) extends Squarer {
    def this() = this("SquarerImpl")

    def squareDontCare(i: Int): Unit = i * i
    def square(i: Int): Future[Int] = Promise.successful(i * i).future
    def squareNowPlease(i: Int): Option[Int] = Some(i * i)
    def squareNow(i: Int): Int = i * i
  }

trait Squarer中定义了4个方法:

(1)def squareDontCare(i: Int): Unit方法:返回值类型为Unit,它类似于Untyped Actor中的fire-and-forget消息发送模型,即!和tell方法调用。

(2)def square(i: Int): Future[Int]:返回值类型为Future[Int],它类似于Untyped Actor中的send-request-reply消息发送模型,即?和ask方法调用,此种调用是非阻塞的。

(3)def squareNowPlease(i: Int): Option[Int]:返回值类型为Option[Int](Option类可以是scala.Option[_]也可以是akka.japi.Option

2. 创建Typed Actor

通过下列代码创建Typed Actor实例。

//直接通过默认的构造函数创建Typed Actor
val mySquarer: Squarer =TypedActor(system).typedActorOf(TypedProps[SquarerImpl]())
//直接通过默认的构造函数创建Typed Actor并指定Typed Actor名称
val mySquarer: Squarer =TypedActor(system).typedActorOf(TypedProps[SquarerImpl](),"mySquarer")
//通过非默认的构造函数创建Typed Actor并指定Typed Actor名称
val otherSquarer: Squarer = TypedActor(system).typedActorOf(TypedProps(classOf[Squarer],new SquarerImpl("SquarerImpl")), "otherSquarer")

上面代码演示的是使用构造函数和非默认构造函数创建Typed Actor,其中Squarer为代理的类型,SquarerImpl为具体实现的类型。

3. 消息发送

//fire-forget消息发送
  mySquarer.squareDontCare(10)

  //send-request-reply消息发送
  val oSquare = mySquarer.squareNowPlease(10)

  val iSquare = mySquarer.squareNow(10)

  //Request-reply-with-future 消息发送
  val fSquare = mySquarer.square(10)
  val result = Await.result(fSquare, 5 second)

代码mySquarer.squareDontCare(10)是单向消息发送,方法将在另外一个线程上异步地执行;val oSquare = mySquarer.squareNowPlease(10)、val iSquare = mySquarer.squareNow(10)为Request-reply消息发送,在特定时间内以阻塞的方式执行,对于.squareNowPlease(10)方法如果在对应时间内没有返回结果则返回值为None,否则返回值为Option[Int]类型,对于squareNow(10)方法如果在对应时间内无返回值则会抛出异常java.util.concurrent.TimeoutException,否则返回Int类型值;val fSquare = mySquarer.square(10)为Request-reply-with-future式的消息发送,以非阻塞的方式执行,可以通过val result = Await.result(fSquare, 5 second)获取执行结果。完整代码如下所示。

/*
 * Typed Actor
 */
object Example_01 extends  App {

  import akka.event.Logging
  import scala.concurrent.{ Promise, Future }
  import akka.actor.{ TypedActor, TypedProps }
  import scala.concurrent.duration._

  trait Squarer {
    //fire-and-forget消息
    def squareDontCare(i: Int): Unit
    //非阻塞send-request-reply消息
    def square(i: Int): Future[Int]
    //阻塞式的send-request-reply消息
    def squareNowPlease(i: Int): Option[Int]
    //阻塞式的send-request-reply消息
    def squareNow(i: Int): Int
  }

  class SquarerImpl(val name: String) extends Squarer {
    def this() = this("SquarerImpl")

    def squareDontCare(i: Int): Unit = i * i
    def square(i: Int): Future[Int] = Promise.successful(i * i).future
    def squareNowPlease(i: Int): Option[Int] = Some(i * i)
    def squareNow(i: Int): Int = i * i
  }

  val system = ActorSystem("TypedActorSystem")
  val log = Logging(system, this.getClass)

  //使用默认构造函数创建Typed Actor
  val mySquarer: Squarer =
    TypedActor(system).typedActorOf(TypedProps[SquarerImpl](),"mySquarer")

  //使用非默认构造函数创建Typed Actor
    val otherSquarer: Squarer =
      TypedActor(system).typedActorOf(TypedProps(classOf[Squarer],
        new SquarerImpl("SquarerImpl")), "otherSquarer")

  //fire-forget消息发送
  mySquarer.squareDontCare(10)

  //send-request-reply消息发送
  val oSquare = mySquarer.squareNowPlease(10)

  log.info("oSquare="+oSquare)

  val iSquare = mySquarer.squareNow(10)
  log.info("iSquare="+iSquare)

  //Request-reply-with-future 消息发送
  val fSquare = mySquarer.square(10)
  val result = Await.result(fSquare, 5 second)

  log.info("fSquare="+result)

  system.shutdown()
}

代码运行结果如下:

[INFO] [03/21/2016 21:15:50.592] [main] [Example12_9(akka://TypedActorSystem)]oSquare=Some(100)[INFO][03/21/201621:15:50.649][main][Example129(akka://TypedActorSystem)] iSquare=100

[INFO] [03/21/2016 21:15:50.649] [main] [Example12_9$(akka://TypedActorSystem)] fSquare=100

时间: 2024-11-08 16:38:51

Akka并发编程——第七节:Actor模型(六)的相关文章

Akka并发编程——第八节:Actor模型(七)

本节主要内容 停止运行Typed Actor 当Typed Actor不再需要时要将其停止,有3种方法停止Typed Actor的运行: (1)通过system.shutdown()停止ActorSystem中所有的Typed Actor: (2)调用TypedActor(system).stop(mySquarer)停止指定的Typed Actor: (3)调用TypedActor(system).poisonPill(otherSquarer)停止指定的Typed Actor. 具体使用代码

JAVA并发编程2_线程安全&内存模型

"你永远都不知道一个线程何时在运行!" 在上一篇博客JAVA并发编程1_多线程的实现方式中后面看到多线程中程序运行结果往往不确定,和我们预期结果不一致.这就是线程的不安全.线程的安全性是非常复杂的,没有任何同步的情况下,多线程的执行顺序是不可预测的.当多个线程访问同一个资源时就会出现线程安全问题.例如有一个银行账户,一个线程往里面打钱,一个线程取钱,要是得到不确定的结果那是多么可怕的事情. 引入: 例如下面的程序,在单线程下,执行两次i++理论上i的最终值是12,但是在多线程环境下则不

并发编程(七)——AbstractQueuedSynchronizer 之 CountDownLatch、CyclicBarrier、Semaphore 源码分析

这篇,我们的关注点是 AQS 最后的部分,共享模式的使用.本文先用 CountDownLatch 将共享模式说清楚,然后顺着把其他 AQS 相关的类 CyclicBarrier.Semaphore 的源码一起过一下. CountDownLatch CountDownLatch 这个类是比较典型的 AQS 的共享模式的使用,这是一个高频使用的类.使用方法在前面一篇文章中有介绍 并发编程(二)—— CountDownLatch.CyclicBarrier和Semaphore 使用例子 我们看下 Do

(更新中)谈谈个人对java并发编程中(管程模型,死锁,线程生命周期等问题) 见解

之前未曾接触过多线程编程  公司的项目开始用到多线程,所以自己谈谈个人对于并发编程的见解. 并发编程会导致线程不安全,常说的线程不安全指的是  多个线程操作一个共享数据,导致线程之间的读取到的数据不一致. 并发编程导致线程不安全的根源   可见性  原子性    有序性 1 .可见性     cpu缓存导致. 一般cpu缓存中进行操作之后再将数据写到内存,在多核服务器中  每个线程都会分配一个cpu  都会在各自的cpu中进行处理再将数据统一写到内存中.每个cpu缓存中的数据都是不可见的.导致最

java并发编程(七)synchronized详解

Java语言的关键字,当它用来修饰一个方法或者一个代码块的时候,能够保证在同一时刻最多只有一个线程执行该段代码.     一.当两个并发线程访问同一个对象object中的这个synchronized(this)同步代码块时,一个时间内只能有一个线程得到执行.另一个线程必须等待当前线程执行完这个代码块以后才能执行该代码块.      二.然而,当一个线程访问object的一个synchronized(this)同步代码块时,另一个线程仍然可以访问该object中的非synchronized(thi

[Java 并发] Java并发编程实践 思维导图 - 第六章 任务执行

根据<Java并发编程实践>一书整理的思维导图.希望能够有所帮助. 第一部分: 第二部分: 第三部分:

探索并发编程(七)------分布式环境中并发问题

在分布式环境中,处理并发问题就没办法通过操作系统和JVM的工具来解决,那么在分布式环境中,可以采取一下策略和方式来处理: 避免并发 时间戳 串行化 数据库 行锁 统一触发途径 避免并发 在分布式环境中,如果存在并发问题,那么很难通过技术去解决,或者解决的代价很大,所以我们首先要想想是不是可以通过某些策略和业务设计来避免并 发.比如通过合理的时间调度,避开共享资源的存取冲突.另外,在并行任务设计上可以通过适当的策略,保证任务与任务之间不存在共享资源,比如在以前博文中 提到的例子,我们需要用多线程或

&lt;&lt;java 并发编程&gt;&gt;第七章:取消和关闭

Java没有提供任何机制来安全地终止线程,虽然Thread.stop和suspend等方法提供了这样的机制,但是存在严重的缺陷,应该避免使用这些方法.但是Java提供了中断Interruption机制,这是一种协作机制,能够使一个线程终止另一个线程的当前工作. 这种协作方式是必要的,我们很少希望某个任务线程或者服务立即停止,因为这种立即停止会时某个共享的数据结构处于不一致的状态.相反,在编写任务和服务的时候可以使用一种协作方式:当需要停止的时候,它们会先清除当前正在执行的工作,然后再结束. 7.

Java并发编程原理与实战二十六:闭锁 CountDownLatch

关于闭锁 CountDownLatch 之前在网上看到过一篇举例非常形象的例子,但不记得是出自哪里了,所以这里就当自己再重新写一篇吧: 例子如下: 我们每天起早贪黑的上班,父母每天也要上班,有一天定了一个饭店,一家人一起吃个饭,通知大家下班去饭店集合. 假设:3个人在不同的地方上班,必须等到3个人到场才能吃饭,用程序如何实现呢? 方式一: public class Test1 { /** * 模拟爸爸去饭店 */ public static void fatherToRes() { System