Scala akka

package com.vdncloud.las.da.akka

import java.util.concurrent.TimeUnit

import akka.actor.{Actor, ActorRef, ActorSystem, Props}import akka.routing.RoundRobinRouterimport junit.framework.TestCase

import scala.concurrent.duration._

/**  *  * Calculate – 发送给 主 actor 来启动计算。  * Work – 从 主 actor 发送给各 工作 actor,包含工作分配的内容。  * Result – 从 工作 actors 发送给 主 actor,包含工作actor的计算结果。  * PiApproximation – 从 主 actor发送给 监听器 actor,包含pi的最终计算结果和整个计算耗费的时间。  *  * 发送给actor的消息应该永远是不可变的,以避免共享可变状态。  * 在scala里我们有 ‘case classes’ 来构造完美的消息。  * 现在让我们用case class创建3种消息。  * 我们还为消息们创建一个通用的基础trait(定义为sealed以防止在我们不可控的地方创建消息):  */sealed trait PiMessage

case object Calculate extends PiMessage

case class Work(start: Int, nrOfElements: Int) extends PiMessage

case class Result(value: Double) extends PiMessage

case class PiApproximation(pi: Double, duration: Duration) {  override def toString: String = "Pi approximation: \t\t%s\tCalculation time: \t%s".format(pi, duration)}

