Akka 是一个用 Scala 编写的库,用于简化编写容错的、高可伸缩性的 Java 和 Scala 的 Actor 模型应用。它已经成功运用在电信行业。系统几乎不会宕机(高可用性 99.9999999 % 一年只有 31 ms 宕机。
用akka计算pi,计算公式:
这样,我们把这个公式每连续的elements个分成一段,一共分成message段。
然后开worker个actor同时计算,把结果合并。算出最终结果和耗时。
一:计算某一段的actor
我们需要一个工作actor,用来计算某一段的和,并把结果传出去。
class Worker extends Actor { //计算从start开始,连续elements个的和 def calculatePiFor(start: Int, elements: Int): Double = { var acc = 0.0 for (i <- start until (start + elements)) acc += 4.0 * (1 - (i % 2) * 2) / (2 * i + 1) acc } def receive = { // sender 用来访问当前消息的发送者的引用 case Work(start, elements) => sender ! Result(calculatePiFor(start, elements)) } }
二:创建主actor
主actor的工作是把任务分配下去,并且把Worker执行的结果进行收集处理,并且把处理完的最终结果返回给监听actor
分配任务我们就需要创建一个round-robin的路由器来简化把任务平均分配给工作者的过程。
//创建了一个路由器,启动了workers个Worker val workerRouter = context.actorOf( Props[Worker].withRouter(RoundRobinRouter(workers)), name = "workerRouter")
对于主actor,会接受两种消息,一种是计算,一种是收到结果处理。
class Master(workers: Int, messages: Int, elements: Int, listener: ActorRef) extends Actor { var pi: Double = 0.0 var finish: Int = 0 val startTime: Long = System.currentTimeMillis() //创建了一个路由器,启动了workers个Worker val workerRouter = context.actorOf( Props[Worker].withRouter(RoundRobinRouter(workers)), name = "workerRouter") def receive = { //收到计算的请求,把计算的任务分配下去 case Calculate => for (i <- 0 until messages) workerRouter ! Work(i * elements, elements) //收到计算的结果,把计算的结果加到pi上,并且判断下发的任务有没有全部执行完毕 //如果全部执行完毕,那么给监听者发一个消息 case Result(value) => pi += value finish += 1 if (finish == messages) { listener ! PiApproximation(pi, duration = (System.currentTimeMillis - startTime)) context.stop(self) } } }
三:监听actor
监听actor比较简单,收到数据,直接输出就好
class Listener extends Actor { def receive = { case PiApproximation(pi, duration) => println("计算结束,结果为: " + pi + "用时 : " + duration) } }
四:最终结果
package akka.study.base import akka.actor._ import akka.routing.RoundRobinRouter import java.util.concurrent.TimeUnit import java.lang.System._ import java.time.Duration sealed trait PiMessage case object Calculate extends PiMessage case class Work(start: Int, elements: Int) extends PiMessage case class Result(value: Double) extends PiMessage case class PiApproximation(pi: Double, duration: Long) class Worker extends Actor { //计算从start开始,连续elements个的和 def calculatePiFor(start: Int, elements: Int): Double = { var acc = 0.0 for (i <- start until (start + elements)) acc += 4.0 * (1 - (i % 2) * 2) / (2 * i + 1) acc } def receive = { // sender 用来访问当前消息的发送者的引用 case Work(start, elements) => sender ! Result(calculatePiFor(start, elements)) } } class Master(workers: Int, messages: Int, elements: Int, listener: ActorRef) extends Actor { var pi: Double = 0.0 var finish: Int = 0 val startTime: Long = System.currentTimeMillis() //创建了一个路由器,启动了workers个Worker val workerRouter = context.actorOf( Props[Worker].withRouter(RoundRobinRouter(workers)), name = "workerRouter") def receive = { //收到计算的请求,把计算的任务分配下去 case Calculate => for (i <- 0 until messages) workerRouter ! Work(i * elements, elements) //收到计算的结果,把计算的结果加到pi上,并且判断下发的任务有没有全部执行完毕 //如果全部执行完毕,那么给监听者发一个消息 case Result(value) => pi += value finish += 1 if (finish == messages) { listener ! PiApproximation(pi, duration = (System.currentTimeMillis - startTime)) context.stop(self) } } } class Listener extends Actor { def receive = { case PiApproximation(pi, duration) => println("计算结束,结果为: " + pi + "用时 : " + duration) } } object Pi { def main(args: Array[String]) { def calculate(workers: Int, elements: Int, messages: Int) { val system = ActorSystem("PiSystem") val listener = system.actorOf(Props[Listener], name = "listener") val master = system.actorOf(Props(new Master( workers, messages, elements, listener)), name = "master") master ! Calculate } calculate(6, 10000, 10000) } }
时间: 2024-10-13 19:55:40