package com.vdncloud.las.da.akka import java.util.concurrent.TimeUnit import akka.actor.{Actor, ActorRef, ActorSystem, Props}import akka.routing.RoundRobinRouterimport junit.framework.TestCase import scala.concurrent.duration._ /** * * Calculate – 发送给 主 actor 来启动计算。 * Work – 从 主 actor 发送给各 工作 actor,包含工作分配的内容。 * Result – 从 工作 actors 发送给 主 actor,包含工作actor的计算结果。 * PiApproximation – 从 主 actor发送给 监听器 actor,包含pi的最终计算结果和整个计算耗费的时间。 * * 发送给actor的消息应该永远是不可变的,以避免共享可变状态。 * 在scala里我们有 ‘case classes’ 来构造完美的消息。 * 现在让我们用case class创建3种消息。 * 我们还为消息们创建一个通用的基础trait(定义为sealed以防止在我们不可控的地方创建消息): */sealed trait PiMessage case object Calculate extends PiMessage case class Work(start: Int, nrOfElements: Int) extends PiMessage case class Result(value: Double) extends PiMessage case class PiApproximation(pi: Double, duration: Duration) { override def toString: String = "Pi approximation: \t\t%s\tCalculation time: \t%s".format(pi, duration)} /** * 现在创建工作 actor。 方法是混入 Actor trait 并定义其中的 receive 方法. * receive 方法定义我们的消息处理器。我们让它能够处理 Work 消息,所以添加一个针对这种消息的处理器: */class Worker extends Actor { /** * 可以看到我们现在创建了一个 Actor 和一个 receive 方法作为 Work 消息的处理器. * 在这个处理器中我们调用calculatePiFor(..) 方法, 将结果包在 Result 消息里并使用sender异步发送回消息的原始发送者。 * 在Akka里,sender引用是与消息一起隐式发送的,这样接收者可以随时回复或将sender引用保存起来以备将来使用。 * * 现在在我们的 Worker actor 中唯一缺少的就是实现 calculatePiFor(..) 方法。 * 虽然在Scala里我们可以有很多方法来实现这个算法,在这个入门指南中我们选择了一种命令式的风格,使用了for写法和一个累加器: */ def calculatePiFor(start: Int, nrOfElements: Int): Double = { var acc = 0.0 for (i ← start until (start + nrOfElements)) acc += 4.0 * (1 - (i % 2) * 2) / (2 * i + 1) acc } def receive = { case Work(start, nrOfElements) ? sender ! Result(calculatePiFor(start, nrOfElements)) // perform the work }} /** * 现在我们有了一个路由,可以在一个单一的抽象中表达所有的工作actor。现在让我们创建主actor. 传递给它三个整数变量: * * 我们还缺少 主 actor的消息处理器. 这个处理器需要能够对两种消息进行响应: * Calculate – 用来启动计算过程 * Result – 用来汇总不同的计算结果 * * @param nrOfWorkers – 定义我们会启动多少工作actor * @param nrOfMessages – 定义会有多少整数段发送给工作actor * @param nrOfElements – 定义发送给工作actor的每个整数段的大小 * @param listener – 用来向外界报告最终的计算结果。 */class Master(nrOfWorkers: Int, nrOfMessages: Int, nrOfElements: Int, listener: ActorRef) extends Actor { var pi: Double = _ var nrOfResults: Int = _ val start: Long = System.currentTimeMillis /** * 主actor会稍微复杂一些。 * 在它的构造方法里我们创建一个round-robin的路由器来简化将工作平均地分配给工作actor们的过程,先做这个: */ val workerRouter = context.actorOf( Props[Worker].withRouter(RoundRobinRouter(nrOfWorkers)), name = "workerRouter") // val workerRouter = context.actorOf(Props[Worker].withRouter(FromConfig()), "workerRouter") /** * Calculate 处理器会通过其路由器向所有的工作 actor 发送工作内容. * Result 处理器从 Result 消息中获取值并汇总到我们的 pi 成员变量中. * * 我们还会记录已经接收的结果数据的数量,它是否与发送出去的任务数量一致 . * 主 actor 发现计算完成了,会将最终结果发送给监听者. * 当整个过程都完成了,它会调用 context.stop(self) 方法来终止自己 和 它所监管的所有actor. * 在本例中,主actor监管一个actor,我们的路由器,而路由器监管着所有 nrOfWorkers 个工作actors. * 所有的actor都会在其监管者的stop方法被调用时自动终止,并会传递给所有它监管的子actor。 */ def receive = { case Calculate ? for (i ← 0 until nrOfMessages) workerRouter ! Work(i * nrOfElements, nrOfElements) case Result(value) ? pi += value nrOfResults += 1 if (nrOfResults == nrOfMessages) { // Send the result to the listener val pa = PiApproximation(pi, duration = Duration(System.currentTimeMillis - start, TimeUnit.MILLISECONDS)) listener ! pa sender ! pa // Stops this actor and all its supervised children context.stop(self) } }} /** * 创建计算结果监听者 * 监听者很简单,当它接收到从 Master发来的PiApproximation ,就将结果打印出来并关闭整个 Actor系统。 */class Listener extends Actor { def receive = { // PiApproximation(pi, duration) case a: PiApproximation ? println("Listener " + a) // context.system.terminate() context.system.shutdown() }} /** * Created by hdfs on 17-2-21. * ref:http://blog.csdn.net/beijicy/article/details/50587180 * */class TestAkka extends TestCase { /** * 现在只剩下实现启动和运行计算的执行者了。 * 我们创建一个调用 Pi的对象, 这里我们可以继承Scala中的 Apptrait, 这个trait使我们能够在命令行上直接运行这个应用. */ def testAkkConcurrent(): Unit = { //Pi 对象是我们的actor和消息的很好的容器。 // 所以我们把它们都放在这儿。我们还创建一个 calculate 方法来启动 主 actor 并等待它结束: calculate(nrOfWorkers = 4, nrOfElements = 10000, nrOfMessages = 10000) } /** * calculate 方法创建一个 Actor系统,这是包括所有创建出的actor的 “上下文”。 * 如何在容器中创建actor的例子在calculate方法的 ‘system.actorOf(...)’ 这一行。 * 这里我们创建两个顶级actor. 如果你是在一个actor上下文(i.e. 在一个创建其它actor的actor中), * 你应该使用 context.actorOf(...). 这在以上的主actor代码中有所体现。 * * @param nrOfWorkers * @param nrOfElements * @param nrOfMessages */ def calculate(nrOfWorkers: Int, nrOfElements: Int, nrOfMessages: Int) { // Create an Akka system val system = ActorSystem("PiSystem") // val system = ActorSystem("MasterApp", ConfigFactory.load.getConfig("multiThread")) // create the result listener, which will print the result and shutdown the system val listener = system.actorOf(Props[Listener], name = "listener") // create the master val master = system.actorOf(Props(new Master(nrOfWorkers, nrOfMessages, nrOfElements, listener)), name = "master") master ! Calculate }}
时间: 2024-12-04 16:39:12