/**  * 现在创建工作 actor。 方法是混入 Actor trait 并定义其中的 receive 方法.  * receive 方法定义我们的消息处理器。我们让它能够处理 Work 消息,所以添加一个针对这种消息的处理器:  */class Worker extends Actor {  /**    * 可以看到我们现在创建了一个 Actor 和一个 receive 方法作为 Work 消息的处理器.    * 在这个处理器中我们调用calculatePiFor(..) 方法, 将结果包在 Result 消息里并使用sender异步发送回消息的原始发送者。    * 在Akka里,sender引用是与消息一起隐式发送的,这样接收者可以随时回复或将sender引用保存起来以备将来使用。    *    * 现在在我们的 Worker actor 中唯一缺少的就是实现 calculatePiFor(..) 方法。    * 虽然在Scala里我们可以有很多方法来实现这个算法,在这个入门指南中我们选择了一种命令式的风格,使用了for写法和一个累加器:    */  def calculatePiFor(start: Int, nrOfElements: Int): Double = {    var acc = 0.0    for (i ← start until (start + nrOfElements))      acc += 4.0 * (1 - (i % 2) * 2) / (2 * i + 1)    acc  }

def receive = {    case Work(start, nrOfElements) ?      sender ! Result(calculatePiFor(start, nrOfElements)) // perform the work  }}

/**  * 现在我们有了一个路由,可以在一个单一的抽象中表达所有的工作actor。现在让我们创建主actor. 传递给它三个整数变量:  *  * 我们还缺少 主 actor的消息处理器. 这个处理器需要能够对两种消息进行响应:  * Calculate – 用来启动计算过程  * Result – 用来汇总不同的计算结果  *  * @param nrOfWorkers  – 定义我们会启动多少工作actor  * @param nrOfMessages – 定义会有多少整数段发送给工作actor  * @param nrOfElements – 定义发送给工作actor的每个整数段的大小  * @param listener     – 用来向外界报告最终的计算结果。  */class Master(nrOfWorkers: Int, nrOfMessages: Int, nrOfElements: Int, listener: ActorRef)  extends Actor {

var pi: Double = _  var nrOfResults: Int = _  val start: Long = System.currentTimeMillis

/**    * 主actor会稍微复杂一些。    * 在它的构造方法里我们创建一个round-robin的路由器来简化将工作平均地分配给工作actor们的过程,先做这个:    */

val workerRouter = context.actorOf(    Props[Worker].withRouter(RoundRobinRouter(nrOfWorkers)), name = "workerRouter")

//  val workerRouter = context.actorOf(Props[Worker].withRouter(FromConfig()), "workerRouter")

/**    * Calculate 处理器会通过其路由器向所有的工作 actor 发送工作内容.    * Result 处理器从 Result 消息中获取值并汇总到我们的 pi 成员变量中.    *    * 我们还会记录已经接收的结果数据的数量,它是否与发送出去的任务数量一致 .    * 主 actor 发现计算完成了,会将最终结果发送给监听者.    * 当整个过程都完成了,它会调用 context.stop(self) 方法来终止自己 和 它所监管的所有actor.    * 在本例中,主actor监管一个actor,我们的路由器,而路由器监管着所有 nrOfWorkers 个工作actors.    * 所有的actor都会在其监管者的stop方法被调用时自动终止,并会传递给所有它监管的子actor。    */  def receive = {    case Calculate ?      for (i ← 0 until nrOfMessages) workerRouter ! Work(i * nrOfElements, nrOfElements)    case Result(value) ?      pi += value      nrOfResults += 1      if (nrOfResults == nrOfMessages) {        // Send the result to the listener        val pa = PiApproximation(pi, duration = Duration(System.currentTimeMillis - start, TimeUnit.MILLISECONDS))        listener ! pa        sender ! pa        // Stops this actor and all its supervised children        context.stop(self)      }  }}

/**  * 创建计算结果监听者  * 监听者很简单,当它接收到从 Master发来的PiApproximation ,就将结果打印出来并关闭整个 Actor系统。  */class Listener extends Actor {  def receive = {    //    PiApproximation(pi, duration)    case a: PiApproximation ?      println("Listener " + a)    //      context.system.terminate()          context.system.shutdown()  }}

/**  * Created by hdfs on 17-2-21.  * ref:http://blog.csdn.net/beijicy/article/details/50587180  *  */class TestAkka extends TestCase {

/**    * 现在只剩下实现启动和运行计算的执行者了。    * 我们创建一个调用 Pi的对象, 这里我们可以继承Scala中的 Apptrait, 这个trait使我们能够在命令行上直接运行这个应用.    */  def testAkkConcurrent(): Unit = {    //Pi 对象是我们的actor和消息的很好的容器。    // 所以我们把它们都放在这儿。我们还创建一个 calculate 方法来启动 主 actor 并等待它结束:

calculate(nrOfWorkers = 4, nrOfElements = 10000, nrOfMessages = 10000)

}

/**    * calculate 方法创建一个 Actor系统,这是包括所有创建出的actor的 “上下文”。    * 如何在容器中创建actor的例子在calculate方法的 ‘system.actorOf(...)’ 这一行。    * 这里我们创建两个顶级actor. 如果你是在一个actor上下文(i.e. 在一个创建其它actor的actor中),    * 你应该使用 context.actorOf(...). 这在以上的主actor代码中有所体现。    *    * @param nrOfWorkers    * @param nrOfElements    * @param nrOfMessages    */  def calculate(nrOfWorkers: Int, nrOfElements: Int, nrOfMessages: Int) {    // Create an Akka system    val system = ActorSystem("PiSystem")

//    val system = ActorSystem("MasterApp", ConfigFactory.load.getConfig("multiThread"))

// create the result listener, which will print the result and shutdown the system    val listener = system.actorOf(Props[Listener], name = "listener")

// create the master    val master = system.actorOf(Props(new Master(nrOfWorkers, nrOfMessages, nrOfElements, listener)),      name = "master")

master ! Calculate  }}
时间: 2024-12-04 16:39:12

Scala akka的相关文章

scala akka 修炼之路6(scala函数式柯里化风格应用场景分析)

胜败兵家事不期,包羞忍耻是男儿--斗牛士fighting,fighting,fighting... 小象学习和使用scala也一段时间了,最初小象学习scala主要为了学习spark生态,但是深入学习scala的一些特性后,深深被scala函数式和面向对象的风格所折服,不得不赞美设计这门语言的设计者.小象大学阶段在使用MATLAB做数据分析和自动化设计时,就非常喜欢使用MATLAB的命令行和面向矩阵运算的风格编写分析代码:喜欢使用java编写层次化和清晰的模块接口,而这些Scala语言设计中都有

scala akka Future 顺序执行 sequential execution

对于 A => B => C 这种 future 之间的操作,akka 默认会自动的按照顺序执行,但对于数据库操作来说,我们希望几个操作顺序执行,就需要使用语法来声明 有两种声明 future 先后关系的方法,第一种是 flatMap,第二种是 for import scala.concurrent.Future import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.blocking

scala akka 修炼之路5(scala特质应用场景分析)

scala中特质定义:包括一些字段,行为(方法/函数/动作)和一些未实现的功能接口的集合,能够方便的实现扩展或混入到已有类或抽象类中. scala中特质(trait)是一个非常实用的特性,在程序设计中能够 更好的抽象现实.使程序更关注各自功能和更好的将程序拆分成多个特质模块,使程序具有更强的扩展性.熟悉java的同学.能够将特质理解为抽象类.可是scala中能够在一个类中同一时候混入多个特质(使用extends 或with).而java中一个类仅仅能继承一个抽象类,假设要实现多个抽象类就必需使用

[scala] akka actor编程

Akka基础 Akka笔记之Actor简介  Akka中的Actor遵循Actor模型.你可以把Actor当作是人.这些人不会亲自去和别人交谈.他们只通过邮件来交流.  1. 消息传递 2. 并发 3. 异常处理 4. 多任务 5. 消息链 Akka笔记之消息传递 消息发送给actor代理: 消息是不可变对象(可带有属性的case class): 分发器dispatcher和邮箱: dispatcher从actorRef取出一条消息放在目标actor邮箱中,然后放mailbox放在一个Threa

Spark架构开发 大数据视频教程 SQL Streaming Scala Akka Hadoop

培训Spark架构开发! 从基础到高级,一对一培训![技术QQ:2937765541] --------------------------------------------------------------------------------------------------------------------------------------- 课程体系: 获取视频资料和培训解答技术支持地址 课程展示(大数据技术很广,一直在线为你培训解答!): 获取视频资料和培训解答技术支持地址

Akka FSM 源代码分析

Akka FSM 源代码分析 萧猛 <[email protected]> 啰嗦几句 有限状态机本身不是啥新奇东西,在GoF的设计模式一书中就有状态模式, 也给出了实现的建议.各种语言对状态机模式都有非常多种实现的方式.我自己以前用C++和java实现过,也以前把 apache mina 源代码中的一个状态机实现抠出来单独使用. 但Akka的状态机是我见过的最简洁美丽实现.充分利用了Scala的很多先进的语言机制让代码更加简洁清晰.利用了Akka Actor实现并发.用户基本不用考虑线程安全的

Akka(36): Http:Client-side-Api,Client-Connections

Akka-http的客户端Api应该是以HttpRequest操作为主轴的网上消息交换模式编程工具.我们知道:Akka-http是搭建在Akka-stream之上的.所以,Akka-http在客户端构建与服务器的连接通道也可以用Akka-stream的Flow来表示.这个Flow可以通过调用Http.outgoingConnection来获取: /** * Creates a [[akka.stream.scaladsl.Flow]] representing a prospective HTT

Scala笔记整理(一):scala基本知识

[TOC] Scala简介 Scala是一门多范式(multi-paradigm)的编程语言,设计初衷是要集成面向对象编程和函数式编程的各种特性. Scala运行在Java虚拟机上,并兼容现有的Java程序. Scala源代码被编译成Java字节码,所以它可以运行于JVM之上,并可以调用现有的Java类库. 函数编程范式更适合用于Map/Reduce和大数据模型,它摒弃了数据与状态的计算模型,着眼于函数本身,而非执行的过程的数据和状态的处理.函数范式逻辑清晰.简单,非常适合用于处理基于不变数据的

.NET的Actor模型:Orleans

Orleans是微软推出的类似Scala Akka的Actor模型,Orleans是一个建立在.NET之上的,设计的目标是为了方便程序员开发需要大规模扩展的云服务, 可用于实现DDD+EventSourcing/CQRS系统. 传统的三层体系结构包括无状态的前端,无状态的中间层和存储层在可伸缩性方面是有限制的,由于存储层在延迟和吞吐量方面的限制,这对于每个用户请求都有影响.通常办法是在中间层和存储层之间添加缓存层来提高性能.然而,缓存会失去了大部分的并发性和底层存储层的语义保证.为了防止缓存和存